大数据SQL调优专题——Hive执行原理

引入

Apache Hive 是基于Hadoop的数据仓库工具,它可以使用SQL来读取、写入和管理存在分布式文件系统中的海量数据。在Hive中,HQL默认转换成MapReduce程序运行到Yarn集群中,大大降低了非Java开发者数据分析的门槛,并且Hive提供命令行工具和JDBC驱动程序,方便用户连接到Hive进行数据分析操作。

严格意义上,Hive并不属于计算引擎,而是建立在Hadoop生态之上的数据仓库管理工具。它将繁杂的MapReduce作业抽象成SQL,使得开发及维护成本大幅降低。得益于HDFS的存储和MapReduce的读写能力,Hive展现出了强大的兼容能力、数据吞吐能力和服务稳定性,时至今日依然是大数据架构中不可或缺的一部分。

Hive的核心特点

  • Hive是基于Hadoop的数仓工具,底层数据存储在HDFS中;

  • Hive提供标准SQL功能,支持SQL语法访问操作数据;

  • Hive适合OLAP数据分析场景,不适合OLTP数据处理场景,所以适合数据仓库构建;

  • HQL默认转换成MapReduce任务执行,也可以配置转换成Apache Spark、Apache Tez任务运行;

  • Hive中支持定义UDF、UDAF、UDTF函数扩展功能。

Hive的架构设计

Hive用户接口

访问Hive可以通过CLI、Beeline、JDBC/ODBC、WebUI几种方式。在Hive早期版本中可以使用Hive CLI来操作Hive,Hive CLI并发性能差、脚本执行能力有限并缺乏JDBC驱动支持,从Hive 4.x版本起废弃了Hive CLI推荐使用Beeline。Beeline是一个基于JDBC的Hive客户端,支持并发环境、复杂脚本执行、JDBC驱动等,在Hive集群内连接Hive可以使用Beeline方式。在Hive集群外,通过代码或者工具连接操作Hive时可以通过JDBC/ODBC方式。通过WebUI方式可以通过浏览器查看到Hive集群的一些信息。

HiveServer2服务

HiveServer2服务提供JDBC/ODBC接口,主要用于代理远程客户端对Hive的访问,是一种基于Thrift协议的服务。例如通过JDBC或者Beeline连接访问Hive时就需要启动HiveServer2服务,就算Beeline访问本机上的Hive服务也需要启动HiveServer2服务。

HiveServer2代理远程客户端对Hive操作时会涉及到操作HDFS数据,就会有操作权限问题,那么操作HDFS中数据的用户是启动HiveServer2的用户还是远程客户端的用户需要通过“hive.server2.enable.doAs” 参数决定,该参数默认为true,表示HiveServer2操作HDFS时的用户为远程客户端用户,如果设置为false表示操作HDFS数据的用户为启动HiveServer2的用户。

MetaStore服务

MetaStore服务负责存储和管理Hive元数据,为HiverServer2提供元数据访问接口。Hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(表拥有者、是否为外部表等),表的数据所在目录等。

Hive MetaStore可以将元数据存储在mysql、derby数据库中。

Hive Driver

Driver中包含解释器(SQL Parser)、编译器(Compiler)、优化器(Optimizer),负责完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中,并在随后有执行器(Executor)调用MapReduce执行。

对于Hive有了一个初步认识,我们下面开始梳理Hive的执行原理。

Hive的执行原理

Hive无论采用哪种调用方式,最终都会辗转到org.apache.hadoop.hive.ql.Driver类。SQL语句在Driver类中,通过Antlr框架进行解析编译,将SQL转换成最终执行的MapReduce任务。

如果直接盲目的去看Driver类的代码,会很容易看懵逼,我们需要再往前一点。

SQLOperation

先看org.apache.hive.service.cli.operation.SQLOperation 类,它负责创建Driver对象、编译SQL、异步执行SQL。其中核心的就是 runInternal()方法,主要进行如下两个步骤:

  1. Driver对象创建并编译SQL,将SQL编译成Query Plan执行计划。
  2. 对QueryPaln 进行处理,转换成MR 任务执行。

