当前位置: 首页 > news >正文

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

前言

【本文适合有一定计算机基础/半年工作经验的读者食用。立个Flg,愿天下不再有肤浅的SQL Boy】

谈到大数据开发,占据绝大多数人口的就是SQL Boy,不接受反驳,毕竟大数据主要就是为机器学习和统计报表服务的,自然从Oracle数据库开发转过来并且还是只会写几句SQL的人不在少数,个别会Python写个spark.sql(“一个sql字符串”)的已经是SQL Boy中的人才。这种只能处理结构化表的最基础的大数据开发人员,就是我们常提到的梗:肤浅的SQL Boy。。。对大数据完全不懂,思想还停留在数据库时代,大数据组件也都是拿来当RDBMS来用。。。这种业务开发人员的技术水平其实不敢恭维。

还有从Java后端开发转过来的,虽然不适应,但还是可以一个Main方法流畅地操作Spark、Flink,手写个JDBC,做点简单的二开,这种就是平台开发人员,技术水平要更高一些。Java写得好,Scala其实上手也快。

但是。。。这并不代表做大数据只能用SQL/Java/Scala。。。这么局限的话,也不比SQL Boy强到哪里去。

笔者最早还搞过嵌入式开发,自然明白C/C#/C++也可以搞大数据。。。

本文将以大数据开发中最常见的数仓组件Hive的drop table为例,抛砖引玉,解读为神马大数据开发可以脱离SQL、Java、Scala。

为神马可以脱离SQL

数据不外乎结构化数据和非结构化数据,SQL只能处理极其有限的结构化表【RDBMS、整齐的csv/tsv等】,绝大多数的半结构化、非结构化数据SQL是无能为力的【log日志文件、音图等】。古代的MapReduce本身就不可以用SQL,Spark和Flink老版本都是基于API的,没有SQL的年代大家也活得好好的。大数据组件对SQL的支持日渐友好都是后来的事情,主要是为了降低门槛,让SQL Boy也可以用上大数据技术。

肤浅的SQL Boy们当然只知道:

drop table db_name.tb_name;

正常情况这个Hive表就会被drop掉,认知也就局限于Hive是个数据库。

但是大数据平台开发知道去翻看Hive的Java API:

https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.1.3/api/index.html

知道还有这种方式:

package com.zhiyong;import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;/*** @program: zhiyong_study* @description: 测试MetaStore* @author: zhiyong* @create: 2023-03-22 22:57**/
public class MetaStoreDemo {public static void main(String[] args) throws Exception{HiveConf hiveConf = new HiveConf();HiveMetaStoreClient client = new HiveMetaStoreClient(hiveConf);client.dropTable("db_name","tb_name");}
}

通过调用API的方式,同样可以drop掉表。显然不一定要用DDL。通过HiveMetaStoreClient的方式,还可以create建表等操作。

懂大数据底层的平台开发当然还有更狠的方式:直接连Hive存元数据的MySQL,对元数据表的数据做精准crud。。。

对结构化表的ETL或者其它的运算处理完全可以用Spark的DataFrame、Flink的DataStream编程,纯API方式实现,SQL能实现的Java和Scala都能实现,至于SQL实现不了的Java和Scala也能实现。。。

笔者实在是想不到除了RDBMS和各类包皮产品【在开源的Apache组件基础上做一些封装】,还有哪些场景是只能用SQL的。。。

至此,可以说明大数据可以脱离SQL。

为神马可以脱离Java

虽然Hive底层是Java写的,但是这并不意味着只能用Java操作Hive。认知这么肤浅的话,也就活该一辈子调参调API了。。。

找到dropTable的实际入口

从Hive3.1.2源码,可以找到dropTable方法:

