大数据SQL调优专题——Spark执行原理

引入

在深入MapReduce中有提到,MapReduce虽然通过“分而治之”的思想,解决了海量数据的计算处理问题,但性能还是不太理想,这体现在两个方面:

  1. 每个任务都有比较大的overhead,都需要预先把程序复制到各个 worker 节点,然后启动进程;
  2. 所有的中间数据都要读写多次硬盘。map 的输出结果要写到硬盘上,reduce抓取数据排序合并之后,也要先写到本地硬盘上再进行读取,所以快不起来。

除此之外,MapReduce还有以下的问题:

  1. 算子不够丰富,仅有Map和Reduce,复杂算子的实现极为烦琐;
  2. MapReduce在处理迭代计算、实时查询和交互式数据挖掘任务时效率较低,因为每次迭代都需要将数据写入磁盘,导致大量的I/O开销;
  3. 无法支持血缘或上下游依赖的概念,失败重试只能从头开始,变相地无法实现迭代计算。

针对以上的缺陷,不同的计算引擎采取了不同的优化策略。例如Tez简化了MapReduce过程,支持DAG(Directed Acyclic Graph,有向无环图)​,细化MapReduce环节并灵活组合。Impala则专注于单节点纯内存计算。而Spark依托DAG Lineage、纯内存计算、RDD(分布式弹性数据集)等特性,以及与Hadoop生态极佳的兼容性,支持例如图计算、机器学习、流(Micro-Batch)计算等多样化的功能或场景,在一系列大数据引擎中脱颖而出,成为当今最主流的计算引擎之一。

Spark最初由加州大学伯克利分校的AMPLab开发,后来成为Apache软件基金会的顶级项目。

Spark的核心概念

下面通过Spark的一些核心概念,去进一步了解它。

RDD(弹性分布式数据集)

定义

RDD(Resilient Distributed Dataset)是Spark的核心抽象,是一个不可变的、分布式的数据集合,可以并行操作。RDD可以通过在存储系统中的文件或已有的Scala、Java、Python集合以及通过其他RDD的转换操作来创建。

特性

  • 不可变性:RDD一旦创建,其数据就不能被修改。

  • 分区:RDD的数据被划分为多个分区,每个分区可以独立计算。

  • 依赖关系:RDD之间通过转换操作形成依赖关系,这些依赖关系构成了DAG(有向无环图)

DAG(有向无环图)

定义

DAG是Spark中用于表示RDD之间依赖关系的有向无环图。DAG调度器负责将DAG分解成多个阶段(stages),每个阶段是一系列并行的任务。

工作原理

  1. 操作解析:当代码被提交到Spark的解释器时,系统会生成一个操作图,称为Operator Graph,记录了所有的Transformation操作及其依赖关系。

  2. DAG调度器:当一个Action操作(如collect或save)被触发时,DAG调度器会将Operator Graph分解为多个Stage(阶段)。窄依赖(Narrow Dependency)的操作如map和filter不需要数据重新分区,属于同一阶段;宽依赖(Wide Dependency)的操作如reduceByKey需要数据Shuffle,不同阶段之间以宽依赖为界。

  3. 任务调度器:每个Stage会被拆分为多个基于数据分区的Task。Task调度器将这些Task分发到集群的Worker节点上执行。

  4. 执行与结果:每个Worker节点执行分配的Task,并将结果返回给Driver程序。DAG确保各个阶段按依赖顺序执行,并通过内存优化中间结果存储,最大限度减少I/O和通信开销。

作用

  • 任务依赖分析:DAG调度器通过分析RDD之间的依赖关系,决定任务的执行顺序。

  • 内存计算优化:通过减少Shuffle和磁盘读写,DAG提高了计算效率。

  • 全局优化:DAG确保每个Stage都包含最少的任务,避免重复计算。

Shuffle机制

定义

Shuffle是Spark中的一种数据重新分区操作,通常在宽依赖(Wide Dependency)的操作中发生,如reduceByKey、groupByKey等。

工作原理

  1. 分区:Shuffle操作会将数据重新分区,通常会根据键(key)进行分区。

  2. 数据传输:数据从一个节点传输到另一个节点,以确保相同键的数据位于同一个节点上。

  3. 排序和分组:在目标节点上,数据会被排序和分组,以便进行后续的聚合操作。

优化策略

  • 减少数据传输:通过数据本地性优化,尽量减少数据在节点之间的传输。

  • 压缩:在Shuffle过程中,可以启用数据压缩,减少网络传输的开销。

  • 缓存:在Shuffle之前,可以将数据缓存到内存中,减少重复计算。

数据缓存机制

定义

