引入
在深入MapReduce中有提到,MapReduce虽然通过“分而治之”的思想,解决了海量数据的计算处理问题,但性能还是不太理想,这体现在两个方面:
- 每个任务都有比较大的overhead,都需要预先把程序复制到各个 worker 节点,然后启动进程;
- 所有的中间数据都要读写多次硬盘。map 的输出结果要写到硬盘上,reduce抓取数据排序合并之后,也要先写到本地硬盘上再进行读取,所以快不起来。
除此之外,MapReduce还有以下的问题:
- 算子不够丰富,仅有Map和Reduce,复杂算子的实现极为烦琐;
- MapReduce在处理迭代计算、实时查询和交互式数据挖掘任务时效率较低,因为每次迭代都需要将数据写入磁盘,导致大量的I/O开销;
- 无法支持血缘或上下游依赖的概念,失败重试只能从头开始,变相地无法实现迭代计算。
针对以上的缺陷,不同的计算引擎采取了不同的优化策略。例如Tez简化了MapReduce过程,支持DAG(Directed Acyclic Graph,有向无环图),细化MapReduce环节并灵活组合。Impala则专注于单节点纯内存计算。而Spark依托DAG Lineage、纯内存计算、RDD(分布式弹性数据集)等特性,以及与Hadoop生态极佳的兼容性,支持例如图计算、机器学习、流(Micro-Batch)计算等多样化的功能或场景,在一系列大数据引擎中脱颖而出,成为当今最主流的计算引擎之一。
Spark最初由加州大学伯克利分校的AMPLab开发,后来成为Apache软件基金会的顶级项目。
Spark的核心概念
下面通过Spark的一些核心概念,去进一步了解它。
RDD(弹性分布式数据集)
定义
RDD(Resilient Distributed Dataset)是Spark的核心抽象,是一个不可变的、分布式的数据集合,可以并行操作。RDD可以通过在存储系统中的文件或已有的Scala、Java、Python集合以及通过其他RDD的转换操作来创建。
特性
-
不可变性:RDD一旦创建,其数据就不能被修改。
-
分区:RDD的数据被划分为多个分区,每个分区可以独立计算。
-
依赖关系:RDD之间通过转换操作形成依赖关系,这些依赖关系构成了DAG(有向无环图)。
DAG(有向无环图)
定义
DAG是Spark中用于表示RDD之间依赖关系的有向无环图。DAG调度器负责将DAG分解成多个阶段(stages),每个阶段是一系列并行的任务。
工作原理
-
操作解析:当代码被提交到Spark的解释器时,系统会生成一个操作图,称为Operator Graph,记录了所有的Transformation操作及其依赖关系。
-
DAG调度器:当一个Action操作(如collect或save)被触发时,DAG调度器会将Operator Graph分解为多个Stage(阶段)。窄依赖(Narrow Dependency)的操作如map和filter不需要数据重新分区,属于同一阶段;宽依赖(Wide Dependency)的操作如reduceByKey需要数据Shuffle,不同阶段之间以宽依赖为界。
-
任务调度器:每个Stage会被拆分为多个基于数据分区的Task。Task调度器将这些Task分发到集群的Worker节点上执行。
-
执行与结果:每个Worker节点执行分配的Task,并将结果返回给Driver程序。DAG确保各个阶段按依赖顺序执行,并通过内存优化中间结果存储,最大限度减少I/O和通信开销。
作用
-
任务依赖分析:DAG调度器通过分析RDD之间的依赖关系,决定任务的执行顺序。
-
内存计算优化:通过减少Shuffle和磁盘读写,DAG提高了计算效率。
-
全局优化:DAG确保每个Stage都包含最少的任务,避免重复计算。
Shuffle机制
定义
Shuffle是Spark中的一种数据重新分区操作,通常在宽依赖(Wide Dependency)的操作中发生,如reduceByKey、groupByKey等。
工作原理
-
分区:Shuffle操作会将数据重新分区,通常会根据键(key)进行分区。
-
数据传输:数据从一个节点传输到另一个节点,以确保相同键的数据位于同一个节点上。
-
排序和分组:在目标节点上,数据会被排序和分组,以便进行后续的聚合操作。
优化策略
-
减少数据传输:通过数据本地性优化,尽量减少数据在节点之间的传输。
-
压缩:在Shuffle过程中,可以启用数据压缩,减少网络传输的开销。
-
缓存:在Shuffle之前,可以将数据缓存到内存中,减少重复计算。
数据缓存机制
定义
数据缓存机制是Spark中用于提高数据处理效率的一种机制,通过将数据缓存到内存中,减少重复计算的开销。
实现方式
-
cache():
cache()
方法是persist()
的简化版,其底层实现直接调用persist(StorageLevel.MEMORY_AND_DISK)
,默认将数据存储在内存中,如果内存不足,则溢写到磁盘。 -
persist():
persist()
方法允许用户选择存储级别(StorageLevel
),如MEMORY_ONLY
、MEMORY_AND_DISK
、DISK_ONLY
等。
作用
-
加速重复计算:通过缓存数据,避免重复计算DAG中的父节点。
-
灵活的存储策略:
persist()
方法提供了更灵活的存储策略,适应内存、磁盘等不同环境。
适用场景
-
数据需要被多次使用:适用于数据需要被多次使用,但不需要跨作业的容错能力的场景。
-
计算代价大:适用于计算代价大,但内存能够容纳数据的场景。
错误容忍机制
RDD的DAG Lineage(血缘)
指创建RDD所依赖的转换操作序列。当某个RDD的分区数据丢失时,Spark可以通过Lineage信息重新计算该分区的数据。
RDD的 DAG Lineage 主要用于描述数据从源到目标的转换过程,包括数据的流动、处理、转换等各个步骤。DAG Lineage能够清晰地展示数据的来源、去向以及数据在不同阶段的变化,帮助用户了解数据的全生命周期。
Checkpoint(检查点)
通过将RDD的状态保存到可靠的存储系统(如HDFS、S3等),以支持容错和优化长计算链。当Spark应用程序出现故障时,可以从检查点恢复状态。
故障恢复
- 节点故障:当Worker节点故障时,Spark会利用RDD的血统信息重新计算丢失的数据分区。如果设置了检查点,Spark会从检查点位置开始重新执行,减少计算开销。
- 驱动节点故障:如果驱动节点故障,Spark会通过Apache Mesos等集群管理器重新启动驱动节点,并恢复执行状态。
内存管理机制
内存模型
执行内存(Execution Memory):主要用于存储任务执行过程中的临时数据,如Shuffle的中间结果等。这部分内存主要用于任务的执行期间,任务完成后会被释放。
存储内存(Storage Memory):用于缓存中间结果(RDD)和DataFrame/DataSet的持久化数据。这部分内存是为了加速重复计算而存在的,数据可以被多次复用。
内存分配
内存分配比例:内存分配比例可以通过配置项spark.executor.memory来设置总的内存大小,并通过spark.storage.memoryFraction来指定存储内存所占的比例,默认为0.6。这意味着默认情况下,Executor的60%的内存用于存储,剩余的40%用于执行。
内存回收
LRU缓存淘汰策略:Spark采用LRU(Least Recently Used)缓存淘汰策略来管理存储内存中的数据。当存储内存不足时,Spark会根据LRU算法淘汰最近最少使用的数据。
Spill to Disk:当执行内存不足时,Spark会将一部分数据溢写到磁盘,以释放内存空间。例如,在Shuffle操作期间,如果内存不足以存放所有中间结果,Spark会将部分数据写入磁盘。
动态内存管理
动态调整内存分配:在Spark 2.x版本之后,引入了更先进的内存管理机制,支持动态调整执行内存和存储内存之间的比例。这意味着在运行时,Spark可以根据实际内存使用情况动态调整内存分配,从而更好地利用资源。
内存配置
- spark.executor.memory:设置Executor的总内存大小。
- spark.storage.memoryFraction:设置存储内存所占的比例。
- spark.shuffle.spill.compress:是否启用Shuffle数据的压缩。
- spark.serializer:设置序列化库,默认为org.apache.spark.serializer.KryoSerializer。
- spark.kryoserializer.buffer.max:设置Kryo序列化器的最大缓冲区大小。
Spark的执行原理
由于本专栏的重点是SQL,所以我们主要看Spark SQL的执行过程。相比于Hive的源码,Spark就贴心很多了,提供了org.apache.spark.sql.execution.QueryExecution类,这个类是Spark SQL查询执行的核心,它封装了从SQL解析到最终执行的整个过程,为开发者提供了丰富的接口来理解和调试查询执行的整个过程。
QueryExecution源码注释如下:
The primary workflow for executing relational queries using Spark. Designed to allow easy access to the intermediate phases of query execution for developers.
While this is not a public class, we should avoid changing the function names for the sake of changing them, because a lot of developers use the feature for debugging.翻译:
使用 Spark 执行关系查询的主要工作流程。设计目的是让开发者能够轻松访问查询执行的中间阶段。
虽然这不是一个公共类,但我们应尽可能避免改变函数名称,因为许多开发者使用此功能进行调试。
QueryExecution类的2.x源码如下:
/*** QueryExecution类负责执行关系查询的主要工作流程。* 它设计用于让开发人员可以轻松访问查询执行的中间阶段。** @param sparkSession 用于执行查询的SparkSession* @param logical 要执行的逻辑计划*/
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {// TODO: 将planner和optimizer从SessionState移动到这里/** 获取查询计划器 */protected def planner: SparkPlanner = sparkSession.sessionState.plannerdef assertAnalyzed(): Unit = analyzeddef assertSupported(): Unit = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}/** 懒加载已分析的逻辑计划 */// SQL解析lazy val analyzed: LogicalPlan = {SparkSession.setActiveSession(sparkSession)sparkSession.sessionState.analyzer.executeAndCheck(logical)}/** 懒加载带缓存数据的逻辑计划 */lazy val withCachedData: LogicalPlan = {assertAnalyzed()assertSupported()sparkSession.sharedState.cacheManager.useCachedData(analyzed)}/** 懒加载优化后的逻辑计划 */// 逻辑优化处理lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)/** 懒加载Spark物理计划 */// 将逻辑计划转换成物理计划// 逻辑计划是不区分引擎的,而这里的物理计划(SparkPlan)是面向Spark执行的lazy val sparkPlan: SparkPlan = {SparkSession.setActiveSession(sparkSession)// TODO: 目前我们使用next(),即取planner返回的第一个计划,// 但我们将来会实现选择最佳计划的功能。planner.plan(ReturnAnswer(optimizedPlan)).next()}/** * executedPlan 不应用于初始化任何 SparkPlan。* 它仅应用于执行。*/// 预提交阶段lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)/** RDD的内部版本。避免复制且没有schema */// 最终提交阶段lazy val toRdd: RDD[InternalRow] = {if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) {new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)} else {executedPlan.execute()}}/*** 为执行准备计划好的SparkPlan。* 根据需要插入shuffle操作和内部行格式转换。** @param plan 要准备执行的SparkPlan* @return 准备好执行的SparkPlan*/protected def prepareForExecution(plan: SparkPlan): SparkPlan = {preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }}/** * 在执行前应用于物理计划的规则序列。* 这些规则将按顺序应用。** @return 要应用的规则序列*/protected def preparations: Seq[Rule[SparkPlan]] = Seq(PlanSubqueries(sparkSession),EnsureRequirements(sparkSession.sessionState.conf),CollapseCodegenStages(sparkSession.sessionState.conf),ReuseExchange(sparkSession.sessionState.conf),ReuseSubquery(sparkSession.sessionState.conf))/*** 尝试执行给定的操作并返回结果字符串。* 如果发生AnalysisException,则返回异常的字符串表示。** @param f 要执行的操作* @return 操作结果的字符串表示或异常信息*/protected def stringOrError[A](f: => A): String =try f.toString catch { case e: AnalysisException => e.toString }/*** 以Hive兼容的字符串序列形式返回结果。* 这在测试和CLI应用程序的SparkSQLDriver中使用。** @return Hive兼容的结果字符串序列*/def hiveResultString(): Seq[String] = executedPlan match {case ExecutedCommandExec(desc: DescribeTableCommand) =>// 如果是Hive表的describe命令,我们希望输出格式与Hive相似desc.run(sparkSession).map {case Row(name: String, dataType: String, comment) =>Seq(name, dataType,Option(comment.asInstanceOf[String]).getOrElse("")).map(s => String.format("%-20s", s)).mkString("\t")}// Hive中的SHOW TABLES只输出表名,而我们的输出包括数据库、表名和是否为临时表case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>command.executeCollect().map(_.getString(1))case other =>val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq// 我们需要类型信息以输出结构字段名val types = analyzed.output.map(_.dataType)// 重新格式化以匹配Hive的制表符分隔输出result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))}/*** 根据给定的数据类型格式化数据,并返回其字符串表示。** @param a 包含数据和其对应DataType的元组* @return 格式化后的字符串表示*/private def toHiveString(a: (Any, DataType)): String = {val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType,BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)/*** 格式化BigDecimal,去除尾随的零。** @param d 要格式化的BigDecimal* @return 格式化后的字符串*/def formatDecimal(d: java.math.BigDecimal): String = {if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {java.math.BigDecimal.ZERO.toPlainString} else {d.stripTrailingZeros().toPlainString}}/*** Hive输出struct字段的方式与最外层属性略有不同。* 这个方法处理struct、数组和map类型的特殊格式化。** @param a 包含数据和其对应DataType的元组* @return 格式化后的字符串*/def toHiveStructString(a: (Any, DataType)): String = a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "null"case (s: String, StringType) => "\"" + s + "\""case (decimal, DecimalType()) => decimal.toStringcase (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes contains tpe => other.toString}// 主要的格式化逻辑a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "NULL"case (d: Date, DateType) =>DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))case (t: Timestamp, TimestampType) =>DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)case (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes.contains(tpe) => other.toString}}/*** 返回执行计划的简单字符串表示。** @return 包含物理计划树的字符串*/def simpleString: String = withRedaction {s"""== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = false))}""".stripMargin.trim}/*** 返回查询执行的详细字符串表示。** @return 包含解析的逻辑计划、分析的逻辑计划、优化的逻辑计划和物理计划的字符串*/override def toString: String = withRedaction {def output = Utils.truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")val analyzedPlan = Seq(stringOrError(output),stringOrError(analyzed.treeString(verbose = true))).filter(_.nonEmpty).mkString("\n")s"""== Parsed Logical Plan ==|${stringOrError(logical.treeString(verbose = true))}|== Analyzed Logical Plan ==|$analyzedPlan|== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 返回带有统计信息的查询执行字符串表示。** @return 包含优化的逻辑计划和物理计划(带统计信息)的字符串*/def stringWithStats: String = withRedaction {// 触发计算逻辑计划的统计信息optimizedPlan.stats// 只显示优化的逻辑计划和物理计划s"""== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 对给定的字符串中的敏感信息进行编辑。** @param message 要编辑的原始消息* @return 编辑后的消息*/private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}/** * 一个特殊的命名空间,包含可用于调试查询执行的命令。*/// scalastyle:offobject debug {// scalastyle:on/*** 将此计划中找到的所有生成的代码打印到标准输出。* 即打印每个WholeStageCodegen子树的输出。*/def codegen(): Unit = {// scalastyle:off printlnprintln(org.apache.spark.sql.execution.debug.codegenString(executedPlan))// scalastyle:on println}/*** 获取查询计划中的WholeStageCodegenExec子树及其生成的代码。** @return WholeStageCodegen子树及其对应生成代码的序列*/def codegenToSeq(): Seq[(String, String)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}}
}
可以看到这个类确实是封装了SparkSQL查询执行的整个过程,从逻辑计划到物理计划,再到最终的执行。
我们简单梳理一下其主要的执行阶段如下:
- analyzed: 对逻辑计划进行分析
- withCachedData: 使用缓存数据
- optimizedPlan: 优化逻辑计划
- sparkPlan: 生成物理计划
- executedPlan: 准备提交执行物理计划
- toRdd: 最终执行,将物理计划转换为RDD
而QueryExecution类的3.x源码如下:
/*** QueryExecution 类代表了使用 Spark 执行关系查询的主要工作流程。* * 该类设计用于让开发者能够轻松访问查询执行的中间阶段,便于调试。** @param sparkSession 当前的 SparkSession* @param logical 要执行的逻辑计划* @param tracker 用于跟踪查询计划的 QueryPlanningTracker* @param mode 命令执行模式*/
class QueryExecution(val sparkSession: SparkSession, // 当前的 Spark 会话,包含 Spark 的运行时环境val logical: LogicalPlan, // 查询的逻辑计划,表示对数据的高级操作val tracker: QueryPlanningTracker = new QueryPlanningTracker, // 跟踪查询计划的执行阶段和时间val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL // 指定命令执行的模式
) extends Logging {val id: Long = QueryExecution.nextExecutionId // 为每个查询执行分配唯一 ID// 暂未使用,计划将 planner 和 optimizer 移至此处protected def planner = sparkSession.sessionState.planner// 确保逻辑计划已被分析def assertAnalyzed() = analyzed// 确保查询操作受支持def assertSupported() = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}// 延迟计算,确保逻辑计划已被分析lazy val analyzed: LogicalPlan = {// 执行分析阶段val plan = executePhase(QueryPlanningTracker.ANALYSIS) {sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)}tracker.setAnalyzed(plan)plan}// 根据执行模式决定是否立即执行命令lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}// 定义命令执行名称private def commandExecutionName(command: Command): String = command match {case _: CreateTableAsSelect => "create"case _: ReplaceTableAsSelect => "replace"case _: AppendData => "append"case _: OverwriteByExpression => "overwrite"case _: OverwritePartitionsDynamic => "overwritePartitions"case _ => "command"}// 立即执行命令private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// 设置查询计划追踪器,标记查询已准备好执行tracker.setReadyForExecution()// 执行命令计划val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)// 执行命令计划,获取结果val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {qe.executedPlan.executeCollect()}// 创建命令结果CommandResult(qe.analyzed.output,qe.commandExecuted,qe.executedPlan,result)case other => other // 其他情况保持不变}// 对查询计划进行归一化lazy val normalized: LogicalPlan = {val normalizationRules = sparkSession.sessionState.planNormalizationRulesif (normalizationRules.isEmpty) {commandExecuted} else {val planChangeLogger = new PlanChangeLogger[LogicalPlan]()// 应用归一化规则val normalized = normalizationRules.foldLeft(commandExecuted) { (p, rule) =>val result = rule.apply(p)planChangeLogger.logRule(rule.ruleName, p, result)result}// 记录归一化过程planChangeLogger.logBatch("Plan Normalization", commandExecuted, normalized)normalized}}lazy val withCachedData: LogicalPlan = sparkSession.withActive {assertAnalyzed()assertSupported()// 克隆计划,避免状态共享sparkSession.sharedState.cacheManager.useCachedData(normalized.clone())}// 确保命令已执行def assertCommandExecuted(): Unit = commandExecuted// 获得优化后的逻辑计划lazy val optimizedPlan: LogicalPlan = {assertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// 克隆计划,避免状态共享val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// 标记计划为已分析plan.setAnalyzed()plan}}// 确保计划已优化def assertOptimized(): Unit = optimizedPlan// 获得物理执行计划lazy val sparkPlan: SparkPlan = {assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// 创建 Spark 计划QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())}}// 确保物理计划已准备好执行def assertSparkPlanPrepared(): Unit = sparkPlan// 获得实际执行计划lazy val executedPlan: SparkPlan = {assertOptimized()val plan = executePhase(QueryPlanningTracker.PLANNING) {// 准备执行计划QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}// 标记查询已准备好执行tracker.setReadyForExecution()plan}// 确保实际执行计划已准备好执行def assertExecutedPlanPrepared(): Unit = executedPlan// 获取 RDD 内部版本lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)// 获得查询计划的指标def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)protected def preparations: Seq[Rule[SparkPlan]] = {QueryExecution.preparations(sparkSession,Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)}// 执行查询阶段protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {tracker.measurePhase(phase)(block)}}// 获取查询计划的简单字符串表示def simpleString: String = {val concat = new PlanStringConcat()simpleString(false, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查询计划的简单字符串表示private def simpleString(formatted: Boolean,maxFields: Int,append: String => Unit): Unit = {append("== Physical Plan ==\n")if (formatted) {try {ExplainUtils.processPlan(executedPlan, append)} catch {case e: AnalysisException => append(e.toString)case e: IllegalArgumentException => append(e.toString)}} else {QueryPlan.append(executedPlan,append, verbose = false, addSuffix = false, maxFields = maxFields)}append("\n")}// 获取查询计划的解释字符串def explainString(mode: ExplainMode): String = {val concat = new PlanStringConcat()explainString(mode, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 解释查询计划,生成字符串表示private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = {val queryExecution = if (logical.isStreaming) {new IncrementalExecution(sparkSession, logical, OutputMode.Append(), "<unknown>",UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0),WatermarkPropagator.noop())} else {this}mode match {case SimpleMode =>queryExecution.simpleString(false, maxFields, append)case ExtendedMode =>queryExecution.toString(maxFields, append)case CodegenMode =>try {org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)} catch {case e: AnalysisException => append(e.toString)}case CostMode =>queryExecution.stringWithStats(maxFields, append)case FormattedMode =>queryExecution.simpleString(formatted = true, maxFields = maxFields, append)}}// 写入各个阶段的计划private def writePlans(append: String => Unit, maxFields: Int): Unit = {val (verbose, addSuffix) = (true, false)append("== Parsed Logical Plan ==\n")QueryPlan.append(logical, append, verbose, addSuffix, maxFields)append("\n== Analyzed Logical Plan ==\n")try {if (analyzed.output.nonEmpty) {append(truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))append("\n")}QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)append("\n== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)} catch {case e: AnalysisException => append(e.toString)}}// 重写 toString 方法override def toString: String = withRedaction {val concat = new PlanStringConcat()toString(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查询计划的字符串表示private def toString(maxFields: Int, append: String => Unit): Unit = {writePlans(append, maxFields)}// 获取包含统计信息的计划字符串def stringWithStats: String = {val concat = new PlanStringConcat()stringWithStats(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成包含统计信息的计划字符串private def stringWithStats(maxFields: Int, append: String => Unit): Unit = {try {// 触发逻辑计划的统计计算optimizedPlan.collectWithSubqueries {case plan => plan.stats}} catch {case e: AnalysisException => append(e.toString + "\n")}// 只显示优化后的逻辑计划和物理计划append("== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields)append("\n")}// 内部敏感信息的脱敏处理private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}// 用于调试查询执行的特殊命名空间object debug {// 打印生成的代码def codegen(): Unit = {println(org.apache.spark.sql.execution.debug.codegenString(executedPlan))}// 获取生成的代码和统计信息def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}// 将调试信息写入文件def toFile(path: String,maxFields: Int = Int.MaxValue,explainMode: Option[String] = None): Unit = {val filePath = new Path(path)val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))try {val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode)explainString(mode, maxFields, writer.write)if (mode != CodegenMode) {writer.write("\n== Whole Stage Codegen ==\n")org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)}log.info(s"Debug information was written at: $filePath")} finally {writer.close()}}}
}
/*** SPARK-35378: 命令应立即执行,以便像 `sql("INSERT ...")` 这样的查询可以立即触发表插入,无需调用 `.collect()`。* 为了避免无穷递归,我们应在递归执行命令时使用 `NON_ROOT`。此外,我们不能执行带有命令叶节点的查询计划,* 因为许多命令返回 `GenericInternalRow` 并不能直接放入查询计划中,否则查询引擎可能会尝试将 `GenericInternalRow`* 转换为 `UnsafeRow` 并失败。当运行 EXPLAIN 或命令包含在其他命令中时,应使用 `SKIP` 来避免立即触发命令执行。*/
object CommandExecutionMode extends Enumeration {val SKIP, NON_ROOT, ALL = Value // 定义不同模式的值
}
object QueryExecution {private val _nextExecutionId = new AtomicLong(0) // 原子操作,确保线程安全private def nextExecutionId: Long = _nextExecutionId.getAndIncrement // 获取下一个查询执行的 ID// 构建用于准备执行的规则序列private[execution] def preparations(sparkSession: SparkSession,adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,subquery: Boolean): Seq[Rule[SparkPlan]] = {adaptiveExecutionRule.toSeq ++Seq(CoalesceBucketsInJoin,PlanDynamicPruningFilters(sparkSession),PlanSubqueries(sparkSession),RemoveRedundantProjects,EnsureRequirements(),ReplaceHashWithSortAgg,RemoveRedundantSorts,RemoveRedundantWindowGroupLimits,DisableUnnecessaryBucketedScan,ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules, outputsColumnar = false),CollapseCodegenStages()) ++(if (subquery) Nil else Seq(ReuseExchangeAndSubquery))}// 准备 SparkPlan 以供执行private[execution] def prepareForExecution(preparations: Seq[Rule[SparkPlan]],plan: SparkPlan): SparkPlan = {val planChangeLogger = new PlanChangeLogger[SparkPlan]()val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>val result = rule.apply(sp)planChangeLogger.logRule(rule.ruleName, sp, result)result}planChangeLogger.logBatch("Preparations", plan, preparedPlan)preparedPlan}// 将逻辑计划转换为 Spark 计划def createSparkPlan(sparkSession: SparkSession,planner: SparkPlanner,plan: LogicalPlan): SparkPlan = {planner.plan(ReturnAnswer(plan)).next()}// 准备 SparkPlan 以供执行def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {prepareForExecution(preparations(spark, subquery = true), plan)}// 准备子查询的执行计划def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())prepareExecutedPlan(spark, sparkPlan)}// 使用自适应执行上下文准备执行计划def prepareExecutedPlan(session: SparkSession,plan: LogicalPlan,context: AdaptiveExecutionContext): SparkPlan = {val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone())val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)prepareForExecution(preparationRules, sparkPlan.clone())}// 将断言和空指针异常转换为内部错误private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) =>SparkException.internalError(msg + " You hit a bug in Spark or the Spark plugins you use. Please, report this bug " +"to the corresponding communities or vendors, and provide the full stack trace.",e)case e: Throwable =>e}// 捕获断言和空指针异常,并将其转换为内部错误private[sql] def withInternalError[T](msg: String)(block: => T): T = {try {block} catch {case e: Throwable => throw toInternalError(msg, e)}}
}
可以看到Spark 3.x实现的整个执行流程是和2.x区别不大的,主要是引入了两个新参数如下:
- tracker: 用于跟踪查询计划过程的工具
- mode: 命令执行模式
引入 tracker
和 mode
参数的主要目的是为了增强查询执行的灵活性和可追踪性。tracker
参数使得对查询计划的执行过程可以进行更细粒度的监控,而 mode
参数则提供了更灵活的查询执行模式,使得 QueryExecution
类能够根据不同的需求进行不同的操作。同时通过CommandExecutionMode 枚举定义了命令执行的不同模式,来够灵活地控制命令的执行方式。这些改进的目的都是为了提高 Spark SQL 查询的性能和可调试性。
总结
本文介绍了Spark,并通过源码梳理了Spark SQL的执行原理,其核心思路也是和我们在引入篇以及Hive执行原理,提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)是一致的。
Spark SQL的具体执行过程主要可以分为以下几个步骤:
- 输入SQL语句经过Antlr4解析,生成未解决的逻辑计划;
- 绑定分析器,例如函数适配、通过Catalog获取字段等,生成已解决的逻辑计划;
- 优化器对已解决的逻辑计划进行优化,基于CBO和RBO转换,生成优化后的逻辑计划;
- 将优化后的逻辑计划转换为多个可被识别或执行的物理计划;
- 基于CBO在多个物理计划中,选择执行开销最小的物理计划;
- 转为具体的RDDs执行。
感兴趣的小伙伴可以深入源码去探索一下具体的解析和优化实现。