@Overridepublic void dropTable(String dbname, String name, boolean deleteData,boolean ignoreUnknownTab) throws MetaException, TException,NoSuchObjectException, UnsupportedOperationException {dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, null);}@Overridepublic void dropTable(String dbname, String name, boolean deleteData,boolean ignoreUnknownTab, boolean ifPurge) throws TException {dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, ifPurge);}@Overridepublic void dropTable(String dbname, String name) throws TException {dropTable(getDefaultCatalog(conf), dbname, name, true, true, null);}@Overridepublic void dropTable(String catName, String dbName, String tableName, boolean deleteData,boolean ignoreUnknownTable, boolean ifPurge) throws TException {//build new environmentContext with ifPurge;EnvironmentContext envContext = null;if(ifPurge){Map<String, String> warehouseOptions;warehouseOptions = new HashMap<>();warehouseOptions.put("ifPurge", "TRUE");envContext = new EnvironmentContext(warehouseOptions);}dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext);}

虽然有多个同名方法,但是底层调用的还是同一个方法:

  /*** Drop the table and choose whether to: delete the underlying table data;* throw if the table doesn't exist; save the data in the trash.** @param catName catalog name* @param dbname database name* @param name table name* @param deleteData*          delete the underlying data or just delete the table in metadata* @param ignoreUnknownTab*          don't throw if the requested table doesn't exist* @param envContext*          for communicating with thrift* @throws MetaException*           could not drop table properly* @throws NoSuchObjectException*           the table wasn't found* @throws TException*           a thrift communication error occurred* @throws UnsupportedOperationException*           dropping an index table is not allowed* @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String,*      java.lang.String, boolean)*/public void dropTable(String catName, String dbname, String name, boolean deleteData,boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,NoSuchObjectException, UnsupportedOperationException {Table tbl;try {tbl = getTable(catName, dbname, name);} catch (NoSuchObjectException e) {if (!ignoreUnknownTab) {throw e;}return;}HiveMetaHook hook = getHook(tbl);if (hook != null) {hook.preDropTable(tbl);}boolean success = false;try {drop_table_with_environment_context(catName, dbname, name, deleteData, envContext);if (hook != null) {hook.commitDropTable(tbl, deleteData || (envContext != null && "TRUE".equals(envContext.getProperties().get("ifPurge"))));}success=true;} catch (NoSuchObjectException e) {if (!ignoreUnknownTab) {throw e;}} finally {if (!success && (hook != null)) {hook.rollbackDropTable(tbl);}}}

主要就是获取了表对象,然后做了preDropTable预提交和commitDropTable实际的提交。这种2PC方式表面上还是很严谨。。。

可以发现HiveMetaHook这其实是个接口:

package org.apache.hadoop.hive.metastore;/*** HiveMetaHook defines notification methods which are invoked as part* of transactions against the metastore, allowing external catalogs* such as HBase to be kept in sync with Hive's metastore.**<p>** Implementations can use {@link MetaStoreUtils#isExternalTable} to* distinguish external tables from managed tables.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface HiveMetaHook {public String ALTER_TABLE_OPERATION_TYPE = "alterTableOpType";public List<String> allowedAlterTypes = ImmutableList.of("ADDPROPS", "DROPPROPS");/*** Called before a table definition is removed from the metastore* during DROP TABLE.** @param table table definition*/public void preDropTable(Table table)throws MetaException;/*** Called after failure removing a table definition from the metastore* during DROP TABLE.** @param table table definition*/public void rollbackDropTable(Table table)throws MetaException;/*** Called after successfully removing a table definition from the metastore* during DROP TABLE.** @param table table definition** @param deleteData whether to delete data as well; this should typically* be ignored in the case of an external table*/public void commitDropTable(Table table, boolean deleteData)throws MetaException;
}

继承关系:

在这里插入图片描述

显然不是这个:

package org.apache.hadoop.hive.metastore;public abstract class DefaultHiveMetaHook implements HiveMetaHook {/*** Called after successfully INSERT [OVERWRITE] statement is executed.* @param table table definition* @param overwrite true if it is INSERT OVERWRITE** @throws MetaException*/public abstract void commitInsertTable(Table table, boolean overwrite) throws MetaException;/*** called before commit insert method is called* @param table table definition* @param overwrite true if it is INSERT OVERWRITE** @throws MetaException*/public abstract void preInsertTable(Table table, boolean overwrite) throws MetaException;/*** called in case pre commit or commit insert fail.* @param table table definition* @param overwrite true if it is INSERT OVERWRITE** @throws MetaException*/public abstract void rollbackInsertTable(Table table, boolean overwrite) throws MetaException;
}

更不可能是这个test的Mock类:

/*** Mock class used for unit testing.* {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2#testLockingOnInsertIntoNonNativeTables()}*/
public class StorageHandlerMock extends DefaultStorageHandler {}

所以是AccumuloStorageHandler这个类:

package org.apache.hadoop.hive.accumulo;/*** Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.*/
public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,HiveStoragePredicateHandler {}

但是:

  @Overridepublic void preDropTable(Table table) throws MetaException {// do nothing}

这个do nothing!!!一言难尽。这种2PC方式表面上确实很严谨。。。

所以dropTable的入口是:

  @Overridepublic void commitDropTable(Table table, boolean deleteData) throws MetaException {String tblName = getTableName(table);if (!isExternalTable(table)) {try {if (deleteData) {TableOperations tblOpts = connectionParams.getConnector().tableOperations();if (tblOpts.exists(tblName)) {tblOpts.delete(tblName);}}} catch (AccumuloException e) {throw new MetaException(StringUtils.stringifyException(e));} catch (AccumuloSecurityException e) {throw new MetaException(StringUtils.stringifyException(e));} catch (TableNotFoundException e) {throw new MetaException(StringUtils.stringifyException(e));}}}

按照最简单的内部表、需要删数据来看,实际上调用的是这个delete方法。而TableOperations又是个接口:

package org.apache.accumulo.core.client.admin;/*** Provides a class for administering tables**/public interface TableOperations {/*** Delete a table** @param tableName*          the name of the table* @throws AccumuloException*           if a general error occurs* @throws AccumuloSecurityException*           if the user does not have permission* @throws TableNotFoundException*           if the table does not exist*/void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
}

继承关系简单:

在这里插入图片描述

当然就是这个实现类:

package org.apache.accumulo.core.client.impl;public class TableOperationsImpl extends TableOperationsHelper {@Overridepublic void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {checkArgument(tableName != null, "tableName is null");List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)));Map<String,String> opts = new HashMap<>();try {doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE, args, opts);} catch (TableExistsException e) {// should not happenthrow new AssertionError(e);}}
}