数据缓存机制是Spark中用于提高数据处理效率的一种机制,通过将数据缓存到内存中,减少重复计算的开销。

实现方式

  • cache()cache()方法是persist()的简化版,其底层实现直接调用persist(StorageLevel.MEMORY_AND_DISK),默认将数据存储在内存中,如果内存不足,则溢写到磁盘。

  • persist()persist()方法允许用户选择存储级别StorageLevel,如MEMORY_ONLYMEMORY_AND_DISKDISK_ONLY等。

作用

  • 加速重复计算:通过缓存数据,避免重复计算DAG中的父节点。

  • 灵活的存储策略persist()方法提供了更灵活的存储策略,适应内存、磁盘等不同环境。

适用场景

  • 数据需要被多次使用:适用于数据需要被多次使用,但不需要跨作业的容错能力的场景。

  • 计算代价大:适用于计算代价大,但内存能够容纳数据的场景。

错误容忍机制

RDD的DAG Lineage(血缘)

指创建RDD所依赖的转换操作序列。当某个RDD的分区数据丢失时,Spark可以通过Lineage信息重新计算该分区的数据。

RDD的 DAG Lineage 主要用于描述数据从源到目标的转换过程,包括数据的流动、处理、转换等各个步骤。DAG Lineage能够清晰地展示数据的来源、去向以及数据在不同阶段的变化,帮助用户了解数据的全生命周期。

Checkpoint(检查点)

通过将RDD的状态保存到可靠的存储系统(如HDFS、S3等),以支持容错和优化长计算链。当Spark应用程序出现故障时,可以从检查点恢复状态。

故障恢复

  • 节点故障:当Worker节点故障时,Spark会利用RDD的血统信息重新计算丢失的数据分区。如果设置了检查点,Spark会从检查点位置开始重新执行,减少计算开销。
  • 驱动节点故障:如果驱动节点故障,Spark会通过Apache Mesos等集群管理器重新启动驱动节点,并恢复执行状态。

内存管理机制

内存模型

执行内存(Execution Memory):主要用于存储任务执行过程中的临时数据,如Shuffle的中间结果等。这部分内存主要用于任务的执行期间,任务完成后会被释放。

存储内存(Storage Memory):用于缓存中间结果(RDD)和DataFrame/DataSet的持久化数据。这部分内存是为了加速重复计算而存在的,数据可以被多次复用。

内存分配

内存分配比例:内存分配比例可以通过配置项spark.executor.memory来设置总的内存大小,并通过spark.storage.memoryFraction来指定存储内存所占的比例,默认为0.6。这意味着默认情况下,Executor的60%的内存用于存储,剩余的40%用于执行。

内存回收

LRU缓存淘汰策略:Spark采用LRU(Least Recently Used)缓存淘汰策略来管理存储内存中的数据。当存储内存不足时,Spark会根据LRU算法淘汰最近最少使用的数据。

Spill to Disk:当执行内存不足时,Spark会将一部分数据溢写到磁盘,以释放内存空间。例如,在Shuffle操作期间,如果内存不足以存放所有中间结果,Spark会将部分数据写入磁盘。

动态内存管理

动态调整内存分配:在Spark 2.x版本之后,引入了更先进的内存管理机制,支持动态调整执行内存和存储内存之间的比例。这意味着在运行时,Spark可以根据实际内存使用情况动态调整内存分配,从而更好地利用资源。

内存配置

  • spark.executor.memory:设置Executor的总内存大小。
  • spark.storage.memoryFraction:设置存储内存所占的比例。
  • spark.shuffle.spill.compress:是否启用Shuffle数据的压缩。
  • spark.serializer:设置序列化库,默认为org.apache.spark.serializer.KryoSerializer。
  • spark.kryoserializer.buffer.max:设置Kryo序列化器的最大缓冲区大小。

Spark的执行原理

由于本专栏的重点是SQL,所以我们主要看Spark SQL的执行过程。相比于Hive的源码,Spark就贴心很多了,提供了org.apache.spark.sql.execution.QueryExecution类,这个类是Spark SQL查询执行的核心,它封装了从SQL解析到最终执行的整个过程,为开发者提供了丰富的接口来理解和调试查询执行的整个过程。

QueryExecution源码注释如下:

The primary workflow for executing relational queries using Spark.  Designed to allow easy access to the intermediate phases of query execution for developers.
While this is not a public class, we should avoid changing the function names for the sake of changing them, because a lot of developers use the feature for debugging.

翻译:

使用 Spark 执行关系查询的主要工作流程。设计目的是让开发者能够轻松访问查询执行的中间阶段。
虽然这不是一个公共类,但我们应尽可能避免改变函数名称,因为许多开发者使用此功能进行调试。