runInternal() 方法源码内容如下:

  /*** 内部运行方法,用于执行SQL操作。** @throws HiveSQLException 如果在执行过程中发生Hive SQL异常。*/public void runInternal() throws HiveSQLException {// 设置操作状态为PENDINGsetState(OperationState.PENDING);// 判断是否应该异步运行boolean runAsync = shouldRunAsync();// 判断是否应该异步编译final boolean asyncPrepare = runAsync&& HiveConf.getBoolVar(queryState.getConf(),HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);// 如果不是异步编译,则同步准备查询if (!asyncPrepare) {//创建Driver对象,编译SQL//Driver经过:SQL -> AST(抽象语法树) -> QueryBlock(查询块) -> Operator(e逻辑执行计划) -> TaskTree(物理执行计划) -> QueryPlan(查询计划)prepare(queryState);}// 如果不是异步运行,则同步运行查询if (!runAsync) {runQuery();} else {// 我们将在后台线程中传递ThreadLocals,从前台(处理程序)线程传递。// 1) ThreadLocal Hive对象需要在后台线程中设置// 2) Hive中的元数据存储客户端与正确的用户相关联。// 3) 当前UGI将在元数据存储处于嵌入式模式时被元数据存储使用Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);try {// 如果没有可用的后台线程来运行此操作,此提交将阻塞Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);// 设置后台操作句柄setBackgroundHandle(backgroundHandle);} catch (RejectedExecutionException rejected) {// 设置操作状态为ERRORsetState(OperationState.ERROR);// 抛出HiveSQLException异常throw new HiveSQLException("The background threadpool cannot accept" +" new task for execution, please retry the operation", rejected);}}}

1.Driver对象创建并编译SQL,将SQL编译成Query Plan执行计划

其中核心的是prepare()方法,它的源码在2.x和3.x、4.x有一些区别,不过其核心功能是没变的,主要是创建Driver对象,并编译SQL,然后通过Driver将SQL最终转换成Query Plan。

prepare()方法3.x的源码如下:

  /*** 准备执行SQL查询的操作。* 此方法负责初始化Driver,设置查询超时,编译查询语句,并处理可能的异常。** @param queryState 包含查询状态信息的对象。* @throws HiveSQLException 如果在准备过程中发生Hive SQL异常。*/public void prepare(QueryState queryState) throws HiveSQLException {// 设置操作状态为运行中setState(OperationState.RUNNING);try {// 创建Driver实例,返回的Driver对象是 ReExecDriverdriver = DriverFactory.newDriver(queryState, getParentSession().getUserName(), queryInfo);// 如果查询超时时间大于0,则启动一个定时任务来取消查询if (queryTimeout > 0) {// 创建一个单线程的定时任务执行器timeoutExecutor = new ScheduledThreadPoolExecutor(1);// 创建一个定时任务,在查询超时后取消查询Runnable timeoutTask = new Runnable() {@Overridepublic void run() {try {// 获取查询IDString queryId = queryState.getQueryId();// 记录日志,查询超时并取消执行LOG.info("Query timed out after: " + queryTimeout+ " seconds. Cancelling the execution now: " + queryId);// 取消查询SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {// 记录日志,取消查询时发生错误LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);} finally {// 关闭定时任务执行器timeoutExecutor.shutdown();}}};// 安排定时任务在查询超时后执行timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);}// 设置查询显示信息queryInfo.setQueryDisplay(driver.getQueryDisplay());// 设置操作句柄信息,以便Thrift API用户可以使用操作句柄查找Yarn ATS中的查询信息String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier().toTHandleIdentifier().getGuid()).trim();driver.setOperationId(guid64);// 编译SQL查询并响应 ReExecDriver.compileAndRespond(...) -> Driver.compileAndRespond(...)response = driver.compileAndRespond(statement);// 如果响应代码不为0,则抛出异常if (0 != response.getResponseCode()) {throw toSQLException("Error while compiling statement", response);}// 设置是否有结果集setHasResultSet(driver.hasResultSet());} catch (HiveSQLException e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw e;} catch (Throwable e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw new HiveSQLException("Error running query: " + e.toString(), e);}}

2.x与3.x源码最核心的区别就是在创建Driver,其对应源码是:

driver = new Driver(queryState, getParentSession().getUserName());

而4.x与3.x源码最核心的区别如下:

  1. 利用 Java 8 的 Lambda 表达式特性,简化代码逻辑,提高代码的可读性和可维护性。
  2. 通过将 queryTimeout 的类型改为 long,支持了更大的超时值,避免了溢出问题。
  3. 在资源管理方面,对调度器的生命周期管理也进行了优化,不需要显式的关闭操作。

4.x对应源码是:

if (queryTimeout > 0L) {timeoutExecutor = Executors.newSingleThreadScheduledExecutor();timeoutExecutor.schedule(() -> {try {final String queryId = queryState.getQueryId();log.info("Query timed out after: {} seconds. Cancelling the execution now: {}", queryTimeout, queryId);SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {log.error("Error cancelling the query after timeout: {} seconds", queryTimeout, e);}return null;}, queryTimeout, TimeUnit.SECONDS);
}

DriverFactory.newDriver()方法中返回 ReExecDriver对象,该对象表示执行过程失败可重试的Driver对象,然后调用 Driver.compileAndRespond() 方法进行编译SQL。

2.对QueryPaln 进行处理,转换成MR 任务执行

BackgroundWork是一个线程,负责异步处理QueryPlan,通过submitBackgroundOperation(work)提交运行,执行到SQLOperator.BackgroundOperation.run()方法,最终调用到Driver.run() 方法。

Driver

下面我们再来Driver类,它在不同版本中也有一些差别,比如2.x版本是直接 implements CommandProcessor,而3.x和4.x版本则是implements IDriver,而IDriver 则是 extends CommandProcessor。本质是为了更好的解耦和扩展性,使得代码更加模块化,易于维护和扩展。同时,通过继承 CommandProcessor 接口,也保持了与旧版本的兼容性,确保了功能的连续性。不过其核心功能是没变的,主要包含编译、优化及执行。

执行步骤

为了方便理解,我们先梳理整个执行步骤如下:

  1. 通过Antlr解析SQL语法规则和语法解析,将SQL语法转换成AST(抽象语法树)

  2. 遍历AST(抽象语法树) 将其转化成Query Block(查询块,可以看成查询基本执行单元)

  3. 将Query Block(查询块) 转换成OperatorTree(逻辑执行计划),并进行优化。

  4. OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划,每个Task对应一个MR Job任务)

  5. TaskTree(物理执行计划)最终包装成Query Plan(查询计划)

简单总结执行流程如下:

SQL -> AST(抽象语法树) -> QueryBlock(查询块) -> Operator(逻辑执行计划) -> TaskTree(物理执行计划) -> QueryPlan(查询计划)。

下面我们再结合SQLOperation调用的Driver类里面的核心方法,来看看底层源码是如何实现的:

compileAndRespond方法

首先第一个核心方法是

response = driver.compileAndRespond(statement);

compileAndRespond()方法2.x源码如下:

    /*** 编译给定的 SQL 命令并返回一个命令处理器响应。* 此方法调用 compileInternal 方法进行实际的编译操作,并使用编译结果创建一个命令处理器响应。** @param command 要编译的 SQL 命令* @return 包含编译结果的命令处理器响应*/public CommandProcessorResponse compileAndRespond(String command) {return createProcessorResponse(compileInternal(command, false));}

3.x和4.x会有些区别,会返回以下方法的调用结果:

coreDriver.compileAndRespond(statement);

无论哪个版本,最终compileAndRespond()方法都会调用到 compileInternal()方法,我们继续看2.x版本compileInternal()方法源码如下:

    private int compileInternal(String command, boolean deferClose) {int ret;// 获取Metrics实例,如果存在则增加等待编译操作的计数器Metrics metrics = MetricsFactory.getInstance();if(metrics != null) {metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 尝试获取编译锁,如果获取失败则返回编译锁超时错误码final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, command);if(compileLock == null) {return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();}try {// 如果Metrics实例存在,减少等待编译操作的计数器if(metrics != null) {metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 进行Hive SQL编译ret = compile(command, true, deferClose);} finally {// 无论编译结果如何,最终都要释放编译锁compileLock.unlock();}// 如果编译失败,尝试释放锁并回滚事务if(ret != 0) {try {releaseLocksAndCommitOrRollback(false, null);} catch(LockException e) {// 记录释放锁时的异常信息LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e));}}// 保存编译时的性能日志,用于WebUI展示// 执行时的性能日志由另一个线程的PerfLogger或重置后的PerfLogger完成PerfLogger perfLogger = SessionState.getPerfLogger();queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());return ret;}

3.x和4.x的源码相比起来有一些区别,但是都是通过执行Driver.compile()方法,由于4.x代码这块改动较大,做了很多解耦的操作,其核心内容还是变化不大,加上目前几乎很少应用4.x版本的hive,下面我们重点看看2.x和3.x版本的compile()方法内容。

compile方法

compile()方法2.x源码如下:

/*** 编译一个新的查询,可选择重置任务ID计数器并决定是否延迟关闭。* * @param command      要编译的HiveQL查询。* @param resetTaskIds 如果为true,则重置任务ID计数器。* @param deferClose   如果为true,则在编译过程被中断时延迟关闭/销毁操作。* @return 0表示编译成功,否则返回错误代码。*/
// deferClose 表示当进程被中断时,是否应该推迟关闭/销毁操作。如果 compile 方法是在另一个方法(如 runInternal)中被调用,并且该方法会将关闭操作推迟到其内部处理,那么 deferClose 应该设置为 true。
public int compile(String command, boolean resetTaskIds, boolean deferClose) {// 获取性能日志记录器,并开始记录编译过程的性能PerfLogger perfLogger = SessionState.getPerfLogger(true);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);// 锁定驱动状态,将驱动状态设置为编译中lDrvState.stateLock.lock();try {lDrvState.driverState = DriverState.COMPILING;} finally {lDrvState.stateLock.unlock();}// 对查询命令进行变量替换command = new VariableSubstitution(new HiveVariableSource() {@Overridepublic Map<String, String> getHiveVariable() {return SessionState.get().getHiveVariables();}}).substitute(conf, command);// 存储查询字符串String queryStr = command;try {// 对查询命令进行脱敏处理,避免记录敏感数据queryStr = HookUtils.redactLogString(conf, command);} catch(Exception e) {// 若脱敏失败,记录警告信息LOG.warn("WARNING! Query command could not be redacted." + e);}// 检查编译过程是否被中断,若中断则处理中断并返回错误代码if(isInterrupted()) {return handleInterruption("at beginning of compilation."); //indicate if need clean resource}// 如果上下文不为空且解释分析状态不为运行中,则关闭现有上下文if(ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {// close the existing ctx etc before compiling a new query, but does not destroy drivercloseInProcess(false);}// 如果需要重置任务ID,则重置任务工厂的IDif(resetTaskIds) {TaskFactory.resetId();}// 获取查询IDString queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);// 保存查询信息,用于Web UI显示this.queryDisplay.setQueryStr(queryStr);this.queryDisplay.setQueryId(queryId);// 记录编译开始信息LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);// 设置查询的当前时间戳SessionState.get().setupQueryCurrentTimestamp();// 标记编译过程中是否发生错误boolean compileError = false;try {// 初始化事务管理器final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);// 移除旧的关闭hookShutdownHookManager.removeShutdownHook(shutdownRunner);// 创建新的关闭hook,用于在JVM关闭时释放锁shutdownRunner = new Runnable() {@Overridepublic void run() {try {releaseLocksAndCommitOrRollback(false, txnManager);} catch(LockException e) {// 若释放锁时发生异常,记录警告信息LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage());}}};// 添加新的关闭hookShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);// 再次检查编译过程是否被中断if(isInterrupted()) {return handleInterruption("before parsing and analysing the query");}// 如果上下文为空,则创建新的上下文if(ctx == null) {ctx = new Context(conf);}// 设置上下文的重试次数、命令和HDFS清理标志ctx.setTryCount(getTryCount());ctx.setCmd(command);ctx.setHDFSCleanup(true);/*** 把 HQL命令 翻译成一个 ASTNode Tree* 封装了 ParseDriver 对 HQL 的解析工作* ParseDriver 对 command 进行词法分析和语法解析(统称为语法分析),返回一个抽象语法树AST*/// 开始记录解析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);// 解析查询命令,得到抽象语法树ASTNode tree = ParseUtils.parse(command, ctx);// 结束记录解析过程的性能perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);// 加载查询hookqueryHooks = loadQueryHooks();// 如果查询hook不为空且不为空列表,则触发查询hook的编译前操作if(queryHooks != null && !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.beforeCompile(qhc);}}// 开始记录语义分析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);// 获取语义分析器BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);// 获取语义分析hookList<HiveSemanticAnalyzerHook> saHooks = getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, HiveSemanticAnalyzerHook.class);// 刷新元数据存储缓存,确保获取最新的元数据Hive.get().getMSC().flushCache();// 进行语义分析和计划生成if(saHooks != null && !saHooks.isEmpty()) {HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();hookCtx.setConf(conf);hookCtx.setUserName(userName);hookCtx.setIpAddress(SessionState.get().getUserIpAddress());hookCtx.setCommand(command);hookCtx.setHiveOperation(queryState.getHiveOperation());// 触发语义分析hook的预分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {tree = hook.preAnalyze(hookCtx, tree);}/*** sem 是一个 SemanticAnalyzer(语义分析器) 对象* 主要的工作是将 ASTNode 转化为 TaskTree,包括可能的 optimize,过程比较复杂** tree:  AST  抽象语法树   ===> TaskTree*        TaskTree : 物理执行计划**   把抽象语法树交给 SemanticAnalyzer 执行语法解析*   1、从 AST 转成 解析树*   2、通过解析树 再生成 QB 在查询快*   3、从 QB 树在生成 OperatorTree (Logical Plan)*   4、逻辑执行计划的优化*   5、OperatorTree转变成TaskTree*   6、再针对物理执行计划执行优化*   7、生成QueryPlan*/// 进行语义分析sem.analyze(tree, ctx);// 更新hook上下文hookCtx.update(sem);// 触发语义分析hook的后分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {hook.postAnalyze(hookCtx, sem.getAllRootTasks());}} else {// 若没有语义分析hook,直接进行语义分析sem.analyze(tree, ctx);}// 记录查询中发现的ACID文件接收器acidSinks = sem.getAcidFileSinks();// 记录语义分析完成信息LOG.info("Semantic Analysis Completed");// 验证语义分析生成的计划是否有效sem.validate();// 检查查询中是否包含ACID操作acidInQuery = sem.hasAcidInQuery();// 结束语义分析阶段的性能日志记录perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);// 检查编译过程是否被中断,如果中断则处理中断情况并返回if(isInterrupted()) {return handleInterruption("after analyzing query.");}// 根据语义分析结果和配置信息获取查询的输出模式schema = getSchema(sem, conf);/*** 把 TaskTree 生成一个 QueryPlan* 通过  Exeuctor 提交的方法,要接受的参数就是 QueryPlan*/// 根据查询字符串、语义分析器、开始时间、查询ID、操作类型和输出模式创建查询计划plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);// 设置查询字符串到配置中conf.setQueryString(queryStr);// 设置MapReduce工作流ID到配置中conf.set("mapreduce.workflow.id", "hive_" + queryId);// 设置MapReduce工作流名称到配置中conf.set("mapreduce.workflow.name", queryStr);// 如果查询计划中包含FetchTask,则对其进行初始化if(plan.getFetchTask() != null) {plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());}// 进行授权检查,如果语义分析不跳过授权且开启了授权功能if(!sem.skipAuthorization() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {try {// 开始记录授权过程的性能日志perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);// 执行授权操作doAuthorization(queryState.getHiveOperation(), sem, command);} catch(AuthorizationException authExp) {// 如果授权失败,打印错误信息并设置错误状态和返回码console.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to " + "get" + " more details.");errorMessage = authExp.getMessage();SQLState = "42000";return 403;} finally {// 结束记录授权过程的性能日志perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);}}// 如果配置中开启了记录EXPLAIN输出的功能if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 获取查询的EXPLAIN输出String explainOutput = getExplainOutput(sem, plan, tree);if(explainOutput != null) {if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 记录EXPLAIN输出到日志中LOG.info("EXPLAIN output for queryid " + queryId + " : " + explainOutput);}if(conf.isWebUiQueryInfoCacheEnabled()) {// 如果开启了Web UI查询信息缓存,将EXPLAIN计划设置到查询显示信息中queryDisplay.setExplainPlan(explainOutput);}}}// 编译成功,返回0return 0;} catch(Exception e) {// 如果编译过程中被中断,处理中断情况并返回if(isInterrupted()) {return handleInterruption("during query compilation: " + e.getMessage());}// 标记编译过程出现错误compileError = true;// 获取错误信息ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());// 构建错误消息errorMessage = "FAILED: " + e.getClass().getSimpleName();if(error != ErrorMsg.GENERIC_ERROR) {errorMessage += " [Error " + error.getErrorCode() + "]:";}// HIVE-4889if((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {errorMessage += " " + e.getCause().getMessage();} else {errorMessage += " " + e.getMessage();}if(error == ErrorMsg.TXNMGR_NOT_ACID) {errorMessage += ". Failed command: " + queryStr;}// 设置SQL状态码SQLState = error.getSQLState();// 记录下游错误信息downstreamError = e;// 打印错误信息和详细堆栈跟踪console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));// 返回错误代码return error.getErrorCode();// since it exceeds valid range of shell return values} finally {// 触发编译后的hook函数try {if(queryHooks != null && !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.afterCompile(qhc, compileError);}}} catch(Exception e) {// 如果触发hook函数时出现异常,记录警告信息LOG.warn("Failed when invoking query after-compilation hook.", e);}/*** 计算任务总耗时*/// 结束编译阶段的性能日志记录并计算耗时double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE) / 1000.00;// 获取编译过程中HMS调用的时间统计信息ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");// 设置查询显示信息中的HMS时间统计信息queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);// 检查编译过程是否被中断boolean isInterrupted = isInterrupted();if(isInterrupted && !deferClose) {// 如果被中断且不延迟关闭,关闭正在进行的操作closeInProcess(true);}// 锁定驱动状态lDrvState.stateLock.lock();try {if(isInterrupted) {// 如果被中断,根据是否延迟关闭设置驱动状态lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;} else {// 如果未被中断,根据编译是否出错设置驱动状态lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;}} finally {// 解锁驱动状态lDrvState.stateLock.unlock();}if(isInterrupted) {// 如果编译过程被中断,记录中断信息LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");} else {// 如果编译过程未被中断,记录编译完成信息LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " " + "seconds");}}
}

compile()方法在3.x和4.x有一些区别,但是都有以下三个核心方法:

  1. 首先是通过ParseUtils.parse(command, ctx)将Hive SQL转换成AST(抽象语法树),即:HQL -> AST(抽象语法树)转换;
  2. 然后是通过BaseSemanticAnalyzer.analyze()方法将AST(抽象语法树)解析生成TaskTree(物理执行计划)
  3. 最后将BaseSemanticAnalyzer传入QueryPlan的构造函数,来创建QueryPlan(查询计划)

其核心正是引入篇我们提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)

下面我们深入这几个方法看看:

parse方法

compile()方法中,首先是通过ParseUtils.parse(command, ctx)进行词法分析与解析,将Hive HQL转换成AST抽象语法树。

我们来看看parse()方法的源码:

/*** 解析 HQL。* * 此方法接收一个 Hive 查询命令和上下文对象,调用另一个重载的 parse 方法进行实际的解析操作,* 并将视图的全限定名参数设为 null。* * @param command 要解析的 Hive 查询命令* @param ctx 查询的上下文对象* @return 解析后的 AST 节点* @throws ParseException 如果解析过程中出现异常*/
public static ASTNode parse(String command, Context ctx) throws ParseException {return parse(command, ctx, null);
}

继续往里走,对应源码如下:

  /*** 解析HQL * ParseDriver对command进行词法分析和语法解析(统称为语法分析),返回一个抽象语法树AST* * @param command 要解析的Hive查询命令* @param ctx 查询上下文信息* @param viewFullyQualifiedName 视图的完全限定名称* @return 解析后的AST节点* @throws ParseException 如果解析过程中出现错误*/public static ASTNode parse(String command, Context ctx, String viewFullyQualifiedName) throws ParseException {// 创建一个ParseDriver实例用于解析命令ParseDriver pd = new ParseDriver();// 使用ParseDriver解析命令,得到AST节点ASTNode tree = pd.parse(command, ctx, viewFullyQualifiedName);// 查找根节点中第一个具有非空令牌的节点tree = findRootNonNullToken(tree);// 处理设置列引用的情况handleSetColRefs(tree);// 返回处理后的AST节点return tree;}

pd.parse()方法中,核心调用的是HiveLexer和HiveParser这两个类,它们分别负责SQL的词法分析和语法解析,我们继续看看其中源码:

/*** 解析给定的命令字符串,将其转换为抽象语法树(AST)节点。** @param command 要解析的命令字符串。* @param ctx 解析上下文,可包含配置信息和tokens重写流。* @param viewFullyQualifiedName 视图的完全限定名称,如果不是视图解析则为 null。* @return 解析后的 AST 节点。* @throws ParseException 如果解析过程中出现错误。*/public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)throws ParseException {// 如果启用了调试日志,则记录正在解析的命令if (LOG.isDebugEnabled()) {LOG.debug("Parsing command: " + command);}/***  Antlr对语法文件 HiveLexer.g 编译后自动生成的词法解析和语法解析类(HiveLexerX,HiveParser)*  文件 HiveLexer.g 定义了一些 hive 的关键字,form、where,数字的定义格式【0–9】,分隔符,比较符之类的。*  每一个关键字分支都会变成一个 token。**  HiveLexerX 是 antlr 根据词法规则文件,通过编译生成的一个代码类*  能够执行词法和语法的解析   *  最终生成一个 ASTNode*/// 创建一个不区分大小写的字符流,并使用它初始化词法分析器HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));/***  根据词法分析的结果得到tokens的,此时不只是单纯的字符串,*  而是具有特殊意义的字符串的封装,其本身是一个流。*  lexer 把 SQL 语句中的各个语法分支,都转换成底层引擎能识别的各种 Token*/// 创建一个tokens重写流,用于处理词法分析器生成的tokensTokenRewriteStream tokens = new TokenRewriteStream(lexer);// 如果提供了上下文,则根据是否为视图设置tokens重写流,并设置词法分析器的配置if (ctx != null) {if (viewFullyQualifiedName == null) {// 顶层查询ctx.setTokenRewriteStream(tokens);} else {// 这是一个视图ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);}lexer.setHiveConf(ctx.getConf());}// 语法解析 HiveParser是 Antlr 根据 HiveParser.g 生成的文件// 使用tokens重写流初始化语法解析器HiveParser parser = new HiveParser(tokens);// 如果提供了上下文,则设置解析器的配置if (ctx != null) {parser.setHiveConf(ctx.getConf());}// 设置解析器的树适配器,用于创建 AST 节点parser.setTreeAdaptor(adaptor);// 声明一个变量来存储解析结果HiveParser.statement_return r = null;try {/*** 转化为 ASTTree 放在 ASTNode 中的 tree 属性中。 通过 r.getTree() 获取返回。* 当前这句代码完成了从 Tok 树到 AST 的转变* 把结果放在了 HiveParser.statement_return*/// 调用解析器的 statement 方法进行解析r = parser.statement();} catch (RecognitionException e) {// 打印异常堆栈跟踪信息e.printStackTrace();// 如果解析过程中出现识别异常,则抛出解析异常throw new ParseException(parser.errors);}// 检查词法分析器和解析器是否有错误if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {// 如果没有错误,则记录解析完成的日志LOG.debug("Parse Completed");} else if (lexer.getErrors().size() != 0) {// 如果词法分析器有错误,则抛出解析异常throw new ParseException(lexer.getErrors());} else {// 如果解析器有错误,则抛出解析异常throw new ParseException(parser.errors);}// 获取解析结果的树,并将其转换为 AST 节点ASTNode tree = (ASTNode) r.getTree();// 设置 AST 节点的未知tokens边界tree.setUnknownTokenBoundaries();// 返回解析后的 AST 节点return tree;}

pd.parse()方法将sql语法转换成抽象语法树 AST,Hive中通过使用 Antlr(Another Tool for Language Recognition)进行词法分析和语法解析。

Antlr主要作用:

  • 词法分析:将输入的HiveQL查询字符串分解成一系列的Token,这些Token是语法分析的基础。Antlr生成的词法分析器(Lexer)负责将输入的HiveQL查询字符串分解成一个个Token,这些Token表示查询中的关键字、标识符、运算符等基本元素。
  • 语法解析:根据词法分析器生成的Token序列,解析HiveQL查询语句,生成AST抽象语法树。Antlr生成的语法解析器(Parser)负责读取Token序列,并根据语法规则解析这些Token,生成对应的AST抽象语法树。Token 对应 SQL中的每个关键字。

analyze方法

通过上一个步骤并获取到 ASTNode之后,需要对其进行进一步的抽象和结构化处理,以便能够更便捷地将其转换为MapReduce程序。为此将会初始化类BaseSemanticAnalyzer,并通过SemanticAnalyzerFactory确定SQL的类型,进而调用analyze()方法进行分析,其对应源码如下:

BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
sem.analyze(tree, ctx);

其中 sem 是一个 SemanticAnalyzer(语义分析器)对象,主要的工作是将 ASTNode 转化为 TaskTree(物理执行计划),包括可能的 optimize(优化),也就是前面执行步骤第2~5步做的内容。

首先看analyze()对应源码如下:

    /*** 分析给定的抽象语法树(AST)节点,并使用提供的上下文进行初始化。* * 此方法首先初始化上下文,然后初始化分析器的内部状态。* 最后,调用 `analyzeInternal` 方法对 AST 进行实际的分析。* * @param ast 要分析的抽象语法树节点。* @param ctx 分析过程中使用的上下文。* @throws SemanticException 如果在分析过程中发生语义错误。*/public void analyze(ASTNode ast, Context ctx) throws SemanticException {// 初始化上下文initCtx(ctx);// 初始化分析器的内部状态,清除部分缓存init(true);// 调用内部分析方法对 AST 进行分析analyzeInternal(ast);}

可以看到,除了进行必要的初始化之外,还会调用analyzeInternal()方法,对应源码如下:

    /*** 对抽象语法树(AST)进行内部语义分析。* 此方法为抽象方法,具体实现需在子类中完成。* 它负责对传入的AST进行详细的语义分析,以确保查询语句的合法性和正确性。** @param ast 待分析的抽象语法树节点* @throws SemanticException 如果在语义分析过程中发现错误*/public abstract void analyzeInternal(ASTNode ast) throws SemanticException;

可以看到analyzeInternal()是一个抽象方法,它有多种具体实现,通过断点查看,会发现流程是跳转到了org.apache.hadoop.hive.ql.parse.SemanticAnalyzer类,其源码注释如下:

Implementation of the semantic analyzer. It generates the query plan.
There are other specific semantic analyzers for some hive operations such as DDLSemanticAnalyzer for ddl operations.

翻译:

语义分析器的实现。它用于生成查询计划。
对于某些 Hive 操作,还有其他特定的语义分析器,例如用于 DDL 操作的 DDLSemanticAnalyzer。

这个类有点复杂,Hive优化的秘密全在于此,将AST抽象语法树解析生成TaskTree(物理执行计划)的全流程,包括逻辑执行计划、逻辑执行计划的优化、物理执行计划的切分、物理执行计划的优化、以及 MapReduce 任务的生成全部都在其中,下面我们就看看其中实现的analyzeInternal()方法源码:

  /*** 对传入的AST节点进行内部分析,生成查询计划。** @param ast 抽象语法树节点* @param pcf 计划上下文工厂* @throws SemanticException 语义分析异常*/void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {LOG.info("Starting Semantic Analysis");// 1. 从语法树生成解析树boolean needsTransform = needsTransform();// 改变位置别名处理的位置processPositionAlias(ast);PlannerContext plannerCtx = pcf.create();if (!genResolvedParseTree(ast, plannerCtx)) {return;}if (HiveConf.getBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY)) {for (String alias : qb.getSubqAliases()) {removeOBInSubQuery(qb.getSubqForAlias(alias));}}// 检查查询结果缓存。// 如果不需要进行掩码/过滤,则可以在生成操作符树和进行CBO之前检查缓存。// 否则,必须等到掩码/过滤步骤之后。boolean isCacheEnabled = isResultsCacheEnabled();QueryResultsCache.LookupInfo lookupInfo = null;if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) {lookupInfo = createLookupInfoForQuery(ast);if (checkResultsCache(lookupInfo)) {return;}}ASTNode astForMasking;if (isCBOExecuted() && needsTransform &&(qb.isCTAS() || qb.isView() || qb.isMaterializedView() || qb.isMultiDestQuery())) {// 如果使用CBO并且可能应用掩码/过滤策略,则创建ast的副本。// 原因是操作符树的生成可能会修改初始ast,但如果需要第二次解析,我们希望解析未修改的ast。astForMasking = (ASTNode) ParseDriver.adaptor.dupTree(ast);} else {astForMasking = ast;}// 2. 从解析树生成操作符树Operator sinkOp = genOPTree(ast, plannerCtx);boolean usesMasking = false;if (!unparseTranslator.isEnabled() &&(tableMask.isEnabled() && analyzeRewrite == null)) {// 在这里重写 * 以及掩码表ASTNode rewrittenAST = rewriteASTWithMaskAndFilter(tableMask, astForMasking, ctx.getTokenRewriteStream(),ctx, db, tabNameToTabObject, ignoredTokens);if (astForMasking != rewrittenAST) {usesMasking = true;plannerCtx = pcf.create();ctx.setSkipTableMasking(true);init(true);// 改变位置别名处理的位置processPositionAlias(rewrittenAST);genResolvedParseTree(rewrittenAST, plannerCtx);if (this instanceof CalcitePlanner) {((CalcitePlanner) this).resetCalciteConfiguration();}sinkOp = genOPTree(rewrittenAST, plannerCtx);}}// 检查查询结果缓存// 在需要进行行或列掩码/过滤的情况下,不支持缓存。// TODO: 为带有掩码/过滤的查询启用缓存if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache()) {lookupInfo = createLookupInfoForQuery(ast);if (checkResultsCache(lookupInfo)) {return;}}// 3. 推导结果集模式if (createVwDesc != null && !this.ctx.isCboSucceeded()) {resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());} else {// 如果满足以下条件,resultSchema将为null:// (1) cbo被禁用;// (2) 或者cbo启用但使用AST返回路径(无论是否成功,resultSchema都将重新初始化)// 只有在cbo启用且使用新返回路径并且成功时,resultSchema才不为null。if (resultSchema == null) {resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));}}// 4. 为优化器和物理编译器生成解析上下文copyInfoToQueryProperties(queryProperties);ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,// 使用菱形操作符简化泛型类型声明new HashSet<>(joinContext.keySet()),// 使用菱形操作符简化泛型类型声明new HashSet<>(smbMapJoinContext.keySet()),loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,queryProperties, viewProjectToTableSchema, acidFileSinks);// 在解析上下文中设置半连接提示pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList()));// 如果需要禁用映射连接提示,则设置pCtx.setDisableMapJoin(disableMapJoinWithHint(getQB().getParseInfo().getHintList()));// 5. 处理视图创建if (createVwDesc != null) {if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) {return;}if (!ctx.isCboSucceeded()) {saveViewDefinition();}// 此时验证创建视图语句,createVwDesc包含语义检查所需的所有信息validateCreateView();if (createVwDesc.isMaterialized()) {createVwDesc.setTablesUsed(getTablesUsed(pCtx));} else {// 由于我们只是创建视图(不执行它),因此不需要优化或转换计划(实际上,这些过程可能会干扰视图创建)。所以跳过此方法的其余部分。ctx.setResDir(null);ctx.setResFile(null);try {PlanUtils.addInputsForView(pCtx);} catch (HiveException e) {throw new SemanticException(e);}// 为创建视图语句生成谱系信息// 如果配置了LineageLoggerhook。// 添加计算谱系信息的转换。Set<String> postExecHooks = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS))));if (postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {// 使用菱形操作符简化泛型类型声明ArrayList<Transform> transformations = new ArrayList<>();transformations.add(new HiveOpConverterPostProc());transformations.add(new Generator(postExecHooks));for (Transform t : transformations) {pCtx = t.transform(pCtx);}// 我们仅使用视图名称作为位置。queryState.getLineageState().mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);}return;}}// 6. 如果需要,生成表访问统计信息if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());}// 7. 执行逻辑优化if (LOG.isDebugEnabled()) {LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));}// 创建一个优化器实例,并对解析上下文进行逻辑优化。Optimizer optm = new Optimizer();// 设置优化器的解析上下文optm.setPctx(pCtx);// 初始化优化器optm.initialize(conf);// 执行优化操作,并更新解析上下文pCtx = optm.optimize();// 检查优化后的解析上下文中是否包含列访问信息if (pCtx.getColumnAccessInfo() != null) {// 设置列访问信息,用于视图列授权setColumnAccessInfo(pCtx.getColumnAccessInfo());}// 如果启用了调试日志,则输出优化后的操作符树信息if (LOG.isDebugEnabled()) {LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));}// 8. Generate column access stats if required - wait until column pruning// takes place during optimization// 检查是否需要收集列访问信息用于授权或统计boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);if (isColumnInfoNeedForAuth|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {// 创建列访问分析器实例ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);// 分析列访问信息,并更新列访问信息// view column access info is carried by this.getColumnAccessInfo().setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));}// 9. Optimize Physical op tree & Translate to target execution engine (MR,// TEZ..)// 检查是否需要进行逻辑解释,如果不需要则进行物理操作树的优化和编译if (!ctx.getExplainLogical()) {// 获取任务编译器实例TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);// 初始化任务编译器compiler.init(queryState, console, db);// 编译解析上下文,生成任务和输入输出信息compiler.compile(pCtx, rootTasks, inputs, outputs);// 获取获取任务fetchTask = pCtx.getFetchTask();}//find all Acid FileSinkOperatorS// 创建查询计划后处理器实例,但该实例未被使用,后续可考虑移除QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers// 查找根任务列表中的第一个TezTaskfinal Optional<TezTask> optionalTezTask =rootTasks.stream().filter(task -> task instanceof TezTask).map(task -> (TezTask) task).findFirst();if (optionalTezTask.isPresent()) {// 获取第一个TezTask实例final TezTask tezTask = optionalTezTask.get();// 遍历根任务列表,为满足条件的DDLWork添加插入提交hook任务rootTasks.stream()// 过滤出工作类型为DDLWork的任务.filter(task -> task.getWork() instanceof DDLWork)// 将任务转换为DDLWork类型.map(task -> (DDLWork) task.getWork())// 过滤出预插入表描述不为空的DDLWork.filter(ddlWork -> ddlWork.getPreInsertTableDesc() != null)// 获取预插入表描述.map(ddlWork -> ddlWork.getPreInsertTableDesc())// 创建插入提交hook描述.map(ddlPreInsertTask -> new InsertCommitHookDesc(ddlPreInsertTask.getTable(),ddlPreInsertTask.isOverwrite()))// 为TezTask添加依赖任务.forEach(insertCommitHookDesc -> tezTask.addDependentTask(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), insertCommitHookDesc), conf)));}LOG.info("Completed plan generation");// 11. put accessed columns to readEntity// 检查是否需要收集扫描列的统计信息if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {// 将访问的列信息添加到读取实体中putAccessedColumnsToReadEntity(inputs, columnAccessInfo);}// 检查是否启用了查询结果缓存,并且查找信息不为空if (isCacheEnabled && lookupInfo != null) {// 检查查询是否可以被缓存if (queryCanBeCached()) {// 创建缓存查询信息QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);// Specify that the results of this query can be cached.// 指定该查询的结果可以被缓存setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));}}}

简单总结一下,首先输入的是AST抽象语法树,主要经历了以下步骤:

  1. Generate Resolved Parse tree from syntax tree 从语法树生成解析树
  2. Gen OP Tree from resolved Parse Tree 从解析树生成Gen OP树  OperatorTree
  3. Deduce Resultset Schema(selct ...... 每个字段,我给你构造成一个 Field)推导结果集模式  CBO优化
  4. Generate ParseContext for Optimizer & Physical compiler 为优化器和物理编译器生成解析上下文
  5. Take care of view creation 注意视图创建
  6. Generate table access stats if required 生成表访问统计信息(如果需要)
  7. Perform Logical optimization 执行逻辑执行计划的优化
  8. Generate column access stats if required - wait until column pruning takes place during optimization
    根据需要生成列访问统计信息-等待优化期间进行列裁剪
    sql当中写了很多的无用的字段,但是最终执行逻辑不需要这些字段,就需要列裁剪。
  9. Optimize Physical op tree & Translate to target execution engine (MR, Spark, TEZ..) 优化物理操作树并转换为目标执行引擎(MR,TEZ ..)
  10. put accessed columns to readEntity 将访问的列放入 ReadEntity(要读取的列的信息)
  11. if desired check we're not going over partition scan limits 如果需要检查,我们不会超过分区扫描限制

生成QueryPlan

这一系列操作完成后,最后就是把得到的 TaskTree 生成一个 QueryPlan,相关源码如下:

/*** 创建一个新的 QueryPlan 对象。* * @param queryStr 要执行的查询字符串* @param sem 语义分析器对象,用于对查询进行语义分析* @param startTime 驱动程序开始运行的时间,通过 perfLogger 获取* @param queryId 查询的唯一标识符* @param hiveOperation Hive 操作类型* @param schema 查询结果的输出模式*/
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);

总结

本文介绍了Hive,并通过源码梳理了Hive的执行原理,其核心正是引入篇我们提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)

总结起来主要有以下四个步骤:

  1. 词法分析与解析
    将SQL语法转换成AST(抽象语法树) 
  2. 语义分析
    将AST进行进一步的抽象和结构化处理,通过遍历AST(抽象语法树) 将其转化成Query Block
  3. 逻辑优化
    到了第三步时,操作符树虽然已经勾勒出执行任务的先后顺序和上下游依赖,但细节还比较粗糙,例如存在重复的数据扫描、不必要的Shuffle操作等,因此还需要进行进一步优化。通过优化,Hive可以改进查询的执行计划,并生成更高效的作业图以在分布式计算框架中执行。这些优化可以提高查询的性能和效率,并减少资源开销。
  4. 物理优化
    在逻辑优化阶段结束后,输入的SQL语句也逐步转换为优化后的逻辑计划,不过此时的逻辑计划仍然不能直接执行,还需要进一步转换成可以识别并执行的MapReduce Task,首先将优化后的OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划,每个Task对应一个MR Job任务),并对物理执行计划进行一些优化,然后依次调用执行。

有朋友看了初版觉得写的不够细,私信让我迭代丰富一下,但还有一些有意思的细节,比如4.x源码的区别等,感兴趣的小伙伴可以自行深入探索一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18189.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

30天开发操作系统 第 20 天 -- API

前言 大家早上好&#xff0c;今天我们继续努力哦。 昨天我们已经实现了应用程序的运行, 今天我们来实现由应用程序对操作系统功能的调用(即API, 也叫系统调用)。 为什么这样的功能称为“系统调用”(system call)呢&#xff1f;因为它是由应用程序来调用(操作)系统中的功能来完…

UE5.2后 Bake Out Materials失效

这个问题出现在5.3&#xff0c;5.4&#xff0c;5.5没有测试 烘焙贴图后会找不到贴图位置&#xff0c; 这个是5.2的正常状态 默认是生成在模型当前目录里&#xff0c;包括新的材质 但是这个bug会让材质和贴图都消失&#xff0c;无法定位 暂时没有办法解决&#xff0c;等官方 …

macOS部署DeepSeek-r1

好奇&#xff0c;跟着网友们的操作试了一下 网上方案很多&#xff0c;主要参考的是这篇 DeepSeek 接入 PyCharm&#xff0c;轻松助力编程_pycharm deepseek-CSDN博客 方案是&#xff1a;PyCharm CodeGPT插件 DeepSeek-r1:1.5b 假设已经安装好了PyCharm PyCharm: the Pyth…

架构设计系列(二):CI/CD

一、概述 CI/CD 是 持续集成&#xff08;Continuous Integration&#xff09; 和 持续交付/持续部署&#xff08;Continuous Delivery/Continuous Deployment&#xff09; 的缩写&#xff0c;是现代软件开发中的一套核心实践和工具链&#xff0c;旨在提高软件交付的效率、质量…

Windows 11 搭建私有知识库(docker、dify、deepseek、ollama)

一、操作系统信息 版本 Windows 11 家庭中文版 版本号 23H2 安装日期 ‎2023/‎8/‎21 操作系统版本 22631.4460二、搭建思路 ollama拉取deepseek、bge-m3模型docker拉取dify的镜像dify链接ollama使用模型&#xff0c;并上传文件搭建知识库&#xff0c;创建应用 三、搭建步骤…

本地部署DeepSeek摆脱服务器繁忙

由于图片和格式解析问题&#xff0c;可前往 阅读原文 最近DeepSeek简直太火了&#xff0c;频频霸榜热搜打破春节的平静&#xff0c;大模型直接开源让全球科技圈都为之震撼&#xff01;再次证明了中国AI的换道超车与崛起 DeepSeek已经成了全民ai&#xff0c;使用量也迅速上去了…

‌CBA认证‌(业务架构师认证)简介---适用人群、考试内容与形式、含金量与职业前景,以及‌CBA、TOGAF认证对比表格

‌CBA认证‌&#xff0c;即业务架构师认证&#xff08;Certified Business Architect&#xff0c;CBA&#xff09;&#xff0c;是由业务架构师协会&#xff08;Business Architecture Institute&#xff09;推出的一项国际认证计划。该认证旨在评估和认证业务架构师的专业能力和…

保姆级GitHub大文件(100mb-2gb)上传教程

GLF&#xff08;Git Large File Storage&#xff09;安装使用 使用GitHub desktop上传大于100mb的文件时报错 The following files are over 100MB. lf you commit these files, you will no longer beable to push this repository to GitHub.com.term.rarWe recommend you a…

使用 Visual Studio Code (VS Code) 开发 Python 图形界面程序

安装Python、VS Code Documentation for Visual Studio Code Python Releases for Windows | Python.org 更新pip >python.exe -m pip install --upgrade pip Requirement already satisfied: pip in c:\users\xxx\appdata\local\programs\python\python312\lib\site-pa…

Python的那些事第二十一篇:Python Web开发的“秘密武器”Flask

基于 Flask 框架的 Python Web 开发研究 摘要 在 Web 开发的江湖里,Python 是一位武林高手,而 Flask 则是它手中那把小巧却锋利的匕首。本文以 Flask 框架为核心,深入探讨了它在 Python Web 开发中的应用。通过幽默风趣的笔触,结合实例和表格,分析了 Flask 的特性、优势以…

Qt开发①Qt的概念+发展+优点+应用+使用

目录 1. Qt的概念和发展 1.1 Qt的概念 1.2 Qt 的发展史&#xff1a; 1.3 Qt 的版本 2. Qt 的优点和应用 2.1 Qt 的优点&#xff1a; 2.2 Qt 的应用场景 2.3 Qt 的应用案例 3. 搭建 Qt 开发环境 3.1 Qt 的开发工具 3.2 Qt SDK 的下载和安装 3.3 Qt 环境变量配置和使…

【第4章:循环神经网络(RNN)与长短时记忆网络(LSTM)— 4.3 RNN与LSTM在自然语言处理中的应用案例】

咱今天来聊聊在人工智能领域里,特别重要的两个神经网络:循环神经网络(RNN)和长短时记忆网络(LSTM),主要讲讲它们在自然语言处理里的应用。你想想,平常咱们用手机和别人聊天、看新闻、听语音助手说话,背后说不定就有 RNN 和 LSTM 在帮忙呢! 二、RNN 是什么? (一)…

DeepSeek应用——与PyCharm的配套使用

目录 一、配置方法 二、使用方法 三、注意事项 1、插件市场无continue插件 2、无结果返回&#xff0c;且在本地模型报错 记录自己学习应用DeepSeek的过程&#xff0c;使用的是自己电脑本地部署的私有化蒸馏模型...... &#xff08;举一反三&#xff0c;这个不单单是可以用…

国自然地区基金|影像组学联合病理组学预测进展期胃癌术后预后的研究|基金申请·25-02-13

小罗碎碎念 今天和大家分享一个国自然地区科学项目&#xff0c;执行年限为2020.01&#xff5e;2023.12&#xff0c;直接费用为34万元。 胃癌在我国发病形势严峻&#xff0c;现有TNM分期预后评估存在局限&#xff0c;难以满足精准医疗需求。本项目运用“医工结合&#xff0c;学科…

【Java集合一】集合概述

一、集合简介 Java 集合框架&#xff08;Collection Framework&#xff09;是 Java 提供的一组用于存储和操作对象的类和接口集合。这些集合类提供了不同的数据结构&#xff0c;使得数据的管理和操作更加方便和高效。 Java 集合框架提供了各种类型的数据结构&#xff0c;如列…

k8s集群搭建参考(by lqw)

文章目录 声明配置yum源安装docker安装 kubeadm&#xff0c;kubelet 和 kubectl部署主节点其他节点加入集群安装网络插件 声明 由于看了几个k8s的教程&#xff0c;都存在各种问题&#xff0c;自己搭建的时候&#xff0c;踩了不少坑&#xff0c;最后还是靠百度csdnchatGPT才搭建…

MySQL 插入替换语句(replace into statement)

我们日常使用 insert into 语句向表中插入数据时&#xff0c;一定遇到过主键或唯一索引冲突的情况&#xff0c;MySQL的反应是报错并停止执行后续的语句&#xff0c;而replace into语句可以实现强制插入。 文章目录 一、replace into 语句简介1.1 基本用法1.2 使用set语句 二、注…

日语发音的节拍

短音 每个假名&#xff08;包括清音、浊音、半浊音&#xff09;都占 1 拍。 长音 长音占 2 拍&#xff0c;发音时间比短音长 不同母音的长音形式不同&#xff08;あ段あ&#xff0c;い段い&#xff0c;う段う&#xff0c;え段い/え&#xff0c;お段う/お&#xff09; 促音 …

[AI]从零开始的llama.cpp部署与DeepSeek格式转换、量化、运行教程

一、前言 在上一次的DeepSeek的部署教程中&#xff0c;我们使用Ollama与LM Studio很轻松的部署了DeepSeek并且也完成了相关API的调用&#xff0c;如果还有不会的小伙伴请看下面的教程&#xff1a; DeepSeek本地部署&#xff1a;[AI]从零开始的DeepSeek本地部署及本地API调用教…

基于SSM+uniapp的数学辅导小程序+LW示例参考

1.项目介绍 系统角色&#xff1a;管理员、普通用户功能模块&#xff1a;用户管理、学习中心、知识分类管理、学习周报管理、口算练习管理、试题管理、考试管理、错题本等技术选型&#xff1a;SSM&#xff0c;Vue&#xff08;后端管理web&#xff09;&#xff0c;uniapp等测试环…