所以实际入口是这里的doTableFateOperation方法。枚举体的FateOperation.TABLE_DELETE=2。

找到doTableFateOperation方法的调用栈

跳转到:

  private void doTableFateOperation(String tableOrNamespaceName, Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException {try {doFateOperation(op, args, opts, tableOrNamespaceName);} }

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName) throws AccumuloSecurityException,TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException {return doFateOperation(op, args, opts, tableOrNamespaceName, true);}

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName, boolean wait)throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException {Long opid = null;try {opid = beginFateOperation();executeFateOperation(opid, op, args, opts, !wait);if (!wait) {opid = null;return null;}String ret = waitForFateOperation(opid);return ret;} catch (ThriftSecurityException e) {switch (e.getCode()) {case TABLE_DOESNT_EXIST:throw new TableNotFoundException(null, tableOrNamespaceName, "Target table does not exist");case NAMESPACE_DOESNT_EXIST:throw new NamespaceNotFoundException(null, tableOrNamespaceName, "Target namespace does not exist");default:String tableInfo = Tables.getPrintableTableInfoFromName(context.getInstance(), tableOrNamespaceName);throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);}} catch (ThriftTableOperationException e) {switch (e.getType()) {case EXISTS:throw new TableExistsException(e);case NOTFOUND:throw new TableNotFoundException(e);case NAMESPACE_EXISTS:throw new NamespaceExistsException(e);case NAMESPACE_NOTFOUND:throw new NamespaceNotFoundException(e);case OFFLINE:throw new TableOfflineException(context.getInstance(), Tables.getTableId(context.getInstance(), tableOrNamespaceName));default:throw new AccumuloException(e.description, e);}} catch (Exception e) {throw new AccumuloException(e.getMessage(), e);} finally {Tables.clearCache(context.getInstance());// always finish table op, even when exceptionif (opid != null)try {finishFateOperation(opid);} catch (Exception e) {log.warn(e.getMessage(), e);}}}

在这里可以发现一些奇怪的现象,居然catch了好多Thrift相关的Exception。继续跳转:

  // This method is for retrying in the case of network failures; anything else it passes to the caller to deal withprivate void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)throws ThriftSecurityException, TException, ThriftTableOperationException {while (true) {MasterClientService.Iface client = null;try {client = MasterClient.getConnectionWithRetry(context);client.executeFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid, op, args, opts, autoCleanUp);break;} catch (TTransportException tte) {log.debug("Failed to call executeFateOperation(), retrying ... ", tte);UtilWaitThread.sleep(100);} finally {MasterClient.close(client);}}}

这个死循环里获取了Client对象。但是这个Client一看就没那么简单。。。调用的executeFateOperation方法还不能直接Idea点开,需要手动定位。

分析client对象

package org.apache.accumulo.core.client.impl;import com.google.common.net.HostAndPort;public class MasterClient {private static final Logger log = LoggerFactory.getLogger(MasterClient.class);public static MasterClientService.Client getConnectionWithRetry(ClientContext context) {while (true) {MasterClientService.Client result = getConnection(context);if (result != null)return result;UtilWaitThread.sleep(250);}}
}

实际上又是这个:

public static class Client extends FateService.Client implements Iface {
}

所以其父类是:

package org.apache.accumulo.core.master.thrift;@SuppressWarnings({"unchecked", "serial", "rawtypes", "unused"}) public class FateService {public interface Iface {public void executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, org.apache.thrift.TException;}public void executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, org.apache.thrift.TException
{send_executeFateOperation(tinfo, credentials, opid, op, arguments, options, autoClean);recv_executeFateOperation();
}public static class Client extends org.apache.thrift.TServiceClient implements Iface {}
}

所以这种client对象才可以执行executeFateOperation方法。

查看executeFateOperation方法

分为2步,字面意思send_executeFateOperation方法发送了啥,recv_executeFateOperation方法又接收了啥。显然发送消息是需要重点关心的:

public void send_executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.thrift.TException
{executeFateOperation_args args = new executeFateOperation_args();args.setTinfo(tinfo);args.setCredentials(credentials);args.setOpid(opid);args.setOp(op);args.setArguments(arguments);args.setOptions(options);args.setAutoClean(autoClean);sendBase("executeFateOperation", args);
}

这个发送的方法把入参的表名、操作类型【Drop表】设置为sendBase方法的入参。

package org.apache.thrift;/*** A TServiceClient is used to communicate with a TService implementation* across protocols and transports.*/
public abstract class TServiceClient {protected void sendBase(String methodName, TBase<?,?> args) throws TException {sendBase(methodName, args, TMessageType.CALL);}private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));args.write(oprot_);oprot_.writeMessageEnd();oprot_.getTransport().flush();}}