QueryExecution类的2.x源码如下:

/*** QueryExecution类负责执行关系查询的主要工作流程。* 它设计用于让开发人员可以轻松访问查询执行的中间阶段。** @param sparkSession 用于执行查询的SparkSession* @param logical 要执行的逻辑计划*/
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {// TODO: 将planner和optimizer从SessionState移动到这里/** 获取查询计划器 */protected def planner: SparkPlanner = sparkSession.sessionState.plannerdef assertAnalyzed(): Unit = analyzeddef assertSupported(): Unit = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}/** 懒加载已分析的逻辑计划 */// SQL解析lazy val analyzed: LogicalPlan = {SparkSession.setActiveSession(sparkSession)sparkSession.sessionState.analyzer.executeAndCheck(logical)}/** 懒加载带缓存数据的逻辑计划 */lazy val withCachedData: LogicalPlan = {assertAnalyzed()assertSupported()sparkSession.sharedState.cacheManager.useCachedData(analyzed)}/** 懒加载优化后的逻辑计划 */// 逻辑优化处理lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)/** 懒加载Spark物理计划 */// 将逻辑计划转换成物理计划// 逻辑计划是不区分引擎的,而这里的物理计划(SparkPlan)是面向Spark执行的lazy val sparkPlan: SparkPlan = {SparkSession.setActiveSession(sparkSession)// TODO: 目前我们使用next(),即取planner返回的第一个计划,//       但我们将来会实现选择最佳计划的功能。planner.plan(ReturnAnswer(optimizedPlan)).next()}/** * executedPlan 不应用于初始化任何 SparkPlan。* 它仅应用于执行。*/// 预提交阶段lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)/** RDD的内部版本。避免复制且没有schema */// 最终提交阶段lazy val toRdd: RDD[InternalRow] = {if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) {new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)} else {executedPlan.execute()}}/*** 为执行准备计划好的SparkPlan。* 根据需要插入shuffle操作和内部行格式转换。** @param plan 要准备执行的SparkPlan* @return 准备好执行的SparkPlan*/protected def prepareForExecution(plan: SparkPlan): SparkPlan = {preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }}/** * 在执行前应用于物理计划的规则序列。* 这些规则将按顺序应用。** @return 要应用的规则序列*/protected def preparations: Seq[Rule[SparkPlan]] = Seq(PlanSubqueries(sparkSession),EnsureRequirements(sparkSession.sessionState.conf),CollapseCodegenStages(sparkSession.sessionState.conf),ReuseExchange(sparkSession.sessionState.conf),ReuseSubquery(sparkSession.sessionState.conf))/*** 尝试执行给定的操作并返回结果字符串。* 如果发生AnalysisException,则返回异常的字符串表示。** @param f 要执行的操作* @return 操作结果的字符串表示或异常信息*/protected def stringOrError[A](f: => A): String =try f.toString catch { case e: AnalysisException => e.toString }/*** 以Hive兼容的字符串序列形式返回结果。* 这在测试和CLI应用程序的SparkSQLDriver中使用。** @return Hive兼容的结果字符串序列*/def hiveResultString(): Seq[String] = executedPlan match {case ExecutedCommandExec(desc: DescribeTableCommand) =>// 如果是Hive表的describe命令,我们希望输出格式与Hive相似desc.run(sparkSession).map {case Row(name: String, dataType: String, comment) =>Seq(name, dataType,Option(comment.asInstanceOf[String]).getOrElse("")).map(s => String.format("%-20s", s)).mkString("\t")}// Hive中的SHOW TABLES只输出表名,而我们的输出包括数据库、表名和是否为临时表case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>command.executeCollect().map(_.getString(1))case other =>val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq// 我们需要类型信息以输出结构字段名val types = analyzed.output.map(_.dataType)// 重新格式化以匹配Hive的制表符分隔输出result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))}/*** 根据给定的数据类型格式化数据,并返回其字符串表示。** @param a 包含数据和其对应DataType的元组* @return 格式化后的字符串表示*/private def toHiveString(a: (Any, DataType)): String = {val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType,BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)/*** 格式化BigDecimal,去除尾随的零。** @param d 要格式化的BigDecimal* @return 格式化后的字符串*/def formatDecimal(d: java.math.BigDecimal): String = {if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {java.math.BigDecimal.ZERO.toPlainString} else {d.stripTrailingZeros().toPlainString}}/*** Hive输出struct字段的方式与最外层属性略有不同。* 这个方法处理struct、数组和map类型的特殊格式化。** @param a 包含数据和其对应DataType的元组* @return 格式化后的字符串*/def toHiveStructString(a: (Any, DataType)): String = a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "null"case (s: String, StringType) => "\"" + s + "\""case (decimal, DecimalType()) => decimal.toStringcase (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes contains tpe => other.toString}// 主要的格式化逻辑a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "NULL"case (d: Date, DateType) =>DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))case (t: Timestamp, TimestampType) =>DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)case (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes.contains(tpe) => other.toString}}/*** 返回执行计划的简单字符串表示。** @return 包含物理计划树的字符串*/def simpleString: String = withRedaction {s"""== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = false))}""".stripMargin.trim}/*** 返回查询执行的详细字符串表示。** @return 包含解析的逻辑计划、分析的逻辑计划、优化的逻辑计划和物理计划的字符串*/override def toString: String = withRedaction {def output = Utils.truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")val analyzedPlan = Seq(stringOrError(output),stringOrError(analyzed.treeString(verbose = true))).filter(_.nonEmpty).mkString("\n")s"""== Parsed Logical Plan ==|${stringOrError(logical.treeString(verbose = true))}|== Analyzed Logical Plan ==|$analyzedPlan|== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 返回带有统计信息的查询执行字符串表示。** @return 包含优化的逻辑计划和物理计划(带统计信息)的字符串*/def stringWithStats: String = withRedaction {// 触发计算逻辑计划的统计信息optimizedPlan.stats// 只显示优化的逻辑计划和物理计划s"""== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 对给定的字符串中的敏感信息进行编辑。** @param message 要编辑的原始消息* @return 编辑后的消息*/private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}/** * 一个特殊的命名空间,包含可用于调试查询执行的命令。*/// scalastyle:offobject debug {// scalastyle:on/*** 将此计划中找到的所有生成的代码打印到标准输出。* 即打印每个WholeStageCodegen子树的输出。*/def codegen(): Unit = {// scalastyle:off printlnprintln(org.apache.spark.sql.execution.debug.codegenString(executedPlan))// scalastyle:on println}/*** 获取查询计划中的WholeStageCodegenExec子树及其生成的代码。** @return WholeStageCodegen子树及其对应生成代码的序列*/def codegenToSeq(): Seq[(String, String)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}}
}

