背景
本文基于 Spark 3.5
关于ShuffleLocalRead
的作用简单的来说,就是会按照一定的规则,从一个 map Task
中连续读取多个 reduce数据
的任务,(正常的情况下是读取所有map Task
中特定的一个reduce数据
任务),具体可以参考Spark AQE中的CoalesceShufflePartitions和OptimizeLocalShuffleReader
分析
直接上OptimizeShuffleWithLocalRead
代码:
override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.LOCAL_SHUFFLE_READER_ENABLED)) {return plan}plan match {case s: SparkPlan if canUseLocalShuffleRead(s) =>createLocalRead(s)case s: SparkPlan =>createProbeSideLocalRead(s)}}...def canUseLocalShuffleRead(plan: SparkPlan): Boolean = plan match {case s: ShuffleQueryStageExec =>s.mapStats.isDefined && isSupported(s.shuffle)case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>s.mapStats.isDefined && isSupported(s.shuffle) &&s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTScase _ => false}...private def createLocalRead(plan: SparkPlan): AQEShuffleReadExec = {plan match {case c @ AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>AQEShuffleReadExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length)))case s: ShuffleQueryStageExec =>AQEShuffleReadExec(s, getPartitionSpecs(s, None))}
}
这里有两种情况会引入LocalshuffleRead
:
第一种是引入了REBALANCE
hint的场景。这种情况下,在Spark的内部表示 ShuffleOrigin 为 REBALANCE_PARTITIONS_BY_NONE,这种情况下 是hint为REBALANCE
而不是REBALANCE(c)
或者REBALANCE(num)
的情况;
第二种是SMJ 转变为 BHJ的
场景。
至于为啥会存在AQEShuffleReadExec(s: ShuffleQueryStageExec, _)
这种情况是因为CoalesceShufflePartitions
这个规则会进行分区的合并等
所以在代码中会有两个case:
-
SparkPlan if canUseLocalShuffleRead(s)
如果满足是REBALANCE hint
的情况或者是Spark内部加的(为了满足Shuffle上下算子的数据分布要求)就强加上AQEShuffleReadExec
-
createProbeSideLocalRead
这里是进行SMJ 转 BHJ
的BuildBroadcast
的另一边进行ShuffleLocalRead
的情况,这种情况下,因为已经进行broadcast了,所以参与BuildBroadcast
的另一边也可以进行shufflelocalRead
的
针对于第一种情况 强制加上 AQEShuffleReadExec
, 这种情况下在ensureRequirements
规则下,有可能会增加额外的Shuffle操作,这种情况就是负优化了,所以在进行了reOptimize
操作后,会进行一个判断是否有增益:
val afterReOptimize = reOptimize(logicalPlan)if (afterReOptimize.isDefined) {val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.getval origCost = costEvaluator.evaluateCost(currentPhysicalPlan)val newCost = costEvaluator.evaluateCost(newPhysicalPlan)if (newCost < origCost ||(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {logOnLevel("Plan changed:\n" +sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n"))cleanUpTempTags(newPhysicalPlan)currentPhysicalPlan = newPhysicalPlancurrentLogicalPlan = newLogicalPlanstagesToReplace = Seq.empty[QueryStageExec]}
这里的条件默认是根据shuffle的个数来计算的,如果优化后的shuffle数有增加,则会回退到之前的物理计划中去,当然用户也可以配置spark.sql.adaptive.customCostEvaluatorClass
来实现自己的是否有增益的逻辑。
针对第二种情况,这种情况一般来说都是有正向的提升效果的,但是也会经过第一种情况的逻辑判断。
额外话题
我们这边只说到了对于Shuffle
或者Broadcast中Build另一侧的处理
,那对于Broadcast中Build一侧的
的处理是在哪里呢?
我们知道 Spark中有对SMJ 转 BHJ
有两个地方,一个是正常的流程下经过物理规则的转换(JoinSelection),另一个是在AQE期间,根据指标再次进行转换,
具体的可以参考:Spark在生产中是否要禁止掉BHJ(BroadcastHashJoin)
,
-
对于第一种情况来说,经过
EnsureRequirements
规则的时候,是不会在Broadcast
子节点中增加Shuffle操作的,所以这里就增加不了AQEShuffleReadExec
-
对于第二种情况来说, 因为在正常流程下,还是
SMJ
操作,所以会在SMJ字节点中有Shuffle
,操作,所以在AQE阶段,可以适用OptimizeShuffleWithLocalRead
规则,所以可以看到在这种情况下,Broadcast
会有AQEShuffleReadExec
子节点。