其中:

package org.apache.thrift.protocol;/*** Message type constants in the Thrift protocol.**/
public final class TMessageType {public static final byte CALL  = 1;public static final byte REPLY = 2;public static final byte EXCEPTION = 3;public static final byte ONEWAY = 4;
}

这个type传入的其实是1。用于构造方法:

package org.apache.thrift.protocol;/*** Helper class that encapsulates struct metadata.**/
public final class TMessage {public TMessage(String n, byte t, int s) {name = n;type = t;seqid = s;}public final String name;public final byte type;public final int seqid;}

另一个泛型TBase:

package org.apache.thrift;import java.io.Serializable;import org.apache.thrift.protocol.TProtocol;/*** Generic base interface for generated Thrift objects.**/
public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable {/*** Reads the TObject from the given input protocol.** @param iprot Input protocol*/public void read(TProtocol iprot) throws TException;/*** Writes the objects out to the protocol** @param oprot Output protocol*/public void write(TProtocol oprot) throws TException;
}

按照注释可以知道write方法是把Java的对象输出给协议。

executeFateOperation_args类:

public static class executeFateOperation_args implements org.apache.thrift.TBase<executeFateOperation_args, executeFateOperation_args._Fields>, java.io.Serializable, Cloneable, Comparable<executeFateOperation_args>   {
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {schemes.get(oprot.getScheme()).getScheme().write(oprot, this);}   
}

它的write方法:

package org.apache.thrift.scheme;import org.apache.thrift.TBase;public interface IScheme<T extends TBase> {public void read(org.apache.thrift.protocol.TProtocol iproto, T struct) throws org.apache.thrift.TException;public void write(org.apache.thrift.protocol.TProtocol oproto, T struct) throws org.apache.thrift.TException;}

又是跳转到接口。。。

在这里插入图片描述

可以看到有2大抽象类。

getScheme拿到的:


package org.apache.thrift.protocol;import java.nio.ByteBuffer;import org.apache.thrift.TException;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.transport.TTransport;/*** Protocol interface definition.**/
public abstract class TProtocol {public Class<? extends IScheme> getScheme() {return StandardScheme.class;}public abstract void writeMessageBegin(TMessage message) throws TException;
}

显然get到的是StandardScheme类。而writeMessageBegin又是这个抽象类的抽象方法。

该抽象类的继承关系:

在这里插入图片描述

至此可以知道原生支持的协议有这些。最常用的当然就是二进制协议:TBinaryProtocol。

查看TBinaryProtocol二进制协议

package org.apache.thrift.protocol;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;/*** Binary protocol implementation for thrift.**/
public class TBinaryProtocol extends TProtocol {public void writeMessageBegin(TMessage message) throws TException {if (strictWrite_) {int version = VERSION_1 | message.type;writeI32(version);writeString(message.name);writeI32(message.seqid);} else {writeString(message.name);writeByte(message.type);writeI32(message.seqid);}}
}

可以看出writeMessageBegin方法就是实际的写数据操作,把消息拆分后写出。

public void writeString(String str) throws TException {try {byte[] dat = str.getBytes("UTF-8");writeI32(dat.length);trans_.write(dat, 0, dat.length);} catch (UnsupportedEncodingException uex) {throw new TException("JVM DOES NOT SUPPORT UTF-8");}
}

以此为例。会去把数据作为字节数组写出:

package org.apache.thrift.transport;import java.io.Closeable;/*** Generic class that encapsulates the I/O layer. This is basically a thin* wrapper around the combined functionality of Java input/output streams.**/
public abstract class TTransport implements Closeable {/*** Reads up to len bytes into buffer buf, starting at offset off.** @param buf Array to read into* @param off Index to start reading at* @param len Maximum number of bytes to read* @return The number of bytes actually read* @throws TTransportException if there was an error reading data*/public abstract int read(byte[] buf, int off, int len)throws TTransportException;/*** Writes up to len bytes from the buffer.** @param buf The output data buffer* @param off The offset to start writing from* @param len The number of bytes to write* @throws TTransportException if there was an error writing data*/public abstract void write(byte[] buf, int off, int len)throws TTransportException;
}

这才是真正的传输对象。其继承关系:

在这里插入图片描述

搞过嵌入式开发的一定很熟悉这个Socket!!!就是IP+port的那个Socket。应用层与TCP/IP传输层间的抽象层。。。

查看TIOStreamTransport传输类