可以看到这个类确实是封装了SparkSQL查询执行的整个过程,从逻辑计划到物理计划,再到最终的执行。

我们简单梳理一下其主要的执行阶段如下:

  1. analyzed: 对逻辑计划进行分析
  2. withCachedData: 使用缓存数据
  3. optimizedPlan: 优化逻辑计划
  4. sparkPlan: 生成物理计划
  5. executedPlan: 准备提交执行物理计划
  6. toRdd: 最终执行,将物理计划转换为RDD

而QueryExecution类的3.x源码如下:


/*** QueryExecution 类代表了使用 Spark 执行关系查询的主要工作流程。* * 该类设计用于让开发者能够轻松访问查询执行的中间阶段,便于调试。** @param sparkSession 当前的 SparkSession* @param logical 要执行的逻辑计划* @param tracker 用于跟踪查询计划的 QueryPlanningTracker* @param mode 命令执行模式*/
class QueryExecution(val sparkSession: SparkSession,         // 当前的 Spark 会话,包含 Spark 的运行时环境val logical: LogicalPlan,             // 查询的逻辑计划,表示对数据的高级操作val tracker: QueryPlanningTracker = new QueryPlanningTracker, // 跟踪查询计划的执行阶段和时间val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL // 指定命令执行的模式
) extends Logging {val id: Long = QueryExecution.nextExecutionId // 为每个查询执行分配唯一 ID// 暂未使用,计划将 planner 和 optimizer 移至此处protected def planner = sparkSession.sessionState.planner// 确保逻辑计划已被分析def assertAnalyzed() = analyzed// 确保查询操作受支持def assertSupported() = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}// 延迟计算,确保逻辑计划已被分析lazy val analyzed: LogicalPlan = {// 执行分析阶段val plan = executePhase(QueryPlanningTracker.ANALYSIS) {sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)}tracker.setAnalyzed(plan)plan}// 根据执行模式决定是否立即执行命令lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}// 定义命令执行名称private def commandExecutionName(command: Command): String = command match {case _: CreateTableAsSelect => "create"case _: ReplaceTableAsSelect => "replace"case _: AppendData => "append"case _: OverwriteByExpression => "overwrite"case _: OverwritePartitionsDynamic => "overwritePartitions"case _ => "command"}// 立即执行命令private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// 设置查询计划追踪器,标记查询已准备好执行tracker.setReadyForExecution()// 执行命令计划val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)// 执行命令计划,获取结果val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {qe.executedPlan.executeCollect()}// 创建命令结果CommandResult(qe.analyzed.output,qe.commandExecuted,qe.executedPlan,result)case other => other // 其他情况保持不变}// 对查询计划进行归一化lazy val normalized: LogicalPlan = {val normalizationRules = sparkSession.sessionState.planNormalizationRulesif (normalizationRules.isEmpty) {commandExecuted} else {val planChangeLogger = new PlanChangeLogger[LogicalPlan]()// 应用归一化规则val normalized = normalizationRules.foldLeft(commandExecuted) { (p, rule) =>val result = rule.apply(p)planChangeLogger.logRule(rule.ruleName, p, result)result}// 记录归一化过程planChangeLogger.logBatch("Plan Normalization", commandExecuted, normalized)normalized}}lazy val withCachedData: LogicalPlan = sparkSession.withActive {assertAnalyzed()assertSupported()// 克隆计划,避免状态共享sparkSession.sharedState.cacheManager.useCachedData(normalized.clone())}// 确保命令已执行def assertCommandExecuted(): Unit = commandExecuted// 获得优化后的逻辑计划lazy val optimizedPlan: LogicalPlan = {assertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// 克隆计划,避免状态共享val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// 标记计划为已分析plan.setAnalyzed()plan}}// 确保计划已优化def assertOptimized(): Unit = optimizedPlan// 获得物理执行计划lazy val sparkPlan: SparkPlan = {assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// 创建 Spark 计划QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())}}// 确保物理计划已准备好执行def assertSparkPlanPrepared(): Unit = sparkPlan// 获得实际执行计划lazy val executedPlan: SparkPlan = {assertOptimized()val plan = executePhase(QueryPlanningTracker.PLANNING) {// 准备执行计划QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}// 标记查询已准备好执行tracker.setReadyForExecution()plan}// 确保实际执行计划已准备好执行def assertExecutedPlanPrepared(): Unit = executedPlan// 获取 RDD 内部版本lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)// 获得查询计划的指标def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)protected def preparations: Seq[Rule[SparkPlan]] = {QueryExecution.preparations(sparkSession,Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)}// 执行查询阶段protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {tracker.measurePhase(phase)(block)}}// 获取查询计划的简单字符串表示def simpleString: String = {val concat = new PlanStringConcat()simpleString(false, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查询计划的简单字符串表示private def simpleString(formatted: Boolean,maxFields: Int,append: String => Unit): Unit = {append("== Physical Plan ==\n")if (formatted) {try {ExplainUtils.processPlan(executedPlan, append)} catch {case e: AnalysisException => append(e.toString)case e: IllegalArgumentException => append(e.toString)}} else {QueryPlan.append(executedPlan,append, verbose = false, addSuffix = false, maxFields = maxFields)}append("\n")}// 获取查询计划的解释字符串def explainString(mode: ExplainMode): String = {val concat = new PlanStringConcat()explainString(mode, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 解释查询计划,生成字符串表示private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = {val queryExecution = if (logical.isStreaming) {new IncrementalExecution(sparkSession, logical, OutputMode.Append(), "<unknown>",UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0),WatermarkPropagator.noop())} else {this}mode match {case SimpleMode =>queryExecution.simpleString(false, maxFields, append)case ExtendedMode =>queryExecution.toString(maxFields, append)case CodegenMode =>try {org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)} catch {case e: AnalysisException => append(e.toString)}case CostMode =>queryExecution.stringWithStats(maxFields, append)case FormattedMode =>queryExecution.simpleString(formatted = true, maxFields = maxFields, append)}}// 写入各个阶段的计划private def writePlans(append: String => Unit, maxFields: Int): Unit = {val (verbose, addSuffix) = (true, false)append("== Parsed Logical Plan ==\n")QueryPlan.append(logical, append, verbose, addSuffix, maxFields)append("\n== Analyzed Logical Plan ==\n")try {if (analyzed.output.nonEmpty) {append(truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))append("\n")}QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)append("\n== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)} catch {case e: AnalysisException => append(e.toString)}}// 重写 toString 方法override def toString: String = withRedaction {val concat = new PlanStringConcat()toString(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查询计划的字符串表示private def toString(maxFields: Int, append: String => Unit): Unit = {writePlans(append, maxFields)}// 获取包含统计信息的计划字符串def stringWithStats: String = {val concat = new PlanStringConcat()stringWithStats(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成包含统计信息的计划字符串private def stringWithStats(maxFields: Int, append: String => Unit): Unit = {try {// 触发逻辑计划的统计计算optimizedPlan.collectWithSubqueries {case plan => plan.stats}} catch {case e: AnalysisException => append(e.toString + "\n")}// 只显示优化后的逻辑计划和物理计划append("== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields)append("\n")}// 内部敏感信息的脱敏处理private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}// 用于调试查询执行的特殊命名空间object debug {// 打印生成的代码def codegen(): Unit = {println(org.apache.spark.sql.execution.debug.codegenString(executedPlan))}// 获取生成的代码和统计信息def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}// 将调试信息写入文件def toFile(path: String,maxFields: Int = Int.MaxValue,explainMode: Option[String] = None): Unit = {val filePath = new Path(path)val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))try {val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode)explainString(mode, maxFields, writer.write)if (mode != CodegenMode) {writer.write("\n== Whole Stage Codegen ==\n")org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)}log.info(s"Debug information was written at: $filePath")} finally {writer.close()}}}
}
/*** SPARK-35378: 命令应立即执行,以便像 `sql("INSERT ...")` 这样的查询可以立即触发表插入,无需调用 `.collect()`。* 为了避免无穷递归,我们应在递归执行命令时使用 `NON_ROOT`。此外,我们不能执行带有命令叶节点的查询计划,* 因为许多命令返回 `GenericInternalRow` 并不能直接放入查询计划中,否则查询引擎可能会尝试将 `GenericInternalRow`* 转换为 `UnsafeRow` 并失败。当运行 EXPLAIN 或命令包含在其他命令中时,应使用 `SKIP` 来避免立即触发命令执行。*/
object CommandExecutionMode extends Enumeration {val SKIP, NON_ROOT, ALL = Value // 定义不同模式的值
}
object QueryExecution {private val _nextExecutionId = new AtomicLong(0) // 原子操作,确保线程安全private def nextExecutionId: Long = _nextExecutionId.getAndIncrement // 获取下一个查询执行的 ID// 构建用于准备执行的规则序列private[execution] def preparations(sparkSession: SparkSession,adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,subquery: Boolean): Seq[Rule[SparkPlan]] = {adaptiveExecutionRule.toSeq ++Seq(CoalesceBucketsInJoin,PlanDynamicPruningFilters(sparkSession),PlanSubqueries(sparkSession),RemoveRedundantProjects,EnsureRequirements(),ReplaceHashWithSortAgg,RemoveRedundantSorts,RemoveRedundantWindowGroupLimits,DisableUnnecessaryBucketedScan,ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules, outputsColumnar = false),CollapseCodegenStages()) ++(if (subquery) Nil else Seq(ReuseExchangeAndSubquery))}// 准备 SparkPlan 以供执行private[execution] def prepareForExecution(preparations: Seq[Rule[SparkPlan]],plan: SparkPlan): SparkPlan = {val planChangeLogger = new PlanChangeLogger[SparkPlan]()val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>val result = rule.apply(sp)planChangeLogger.logRule(rule.ruleName, sp, result)result}planChangeLogger.logBatch("Preparations", plan, preparedPlan)preparedPlan}// 将逻辑计划转换为 Spark 计划def createSparkPlan(sparkSession: SparkSession,planner: SparkPlanner,plan: LogicalPlan): SparkPlan = {planner.plan(ReturnAnswer(plan)).next()}// 准备 SparkPlan 以供执行def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {prepareForExecution(preparations(spark, subquery = true), plan)}// 准备子查询的执行计划def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())prepareExecutedPlan(spark, sparkPlan)}// 使用自适应执行上下文准备执行计划def prepareExecutedPlan(session: SparkSession,plan: LogicalPlan,context: AdaptiveExecutionContext): SparkPlan = {val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone())val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)prepareForExecution(preparationRules, sparkPlan.clone())}// 将断言和空指针异常转换为内部错误private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) =>SparkException.internalError(msg + " You hit a bug in Spark or the Spark plugins you use. Please, report this bug " +"to the corresponding communities or vendors, and provide the full stack trace.",e)case e: Throwable =>e}// 捕获断言和空指针异常,并将其转换为内部错误private[sql] def withInternalError[T](msg: String)(block: => T): T = {try {block} catch {case e: Throwable => throw toInternalError(msg, e)}}
}

