做网站美工未来规划,中国机械加工网1717,网站建设分哪几种,寄生虫网站排名代做引入
Apache Hive 是基于Hadoop的数据仓库工具#xff0c;它可以使用SQL来读取、写入和管理存在分布式文件系统中的海量数据。在Hive中#xff0c;HQL默认转换成MapReduce程序运行到Yarn集群中#xff0c;大大降低了非Java开发者数据分析的门槛#xff0c;并且Hive提供命令…引入
Apache Hive 是基于Hadoop的数据仓库工具它可以使用SQL来读取、写入和管理存在分布式文件系统中的海量数据。在Hive中HQL默认转换成MapReduce程序运行到Yarn集群中大大降低了非Java开发者数据分析的门槛并且Hive提供命令行工具和JDBC驱动程序方便用户连接到Hive进行数据分析操作。 严格意义上Hive并不属于计算引擎而是建立在Hadoop生态之上的数据仓库管理工具。它将繁杂的MapReduce作业抽象成SQL使得开发及维护成本大幅降低。得益于HDFS的存储和MapReduce的读写能力Hive展现出了强大的兼容能力、数据吞吐能力和服务稳定性时至今日依然是大数据架构中不可或缺的一部分。 Hive的核心特点 Hive是基于Hadoop的数仓工具底层数据存储在HDFS中 Hive提供标准SQL功能支持SQL语法访问操作数据 Hive适合OLAP数据分析场景不适合OLTP数据处理场景所以适合数据仓库构建 HQL默认转换成MapReduce任务执行也可以配置转换成Apache Spark、Apache Tez任务运行 Hive中支持定义UDF、UDAF、UDTF函数扩展功能。
Hive的架构设计 Hive用户接口
访问Hive可以通过CLI、Beeline、JDBC/ODBC、WebUI几种方式。在Hive早期版本中可以使用Hive CLI来操作HiveHive CLI并发性能差、脚本执行能力有限并缺乏JDBC驱动支持从Hive 4.x版本起废弃了Hive CLI推荐使用Beeline。Beeline是一个基于JDBC的Hive客户端支持并发环境、复杂脚本执行、JDBC驱动等在Hive集群内连接Hive可以使用Beeline方式。在Hive集群外通过代码或者工具连接操作Hive时可以通过JDBC/ODBC方式。通过WebUI方式可以通过浏览器查看到Hive集群的一些信息。
HiveServer2服务
HiveServer2服务提供JDBC/ODBC接口主要用于代理远程客户端对Hive的访问是一种基于Thrift协议的服务。例如通过JDBC或者Beeline连接访问Hive时就需要启动HiveServer2服务就算Beeline访问本机上的Hive服务也需要启动HiveServer2服务。
HiveServer2代理远程客户端对Hive操作时会涉及到操作HDFS数据就会有操作权限问题那么操作HDFS中数据的用户是启动HiveServer2的用户还是远程客户端的用户需要通过“hive.server2.enable.doAs” 参数决定该参数默认为true表示HiveServer2操作HDFS时的用户为远程客户端用户如果设置为false表示操作HDFS数据的用户为启动HiveServer2的用户。
MetaStore服务
MetaStore服务负责存储和管理Hive元数据为HiverServer2提供元数据访问接口。Hive中的元数据包括表的名字表的列和分区及其属性表的属性表拥有者、是否为外部表等表的数据所在目录等。
Hive MetaStore可以将元数据存储在mysql、derby数据库中。
Hive Driver
Driver中包含解释器SQL Parser、编译器Compiler、优化器Optimizer负责完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中并在随后有执行器Executor调用MapReduce执行。
对于Hive有了一个初步认识我们下面开始梳理Hive的执行原理。
Hive的执行原理
Hive无论采用哪种调用方式最终都会辗转到org.apache.hadoop.hive.ql.Driver类。SQL语句在Driver类中通过Antlr框架进行解析编译将SQL转换成最终执行的MapReduce任务。
如果直接盲目的去看Driver类的代码会很容易看懵逼我们需要再往前一点。
SQLOperation
先看org.apache.hive.service.cli.operation.SQLOperation 类它负责创建Driver对象、编译SQL、异步执行SQL。其中核心的就是 runInternal()方法主要进行如下两个步骤
Driver对象创建并编译SQL将SQL编译成Query Plan执行计划。对QueryPaln 进行处理转换成MR 任务执行。
runInternal() 方法源码内容如下 /*** 内部运行方法用于执行SQL操作。** throws HiveSQLException 如果在执行过程中发生Hive SQL异常。*/public void runInternal() throws HiveSQLException {// 设置操作状态为PENDINGsetState(OperationState.PENDING);// 判断是否应该异步运行boolean runAsync shouldRunAsync();// 判断是否应该异步编译final boolean asyncPrepare runAsync HiveConf.getBoolVar(queryState.getConf(),HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);// 如果不是异步编译则同步准备查询if (!asyncPrepare) {//创建Driver对象编译SQL//Driver经过SQL - AST(抽象语法树) - QueryBlock(查询块) - Operator(e逻辑执行计划) - TaskTree(物理执行计划) - QueryPlan(查询计划)prepare(queryState);}// 如果不是异步运行则同步运行查询if (!runAsync) {runQuery();} else {// 我们将在后台线程中传递ThreadLocals从前台处理程序线程传递。// 1) ThreadLocal Hive对象需要在后台线程中设置// 2) Hive中的元数据存储客户端与正确的用户相关联。// 3) 当前UGI将在元数据存储处于嵌入式模式时被元数据存储使用Runnable work new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);try {// 如果没有可用的后台线程来运行此操作此提交将阻塞Future? backgroundHandle getParentSession().submitBackgroundOperation(work);// 设置后台操作句柄setBackgroundHandle(backgroundHandle);} catch (RejectedExecutionException rejected) {// 设置操作状态为ERRORsetState(OperationState.ERROR);// 抛出HiveSQLException异常throw new HiveSQLException(The background threadpool cannot accept new task for execution, please retry the operation, rejected);}}}1.Driver对象创建并编译SQL将SQL编译成Query Plan执行计划
其中核心的是prepare()方法它的源码在2.x和3.x、4.x有一些区别不过其核心功能是没变的主要是创建Driver对象并编译SQL然后通过Driver将SQL最终转换成Query Plan。
prepare()方法3.x的源码如下 /*** 准备执行SQL查询的操作。* 此方法负责初始化Driver设置查询超时编译查询语句并处理可能的异常。** param queryState 包含查询状态信息的对象。* throws HiveSQLException 如果在准备过程中发生Hive SQL异常。*/public void prepare(QueryState queryState) throws HiveSQLException {// 设置操作状态为运行中setState(OperationState.RUNNING);try {// 创建Driver实例返回的Driver对象是 ReExecDriverdriver DriverFactory.newDriver(queryState, getParentSession().getUserName(), queryInfo);// 如果查询超时时间大于0则启动一个定时任务来取消查询if (queryTimeout 0) {// 创建一个单线程的定时任务执行器timeoutExecutor new ScheduledThreadPoolExecutor(1);// 创建一个定时任务在查询超时后取消查询Runnable timeoutTask new Runnable() {Overridepublic void run() {try {// 获取查询IDString queryId queryState.getQueryId();// 记录日志查询超时并取消执行LOG.info(Query timed out after: queryTimeout seconds. Cancelling the execution now: queryId);// 取消查询SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {// 记录日志取消查询时发生错误LOG.error(Error cancelling the query after timeout: queryTimeout seconds, e);} finally {// 关闭定时任务执行器timeoutExecutor.shutdown();}}};// 安排定时任务在查询超时后执行timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);}// 设置查询显示信息queryInfo.setQueryDisplay(driver.getQueryDisplay());// 设置操作句柄信息以便Thrift API用户可以使用操作句柄查找Yarn ATS中的查询信息String guid64 Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier().toTHandleIdentifier().getGuid()).trim();driver.setOperationId(guid64);// 编译SQL查询并响应 ReExecDriver.compileAndRespond(...) - Driver.compileAndRespond(...)response driver.compileAndRespond(statement);// 如果响应代码不为0则抛出异常if (0 ! response.getResponseCode()) {throw toSQLException(Error while compiling statement, response);}// 设置是否有结果集setHasResultSet(driver.hasResultSet());} catch (HiveSQLException e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw e;} catch (Throwable e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw new HiveSQLException(Error running query: e.toString(), e);}}2.x与3.x源码最核心的区别就是在创建Driver其对应源码是 driver new Driver(queryState, getParentSession().getUserName()); 而4.x与3.x源码最核心的区别如下 利用 Java 8 的 Lambda 表达式特性简化代码逻辑提高代码的可读性和可维护性。通过将 queryTimeout 的类型改为 long支持了更大的超时值避免了溢出问题。在资源管理方面对调度器的生命周期管理也进行了优化不需要显式的关闭操作。 4.x对应源码是 if (queryTimeout 0L) {timeoutExecutor Executors.newSingleThreadScheduledExecutor();timeoutExecutor.schedule(() - {try {final String queryId queryState.getQueryId();log.info(Query timed out after: {} seconds. Cancelling the execution now: {}, queryTimeout, queryId);SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {log.error(Error cancelling the query after timeout: {} seconds, queryTimeout, e);}return null;}, queryTimeout, TimeUnit.SECONDS);
} 在DriverFactory.newDriver()方法中返回 ReExecDriver对象该对象表示执行过程失败可重试的Driver对象然后调用 Driver.compileAndRespond() 方法进行编译SQL。
2.对QueryPaln 进行处理转换成MR 任务执行
BackgroundWork是一个线程负责异步处理QueryPlan通过submitBackgroundOperation(work)提交运行执行到SQLOperator.BackgroundOperation.run()方法最终调用到Driver.run() 方法。
Driver
下面我们再来Driver类它在不同版本中也有一些差别比如2.x版本是直接 implements CommandProcessor而3.x和4.x版本则是implements IDriver而IDriver 则是 extends CommandProcessor。本质是为了更好的解耦和扩展性使得代码更加模块化易于维护和扩展。同时通过继承 CommandProcessor 接口也保持了与旧版本的兼容性确保了功能的连续性。不过其核心功能是没变的主要包含编译、优化及执行。
执行步骤
为了方便理解我们先梳理整个执行步骤如下 通过Antlr解析SQL语法规则和语法解析将SQL语法转换成AST(抽象语法树) 。 遍历AST(抽象语法树) 将其转化成Query Block查询块可以看成查询基本执行单元。 将Query Block(查询块) 转换成OperatorTree逻辑执行计划并进行优化。 OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划每个Task对应一个MR Job任务)。 TaskTree(物理执行计划)最终包装成Query Plan(查询计划)。 简单总结执行流程如下 SQL - AST(抽象语法树) - QueryBlock(查询块) - Operator(逻辑执行计划) - TaskTree(物理执行计划) - QueryPlan(查询计划)。 下面我们再结合SQLOperation调用的Driver类里面的核心方法来看看底层源码是如何实现的
compileAndRespond方法
首先第一个核心方法是
response driver.compileAndRespond(statement);
compileAndRespond()方法2.x源码如下 /*** 编译给定的 SQL 命令并返回一个命令处理器响应。* 此方法调用 compileInternal 方法进行实际的编译操作并使用编译结果创建一个命令处理器响应。** param command 要编译的 SQL 命令* return 包含编译结果的命令处理器响应*/public CommandProcessorResponse compileAndRespond(String command) {return createProcessorResponse(compileInternal(command, false));}3.x和4.x会有些区别会返回以下方法的调用结果
coreDriver.compileAndRespond(statement);
无论哪个版本最终compileAndRespond()方法都会调用到 compileInternal()方法我们继续看2.x版本compileInternal()方法源码如下 private int compileInternal(String command, boolean deferClose) {int ret;// 获取Metrics实例如果存在则增加等待编译操作的计数器Metrics metrics MetricsFactory.getInstance();if(metrics ! null) {metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 尝试获取编译锁如果获取失败则返回编译锁超时错误码final ReentrantLock compileLock tryAcquireCompileLock(isParallelEnabled, command);if(compileLock null) {return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();}try {// 如果Metrics实例存在减少等待编译操作的计数器if(metrics ! null) {metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 进行Hive SQL编译ret compile(command, true, deferClose);} finally {// 无论编译结果如何最终都要释放编译锁compileLock.unlock();}// 如果编译失败尝试释放锁并回滚事务if(ret ! 0) {try {releaseLocksAndCommitOrRollback(false, null);} catch(LockException e) {// 记录释放锁时的异常信息LOG.warn(Exception in releasing locks. org.apache.hadoop.util.StringUtils.stringifyException(e));}}// 保存编译时的性能日志用于WebUI展示// 执行时的性能日志由另一个线程的PerfLogger或重置后的PerfLogger完成PerfLogger perfLogger SessionState.getPerfLogger();queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());return ret;}3.x和4.x的源码相比起来有一些区别但是都是通过执行Driver.compile()方法由于4.x代码这块改动较大做了很多解耦的操作其核心内容还是变化不大加上目前几乎很少应用4.x版本的hive下面我们重点看看2.x和3.x版本的compile()方法内容。
compile方法
compile()方法2.x源码如下
/*** 编译一个新的查询可选择重置任务ID计数器并决定是否延迟关闭。* * param command 要编译的HiveQL查询。* param resetTaskIds 如果为true则重置任务ID计数器。* param deferClose 如果为true则在编译过程被中断时延迟关闭/销毁操作。* return 0表示编译成功否则返回错误代码。*/
// deferClose 表示当进程被中断时是否应该推迟关闭/销毁操作。如果 compile 方法是在另一个方法如 runInternal中被调用并且该方法会将关闭操作推迟到其内部处理那么 deferClose 应该设置为 true。
public int compile(String command, boolean resetTaskIds, boolean deferClose) {// 获取性能日志记录器并开始记录编译过程的性能PerfLogger perfLogger SessionState.getPerfLogger(true);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);// 锁定驱动状态将驱动状态设置为编译中lDrvState.stateLock.lock();try {lDrvState.driverState DriverState.COMPILING;} finally {lDrvState.stateLock.unlock();}// 对查询命令进行变量替换command new VariableSubstitution(new HiveVariableSource() {Overridepublic MapString, String getHiveVariable() {return SessionState.get().getHiveVariables();}}).substitute(conf, command);// 存储查询字符串String queryStr command;try {// 对查询命令进行脱敏处理避免记录敏感数据queryStr HookUtils.redactLogString(conf, command);} catch(Exception e) {// 若脱敏失败记录警告信息LOG.warn(WARNING! Query command could not be redacted. e);}// 检查编译过程是否被中断若中断则处理中断并返回错误代码if(isInterrupted()) {return handleInterruption(at beginning of compilation.); //indicate if need clean resource}// 如果上下文不为空且解释分析状态不为运行中则关闭现有上下文if(ctx ! null ctx.getExplainAnalyze() ! AnalyzeState.RUNNING) {// close the existing ctx etc before compiling a new query, but does not destroy drivercloseInProcess(false);}// 如果需要重置任务ID则重置任务工厂的IDif(resetTaskIds) {TaskFactory.resetId();}// 获取查询IDString queryId conf.getVar(HiveConf.ConfVars.HIVEQUERYID);// 保存查询信息用于Web UI显示this.queryDisplay.setQueryStr(queryStr);this.queryDisplay.setQueryId(queryId);// 记录编译开始信息LOG.info(Compiling command(queryId queryId ): queryStr);// 设置查询的当前时间戳SessionState.get().setupQueryCurrentTimestamp();// 标记编译过程中是否发生错误boolean compileError false;try {// 初始化事务管理器final HiveTxnManager txnManager SessionState.get().initTxnMgr(conf);// 移除旧的关闭hookShutdownHookManager.removeShutdownHook(shutdownRunner);// 创建新的关闭hook用于在JVM关闭时释放锁shutdownRunner new Runnable() {Overridepublic void run() {try {releaseLocksAndCommitOrRollback(false, txnManager);} catch(LockException e) {// 若释放锁时发生异常记录警告信息LOG.warn(Exception when releasing locks in ShutdownHook for Driver: e.getMessage());}}};// 添加新的关闭hookShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);// 再次检查编译过程是否被中断if(isInterrupted()) {return handleInterruption(before parsing and analysing the query);}// 如果上下文为空则创建新的上下文if(ctx null) {ctx new Context(conf);}// 设置上下文的重试次数、命令和HDFS清理标志ctx.setTryCount(getTryCount());ctx.setCmd(command);ctx.setHDFSCleanup(true);/*** 把 HQL命令 翻译成一个 ASTNode Tree* 封装了 ParseDriver 对 HQL 的解析工作* ParseDriver 对 command 进行词法分析和语法解析统称为语法分析返回一个抽象语法树AST*/// 开始记录解析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);// 解析查询命令得到抽象语法树ASTNode tree ParseUtils.parse(command, ctx);// 结束记录解析过程的性能perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);// 加载查询hookqueryHooks loadQueryHooks();// 如果查询hook不为空且不为空列表则触发查询hook的编译前操作if(queryHooks ! null !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.beforeCompile(qhc);}}// 开始记录语义分析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);// 获取语义分析器BaseSemanticAnalyzer sem SemanticAnalyzerFactory.get(queryState, tree);// 获取语义分析hookListHiveSemanticAnalyzerHook saHooks getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, HiveSemanticAnalyzerHook.class);// 刷新元数据存储缓存确保获取最新的元数据Hive.get().getMSC().flushCache();// 进行语义分析和计划生成if(saHooks ! null !saHooks.isEmpty()) {HiveSemanticAnalyzerHookContext hookCtx new HiveSemanticAnalyzerHookContextImpl();hookCtx.setConf(conf);hookCtx.setUserName(userName);hookCtx.setIpAddress(SessionState.get().getUserIpAddress());hookCtx.setCommand(command);hookCtx.setHiveOperation(queryState.getHiveOperation());// 触发语义分析hook的预分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {tree hook.preAnalyze(hookCtx, tree);}/*** sem 是一个 SemanticAnalyzer(语义分析器) 对象* 主要的工作是将 ASTNode 转化为 TaskTree包括可能的 optimize过程比较复杂** tree: AST 抽象语法树 TaskTree* TaskTree 物理执行计划** 把抽象语法树交给 SemanticAnalyzer 执行语法解析* 1、从 AST 转成 解析树* 2、通过解析树 再生成 QB 在查询快* 3、从 QB 树在生成 OperatorTree (Logical Plan)* 4、逻辑执行计划的优化* 5、OperatorTree转变成TaskTree* 6、再针对物理执行计划执行优化* 7、生成QueryPlan*/// 进行语义分析sem.analyze(tree, ctx);// 更新hook上下文hookCtx.update(sem);// 触发语义分析hook的后分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {hook.postAnalyze(hookCtx, sem.getAllRootTasks());}} else {// 若没有语义分析hook直接进行语义分析sem.analyze(tree, ctx);}// 记录查询中发现的ACID文件接收器acidSinks sem.getAcidFileSinks();// 记录语义分析完成信息LOG.info(Semantic Analysis Completed);// 验证语义分析生成的计划是否有效sem.validate();// 检查查询中是否包含ACID操作acidInQuery sem.hasAcidInQuery();// 结束语义分析阶段的性能日志记录perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);// 检查编译过程是否被中断如果中断则处理中断情况并返回if(isInterrupted()) {return handleInterruption(after analyzing query.);}// 根据语义分析结果和配置信息获取查询的输出模式schema getSchema(sem, conf);/*** 把 TaskTree 生成一个 QueryPlan* 通过 Exeuctor 提交的方法要接受的参数就是 QueryPlan*/// 根据查询字符串、语义分析器、开始时间、查询ID、操作类型和输出模式创建查询计划plan new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);// 设置查询字符串到配置中conf.setQueryString(queryStr);// 设置MapReduce工作流ID到配置中conf.set(mapreduce.workflow.id, hive_ queryId);// 设置MapReduce工作流名称到配置中conf.set(mapreduce.workflow.name, queryStr);// 如果查询计划中包含FetchTask则对其进行初始化if(plan.getFetchTask() ! null) {plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());}// 进行授权检查如果语义分析不跳过授权且开启了授权功能if(!sem.skipAuthorization() HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {try {// 开始记录授权过程的性能日志perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);// 执行授权操作doAuthorization(queryState.getHiveOperation(), sem, command);} catch(AuthorizationException authExp) {// 如果授权失败打印错误信息并设置错误状态和返回码console.printError(Authorization failed: authExp.getMessage() . Use SHOW GRANT to get more details.);errorMessage authExp.getMessage();SQLState 42000;return 403;} finally {// 结束记录授权过程的性能日志perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);}}// 如果配置中开启了记录EXPLAIN输出的功能if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 获取查询的EXPLAIN输出String explainOutput getExplainOutput(sem, plan, tree);if(explainOutput ! null) {if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 记录EXPLAIN输出到日志中LOG.info(EXPLAIN output for queryid queryId : explainOutput);}if(conf.isWebUiQueryInfoCacheEnabled()) {// 如果开启了Web UI查询信息缓存将EXPLAIN计划设置到查询显示信息中queryDisplay.setExplainPlan(explainOutput);}}}// 编译成功返回0return 0;} catch(Exception e) {// 如果编译过程中被中断处理中断情况并返回if(isInterrupted()) {return handleInterruption(during query compilation: e.getMessage());}// 标记编译过程出现错误compileError true;// 获取错误信息ErrorMsg error ErrorMsg.getErrorMsg(e.getMessage());// 构建错误消息errorMessage FAILED: e.getClass().getSimpleName();if(error ! ErrorMsg.GENERIC_ERROR) {errorMessage [Error error.getErrorCode() ]:;}// HIVE-4889if((e instanceof IllegalArgumentException) e.getMessage() null e.getCause() ! null) {errorMessage e.getCause().getMessage();} else {errorMessage e.getMessage();}if(error ErrorMsg.TXNMGR_NOT_ACID) {errorMessage . Failed command: queryStr;}// 设置SQL状态码SQLState error.getSQLState();// 记录下游错误信息downstreamError e;// 打印错误信息和详细堆栈跟踪console.printError(errorMessage, \n org.apache.hadoop.util.StringUtils.stringifyException(e));// 返回错误代码return error.getErrorCode();// since it exceeds valid range of shell return values} finally {// 触发编译后的hook函数try {if(queryHooks ! null !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.afterCompile(qhc, compileError);}}} catch(Exception e) {// 如果触发hook函数时出现异常记录警告信息LOG.warn(Failed when invoking query after-compilation hook., e);}/*** 计算任务总耗时*/// 结束编译阶段的性能日志记录并计算耗时double duration perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE) / 1000.00;// 获取编译过程中HMS调用的时间统计信息ImmutableMapString, Long compileHMSTimings dumpMetaCallTimingWithoutEx(compilation);// 设置查询显示信息中的HMS时间统计信息queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);// 检查编译过程是否被中断boolean isInterrupted isInterrupted();if(isInterrupted !deferClose) {// 如果被中断且不延迟关闭关闭正在进行的操作closeInProcess(true);}// 锁定驱动状态lDrvState.stateLock.lock();try {if(isInterrupted) {// 如果被中断根据是否延迟关闭设置驱动状态lDrvState.driverState deferClose ? DriverState.EXECUTING : DriverState.ERROR;} else {// 如果未被中断根据编译是否出错设置驱动状态lDrvState.driverState compileError ? DriverState.ERROR : DriverState.COMPILED;}} finally {// 解锁驱动状态lDrvState.stateLock.unlock();}if(isInterrupted) {// 如果编译过程被中断记录中断信息LOG.info(Compiling command(queryId queryId ) has been interrupted after duration seconds);} else {// 如果编译过程未被中断记录编译完成信息LOG.info(Completed compiling command(queryId queryId ); Time taken: duration seconds);}}
}compile()方法在3.x和4.x有一些区别但是都有以下三个核心方法
首先是通过ParseUtils.parse(command, ctx)将Hive SQL转换成AST抽象语法树即HQL - AST抽象语法树转换然后是通过BaseSemanticAnalyzer.analyze()方法将AST抽象语法树解析生成TaskTree(物理执行计划)最后将BaseSemanticAnalyzer传入QueryPlan的构造函数来创建QueryPlan(查询计划)。
其核心正是引入篇我们提到的解析Parsing、校验Validation、优化Optimization和执行Execution。
下面我们深入这几个方法看看
parse方法
在compile()方法中首先是通过ParseUtils.parse(command, ctx)进行词法分析与解析将Hive HQL转换成AST抽象语法树。
我们来看看parse()方法的源码
/*** 解析 HQL。* * 此方法接收一个 Hive 查询命令和上下文对象调用另一个重载的 parse 方法进行实际的解析操作* 并将视图的全限定名参数设为 null。* * param command 要解析的 Hive 查询命令* param ctx 查询的上下文对象* return 解析后的 AST 节点* throws ParseException 如果解析过程中出现异常*/
public static ASTNode parse(String command, Context ctx) throws ParseException {return parse(command, ctx, null);
}继续往里走对应源码如下 /*** 解析HQL * ParseDriver对command进行词法分析和语法解析统称为语法分析返回一个抽象语法树AST* * param command 要解析的Hive查询命令* param ctx 查询上下文信息* param viewFullyQualifiedName 视图的完全限定名称* return 解析后的AST节点* throws ParseException 如果解析过程中出现错误*/public static ASTNode parse(String command, Context ctx, String viewFullyQualifiedName) throws ParseException {// 创建一个ParseDriver实例用于解析命令ParseDriver pd new ParseDriver();// 使用ParseDriver解析命令得到AST节点ASTNode tree pd.parse(command, ctx, viewFullyQualifiedName);// 查找根节点中第一个具有非空令牌的节点tree findRootNonNullToken(tree);// 处理设置列引用的情况handleSetColRefs(tree);// 返回处理后的AST节点return tree;}在pd.parse()方法中核心调用的是HiveLexer和HiveParser这两个类它们分别负责SQL的词法分析和语法解析我们继续看看其中源码
/*** 解析给定的命令字符串将其转换为抽象语法树AST节点。** param command 要解析的命令字符串。* param ctx 解析上下文可包含配置信息和tokens重写流。* param viewFullyQualifiedName 视图的完全限定名称如果不是视图解析则为 null。* return 解析后的 AST 节点。* throws ParseException 如果解析过程中出现错误。*/public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)throws ParseException {// 如果启用了调试日志则记录正在解析的命令if (LOG.isDebugEnabled()) {LOG.debug(Parsing command: command);}/*** Antlr对语法文件 HiveLexer.g 编译后自动生成的词法解析和语法解析类HiveLexerXHiveParser* 文件 HiveLexer.g 定义了一些 hive 的关键字form、where数字的定义格式【0–9】分隔符比较符之类的。* 每一个关键字分支都会变成一个 token。** HiveLexerX 是 antlr 根据词法规则文件通过编译生成的一个代码类* 能够执行词法和语法的解析 * 最终生成一个 ASTNode*/// 创建一个不区分大小写的字符流并使用它初始化词法分析器HiveLexerX lexer new HiveLexerX(new ANTLRNoCaseStringStream(command));/*** 根据词法分析的结果得到tokens的此时不只是单纯的字符串* 而是具有特殊意义的字符串的封装其本身是一个流。* lexer 把 SQL 语句中的各个语法分支都转换成底层引擎能识别的各种 Token*/// 创建一个tokens重写流用于处理词法分析器生成的tokensTokenRewriteStream tokens new TokenRewriteStream(lexer);// 如果提供了上下文则根据是否为视图设置tokens重写流并设置词法分析器的配置if (ctx ! null) {if (viewFullyQualifiedName null) {// 顶层查询ctx.setTokenRewriteStream(tokens);} else {// 这是一个视图ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);}lexer.setHiveConf(ctx.getConf());}// 语法解析 HiveParser是 Antlr 根据 HiveParser.g 生成的文件// 使用tokens重写流初始化语法解析器HiveParser parser new HiveParser(tokens);// 如果提供了上下文则设置解析器的配置if (ctx ! null) {parser.setHiveConf(ctx.getConf());}// 设置解析器的树适配器用于创建 AST 节点parser.setTreeAdaptor(adaptor);// 声明一个变量来存储解析结果HiveParser.statement_return r null;try {/*** 转化为 ASTTree 放在 ASTNode 中的 tree 属性中。 通过 r.getTree() 获取返回。* 当前这句代码完成了从 Tok 树到 AST 的转变* 把结果放在了 HiveParser.statement_return*/// 调用解析器的 statement 方法进行解析r parser.statement();} catch (RecognitionException e) {// 打印异常堆栈跟踪信息e.printStackTrace();// 如果解析过程中出现识别异常则抛出解析异常throw new ParseException(parser.errors);}// 检查词法分析器和解析器是否有错误if (lexer.getErrors().size() 0 parser.errors.size() 0) {// 如果没有错误则记录解析完成的日志LOG.debug(Parse Completed);} else if (lexer.getErrors().size() ! 0) {// 如果词法分析器有错误则抛出解析异常throw new ParseException(lexer.getErrors());} else {// 如果解析器有错误则抛出解析异常throw new ParseException(parser.errors);}// 获取解析结果的树并将其转换为 AST 节点ASTNode tree (ASTNode) r.getTree();// 设置 AST 节点的未知tokens边界tree.setUnknownTokenBoundaries();// 返回解析后的 AST 节点return tree;}pd.parse()方法将sql语法转换成抽象语法树 ASTHive中通过使用 AntlrAnother Tool for Language Recognition进行词法分析和语法解析。 Antlr主要作用 词法分析将输入的HiveQL查询字符串分解成一系列的Token这些Token是语法分析的基础。Antlr生成的词法分析器Lexer负责将输入的HiveQL查询字符串分解成一个个Token这些Token表示查询中的关键字、标识符、运算符等基本元素。语法解析根据词法分析器生成的Token序列解析HiveQL查询语句生成AST抽象语法树。Antlr生成的语法解析器Parser负责读取Token序列并根据语法规则解析这些Token生成对应的AST抽象语法树。Token 对应 SQL中的每个关键字。 analyze方法
通过上一个步骤并获取到 ASTNode之后需要对其进行进一步的抽象和结构化处理以便能够更便捷地将其转换为MapReduce程序。为此将会初始化类BaseSemanticAnalyzer并通过SemanticAnalyzerFactory确定SQL的类型进而调用analyze()方法进行分析其对应源码如下
BaseSemanticAnalyzer sem SemanticAnalyzerFactory.get(queryState, tree);
sem.analyze(tree, ctx);
其中 sem 是一个 SemanticAnalyzer语义分析器对象主要的工作是将 ASTNode 转化为 TaskTree物理执行计划包括可能的 optimize优化也就是前面执行步骤第2~5步做的内容。
首先看analyze()对应源码如下 /*** 分析给定的抽象语法树AST节点并使用提供的上下文进行初始化。* * 此方法首先初始化上下文然后初始化分析器的内部状态。* 最后调用 analyzeInternal 方法对 AST 进行实际的分析。* * param ast 要分析的抽象语法树节点。* param ctx 分析过程中使用的上下文。* throws SemanticException 如果在分析过程中发生语义错误。*/public void analyze(ASTNode ast, Context ctx) throws SemanticException {// 初始化上下文initCtx(ctx);// 初始化分析器的内部状态清除部分缓存init(true);// 调用内部分析方法对 AST 进行分析analyzeInternal(ast);}可以看到除了进行必要的初始化之外还会调用analyzeInternal()方法对应源码如下 /*** 对抽象语法树AST进行内部语义分析。* 此方法为抽象方法具体实现需在子类中完成。* 它负责对传入的AST进行详细的语义分析以确保查询语句的合法性和正确性。** param ast 待分析的抽象语法树节点* throws SemanticException 如果在语义分析过程中发现错误*/public abstract void analyzeInternal(ASTNode ast) throws SemanticException;可以看到analyzeInternal()是一个抽象方法它有多种具体实现通过断点查看会发现流程是跳转到了org.apache.hadoop.hive.ql.parse.SemanticAnalyzer类其源码注释如下 Implementation of the semantic analyzer. It generates the query plan. There are other specific semantic analyzers for some hive operations such as DDLSemanticAnalyzer for ddl operations. 翻译 语义分析器的实现。它用于生成查询计划。 对于某些 Hive 操作还有其他特定的语义分析器例如用于 DDL 操作的 DDLSemanticAnalyzer。 这个类有点复杂Hive优化的秘密全在于此将AST抽象语法树解析生成TaskTree物理执行计划的全流程包括逻辑执行计划、逻辑执行计划的优化、物理执行计划的切分、物理执行计划的优化、以及 MapReduce 任务的生成全部都在其中下面我们就看看其中实现的analyzeInternal()方法源码 /*** 对传入的AST节点进行内部分析生成查询计划。** param ast 抽象语法树节点* param pcf 计划上下文工厂* throws SemanticException 语义分析异常*/void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {LOG.info(Starting Semantic Analysis);// 1. 从语法树生成解析树boolean needsTransform needsTransform();// 改变位置别名处理的位置processPositionAlias(ast);PlannerContext plannerCtx pcf.create();if (!genResolvedParseTree(ast, plannerCtx)) {return;}if (HiveConf.getBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY)) {for (String alias : qb.getSubqAliases()) {removeOBInSubQuery(qb.getSubqForAlias(alias));}}// 检查查询结果缓存。// 如果不需要进行掩码/过滤则可以在生成操作符树和进行CBO之前检查缓存。// 否则必须等到掩码/过滤步骤之后。boolean isCacheEnabled isResultsCacheEnabled();QueryResultsCache.LookupInfo lookupInfo null;if (isCacheEnabled !needsTransform queryTypeCanUseCache()) {lookupInfo createLookupInfoForQuery(ast);if (checkResultsCache(lookupInfo)) {return;}}ASTNode astForMasking;if (isCBOExecuted() needsTransform (qb.isCTAS() || qb.isView() || qb.isMaterializedView() || qb.isMultiDestQuery())) {// 如果使用CBO并且可能应用掩码/过滤策略则创建ast的副本。// 原因是操作符树的生成可能会修改初始ast但如果需要第二次解析我们希望解析未修改的ast。astForMasking (ASTNode) ParseDriver.adaptor.dupTree(ast);} else {astForMasking ast;}// 2. 从解析树生成操作符树Operator sinkOp genOPTree(ast, plannerCtx);boolean usesMasking false;if (!unparseTranslator.isEnabled() (tableMask.isEnabled() analyzeRewrite null)) {// 在这里重写 * 以及掩码表ASTNode rewrittenAST rewriteASTWithMaskAndFilter(tableMask, astForMasking, ctx.getTokenRewriteStream(),ctx, db, tabNameToTabObject, ignoredTokens);if (astForMasking ! rewrittenAST) {usesMasking true;plannerCtx pcf.create();ctx.setSkipTableMasking(true);init(true);// 改变位置别名处理的位置processPositionAlias(rewrittenAST);genResolvedParseTree(rewrittenAST, plannerCtx);if (this instanceof CalcitePlanner) {((CalcitePlanner) this).resetCalciteConfiguration();}sinkOp genOPTree(rewrittenAST, plannerCtx);}}// 检查查询结果缓存// 在需要进行行或列掩码/过滤的情况下不支持缓存。// TODO: 为带有掩码/过滤的查询启用缓存if (isCacheEnabled needsTransform !usesMasking queryTypeCanUseCache()) {lookupInfo createLookupInfoForQuery(ast);if (checkResultsCache(lookupInfo)) {return;}}// 3. 推导结果集模式if (createVwDesc ! null !this.ctx.isCboSucceeded()) {resultSchema convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());} else {// 如果满足以下条件resultSchema将为null// (1) cbo被禁用// (2) 或者cbo启用但使用AST返回路径无论是否成功resultSchema都将重新初始化// 只有在cbo启用且使用新返回路径并且成功时resultSchema才不为null。if (resultSchema null) {resultSchema convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));}}// 4. 为优化器和物理编译器生成解析上下文copyInfoToQueryProperties(queryProperties);ParseContext pCtx new ParseContext(queryState, opToPartPruner, opToPartList, topOps,// 使用菱形操作符简化泛型类型声明new HashSet(joinContext.keySet()),// 使用菱形操作符简化泛型类型声明new HashSet(smbMapJoinContext.keySet()),loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,queryProperties, viewProjectToTableSchema, acidFileSinks);// 在解析上下文中设置半连接提示pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList()));// 如果需要禁用映射连接提示则设置pCtx.setDisableMapJoin(disableMapJoinWithHint(getQB().getParseInfo().getHintList()));// 5. 处理视图创建if (createVwDesc ! null) {if (ctx.getExplainAnalyze() AnalyzeState.RUNNING) {return;}if (!ctx.isCboSucceeded()) {saveViewDefinition();}// 此时验证创建视图语句createVwDesc包含语义检查所需的所有信息validateCreateView();if (createVwDesc.isMaterialized()) {createVwDesc.setTablesUsed(getTablesUsed(pCtx));} else {// 由于我们只是创建视图不执行它因此不需要优化或转换计划实际上这些过程可能会干扰视图创建。所以跳过此方法的其余部分。ctx.setResDir(null);ctx.setResFile(null);try {PlanUtils.addInputsForView(pCtx);} catch (HiveException e) {throw new SemanticException(e);}// 为创建视图语句生成谱系信息// 如果配置了LineageLoggerhook。// 添加计算谱系信息的转换。SetString postExecHooks Sets.newHashSet(Splitter.on(,).trimResults().omitEmptyStrings().split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS))));if (postExecHooks.contains(org.apache.hadoop.hive.ql.hooks.PostExecutePrinter)|| postExecHooks.contains(org.apache.hadoop.hive.ql.hooks.LineageLogger)|| postExecHooks.contains(org.apache.atlas.hive.hook.HiveHook)) {// 使用菱形操作符简化泛型类型声明ArrayListTransform transformations new ArrayList();transformations.add(new HiveOpConverterPostProc());transformations.add(new Generator(postExecHooks));for (Transform t : transformations) {pCtx t.transform(pCtx);}// 我们仅使用视图名称作为位置。queryState.getLineageState().mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);}return;}}// 6. 如果需要生成表访问统计信息if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {TableAccessAnalyzer tableAccessAnalyzer new TableAccessAnalyzer(pCtx);setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());}// 7. 执行逻辑优化if (LOG.isDebugEnabled()) {LOG.debug(Before logical optimization\n Operator.toString(pCtx.getTopOps().values()));}// 创建一个优化器实例并对解析上下文进行逻辑优化。Optimizer optm new Optimizer();// 设置优化器的解析上下文optm.setPctx(pCtx);// 初始化优化器optm.initialize(conf);// 执行优化操作并更新解析上下文pCtx optm.optimize();// 检查优化后的解析上下文中是否包含列访问信息if (pCtx.getColumnAccessInfo() ! null) {// 设置列访问信息用于视图列授权setColumnAccessInfo(pCtx.getColumnAccessInfo());}// 如果启用了调试日志则输出优化后的操作符树信息if (LOG.isDebugEnabled()) {LOG.debug(After logical optimization\n Operator.toString(pCtx.getTopOps().values()));}// 8. Generate column access stats if required - wait until column pruning// takes place during optimization// 检查是否需要收集列访问信息用于授权或统计boolean isColumnInfoNeedForAuth SessionState.get().isAuthorizationModeV2() HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);if (isColumnInfoNeedForAuth|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {// 创建列访问分析器实例ColumnAccessAnalyzer columnAccessAnalyzer new ColumnAccessAnalyzer(pCtx);// 分析列访问信息并更新列访问信息// view column access info is carried by this.getColumnAccessInfo().setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));}// 9. Optimize Physical op tree Translate to target execution engine (MR,// TEZ..)// 检查是否需要进行逻辑解释如果不需要则进行物理操作树的优化和编译if (!ctx.getExplainLogical()) {// 获取任务编译器实例TaskCompiler compiler TaskCompilerFactory.getCompiler(conf, pCtx);// 初始化任务编译器compiler.init(queryState, console, db);// 编译解析上下文生成任务和输入输出信息compiler.compile(pCtx, rootTasks, inputs, outputs);// 获取获取任务fetchTask pCtx.getFetchTask();}//find all Acid FileSinkOperatorS// 创建查询计划后处理器实例但该实例未被使用后续可考虑移除QueryPlanPostProcessor qp new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers// 查找根任务列表中的第一个TezTaskfinal OptionalTezTask optionalTezTask rootTasks.stream().filter(task - task instanceof TezTask).map(task - (TezTask) task).findFirst();if (optionalTezTask.isPresent()) {// 获取第一个TezTask实例final TezTask tezTask optionalTezTask.get();// 遍历根任务列表为满足条件的DDLWork添加插入提交hook任务rootTasks.stream()// 过滤出工作类型为DDLWork的任务.filter(task - task.getWork() instanceof DDLWork)// 将任务转换为DDLWork类型.map(task - (DDLWork) task.getWork())// 过滤出预插入表描述不为空的DDLWork.filter(ddlWork - ddlWork.getPreInsertTableDesc() ! null)// 获取预插入表描述.map(ddlWork - ddlWork.getPreInsertTableDesc())// 创建插入提交hook描述.map(ddlPreInsertTask - new InsertCommitHookDesc(ddlPreInsertTask.getTable(),ddlPreInsertTask.isOverwrite()))// 为TezTask添加依赖任务.forEach(insertCommitHookDesc - tezTask.addDependentTask(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), insertCommitHookDesc), conf)));}LOG.info(Completed plan generation);// 11. put accessed columns to readEntity// 检查是否需要收集扫描列的统计信息if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {// 将访问的列信息添加到读取实体中putAccessedColumnsToReadEntity(inputs, columnAccessInfo);}// 检查是否启用了查询结果缓存并且查找信息不为空if (isCacheEnabled lookupInfo ! null) {// 检查查询是否可以被缓存if (queryCanBeCached()) {// 创建缓存查询信息QueryResultsCache.QueryInfo queryInfo createCacheQueryInfoForQuery(lookupInfo);// Specify that the results of this query can be cached.// 指定该查询的结果可以被缓存setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));}}}简单总结一下首先输入的是AST抽象语法树主要经历了以下步骤
Generate Resolved Parse tree from syntax tree 从语法树生成解析树Gen OP Tree from resolved Parse Tree 从解析树生成Gen OP树 OperatorTreeDeduce Resultset Schemaselct ...... 每个字段我给你构造成一个 Field推导结果集模式 CBO优化Generate ParseContext for Optimizer Physical compiler 为优化器和物理编译器生成解析上下文Take care of view creation 注意视图创建Generate table access stats if required 生成表访问统计信息如果需要Perform Logical optimization 执行逻辑执行计划的优化Generate column access stats if required - wait until column pruning takes place during optimization 根据需要生成列访问统计信息-等待优化期间进行列裁剪 sql当中写了很多的无用的字段但是最终执行逻辑不需要这些字段就需要列裁剪。Optimize Physical op tree Translate to target execution engine (MR, Spark, TEZ..) 优化物理操作树并转换为目标执行引擎MRTEZ ..put accessed columns to readEntity 将访问的列放入 ReadEntity(要读取的列的信息)if desired check were not going over partition scan limits 如果需要检查我们不会超过分区扫描限制
生成QueryPlan
这一系列操作完成后最后就是把得到的 TaskTree 生成一个 QueryPlan相关源码如下
/*** 创建一个新的 QueryPlan 对象。* * param queryStr 要执行的查询字符串* param sem 语义分析器对象用于对查询进行语义分析* param startTime 驱动程序开始运行的时间通过 perfLogger 获取* param queryId 查询的唯一标识符* param hiveOperation Hive 操作类型* param schema 查询结果的输出模式*/
plan new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);总结
本文介绍了Hive并通过源码梳理了Hive的执行原理其核心正是引入篇我们提到的解析Parsing、校验Validation、优化Optimization和执行Execution。
总结起来主要有以下四个步骤
词法分析与解析 将SQL语法转换成AST(抽象语法树) 语义分析 将AST进行进一步的抽象和结构化处理通过遍历AST(抽象语法树) 将其转化成Query Block逻辑优化 到了第三步时操作符树虽然已经勾勒出执行任务的先后顺序和上下游依赖但细节还比较粗糙例如存在重复的数据扫描、不必要的Shuffle操作等因此还需要进行进一步优化。通过优化Hive可以改进查询的执行计划并生成更高效的作业图以在分布式计算框架中执行。这些优化可以提高查询的性能和效率并减少资源开销。物理优化 在逻辑优化阶段结束后输入的SQL语句也逐步转换为优化后的逻辑计划不过此时的逻辑计划仍然不能直接执行还需要进一步转换成可以识别并执行的MapReduce Task首先将优化后的OperatorTree逻辑执行计划转换成TaskTree物理执行计划每个Task对应一个MR Job任务并对物理执行计划进行一些优化然后依次调用执行。 有朋友看了初版觉得写的不够细私信让我迭代丰富一下但还有一些有意思的细节比如4.x源码的区别等感兴趣的小伙伴可以自行深入探索一下。