package org.apache.thrift.transport;/*** This is the most commonly used base transport. It takes an InputStream* and an OutputStream and uses those to perform all transport operations.* This allows for compatibility with all the nice constructs Java already* has to provide a variety of types of streams.**/
public class TIOStreamTransport extends TTransport {public int read(byte[] buf, int off, int len) throws TTransportException {if (inputStream_ == null) {throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");}int bytesRead;try {bytesRead = inputStream_.read(buf, off, len);} catch (IOException iox) {throw new TTransportException(TTransportException.UNKNOWN, iox);}if (bytesRead < 0) {throw new TTransportException(TTransportException.END_OF_FILE);}return bytesRead;}/*** Writes to the underlying output stream if not null.*/public void write(byte[] buf, int off, int len) throws TTransportException {if (outputStream_ == null) {throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");}try {outputStream_.write(buf, off, len);} catch (IOException iox) {throw new TTransportException(TTransportException.UNKNOWN, iox);}}/*** Flushes the underlying output stream if not null.*/public void flush() throws TTransportException {if (outputStream_ == null) {throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");}try {outputStream_.flush();} catch (IOException iox) {throw new TTransportException(TTransportException.UNKNOWN, iox);}}
}

其子类TSocket重写了IP、Port和init等。

小结Drop表的流程

至此可以得知Java用API操作Hive的原理,大致是这样:

顶层API【dropTable】→表操作实现类【TableOperationsImpl】的删表方法【doTableFateOperation】
→executeFateOperation方法→Client类的实例对象的executeFateOperation方法
→sendBase方法→executeFateOperation_args静态类的实例对象的write方法输出数据给传输协议TProtocol
→传输协议类的write方法具体把数据写出给ThriftServerThriftServer接收到消息后执行对应的操作

最出名的Thrift当然是Hive自己的Hive Server【Standalone】和Hive Server2,还有Spark的Thrift Server,借助它们,可以用JDBC或者Cli的方式去操作Hive。

但是!!!Thrift的初衷就是实现语言无关,毕竟底层只需要能把数据传输到位即可,数据传输并不是Java的特权。

其它语言的Thrift

在这里插入图片描述

service-rpc这个路径下,可以发现有cpp、Java、php、py,rb的包!!!

Hive的官方文档写的很明白:

https://cwiki.apache.org/confluence/display/Hive/HiveClient#HiveClient-ThriftJavaClient

The command line client currently only supports an embedded server. The JDBC and Thrift-Java clients support both embedded and standalone servers. Clients in other languages only support standalone servers.

命令行模式目前只能用于嵌入式服务,JDBC和Thrift-Java的Client可以支持嵌入式和独立部署的服务。别的语言的Client只支持在独立部署的服务使用。

Connection con = DriverManager.getConnection("jdbc:hive://localhost:10000/default", "", "");
Statement stmt = con.createStatement();

这种古代的Hive Server就是嵌入模式。。。

Connection con = DriverManager.getConnection("jdbc:hive2://localhost:10000/default", "", "");

这种Hive Server2就是独立部署模式。

官方还给出了python的案例:

#!/usr/bin/env pythonimport sysfrom hive import ThriftHive
from hive.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocoltry:transport = TSocket.TSocket('localhost', 10000)transport = TTransport.TBufferedTransport(transport)protocol = TBinaryProtocol.TBinaryProtocol(transport)client = ThriftHive.Client(protocol)transport.open()client.execute("CREATE TABLE r(a STRING, b INT, c DOUBLE)")client.execute("LOAD TABLE LOCAL INPATH '/path' INTO TABLE r")client.execute("SELECT * FROM r")while (1):row = client.fetchOne()if (row == None):breakprint rowclient.execute("SELECT * FROM r")print client.fetchAll()transport.close()except Thrift.TException, tx:print '%s' % (tx.message)

以及PHP的案例:

<?php
// set THRIFT_ROOT to php directory of the hive distribution
$GLOBALS['THRIFT_ROOT'] = '/lib/php/';
// load the required files for connecting to Hive
require_once $GLOBALS['THRIFT_ROOT'] . 'packages/hive_service/ThriftHive.php';
require_once $GLOBALS['THRIFT_ROOT'] . 'transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . 'protocol/TBinaryProtocol.php';
// Set up the transport/protocol/client
$transport = new TSocket('localhost', 10000);
$protocol = new TBinaryProtocol($transport);
$client = new ThriftHiveClient($protocol);
$transport->open();// run queries, metadata calls etc
$client->execute('SELECT * from src');
var_dump($client->fetchAll());
$transport->close();

Ruby好歹也给了个参考: https://github.com/forward3d/rbhive

至于Java、C++就不给Client的案例了。。。也是很容易理解。。。毕竟Java有JDBC和高层API,一般不会有人去用底层API了。

如果是做平台开发或者组件开发这种真正用得上底层API的情况,地方支援中央发型的老Java程序猿,查API填参数让程序跑起来,这点工程能力还是有的。

至于C++程序猿强悍的造轮子功力,没准像临摹Kafka的Red Panda那样,哪天也照猫画虎折腾出个C++版的Hive。。。

既然可以通过Thrift实现语言无关,那么调用组件就不必局限于Java或者Scala。而造轮子从来也不是Java和Scala的专利。

这就是为神马大数据开发可以脱离Java和Scala。

尾言

大数据并不是趋向SQL化,只是为了扩大受众群体,让广大技术水平不高的业务开发人员也能吃上大数据技术的红利。且SQL在处理结构化表的特定场景下开发效率更高。
但是。。。哪怕是这种极度细分的场景,SQL还是有很多缺陷,虽然API的方式也没有好到哪里去。

造轮子和组件调用,就更是语言无关的事情了。。。编程语言往往只是个表达思想的载体,技术栈足够全面才有做选择的权力。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129742904
在这里插入图片描述

相关文章:

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala 前言 【本文适合有一定计算机基础/半年工作经验的读者食用。立个Flg&#xff0c;愿天下不再有肤浅的SQL Boy】 谈到大数据开发&#xff0c;占据绝大多数人口的就是SQL Boy&#xff0c;不接受反驳&#xff0c;毕竟大…...

RocketMQ 事务消息 原理及使用方法解析

&#x1f34a; Java学习&#xff1a;Java从入门到精通总结 &#x1f34a; 深入浅出RocketMQ设计思想&#xff1a;深入浅出RocketMQ设计思想 &#x1f34a; 绝对不一样的职场干货&#xff1a;大厂最佳实践经验指南 &#x1f4c6; 最近更新&#xff1a;2023年3月24日 &#x…...

为什么 ChatGPT 输出时经常会中断,需要输入“继续” 才可以继续输出?

作者&#xff1a;明明如月学长&#xff0c; CSDN 博客专家&#xff0c;蚂蚁集团高级 Java 工程师&#xff0c;《性能优化方法论》作者、《解锁大厂思维&#xff1a;剖析《阿里巴巴Java开发手册》》、《再学经典&#xff1a;《EffectiveJava》独家解析》专栏作者。 热门文章推荐…...

PyTorch 之 基于经典网络架构训练图像分类模型

文章目录一、 模块简单介绍1. 数据预处理部分2. 网络模块设置3. 网络模型保存与测试二、数据读取与预处理操作1. 制作数据源2. 读取标签对应的实际名字3. 展示数据三、模型构建与实现1. 加载 models 中提供的模型&#xff0c;并且直接用训练的好权重当做初始化参数2. 参考 pyto…...

Scrapy的callback进入不了回调方法

一、前言 有的时候&#xff0c;Scrapy的callback方法直接被略过了&#xff0c;不去执行其中的回调方法&#xff0c;可能排查好久都排查不出来&#xff0c;我来教大家集中解决方法。 yield Request(urlurl, callbackself.parse_detail, cb_kwargs{item: item})二、解决方法 1…...

第二十一天 数据库开发-MySQL

目录 数据库开发-MySQL 前言 1. MySQL概述 1.1 安装 1.2 数据模型 1.3 SQL介绍 1.4 项目开发流程 2. 数据库设计-DDL 2.1 数据库操作 2.2 图形化工具 2.3 表操作 3. 数据库操作-DML 3.1 增加(insert) 3.2 修改(update) 3.3 删除(delete) 数据库开发-MySQL 前言 …...

蓝桥杯每日一真题—— [蓝桥杯 2021 省 AB2] 完全平方数(数论,质因数分解)

文章目录[蓝桥杯 2021 省 AB2] 完全平方数题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1样例 #2样例输入 #2样例输出 #2提示思路&#xff1a;理论补充&#xff1a;完全平方数的一个性质&#xff1a;完全平方数的质因子的指数一定为偶数最终思路&#xff1a;小插曲&am…...

Linux编辑器-vim

一、vim简述1&#xff09;vi/vim2&#xff09;检查vim是否安装2)如何用vim打开文件3)vim的几种模式命令模式插入模式末行模式可视化模式二、vim的基本操作1)进入vim&#xff08;命令行模式&#xff09;2)[命令行模式]切换至[插入模式]3)[插入模式]切换至[命令行模式]4)[命令行模…...

5G将在五方面彻底改变制造业

想象一下这样一个未来&#xff0c;智能机器人通过在工厂车间重新配置自己&#xff0c;从多条生产线上组装产品。安全无人机处理着从监视入侵者到确认员工停车等繁琐的任务。自动驾驶汽车不仅可以在建筑物之间运输零部件&#xff0c;还可以在全国各地运输。工厂检查可以在千里之…...

http和https的区别?

http和https的区别&#xff1f;HTTPHTTPSHTTP与HTTPS区别HTTPS相比于HTTP协议的优点和缺点HTTP http是超文本传输协议 HTTP协议是基于传输层的TCP协议进行通信&#xff0c;通用无状态的协议。80端口 HTTPS https—安全的超文本传输协议 是以安全为目标的HTTP通道&#xff0c;…...

【Spring Cloud Alibaba】4.创建服务消费者

文章目录简介开始搭建创建项目修改POM文件添加启动类添加配置项添加Controller添加配置文件启动项目测试访问Nacos访问接口查看端点检查简介 接下来我们创建一个服务消费者&#xff0c;本操作先要完成之前的步骤&#xff0c;详情请参照【Spring Cloud Alibaba】Spring Cloud A…...

C语言——动态内存管理 malloc、calloc、realloc、free的使用

目录 一、为什么存在动态内存分配 二、动态内存函数的介绍 2.1malloc和free 2.2calloc 2.3realloc 三、常见的动态内存错误 3.1对NULL指针的解引用操作 3.2对动态开辟空间的越界访问 3.3对非动态开辟的内存使用free释放 3.4使用free释放一块动态开辟内存的一部分 3.5…...

技术分享——Java8新特性

技术分享——Java8新特性1.背景2. 新特性主要内容3. Lambda表达式4. 四大内置核心函数式接口4.1 Consumer<T>消费型接口4.2 Supplier<T>供给型接口4.3 Function<T,R>函数型接口4.4 Predicate<T> 断定型接口5. Stream流操作5.1 什么是流以及流的类型5.2…...

vue基础知识大全

1&#xff0c;指令作用 以v-开头&#xff0c;由vue提供的attribute&#xff0c;为渲染DOM应用提供特殊的响应式行为&#xff0c;也即是在表达式的值发生变化的时候响应式的更新DOM。其内容为可以被求值的js代码&#xff0c;可以写在return后面被返回的表达式。 指令的简写指令简…...

第2篇|文献研读|nature climate change|减缓气候变化和促进热带生物多样性的碳储量走廊

研究背景 从 2000 年到 2012 年&#xff0c;潮湿和干燥热带地区的森林总损失超过 90,000 平方公里 yr-1&#xff0c;这主要是由农业扩张驱动的。热带森林砍伐向大气中排放 0:95 Pg C yr-1 并导致广泛的生物多样性丧失。保护区的生物多样性取决于与保护区所在的更广泛景观的生态…...

从暴力递归到动态规划(2)小乖,你也在为转移方程而烦恼吗?

前引&#xff1a;继上篇我们讲到暴力递归的过程&#xff0c;这一篇blog我们将继续对从暴力递归到动态规划的实现过程&#xff0c;与上篇类似&#xff0c;我们依然采用题目的方式对其转化过程进行论述。上篇博客&#xff1a;https://blog.csdn.net/m0_65431718/article/details/…...

Leetcode.1638 统计只差一个字符的子串数目

题目链接 Leetcode.1638 统计只差一个字符的子串数目 Rating &#xff1a; 1745 题目描述 给你两个字符串 s和 t&#xff0c;请你找出 s中的非空子串的数目&#xff0c;这些子串满足替换 一个不同字符 以后&#xff0c;是 t串的子串。换言之&#xff0c;请你找到 s和 t串中 恰…...

KoTime:v2.3.9新增线程管理(线程统计、状态查询等)

功能概览 KoTime的开源版本已经迭代到了V2.3.9&#xff0c;目前功能如下&#xff1a; 实时监听方法&#xff0c;统计运行时长web展示方法调用链路&#xff0c;瓶颈可视化追踪追踪系统异常&#xff0c;精确定位到方法接口超时邮件通知&#xff0c;无需实时查看线上热更新&…...

直面风口,未来不仅是中文版ChatGPT,还有AGI大时代在等着我们

说到标题的AI2.0这个概念的研究早在2015年就研究起步了&#xff0c;其实大家早已知道&#xff0c;人工智能技术必然是未来科技发展战略中的重要一环&#xff0c;今天我们就从AI2.0入手&#xff0c;以GPT-4及文心一言的发布为切入角度&#xff0c;来谈一谈即将降临的AGI时代。 关…...

若依微服务(ruoyi-cloud)保姆版容器编排运行

一、简介 项目gitee地址&#xff1a;https://gitee.com/y_project/RuoYi-Cloud 由于该项目运行有很多坑&#xff0c;大家可以在git克隆拷贝到本地后&#xff0c;执行下面的命令使master版本回退到本篇博客的版本&#xff1a; git reset --hard 05ca78e82fb4e074760156359d09a…...

vue2图片预览插件

学习&#xff1a;vue插件开发实例-图片预览插件 vue2-pre-img-plugin的gitee代码 准备工作 准备图片与基础的样式 将iconfont下载的字体图标资源放在src/assets/iconfont目录下将准备预览的图片放到src/static/images目录下 PrevImg.vue 在plugins/PrevImg目录下&#xff…...

手写Promise源码的实现思路

Promise的使用&#xff1a; let promise new Promise((resolve, reject) > {resolve("OK");// reject("Error"); });console.log(promise);promise.then(value > {console.log("success"); }, error > {console.log("fail"…...

【数据结构】-关于树的概念和性质你了解多少??

作者&#xff1a;小树苗渴望变成参天大树 作者宣言&#xff1a;认真写好每一篇博客 作者gitee:gitee 如 果 你 喜 欢 作 者 的 文 章 &#xff0c;就 给 作 者 点 点 关 注 吧&#xff01; 树前言一、树概念及结构1.1树的概念1.2 树的相关概念1.3 树的表示1.4树在实际中的运用…...

【前端之旅】NPM必知必会

一名软件工程专业学生的前端之旅,记录自己对三件套(HTML、CSS、JavaScript)、Jquery、Ajax、Axios、Bootstrap、Node.js、Vue、小程序开发(UniApp)以及各种UI组件库、前端框架的学习。 【前端之旅】Web基础与开发工具 【前端之旅】手把手教你安装VS Code并附上超实用插件…...

Android SQLite使用事务来确保所有语句都以原子方式执行及保证数据完整性一次执行多条语句示例

execSQL 不支持用分号分隔一次执行多个 SQL 语句&#xff0c;虽然理论上可以实现。但是&#xff0c;并不建议这样做&#xff0c;因为这可能会导致潜在的 SQL 注入漏洞。相反&#xff0c;建议使用 execSQL 或 rawQuery 分别执行每个语句。 在下面的代码块中&#xff0c;我们正在…...

nodejs+vue校园超市小卖部零食在线购物商城系统

21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&#xff0c;科学化的管理&#xff0c;使信息存储达到…...

Karl Guttag:论相机对焦技术在AR/VR中的沿用

近期&#xff0c;AR/VR光学专家Karl Guttag介绍了两家在CES 2023展出光学传感技术的公司&#xff1a;poLight和CML&#xff08;剑桥机电一体化&#xff09;。​同时介绍两家公司的原因&#xff0c;是因为他们提供了实现AR/VR“光学微动”&#xff08;Optics Micromovement&…...

ECL@SS学习笔记(3)-概念数据模型

ECLSS 是产品&#xff0c;服务的分类和描述系统。本文介绍其内部的数据模型。ECLSS的作用ECLSS 标准的目标是为了实现工业界数据交换的标准化。这个标准主要作用是产品的分类和描述。分类为了有效地物料管理&#xff0c;供应链管理和电子商务&#xff0c;需要对物料进行分类和编…...

206. 反转链表

给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1] 示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&#xff1a;[2,1] 示例 3&#xff1a; 输…...