可以看到Spark 3.x实现的整个执行流程是和2.x区别不大的,主要是引入了两个新参数如下:

  • tracker: 用于跟踪查询计划过程的工具
  • mode: 命令执行模式

引入 trackermode 参数的主要目的是为了增强查询执行的灵活性和可追踪性。tracker 参数使得对查询计划的执行过程可以进行更细粒度的监控,而 mode 参数则提供了更灵活的查询执行模式,使得 QueryExecution 类能够根据不同的需求进行不同的操作。同时通过CommandExecutionMode 枚举定义了命令执行的不同模式,来够灵活地控制命令的执行方式。这些改进的目的都是为了提高 Spark SQL 查询的性能和可调试性。

总结

本文介绍了Spark,并通过源码梳理了Spark SQL的执行原理,其核心思路也是和我们在引入篇以及Hive执行原理,提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)是一致的。

Spark SQL的具体执行过程主要可以分为以下几个步骤:

  1. 输入SQL语句经过Antlr4解析,生成未解决的逻辑计划;
  2. 绑定分析器,例如函数适配、通过Catalog获取字段等,生成已解决的逻辑计划;
  3. 优化器对已解决的逻辑计划进行优化,基于CBO和RBO转换,生成优化后的逻辑计划;
  4. 将优化后的逻辑计划转换为多个可被识别或执行的物理计划;
  5. 基于CBO在多个物理计划中,选择执行开销最小的物理计划;
  6. 转为具体的RDDs执行。

