Spark AQE 导致的 Driver OOM问题

背景

最近在做Spark 3.1 升级 Spark 3.5的过程中,遇到了一批SQL在运行的过程中 Driver OOM的情况,排查到是AQE开启导致的问题,再次分析记录一下,顺便了解一下Spark中指标的事件处理情况

结论

SQLAppStatusListener 类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致Driver OOM
解决方法:

  1. 关闭AQE spark.sql.adaptive.enabled false
  2. 合并对应的PR-SPARK-45439

分析

背景知识:对于一个完整链接的sql语句来说(比如说从 读取数据源,到 数据处理操作,再到插入hive表),这可以称其为一个最小的SQL执行单元,这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId来进行跟踪的。
对于一个sql,举例来说 :

insert into  TableA select * from TableB;

在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法,该方法会触发eagerlyExecuteCommands调用,最终会到SQLExecution.withNewExecutionId方法:

  def assertOptimized(): Unit = optimizedPlan...lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}...lazy val optimizedPlan: LogicalPlan = {// We need to materialize the commandExecuted here because optimizedPlan is also tracked under// the optimizing phaseassertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// clone the plan to avoid sharing the plan instance between different stages like analyzing,// optimizing and planning.val plan =sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// We do not want optimized plans to be re-analyzed as literals that have been constant// folded and such can cause issues during analysis. While `clone` should maintain the// `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of// paranoia.plan.setAnalyzed()plan}def assertCommandExecuted(): Unit = commandExecuted...private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// Since Command execution will eagerly take place here,// and in most cases be the bulk of time and effort,// with the rest of processing of the root plan being just outputting command results,// for eagerly executed commands we mark this place as beginning of execution.tracker.setReadyForExecution()val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)val name = commandExecutionName(c)val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {SQLExecution.withNewExecutionId(qe, Some(name)) {qe.executedPlan.executeCollect()}}  

SQLExecution.withNewExecutionId主要的作用是设置当前计划的所属的executionId:

    val executionId = SQLExecution.nextExecutionIdsc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

EXECUTION_ID_KEY的值会在JobStart的时候传递给Event,以便记录跟踪整个执行过程中的指标信息。
同时我们在方法中eagerlyExecuteCommands看到qe.executedPlan.executeCollect()这是具体的执行方法,针对于insert into 操作来说,物理计划就是
InsertIntoHadoopFsRelationCommand,这里的run方法最终会流转到DAGScheduler.submitJob方法:

    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleJobSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理,这里有会涉及到指标信息的存储,这里我们截图出dump的内存占用情况:
在这里插入图片描述

可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大,也就是 accumIdsToMetricType占用很大

那在AQE中是怎么回事呢?
我们知道再AQE中,任务会从source节点按照shuffle进行分割,从而形成单独的job,从而生成对应的shuffle指标,具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan中,如下:

      var result = createQueryStages(currentPhysicalPlan)val events = new LinkedBlockingQueue[StageMaterializationEvent]()val errors = new mutable.ArrayBuffer[Throwable]()var stagesToReplace = Seq.empty[QueryStageExec]while (!result.allChildStagesMaterialized) {currentPhysicalPlan = result.newPlanif (result.newStages.nonEmpty) {stagesToReplace = result.newStages ++ stagesToReplaceexecutionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))// SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting// for tasks to be scheduled and leading to broadcast timeout.// This partial fix only guarantees the start of materialization for BroadcastQueryStage// is prior to others, but because the submission of collect job for broadcasting is// running in another thread, the issue is not completely resolved.val reorderedNewStages = result.newStages.sortWith {case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => falsecase (_: BroadcastQueryStageExec, _) => truecase _ => false}// Start materialization of all new stages and fail fast if any stages failed eagerlyreorderedNewStages.foreach { stage =>try {stage.materialize().onComplete { res =>if (res.isSuccess) {events.offer(StageSuccess(stage, res.get))} else {events.offer(StageFailure(stage, res.failed.get))}// explicitly clean up the resources in this stagestage.cleanupResources()}(AdaptiveSparkPlanExec.executionContext)

这里就是得看stage.materialize()这个方法,这两个stage只有两类:BroadcastQueryStageExec 和 ShuffleQueryStageExec
这两个物理计划稍微分析一下如下:

  • BroadcastQueryStageExec
    数据流如下:
    broadcast.submitBroadcastJob||\/
    promise.future||\/
    relationFuture||\/
    child.executeCollectIterator()
    其中 promise的设置在relationFuture方法中,而relationFuture 会被doPrepare调用,而submitBroadcastJob会调用executeQuery,从而调用doPrepare,executeCollectIterator()最终也会发送JobSubmitted事件,分析和上面的一样
  • ShuffleQueryStageExec
     shuffle.submitShuffleJob||\/sparkContext.submitMapStage(shuffleDependency)||\/dagScheduler.submitMapStage

submitMapStage会发送MapStageSubmitted事件:

    eventProcessLoop.post(MapStageSubmitted(jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleMapStageSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理:

  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()...override def onJobStart(event: SparkListenerJobStart): Unit = {val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)if (executionIdString == null) {// This is not a job created by SQLreturn}val executionId = executionIdString.toLongval jobId = event.jobIdval exec = Option(liveExecutions.get(executionId))

该方法会获取事件中的executionId,在AQE中,同一个执行单元的executionId是一样的,所以stageMetrics内存占用会越来越大。
而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan等方法中。

这样整个事件的数据流以及问题的产生原因就应该很清楚了。

其他

为啥AQE以后多个Job还是共享一个executionId呢?因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/314255.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

精酿啤酒:酿造工艺的自动化与智能化发展

随着科技的不断进步,自动化与智能化已成为啤酒酿造工艺的重要发展方向。Fendi Club啤酒紧跟时代潮流,积极推动酿造工艺的自动化与智能化发展,旨在提高生产效率、确保产品品质和满足市场需求。 Fendi Club啤酒引入自动化生产设备。他们采用自动…

[最新]CentOS7设置开机自启动Hadoop集群

安装好Hadoop后我们可以使用开机自启动的方式,节约敲命令的时间。注意是centOS7版本!!!和centOS6版本区别非常大!!! 1、切换到系统目录 [rootmaster ~]# cd /etc/systemd [rootmaster systemd]# ll total 32 -rw-r--r-- 1 root root 720 Jun 30 23:11 bootcha…

线性代数基础2矩阵

矩阵是什么 矩阵就是二维数组,下面是一个 m 乘 n 的矩阵,它有 m 行,n 列,每行每列上面都有元素,每个元素都有行标i 和列标 j, a ij 。简称m n矩阵,记作: 注意a11的索引是 A[0,0]。…

【OceanBase诊断调优】——hpet(高精度时钟源)引起的CPU高问题排查

最近总结一些诊断OCeanBase的一些经验,出一个【OceanBase诊断调优】专题出来,也欢迎大家贡献自己的诊断OceanBase的方法。 1. 前言 昨天在问答区帮忙排查一个用户CPU高的问题,帖子链接:《刚刚新安装的OceanBase集群,…

数据结构与算法解题-20240426

这里写目录标题 面试题 08.04. 幂集367. 有效的完全平方数192. 统计词频747. 至少是其他数字两倍的最大数718. 最长重复子数组 面试题 08.04. 幂集 中等 幂集。编写一种方法,返回某集合的所有子集。集合中不包含重复的元素。 说明:解集不能包含重复的子…

【数据结构】合并两个有序链表

将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 Definition for singly-linked list.struct ListNode {int val;struct ListNode *next;};typedef struct ListNode ListNode; struct ListNode* mergeTwoLists(struct Lis…

信息系统项目管理师0072:集成基础(5信息系统工程—5.3系统集成—5.3.1集成基础)

点击查看专栏目录 文章目录 5.3系统集成5.3.1集成基础5.3系统集成 随着信息技术的发展,系统集成逐步成为信息系统实施中一项重要的工作。此处的系统集成概念专指计算机系统的集成,包括计算机硬件平台、网络系统、系统软件、工具软件、应用软件的集成,围绕这些系统的相应咨询…

稳态视觉诱发电位 (SSVEP) 分类学习系列 (4) :Temporal-Spatial Transformer

稳态视觉诱发电位分类学习系列:Temporal-Spatial Transformer 0. 引言1. 主要贡献2. 提出的方法2.1 解码的主要步骤2.2 网络的主要结构 3. 结果和讨论3.1 在两个数据集下的分类效果3.2 与基线模型的比较3.3 消融实验3.4 t-SNE 可视化 4. 总结欢迎来稿 论文地址:http…

第十五届蓝桥杯省赛第二场C/C++B组E题【遗迹】题解

解题思路 错解 贪心:每次都移动至当前最近的对应方块上。 反例: s s s abxac t t t abac 贪心结果(下标) 0 → 1 → 0 → 4 0 \rightarrow 1 \rightarrow 0 \rightarrow 4 0→1→0→4,答案为 5 5 5。 正确结…

Android Studio的button点击事件

xml添加onClick调用方法 public class MainActivity extends AppCompatActivity {// 创建系统时间的文本控件TextView systemTimeTextView;Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activit…

店匠科技技术产品闪耀,引领新质生产力发展

在科技飞速发展的今天,新质生产力正成为推动社会进步和经济高质量发展的核心力量。店匠科技,作为一家致力于为全球B2C电商提供产品和技术解决方案的领先企业,其技术产品不仅体现了新质生产力的创新特质,更在推动电商行业转型升级中发挥了重要作用。 新质生产力,以创新为主导,摆…

使用代理绕过网站的反爬机制

最近在尝试收集一些网络指标的数据, 所以, 我又开始做爬虫了。 :) 我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么的美好,然而一杯茶的功夫可能就会出现错误…

ElasticSearch 安装(docker)

下载安装包 阿里云链接: elasticSearch.exe https://www.alipan.com/s/3A356NnmWaJ 提取码: 93da 点击链接保存,或者复制本段内容,打开「阿里云盘」APP ,无需下载极速在线查看,视频原画倍速播放。 安装步骤 1、首先…

C++之STL-list+模拟实现

目录 一、list的介绍和基本使用的方法 1.1 list的介绍 1.2 list的基本使用方法 1.2.1 构造方法 1.2.2 迭代器 1.2.3 容量相关的接口 1.2.4 增删查改的相关接口 1.3 关于list迭代器失效的问题 二、模拟实现list 2.1 节点类 2.2 迭代器类 2.3 主类list类 2.3.1 成员变…

Java-字符集和字符编码-roadmap

1 需求 2 接口 3 示例 4 参考资料 「烫烫屯屯锟斤拷」揭秘ASCII、GBK、UTF-8,B站独家,一听就懂_哔哩哔哩_bilibili 非常详细的字符编码讲解,ASCII、GB2312、GBK、Unicode、UTF-8等知识点都有_哔哩哔哩_bilibili 你懂乱码吗?锟斤…

【智能算法】囊状虫群算法(TSA)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2020年,S Kaur等人受到囊状虫群自然行为启发,提出了囊状虫群算法(Tunicate Swarm Algorithm, TSA)。 2.算法原理 2.1算法思想 TSA模拟了囊状虫群在导…

【Linux】:文件查看 stat、cat、more、less、head、tail、uniq、wc

🎥 屿小夏 : 个人主页 🔥个人专栏 : Linux深造日志 🌄 莫道桑榆晚,为霞尚满天! 文章目录 📑前言一、stat(查看文件详细属性信息)1.1 内容解析:1.2…

JavaSE字节缓冲流

欢迎来到 请回答1024 的博客 🍓🍓🍓欢迎来到 请回答1024的博客 关于博主: 我是 请回答1024,一个追求数学与计算的边界、时间与空间的平衡,0与1的延伸的后端开发者。 博客特色: 在我的博客中&a…

HTML中的文档声明

前言 什么是<!DOCTYPE>&#xff1f;是否需要在 HTML5 中使用&#xff1f;什么是严格模式与混杂模式&#xff1f; 文档声明概念 HTML 文档通常以文档声明开始&#xff0c;该声明的作用是帮助浏览器确定其尝试解析和显示的 HTML 文档类型。 <!DOCTYPE html>文档声…

不同交叉工具链编译程序引发的问题及解决思路

目录 一、问题描述二、应用程序使用buildroot的工具链三、lrzsz 移植使用原交叉工具链四、总结 一、问题描述 buildroot 未使用外部交叉编译工具&#xff0c;生成的文件系统运行原先的程序不能启动。解决办法&#xff1a; ①使用 buildroot的工具链 重新编译程序&#xff1b; …