文心一言 vs GPT-4 —— 全面横向比较

文心一言 vs GPT-4 —— 全面横向比较 3月15日凌晨&#xff0c;OpenAI发布“迄今为止功能最强大的模型”——GPT-4。我第一时间为大家奉上了体验报告《OpenAI 发布GPT-4——全网抢先体验》。 时隔一日&#xff0c;3月16日下午百度发布大语言模型——文心一言。发布会上&#…...

企业门户定制网站建设公司/seo 0xu

2019独角兽企业重金招聘Python工程师标准>>> 一、初始化本地git项目 1. git init 2. git add -A 3. git commit -m "初始化仓库"####二、在github上创建项目 如&#xff1a; https://github.com/zkj/easyjava.git三、将github上的项目pull下来 git pull o…...

厦门定制网站建设/网络公司名字

关于前端图表开发有很多基于Flash&#xff0c;HTML5&#xff0c;JavaScript的开源组件&#xff0c;而Java开源图表组件JFreeChart也是众所周知&#xff0c;网上关于JFreeChart的资料非常之多&#xff0c;可见其在Java开发中应用的相当广泛。写这篇文章一是为了记录一下关于JFre…...

做平面设计去哪些网站找图/浙江网络推广公司

nginx的ngx_pagespeed是一个前段加速模块 安装需要的nginx依赖环境 yum install gcc gcc-c pcre* zlib-devel openssl-devel gd-devel php php-mysql php-fpm geoip-devel -y 下载nginx的12版本 不建议用高版本的会有bug wget http://nginx.org/download/nginx-1.12.2.tar.gz …...

邯郸教育网站建设/天津关键词优化平台

草稿未验证1 epoll编程&#xff0c;如何实现高并发服务器开发&#xff1f; - 知乎 https://www.zhihu.com/question/21516827/answer/55127881 nginx 多进程网络编程的巅峰 memcached 多线程网络编程的巅峰 redis单线程网络编程的巅峰~~ 2 为什么说 event-loop 在 IO 密集型…...

大岭山网站建设公司/企业网站建设公司

已结贴√问题点数&#xff1a;20 回复次数&#xff1a;7Android传感器API之&#xff1a;加速度Accelerometer功能源码加速度传感器&#xff0c;主要是感应手机的运动。捕获三个参数&#xff0c;分别表示空间坐标系中X、Y、Z轴方向上的加速度减去重力加速度在相应轴上的分量&…...

做网站的公司上海/竞价运营是做什么的

研发过程中&#xff0c;文档很重要&#xff0c;但更重要的可能是「惯性思维」 开发到底要不要写文档&#xff08;注释&#xff09;&#xff0c;要写多少文档&#xff0c;要怎么写文档&#xff0c;想必在大家工作的各个阶段都会有不同的体会&#xff0c;不同人也会有不同的意见。…...