感兴趣的小伙伴可以深入源码去探索一下具体的解析和优化实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/19888.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AI实践】deepseek支持升级git

当前Windows 11 WSL的git是2.17&#xff0c;Android Studio提示需要升级到2.19版本 网上找到指导文章 安装git 2.19.2 cd /usr/src wget https://www.kernel.org/pub/software/scm/git/git-2.19.2.tar.gz tar xzf git-2.19.2.tar.gz cd git-2.19.2 make prefix/usr/l…

从零复现R1之路[3/3]:一文速览Open R1——对DeepSeek R1训练流程前两个阶段的复现(SFT和GRPO训练)

前言 根据R1的GitHub可知 类别开源内容未开源内容模型权重R1、R1-Zero 及蒸馏模型权重&#xff08;MIT 协议&#xff09;原始训练数据 未公开冷启动数据、RL 训练数据集或合成数据的具体内容&#xff0c;仅提供依赖的公开数据集名称&#xff08;如 AI-MO、NuminaMath-TIR&…

大语言模型简史:从Transformer(2017)到DeepSeek-R1(2025)的进化之路

2025年初&#xff0c;中国推出了具有开创性且高性价比的「大型语言模型」&#xff08;Large Language Model — LLM&#xff09;DeepSeek-R1&#xff0c;引发了AI的巨大变革。本文回顾了LLM的发展历程&#xff0c;起点是2017年革命性的Transformer架构&#xff0c;该架构通过「…

