Spark 之 Aggregate

Aggregate

参考链接:

  • https://github.com/PZXWHU/SparkSQL-Kernel-Profiling

完整的聚合查询的关键字包括 group by、 cube、 grouping sets 和 rollup 4 种 。 分组语句 group by 后面可以是一个或多个分组表达式( groupingExpressions )。

聚合查询还支持 OLAP 场景下的多维分析,包括 rollup、 cube 和 grouping sets 3 种操作 。

逻辑节点 Aggregate

在这里插入图片描述

逻辑算子树节点通过分组表达式列表( groupingExpressions )、聚合表达式列表( aggregateExpressions )和子节点( child )构造而成,
其中分组表达式类型都是 Expression ,而聚合表达式类型都是 NamedExpression ,意味着聚合表达式一般都需要设置名字。
aggregateExpressions 对应聚合函数,而 resultExpressions 则包含了 Select 语句中选择的所有列信息 。

示例之 partial Aggregate 对应 logical plan

在这里插入图片描述
里面的mode 直接也是 Complete

示例之 final Aggregate 对应 logical plan

在这里插入图片描述

NamedExpression (这里对应的是Alias) 里 的child 是 AggregateFunction,里面的mode 直接就是 Complete

case class Alias(child: Expression, name: String)

case class Alias(child: Expression, name: String)(val exprId: ExprId = NamedExpression.newExprId,val qualifier: Seq[String] = Seq.empty,val explicitMetadata: Option[Metadata] = None,val nonInheritableMetadataKeys: Seq[String] = Seq.empty)extends UnaryExpression with NamedExpression {
物理 Aggregate

对于聚合查询,逻辑算子树转换为物理算子树,必不可少的是 Aggregation 转换策略 。 实际上, Aggregation 策略是基于 PhysicalAggregation 的 。 与 PhysicalOperation 类似,PhysicalAggregation 也是一种逻辑算子树的模式,用来匹配逻辑算子树中的 Aggregate 节点并提取该节点中的相关信息 。 PhysicalAggregation 在提取信息时会进行以下转换 。

在这里插入图片描述

select id, count(name) from student group by id

在这里插入图片描述

聚合模式

在 SparkSQL 中,聚合过程有 4 种模式,分别是 Partial 模式、 ParitialMerge 模式、 Final 模式 和 Complete 模式 。

在这里插入图片描述

上述聚合过程
中在 map 阶段的 sum 函数处于 Partial 模式,在 reduce 阶段的 sum 函数处于 Final 模式。

在这里插入图片描述

Complete 模式和Partial/Final 组合方式不一样,不进行局部聚合计算 。

在这里插入图片描述

ParitialMerge 主要应用在 distinct 语句中,如图 、所示 。聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了 4 步聚合操作 。

  • 第 1 步按照( A,C)分组,对 sum 函数进行 Partial 模式聚合计算;
  • 第 2 步是 PartialMerge 模式,对上一步计算之后的聚合缓冲区进行合井,但此时仍然不是最终的结果;
  • 第 3 步分组的列发生变化,再一次进行 Partial 模式的 count 计算;
  • 第 4 步完成 Final 模式的最终计算 。
HashAggregate

常见的聚合查询语句通常采用 HashAggregate 方式,当存在以下几种情况时,会用 SortAggregate 方式来执行 。

  • 查询中存在不支持 Partial 方式的聚合函数:此时会调用 AggUtils 中的 planAggregateWithoutPartial 方法,直接生成 SortAggregateExec 聚合算子节点 。
  • 聚合函数结果不支持 Buffer 方式:如果结果类型不属于(NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DateType, TimestampType,DecimalType]集合中的任意一种,则需要执行 SortAggregateExec 方式,例如 collect_set和 collect_list 函数。
  • 内存不足:如果在 HashAggregate 执行过程中,内存空间己捕,那么聚合执行会切换到 SortAggregateExec 方式。

注意:
spark 2.2 之后去掉了planAggregateWithoutPartial
参见:
https://issues.apache.org/jira/browse/SPARK-19060
https://github.com/apache/spark/pull/16461

Expand

逻辑计划阶段:
GroupingSets 节点转换为 Aggregate+Expand+Pr付出t3 个节点的组合 。 顾名思义, Expand 表示“扩展”,多维分析在本质上相当于执行多种组合的 group by 操作,因此 Expand 所起的作用就是将一条数据扩展为特定形式的多条数据。

在这里插入图片描述

需要注意的是, Expand 方式执行多维分析虽然能够达到只读一次数据表的效果,但是在某些场景下容易造成中间数据的膨胀。 例如,数据的维度太高, Expand 会产生指数级别的数据量 。 针对这种情况,可以进行相应的优化。

AggregateMode

org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode

sealed trait AggregateMode/*** An [[AggregateFunction]] with [[Partial]] mode is used for partial aggregation.* This function updates the given aggregation buffer with the original input of this* function. When it has processed all input rows, the aggregation buffer is returned.*/
case object Partial extends AggregateMode/*** An [[AggregateFunction]] with [[PartialMerge]] mode is used to merge aggregation buffers* containing intermediate results for this function.* This function updates the given aggregation buffer by merging multiple aggregation buffers.* When it has processed all input rows, the aggregation buffer is returned.*/
case object PartialMerge extends AggregateMode/*** An [[AggregateFunction]] with [[Final]] mode is used to merge aggregation buffers* containing intermediate results for this function and then generate final result.* This function updates the given aggregation buffer by merging multiple aggregation buffers.* When it has processed all input rows, the final result of this function is returned.*/
case object Final extends AggregateMode/*** An [[AggregateFunction]] with [[Complete]] mode is used to evaluate this function directly* from original input rows without any partial aggregation.* This function updates the given aggregation buffer with the original input of this* function. When it has processed all input rows, the final result of this function is returned.*/
case object Complete extends AggregateMode
Aggregate 之 inputBufferOffset

org.apache.spark.sql.execution.aggregate.HashAggregateExec

case class HashAggregateExec(requiredChildDistributionExpressions: Option[Seq[Expression]],isStreaming: Boolean,numShufflePartitions: Option[Int],groupingExpressions: Seq[NamedExpression],aggregateExpressions: Seq[AggregateExpression],aggregateAttributes: Seq[Attribute],initialInputBufferOffset: Int,resultExpressions: Seq[NamedExpression],child: SparkPlan)extends AggregateCodegenSupport {
        val aggregationIterator =new TungstenAggregationIterator(partIndex,groupingExpressions,aggregateExpressions,aggregateAttributes,initialInputBufferOffset,resultExpressions,(expressions, inputSchema) =>MutableProjection.create(expressions, inputSchema),inputAttributes,iter,testFallbackStartsAt,numOutputRows,peakMemory,spillSize,avgHashProbe,numTasksFallBacked)

org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator

  extends AggregationIterator(partIndex,groupingExpressions,originalInputAttributes,aggregateExpressions,aggregateAttributes,initialInputBufferOffset,resultExpressions,newMutableProjection) with Logging {

org.apache.spark.sql.execution.aggregate.AggregationIterator

  protected val aggregateFunctions: Array[AggregateFunction] =initializeAggregateFunctions(aggregateExpressions, initialInputBufferOffset)
    for (expression <- expressions) {val func = expression.aggregateFunctionval funcWithBoundReferences: AggregateFunction = expression.mode match {case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>// We need to create BoundReferences if the function is not an// expression-based aggregate function (it does not support code-gen) and the mode of// this function is Partial or Complete because we will call eval of this// function's children in the update method of this aggregate function.// Those eval calls require BoundReferences to work.BindReferences.bindReference(func, inputAttributeSeq)case _ =>// We only need to set inputBufferOffset for aggregate functions with mode// PartialMerge and Final.val updatedFunc = func match {case function: ImperativeAggregate =>function.withNewInputAggBufferOffset(inputBufferOffset)case function => function}inputBufferOffset += func.aggBufferSchema.lengthupdatedFunc}

可见 inputBufferOffset 对 Partial | Complete 无效

ObjectHashAggregateExec

参考链接:

  • https://dataninjago.com/2022/01/09/spark-sql-query-engine-deep-dive-10-hashaggregateexec-objecthashaggregateexec/
  • https://blog.csdn.net/monkeyboy_tech/article/details/123759074

While the HashAggregateExec, backed by the Tungsten execution engine(基于Tungsten执行引擎), performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/475968.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【IDEA】解决总是自动导入全部类(.*)问题

文章目录 问题描述解决方法 我是一名立志把细节说清楚的博主&#xff0c;欢迎【关注】&#x1f389; ~ 原创不易&#xff0c; 如果有帮助 &#xff0c;记得【点赞】【收藏】 哦~ ❥(^_-)~ 如有错误、疑惑&#xff0c;欢迎【评论】指正探讨&#xff0c;我会尽可能第一时间回复…

如何快速将Excel数据导入到SQL Server数据库

工作中&#xff0c;我们经常需要将Excel数据导入到数据库&#xff0c;但是对于数据库小白来说&#xff0c;这可能并非易事&#xff1b;对于数据库专家来说&#xff0c;这又可能非常繁琐。 这篇文章将介绍如何帮助您快速的将Excel数据导入到sql server数据库。 准备工作 这里&…

在centos7中安装SqlDeveloper的Oracle可视化工具

1.下载安装包 &#xff08;1&#xff09;在SqlDeveloper官网下载&#xff08;Oracle SQL Developer Release 19.2 - Get Started&#xff09;对应版本的安装包即可&#xff08;安装包和安装命令如下&#xff09;&#xff1a; &#xff08;2&#xff09;执行完上述命令后&#x…

【动手学深度学习Pytorch】4. 神经网络基础

模型构造 回顾一下感知机。 nn.Sequential()&#xff1a;定义了一种特殊的module。 torch.rand()&#xff1a;用于生成具有均匀分布的随机数&#xff0c;这些随机数的范围在[0, 1)之间。它接受一个形状参数&#xff08;shape&#xff09;&#xff0c;返回一个指定形状的张量&am…

Spring Boot + Vue 基于 RSA 的用户身份认证加密机制实现

Spring Boot Vue 基于 RSA 的用户身份认证加密机制实现 什么是RSA&#xff1f;安全需求介绍前后端交互流程前端使用 RSA 加密密码安装 jsencrypt库实现敏感信息加密 服务器端生成RSA的公私钥文件Windows环境 生成rsa的公私钥文件Linux环境 生成rsa的公私钥文件 后端代码实现返…

一键部署 200+ 开源软件的 Websoft9 面板,Github 2k+ 星星

Websoft9面板是一款基于Web的PaaS/Linux面板&#xff0c;可用于在自己的服务器上一键部署200多种热门开源应用&#xff0c;在Github上获得了2k星星。 特点与优势 丰富的开源软件集成&#xff1a;涵盖数据库、Web服务器、企业建站、电商系统、教育系统、中间件、大数据工具等多…

NLP论文速读(MPO)|通过混合偏好优化提高多模态大型语言模型的推理能力

论文速读|Dynamic Rewarding with Prompt Optimization Enables Tuning-free Self-Alignment of Language Models 论文信息&#xff1a; 简介&#xff1a; 本文探讨的背景是多模态大型语言模型&#xff08;MLLMs&#xff09;在多模态推理能力上的局限性&#xff0c;尤其是在链式…

动态规划子数组系列一>等差数列划分

题目&#xff1a; 解析&#xff1a; 代码&#xff1a; public int numberOfArithmeticSlices(int[] nums) {int n nums.length;int[] dp new int[n];int ret 0;for(int i 2; i < n; i){dp[i] nums[i] - nums[i-1] nums[i-1] - nums[i-2] ? dp[i-1]1 : 0;ret dp[i…

用 React18 构建Tic-Tac-Toe(井字棋)游戏

下面是一个完整的 Tic-Tac-Toe&#xff08;井字棋&#xff09;游戏的实现&#xff0c;用 React 构建。包括核心逻辑和组件分离&#xff0c;支持两人对战。 1. 初始化 React 项目&#xff1a; npx create-react-app tic-tac-toe cd tic-tac-toe2.文件结构 src/ ├── App.js…

前端—Cursor编辑器

在当今快速发展的软件开发领域&#xff0c;效率和质量是衡量一个工具是否优秀的两个关键指标。今天&#xff0c;我要向大家推荐一款革命性的代码编辑器——Cursor&#xff0c;它集成了强大的AI功能&#xff0c;旨在提高开发者的编程效率。以下是Cursor编辑器的详细介绍和推荐理…

uniapp页面样式和布局和nvue教程详解

uniapp页面样式和布局和nvue教程 尺寸单位 uni-app 支持的通用 css 单位包括 px、rpx px 即屏幕像素。rpx 即响应式px&#xff0c;一种根据屏幕宽度自适应的动态单位。以750宽的屏幕为基准&#xff0c;750rpx恰好为屏幕宽度。屏幕变宽&#xff0c;rpx 实际显示效果会等比放大…

Kubernetes 安装配置ingress controller

> 对于Kubernetes的Service&#xff0c;无论是Cluster-Ip和NodePort均是四层的负载&#xff0c;集群内的服务如何实现七层的负载均衡&#xff0c;这就需要借助于Ingress&#xff0c;Ingress控制器的实现方式有很多&#xff0c;比如nginx, Contour, Haproxy, trafik, Istio。…

js批量输入地址获取经纬度

使用js调用高德地图的接口批量输入地址获取经纬度。 以下的请求接口的key请换成你的key。 创建key&#xff1a;我的应用 | 高德控制台 &#xff0c;服务平台选择《Web服务》。 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-…

天润融通携手挚达科技:AI技术重塑客户服务体验

业务爆发式增长&#xff0c;但座席服务却跟不上&#xff0c;怎么办&#xff1f; 智能充电领导者的挚达科技就面临过 这样的问题&#xff0c;让我们来看看如何解决。 2010年以来&#xff0c;国内新能源汽车市场进入高速发展期&#xff0c;作为新能源汽车的重要配件&#xff0c…

51c自动驾驶~合集31

我自己的原文哦~ https://blog.51cto.com/whaosoft/12121357 #大语言模型会成为自动驾驶的灵丹妙药吗 人工智能&#xff08;AI&#xff09;在自动驾驶&#xff08;AD&#xff09;研究中起着至关重要的作用&#xff0c;推动其向智能化和高效化发展。目前AD技术的发展主要遵循…

【代码随想录】贪心

455. 分发饼干 题目 随想录 本质&#xff1a; 对于每个孩子&#xff0c;使用可以满足该孩子的最小的饼干。所以对孩子胃口和饼干进行sort排序&#xff0c;依次将大的饼干满足给孩子。 贪心策略&#xff1a; 想一下局部最优&#xff0c;想一下全局最优&#xff0c;如果局部最优…

QWen2.5学习

配置环境 pip install transformers 记得更新一下&#xff1a;typing_extensions pip install --upgrade typing_extensions 安装modelscope modelscope/modelscope: ModelScope: bring the notion of Model-as-a-Service to life. 下载这个仓库的代码上传到服务器解压 推…

存算分离的过去、现在和未来

存算分离架构&#xff0c;作为数据处理领域的一个重要概念&#xff0c;从其最初的雏形到如今广泛应用&#xff0c;经历了多次迭代和变革。雁飞老师在分享中从过去的存算架构&#xff0c;逐步讲述存算分离的演进&#xff0c;现今的存算分离架构的优势及其在 Databend 中的体现&a…

web——upload-labs——第九关——特殊字符::$DATA绕过

特殊字符::$DATA绕过 典型绕过场景 在一些系统中&#xff0c;::$DATA 被用于绕过文件路径的限制。比如&#xff1a; 路径过滤绕过&#xff1a;如果系统有某种机制来检查和限制文件路径&#xff08;例如&#xff0c;禁止访问某些系统目录或敏感文件&#xff09;&#xff0c;通…

图的存储、遍历以及Dijkstra/Floyd/Kruskal/Prim/拓扑排序/关键路径(实验8--作业)

图–数据结构操作与算法全解析 一、引言 图作为一种重要的数据结构&#xff0c;在计算机科学与众多领域中都有着广泛的应用。它能够有效地描述和解决各种复杂的关系问题&#xff0c;如网络拓扑、路径规划、资源分配等。本文将详细介绍图的相关操作和知识点&#xff0c;包括图…