文章目录
- 什么是Runtime Filter
- JOIN示例
- Runtime Filter的作用
- Runtime Filter的分类
- 分区字段的Filter表达式
- DynamicPruningSubquery
- 非分区字段的Filter表达式
- BloomFilterSubquery
- InSubquery
- 在非分区字段上的Runtime Filter的生成过程
- 插入Runtime Filter
- 优化开始
- 自底向上更新Plan,尝试插入Runtime Filter
- 检查应用Runtime Filter侧的计划满足条件
- 从Creation Side Plan抽取Filter + Scan信息
- 构建带有BloomFilter/InSubquery的逻辑计划树
- 构建新的、带有BloomFilter的逻辑计划树
- 构建新的、带有IN表达式的逻辑计划树
- 优化ScalarSubquery/InSubquery
- 拓展知识
- Subquery的执行
什么是Runtime Filter
我们常见的Partition Filter
、Row Filter
,是在由SQL优化器生成的静态的表达式,例如a = 1
,它们的左值
和右值
都是确定的,因此可以很容易地被下推到数据源的读取算子上,对分区信息/数据进行预处理或是实时过滤。
但在有关联子查询或是JOIN的场景下,关联表达式的右值
则通常是不确定的,需要在运行时才能确定其值,例如a IN (SELECT aa FROM b)
中的子查询或a.id
= b.id
中的b.id
。由于数据集无法在planning阶段确定,因此就不能像普通的条件表达式
那样进行下推。
但如果能够提前计算这些不确定的表达式,使得它们变成常量,那不就可以下推了吗?
答案是可行的,Spark中的Subquery概念和实现机制
就可以支持这种想法,因此通过将不确定的右值
视情况不同转成不同的Subquery,从而在触发整个表达式的计算时,右值
已经变成了常量,也就成了所谓的runtime filter
。
JOIN示例
例如有初始SQL,其中a.id、b.id均是32位整型类型:
SELECT a.*,b.* FROM a JOIN b ON a.id = b.id
转换成带有Runtime Filter的SQL(其中表a
上的读取逻辑增加了带有子查询的IN过滤表达式
,而子查询SELECT id FROM b
则对应了runtime filter构建时新生成的子查询):
SELECT a.*, b.* FROM (SELECT * FROM a WHERE a.id IN (SELECT id FROM b)
JOIN b ON a.id = b.id
Runtime Filter的作用
Runtime Filter的作用下如其名,是在runtime阶段才能对数据过滤的一类过滤条件,是对planning阶段就能被应用的过滤条件场景外的补充,期望在某一侧的数据被读取后能够被再次过滤,减少下游算子的输入数据量。
Runtime Filter的分类
分区字段的Filter表达式
主要由PartitionPruning
优化规则覆盖,具体的处理逻辑在之前的文章有介绍。
DynamicPruningSubquery
过滤表达式的右值在运行时才能确定,例如WHERE a IN (SELECT a FROM b WHERE b.a > 0)
,会将SELECT a FROM b WHERE b.a > 0
转换为,DynamicPruningSubquery()
被下推到Scan算子,
非分区字段的Filter表达式
由InjectRuntimeFilter
优化规则覆盖,是本文介绍的主要内容,该规则主要是覆盖JOIN的场景。
BloomFilterSubquery
优化后的表达式,使用的是Spark自己实现的Bloom Filter,对应内置类org.apache.spark.util.sketch.BloomFilter
。
这里简单归纳,例如有a LEFT JOIN b WHERE a.id = b.id AND a.name = b.name
,对根据a.id = b.id
条件为表b的id字段
构建BloomFilter数据结构,最终得到类似这样的新表达式bloom_filter_might_contain(a.id)
;根据a.name = b.name
条件为b表的name字段
构建BloomFilter数据结构,最终得到类似这样的新表达式bloom_filter_might_contain(a.name)
。
InSubquery
如果不能使用BloomFilter时,则可以尝试基于JOIN的某一侧生成JOIN KEY上的InSubquery
,如此便可以在后续的优化过程将InSubquery表达式转换为SEMI JOIN
(类似Dynamic Pruning Partitions的逻辑,参考之前处理逻辑文章有介绍。
在非分区字段上的Runtime Filter的生成过程
插入Runtime Filter
由InjectRuntimeFilter
优化规则覆盖,仅在限定条件下才尝试插入,具体的实现逻辑见后节分析。
优化开始
object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {override def apply(plan: LogicalPlan): LogicalPlan = plan match {// 如果输入的plan是一个Subquery,且是关联子查询(可能还没有被解耦),就不处理,直接返回原plancase s: Subquery if s.correlated => plancase _ if !conf.runtimeFilterSemiJoinReductionEnabled &&!conf.runtimeFilterBloomFilterEnabled => plancase _ =>// 只有开启了Semi Join或bloom filter优化功能时,才尝试插入runtime filterval newPlan = tryInjectRuntimeFilter(plan)if (conf.runtimeFilterSemiJoinReductionEnabled && !plan.fastEquals(newPlan)) {RewritePredicateSubquery(newPlan)} else {newPlan}}
}
自底向上更新Plan,尝试插入Runtime Filter
private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = {var filterCounter = 0val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)plan transformUp {// 只有输入的plan存在JOIN子结构时,才会尝试进行优化case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>var newLeft = leftvar newRight = right(leftKeys, rightKeys).zipped.foreach((l, r) => {// Check if:// 1. There is already a DPP filter on the key// 2. There is already a runtime filter (Bloom filter or IN subquery) on the key// 3. The keys are simple cheap expressionsif (filterCounter < numFilterThreshold && // 生成的runtime filter数量不能超过阈值!hasDynamicPruningSubquery(left, right, l, r) &&!hasRuntimeFilter(newLeft, newRight, l, r) &&isSimpleExpression(l) && isSimpleExpression(r)) {// 过滤条件的数量小于可配置的阈值numFilterThreashold,而且左、右子树中不存在当前的// JOIN KEY所对应的DPP过滤条件而且也没有runtime filter,而且JOIN KEYs都是简单的表达式val oldLeft = newLeftval oldRight = newRightif (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, hint)) {// JOIN类型是left join/semi left join/inner join时,说明可以对左侧的计划树进行优化。// 同时还需要有,JOIN KEY中的`l` 能够被下推到left plan的叶子结点,right plan存在过滤算子且能够过滤数据,才能向左侧的计划树中插入runtime filter。newLeft = injectFilter(l, newLeft, r, right)}// Did we actually inject on the left? If not, try on the rightif (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&filteringHasBenefit(right, left, r, hint)) {// 同理左侧计划树的rewrite逻辑,这里 尝试在右边的计划树插入runtime filternewRight = injectFilter(r, newRight, l, left)}if (!newLeft.fastEquals(oldLeft) || !newRight.fastEquals(oldRight)) {// 左、右计算树中只要有一侧更新了,就累加生成的runtime filters的数量filterCounter = filterCounter + 1}}})// 返回一个新的JOIN计划树join.withNewChildren(Seq(newLeft, newRight))}}
检查应用Runtime Filter侧的计划满足条件
Application Side和Filter Side必须满足如下的条件中,才会从Filter Side抽取必要的Filter + Scan信息:
- filterApplicationSideExp引用的attribute确实来自于filterApplicationSide的叶子结点;
- 当前的JOIN计划必须是Shuffle Join,或者为Broadcast Join时,application side的子计划中存在shuffle算子,目的是保证filter side被广播到application side后,filter有足够大的效果(filter可以借后续的Filter下推优化规则,下推到靠近Scan算子的位置);
- application side的预估读取数据量大于可配置的阈值,目的是application side的数据集足够大时才有必要生成runtime filter。
object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {/*** Extracts the beneficial filter creation plan with check show below:* - The filterApplicationSideJoinExp can be pushed down through joins, aggregates and windows* (ie the expression references originate from a single leaf node)* - The filter creation side has a selective predicate* - The current join is a shuffle join or a broadcast join that has a shuffle below it* - The max filterApplicationSide scan size is greater than a configurable threshold*/private def extractBeneficialFilterCreatePlan(filterApplicationSide: LogicalPlan,filterCreationSide: LogicalPlan,filterApplicationSideExp: Expression,filterCreationSideExp: Expression,hint: JoinHint): Option[LogicalPlan] = {if (findExpressionAndTrackLineageDown(filterApplicationSideExp, filterApplicationSide).isDefined &&(isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint) ||probablyHasShuffle(filterApplicationSide)) &&satisfyByteSizeRequirement(filterApplicationSide)) {// findExpressionAndTrackLineageDown方法验证filterApplicationSideExp引用的attribute确实来自于filterApplicationSide的叶子结点,否则不生成filter scan。//// (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint)// probablyHasShuffle(filterApplicationSide))则验证当前层级的JOIN需要是Shuffle Join,// 或者为Broadcast Join时application side的子计划中存在shuffle算子,目的是保证filter side被广播到application side后,// filter有足够大的效果(filter可以借后续的Filter下推优化规则,下推到靠近Scan算子的位置)。//// satisfyByteSizeRequirement(filterApplicationSide)则确保application side的预估读取数据量足够大,才有优化的必要性。extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp)} else {None}}/*** 对exp表达式进行解引用,并验证解引用后的attribute是否来自于输入plan的叶子结点的输出集。* 如果是,则返回输入的表达式和叶子结点;否则返回None。*/def findExpressionAndTrackLineageDown(exp: Expression,plan: LogicalPlan): Option[(Expression, LogicalPlan)] = {// 常量了呗,不管了if (exp.references.isEmpty) return Noneplan match {case p: Project =>// 从当前结点的表达式中,抽取所有的别名val aliases = getAliasMap(p)// 将exp中的同名别名,替换成被引用的Attribute;然后继续下推到底层的结点进行处理。findExpressionAndTrackLineageDown(replaceAlias(exp, aliases), p.child)// we can unwrap only if there are row projections, and no aggregation operationcase a: Aggregate =>val aliasMap = getAliasMap(a)findExpressionAndTrackLineageDown(replaceAlias(exp, aliasMap), a.child)case l: LeafNode if exp.references.subsetOf(l.outputSet) =>Some((exp, l))case u: Union =>// 对于Union结点,由于exp只可能有一个attribute,因此找到它在Union的输出列表的位置即可。val index = u.output.indexWhere(_.semanticEquals(exp))if (index > -1) {u.children.flatMap(child => findExpressionAndTrackLineageDown(child.output(index), child)).headOption} else {None}case other =>// 对于其它任意结点,将exp交给其孩子结点继续处理other.children.flatMap {child => if (exp.references.subsetOf(child.outputSet)) {findExpressionAndTrackLineageDown(exp, child)} else {None}}.headOption}}
}
从Creation Side Plan抽取Filter + Scan信息
如果不存在可以过滤数据的、简单过滤表达式时,说明无法找到一个有效的Spark Plan,其结果集大小可估算的,用来构建Runtime Filter。
object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {/*** 从filterCreationSideExp抽取必要的Filter和Scan信息,以帮助构建完成的Runtime Filter。*/private def extractSelectiveFilterOverScan(plan: LogicalPlan,filterCreationSideExp: Expression): Option[LogicalPlan] = {@tailrecdef extract(p: LogicalPlan,predicateReference: AttributeSet, // 记录了过滤条件(谓词)引用的AttributehasHitFilter: Boolean, // 标记是否在之前的步骤中遇到过Filter结点hasHitSelectiveFilter: Boolean, // 标记上一次遇到的Filter结点的过滤表达式能够用于过滤数据currentPlan: LogicalPlan): Option[LogicalPlan] = p match {case Project(projectList, child) if hasHitFilter =>// We need to make sure all expressions referenced by filter predicates are simple// expressions.// 只有当p是类似Filter...Project..的结构时,才会执行这里的逻辑,由于Filter引用的Attribute来自Project的定义,// 因此需要使用projectList中引用了相同的Attribute的表达式,例如Alias(a, c + d),继续下探。val referencedExprs = projectList.filter(predicateReference.contains)if (referencedExprs.forall(isSimpleExpression)) {extract(child,referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),hasHitFilter,hasHitSelectiveFilter,currentPlan)} else {None}case Project(_, child) =>// 遇到Project结点时,而且前面的过程没有遇到Filter时,则必须满足如下的条件assert(predicateReference.isEmpty && !hasHitSelectiveFilter)// 继续遍历孩子结点extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)case Filter(condition, child) if isSimpleExpression(condition) =>// 如果遇到了Filter结点,而且过滤条件是简单表达式时,才会执行这里的逻辑。// 思考:对于a AND b这样的条件,应该也可以被拆成a、b分别处理吧?extract(child,predicateReference ++ condition.references,hasHitFilter = true, // 遇到了Filter,预期在后续遇到Project结点时,能够反射得到底层结点的输出Attribute的表达式hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition), // 过滤条件中是否存确实能够过滤的数据的表达式。currentPlan)case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>// Runtime filters use one side of the [[Join]] to build a set of join key values and prune// the other side of the [[Join]]. It's also OK to use a superset of the join key values// (ignore null values) to do the pruning.// 由于Runtime Filter是利用Join某一侧的plan的结果集去进行剪裁,因此如果遇到了子树中的Join结构时(它的输出结果集肯定是上层JOIN结点的超集),// 显然可以复用某一侧的plan,因此这里会判断filterCreationSideExp在哪个子plan中,从而开始新一轮的抽取。if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {extract(left, AttributeSet.empty,hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)} else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {extract(right, AttributeSet.empty,hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)} else {None}case _: LeafNode if hasHitSelectiveFilter =>// 遍历完所有的结点,同时在之前的过滤中确实找到了可以过滤数据的表达式时,才会返回正确的plan。Some(currentPlan)case _ => None}if (!plan.isStreaming) {extract(plan, AttributeSet.empty,hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = plan)} else {None}}
}
构建带有BloomFilter/InSubquery的逻辑计划树
Spark目前支持两种实现,一个是基于BloomFilter;一个是基于SEMI JOIN,类似dynamic pruning filter的实现,其中BloomFilter 通常是最高效的。
object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {private def injectFilter(filterApplicationSideExp: Expression,filterApplicationSidePlan: LogicalPlan,filterCreationSideExp: Expression,filterCreationSidePlan: LogicalPlan): LogicalPlan = {require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled)if (conf.runtimeFilterBloomFilterEnabled) {injectBloomFilter(filterApplicationSideExp,filterApplicationSidePlan,filterCreationSideExp,filterCreationSidePlan)} else {injectInSubqueryFilter(filterApplicationSideExp,filterApplicationSidePlan,filterCreationSideExp,filterCreationSidePlan)}}
}
构建新的、带有BloomFilter的逻辑计划树
新的计划树结构看起来的样子:
Filter(| might_contain(| ScalarSubquery(bloomFilterAgg(filterCreationSideExp) AS bloomFilter), xxhash64(filterApplicationSidePlan))|filterApplicationSidePlan
/*** 构建BloomFilter,并构建新的子计划。*/private def injectBloomFilter(filterApplicationSideExp: Expression,filterApplicationSidePlan: LogicalPlan,filterCreationSideExp: Expression,filterCreationSidePlan: LogicalPlan): LogicalPlan = {// Skip if the filter creation side is too bigif (!conf.runtimeFilterAllowBigFilterCreationSide) {// 虽然开启了bloom filter的优化功能,但同时不希望当filtering side的结果集太大时,// 生成bloom filter,则直接返回原计划。if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) {return filterApplicationSidePlan}}// 获取预估过滤计划树的结果集的行数val rowCount = filterCreationSidePlan.stats.rowCount// 创建BloomFilterAggregate实例,它是一个TypedImperativeAggregate类型的实现类,是一个Aggregation算子,// 可以根据输入的HASH值,生成BloomFilter实例。val bloomFilterAgg =if (rowCount.isDefined && rowCount.get.longValue > 0L) {// 如果行数可预估,且不为0时,则会创建BloomFilterAggregate实例,其中// new XxHash64(Seq(filterCreationSideExp)),用于计算JOIN表达式的hash值// Literal(rowCount.get.longValue),用于预估结果集中不同HASH值的个数// 综上即构建了一个基于filterCreationSideExp的hash为key的bloom filter,并且hash functions的数量基于行数确定。new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),Literal(rowCount.get.longValue))} else {// 如果无法预估行数,则基于默认的bloom filter相关的参数构建实例// spark.sql.optimizer.runtime.bloomFilter.expectedNumItems=400,0000// spark.sql.optimizer.runtime.bloomFilter.numBits=838,8608new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))}// 这里构建一个新的AggregateExpression表达式,它的类型为Complete,意味着聚合计算的结果将直接从input生成,不需要经过partial aggregation。val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None)val alias = Alias(aggExp, "bloomFilter")()// 这里生成的Aggregate实例,其入参中的Nil表示没有group by表达式,因此最终会生成一行;Seq(alias)指示要聚合计算的表达式;filterCreationSidePlan表示结果集。// ColumnPruning用于裁剪掉filterCreationSidePlan计划结中不需要输出的columns。// ConstantFolding,优化可以通过静态方法估算是常量的表达式,为啥要在这里单独应用这个表达式?// 因为由于boolm filter agg plan的结果集只会有一行,因此这里会将此plan封装成一个ScalarSubquery实例,//. 而在后续的优化迭代过程中,ConstantFolding规则是无法对ScalarSubquery的子树生效的,因此这里显示地应用此优化规则,以提前优化常量表达式。val aggregate =ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))// Boolm filter估算器实例上就是一个复杂的数据类型,只需要一个值就可以了,因此这里会构建ScalarSubquery实例,且其outer references为Nil,即不引用外部属性。val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)// 将ScalarSubquery封装到BloomFilterMightContain表达式中,BloomFilterMightContain的第一个参数是subquery实例,因此在执行当前的计划树时bloomFilterSubquery就已经确定了;第二参数是一个Expression,用于计算application side每一行的filterApplicationSideExp属性。// 构建一个Filter实例,至此就得到了一个可以基于Bloom Filter过滤数据的表达式,类似DynamicPruningExpression。val filter = BloomFilterMightContain(bloomFilterSubquery,new XxHash64(Seq(filterApplicationSideExp)))Filter(filter, filterApplicationSidePlan)}
构建新的、带有IN表达式的逻辑计划树
与Dynamic Pruning Partitions的优化逻辑不同的是,这里会为JOIN 表达式的左、右操作数,进行归一化处理,即计产生原数值或是hash值,尽量减少要广播的数据集大小。
这种构建的筛选条件方式不难想象,与Bloom Filter相似,存在假正例的情况,但效果是没有BloomFilter那样好罢了。
新的计划树看起来的样子:Filter( | mayWrapWithHash(filterApplicationSideExp) IN (mayWrapWithHash(filterCreationSideExp)) | filterApplicationSidePlan
private def injectInSubqueryFilter(filterApplicationSideExp: Expression,filterApplicationSidePlan: LogicalPlan,filterCreationSideExp: Expression,filterCreationSidePlan: LogicalPlan): LogicalPlan = {require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)// mayWrapWithHash方法会根据表达式的返回类型进行归一个,要么取原值,要么对原值进行hashval actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()// 构建一个Aggregation算子,计算creation side plan的结果集中的join字段进行聚合并对结果去重,// 后续就可以通过类SET数据结构快速过滤数据了val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)if (!canBroadcastBySize(aggregate, conf)) {// Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,// i.e., the semi-join will be a shuffled join, which is not worthwhile.// 如果新生成的聚合计算计划树的结果集估算出来有些大,不能够被广播,那么也就没必要去做一次额外的SHUFFLE JOIN了,代价有点大return filterApplicationSidePlan}// 此时Agg可以被广播,因此就可以像Dynamic Prunning Partition的优化逻辑那样,构建一个带有子查询的过滤条件,替换原来的applicationi side plan。val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),ListQuery(aggregate, childOutputs = aggregate.output))Filter(filter, filterApplicationSidePlan)}// Wraps `expr` with a hash function if its byte size is larger than an integer.private def mayWrapWithHash(expr: Expression): Expression = {if (expr.dataType.defaultSize > IntegerType.defaultSize) {// 数据类型的值的最大长度,超过32位数值时,就使用HASH值替代new Murmur3Hash(Seq(expr))} else {expr}}
优化ScalarSubquery/InSubquery
例如MergeScalarSubqueries
优化ScalarSubquery;RewritePredicateSubquery
重写InSubquery为SEMI/ANTI JOIN
(同DPP讲解中的流程)等。
拓展知识
Subquery的执行
由于Runtime Filter需要在估算表达式时,确保依赖的右值已经常量化,因此为了简化数据的依赖实现,Spark利用了内置的Subquery机制。
Subquery即子查询,是相对于Root Plan来说的,为root plan的执行做准备,因此Spark会在Driver侧触发当前的plan生成RDD时,以Blocking的方式完成子查询计算。
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {/*** Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after* preparations.** Concrete implementations of SparkPlan should override `doExecute`.*/// executeQuery是封装了执行当前plan的前置过程。final def execute(): RDD[InternalRow] = executeQuery {if (isCanonicalizedPlan) {throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")}doExecute()}/*** Executes a query after preparing the query and adding query plan information to created RDDs* for visualization.*/protected final def executeQuery[T](query: => T): T = {RDDOperationScope.withScope(sparkContext, nodeName, false, true) {// 做一些准备工作,主要是收集所有的可执行的子查询,如execution.InSubqueryExec/execution.ScalarSubquery,同时触发它们的执行。prepare()// 阻塞并等待所有依赖的子查询执行完成waitForSubqueries()// 实际上是调用this.doExecutor()query}}/*** Blocks the thread until all subqueries finish evaluation and update the results.*/protected def waitForSubqueries(): Unit = synchronized {// fill in the result of subqueriesrunningSubqueries.foreach { sub =>// 以blocking的方式collect当前子查询的结果,并更新状态sub.updateResult()}runningSubqueries.clear()}
}