在线考试系统(代码+数据库+LW)

摘 要 使用旧方法对在线考试系统的信息进行系统化管理已经不再让人们信赖了&#xff0c;把现在的网络信息技术运用在在线考试系统的管理上面可以解决许多信息管理上面的难题&#xff0c;比如处理数据时间很长&#xff0c;数据存在错误不能及时纠正等问题。这次开发的在线考试…

2025百度快排技术分析:模拟点击与发包算法的背后原理

一晃做SEO已经15年了&#xff0c;2025年还有人问我如何做百度快速排名&#xff0c;我能给出的答案就是&#xff1a;做好内容的前提下&#xff0c;多刷刷吧&#xff01;百度的SEO排名算法一直是众多SEO从业者研究的重点&#xff0c;模拟算法、点击算法和发包算法是百度快速排名的…

【Spring+MyBatis】留言墙的实现

目录 1. 添加依赖 2. 配置数据库 2.1 创建数据库与数据表 2.2 创建与数据库对应的实体类 3. 后端代码 3.1 目录结构 3.2 MessageController类 3.3 MessageService类 3.4 MessageMapper接口 4. 前端代码 5. 单元测试 5.1 后端接口测试 5.2 使用前端页面测试 在Spri…

EtherNet/IP转Modbus TCP:新能源风电监控与分析实用案例

EtherNet/IP转Modbus TCP&#xff1a;新能源风电监控与分析实用案例 一、案例背景 在某新能源汽车电池生产线上&#xff0c;需要将采用EtherNet/IP协议的电池检测设备与采用ProfiNet协议的生产线控制系统进行集成&#xff0c;以实现对电池生产过程的全面监控和数据采集。 二、…

管理WSL实例 以及安装 Ubuntu 作为 WSL 子系统 流程

安装ubuntu wsl --install -d Ubuntu分类命令说明安装相关wsl --install在 Windows 10/11 上以管理员身份在 PowerShell 中运行此命令&#xff0c;可安装 WSLwsl --install -d <distribution name>在 PowerShell 中使用此命令安装特定版本的 Linux 发行版&#xff0c;如…

Spring框架中都用到了哪些设计模式?

大家好&#xff0c;我是锋哥。今天分享关于【Spring框架中都用到了哪些设计模式&#xff1f;】面试题。希望对大家有帮助&#xff1b; Spring框架中都用到了哪些设计模式&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Spring框架中使用了大量的设计模…

最新VS code配置C/C++环境(tasks.json, launch.json,c_cpp_properties.json)及运行多个文件、配置Cmake

目录 一、VScode配置C/C环境&#xff0c;需设置tasks.json, launch.json文件 二、安装C/C扩展&#xff0c;配置tasks.json、launch.json、c_cpp_properties.json文件 (1)安装c/c扩展 (2)配置tasks.json文件 (3)配置launch.json文件 (4)配置中的参数(属性)说明 (5)运行程序(运行…

Java零基础入门笔记:(3)程序控制

前言 本笔记是学习狂神的java教程&#xff0c;建议配合视频&#xff0c;学习体验更佳。 【狂神说Java】Java零基础学习视频通俗易懂_哔哩哔哩_bilibili Scanner对象 之前我们学的基本语法中我们并没有实现程序和人的交互&#xff0c;但是Java给我们提供了这样一个工具类&…

Spring Boot 原理分析

spring-boot.version&#xff1a;2.4.3.RELEASE Spring Boot 依赖管理 spring-boot-starter-parent 配置文件管理 <resources> <resource> <directory>${basedir}/src/main/resources</directory> <filtering>true&l…

Word中接入大模型教程

前言 为什么要在word中接入大模型呢&#xff1f; 个人觉得最大的意义就是不用来回切换与复制粘贴了吧。 今天分享一下昨天实践的在word中接入大模型的教程。 在word中接入大模型最简单的方式就是使用vba。 vba代码要做的事&#xff0c;拆分一下就是&#xff1a; 获取用户…

【原创】vue-element-admin-plus完成编辑页面中嵌套列表功能

前言 vue-element-admin-plus对于复杂业务的支持程度确实不怎么样&#xff0c;我这里就遇到了编辑页面中还要嵌套列表的真实案例&#xff0c;比如字典&#xff0c;主字典嵌套子信息&#xff0c;类似于一个树状结构。目前vue-element-admin-plus给出的例子是无法满足这个需求的…

OpenCV中的边缘检测

边缘检测是图像处理和计算机视觉中的关键技术之一&#xff0c;旨在识别图像中像素强度发生显著变化的区域&#xff0c;这些区域通常对应于物体的边界或轮廓。边缘检测在机器视觉中具有重要的需求背景&#xff0c;主要体现在以下几个方面&#xff1a; 图像分割&#xff1a;边缘…

vscode的一些实用操作

1. 焦点切换(比如主要用到使用快捷键在编辑区和终端区进行切换操作) 2. 跳转行号 使用ctrl g,然后输入指定的文件内容&#xff0c;即可跳转到相应位置。 使用ctrl p,然后输入指定的行号&#xff0c;回车即可跳转到相应行号位置。

Redis(高阶篇)02章——BigKey

一、面试题 阿里广告平台&#xff0c;海量数据里查询某一个固定前缀的key小红书&#xff0c;你如何生产上限制 keys* /flushdb/flushall等危险命令以防止阻塞或误删数据&#xff1f;美团&#xff0c;memory usage命令你用过吗&#xff1f;BigKey问题&#xff0c;多大算big&…

《Zookeeper 分布式过程协同技术详解》读书笔记-2

目录 zk的一些内部原理和应用请求&#xff0c;事务和标识读写操作事务标识&#xff08;zxid&#xff09; 群首选举Zab协议&#xff08;ZooKeeper Atomic Broadcast protocol&#xff09;文件系统和监听通知机制分布式配置中心, 简单Demojava code 集群管理code 分布式锁 zk的一…

53倍性能提升!TiDB 全局索引如何优化分区表查询?

作者&#xff1a; Defined2014 原文来源&#xff1a; https://tidb.net/blog/7077577f 什么是 TiDB 全局索引 在 TiDB 中&#xff0c;全局索引是一种定义在分区表上的索引类型&#xff0c;它允许索引分区与表分区之间建立一对多的映射关系&#xff0c;即一个索引分区可以对…

unity学习39:连续动作之间的切换,用按键控制角色的移动

目录 1 不同状态之间的切换模式 1.1 在1个连续状态和一个连续状态之间的transition&#xff0c;使用trigger 1.2 在2个连续状态之间的转换&#xff0c;使用bool值切换转换 2 至少现在有2种角色的移动控制方式 2.1 用CharacterController 控制角色的移动 2.2 用animator…