【大数据面试知识点】Spark的DAGScheduler

Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:

  • Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
  • Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
  • Tasks are individual units of work, each sent to one machine.
  • Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
  • Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
  • Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:

  • All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
  • When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md").flatMap(x=>x.split("\\W+")).filter(_.matches(".*\\d.*")).map(x=>(x,1)).reduceByKey(_+_)// 调用一个action函数,用来触发任务的提交和执行scala> counts.collect()​// 打印RDD的依赖关系(Lineage)scala> counts.toDebugStringres7: String =(2) ShuffledRDD[17] at reduceByKey at <console>:24 []+-(2) MapPartitionsRDD[16] at map at <console>:24 []|  MapPartitionsRDD[15] at filter at <console>:24 []|  MapPartitionsRDD[14] at flatMap at <console>:24 []|  README.md MapPartitionsRDD[13] at textFile at <console>:24 []|  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, they save map output files that can later be fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, `func`, which will be applied to each partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(id: Int,rdd: RDD[_],val func: (TaskContext, Iterator[_]) => _,val partitions: Array[Int],parents: List[Stage],   //依赖的父StagefirstJobId: Int,callSite: CallSite)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

举个例子:

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []|  MapPartitionsRDD[12] at join at <console>:1 []|  MapPartitionsRDD[11] at join at <console>:1 []|  CoGroupedRDD[10] at join at <console>:1 []|  ShuffledRDD[4] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[3] at map at <console>:1 []|  MapPartitionsRDD[2] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []|  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []|  ShuffledRDD[9] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[8] at map at <console>:1 []|  MapPartitionsRDD[7] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []|  /root/t...

参考:DAGScheduler-Stage的划分与提交 - 知乎Spark SQL 源码分析之Physical Plan 到 RDD的具体实现_physicalplan到rdd的具体实现-CSDN博客

一文搞定Spark的DAG调度器(DAGScheduler)_spark dagscheduler-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/228954.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习核心技术与实践之自然语言处理篇

非书中全部内容&#xff0c;只是写了些自认为有收获的部分。 自然语言处理简介 NLP的难点 &#xff08;1&#xff09;语言有很多复杂的情况&#xff0c;比如歧义、省略、指代、重复、更正、倒序、反语等 &#xff08;2&#xff09;歧义至少有如下几种&#xff1a; …

策略模式示例,Lambda表达式

这样会有很多代码冗余 以上代码使用策略模式优化 优化方案1 那么现在可以这样 优化方案二 原先定义了接口,还需要一个个写实现类,其实完全没必要,用匿名内部类方式就可以 优化方案三:用lambda方式简写 优化方案四:不需要定义接口 使用Stream API

从0到1浅析Redis服务器反弹Shell那些事

文章目录 前言Redis服务1.1 特点与应用1.2 安装与使用1.3 语法和配置1.4 未授权访问 反弹Shell2.1 Web服务写入Webshell2.2 Linux定时任务反弹shell2.3 /etc/profile.d->反弹shell2.4 写入ssh公钥登录服务器2.5 利用Redis主从复制RCE2.6 SSRF漏洞组合拳->RCE 总结 前言 …

SDG大数据平台简介

联合国可持续发展目标&#xff08;Sustainable Development Goals&#xff09;缩写SDGs&#xff0c;是联合国制定的17个全球发展目标&#xff0c;在2000-2015年千年发展目标&#xff08;MDGs&#xff09;到期之后继续指导2015-2030年的全球发展工作。&#xff08;摘自百度&…

Windows电脑引导损坏?按照这个教程能修复

前言 Windows系统的引导一般情况下是不会坏的&#xff0c;小伙伴们可以不用担心。发布这个帖子是因为要给接下来的文章做点铺垫。 关注小白很久的小伙伴应该都知道&#xff0c;小白的文章都讲得比较细。而且文章与文章之间的关联度其实还是蛮高的。在文章中&#xff0c;你会遇…

AI-ChatGPTCopilot

ChatGPT chatGPT免费网站列表&#xff1a;GitHub - LiLittleCat/awesome-free-chatgpt: &#x1f193;免费的 ChatGPT 镜像网站列表&#xff0c;持续更新。List of free ChatGPT mirror sites, continuously updated. Copilot 智能生成代码工具 安装步骤 - 登录 github&am…

微信小程序自定义步骤条效果

微信小程序自定义一个步骤条组件&#xff0c;自定义文字在下面&#xff0c;已完成和未完成和当前进度都不一样的样式&#xff0c;可点击上一步和下一步切换流程状态&#xff0c;效果如下。 这是视频效果&#xff1a; 前端实现步骤条效果 下面我们一步步实现编码&#xff0c;自定…

Spring Cloud + Vue前后端分离-第10章 基于阿里云OSS的文件上传

源代码在GitHub - 629y/course: Spring Cloud Vue前后端分离-在线课程 Spring Cloud Vue前后端分离-第10章 基于阿里云OSS的文件上传 前面介绍的文件上传是基于本地文件服务器的文件上传&#xff0c;但是自己搭文件服务器会有很多运维的问题&#xff0c;比如磁盘满了要扩容…

无监督关键词提取算法:TF-IDF、TextRank、RAKE、YAKE、 keyBERT

TF-IDF TF-IDF是一种经典的基于统计的方法&#xff0c;TF(Term frequency)是指一个单词在一个文档中出现的次数&#xff0c;通常一个单词在一个文档中出现的次数越多说明该词越重要。IDF(Inverse document frequency)是所有文档数比上出现某单词的个数&#xff0c;通常一个单词…

【C++杂货铺】C++11新特性——可变参数模板

文章目录 一、可变模板参数相关概念的引入二、获取参数包中参数的个数三、递归函数方式展开参数包四、逗号表达式展开参数包五、可变模板参数的实际应用——emplace相关接口5.1 回顾一下 push_back 的三种用法5.2 emplace_back 使用方法介绍5.3 听说 emplace_back 可以提高效率…

0101包冲突导致安装docker失败-docker-云原生

文章目录 1 前言2 报错3 解决结语 1 前言 最近在学习k8s&#xff0c;前置条件就是要安装指定版本的docker&#xff0c;命令如下 yum install -y docker-ce-20.10.7 docker-ce-cli-20.10.7 containerd.io-1.4.62 报错 file /usr/libexec/docker/cli-plugins/docker-buildx fr…

云原生|对象存储|minio分布式集群的搭建和初步使用(可用于生产)

前言&#xff1a; minio作为轻量级的对象存储服务安装还是比较简单的&#xff0c;但分布式集群可以大大提高存储的安全性&#xff0c;可靠性。分布式集群是在单实例的基础上扩展而来的 minio的分布式集群有如下要求&#xff1a; 所有运行分布式 MinIO 的节点需要具有相同的访…

【小沐学NLP】Python实现K-Means聚类算法(nltk、sklearn)

文章目录 1、简介1.1 机器学习1.2 K 均值聚类1.2.1 聚类定义1.2.2 K-Means定义1.2.3 K-Means优缺点1.2.4 K-Means算法步骤 2、测试2.1 K-Means&#xff08;Python&#xff09;2.2 K-Means&#xff08;Sklearn&#xff09;2.2.1 例子1&#xff1a;数组分类2.2.2 例子2&#xff1…

简写英语单词

题目&#xff1a; 思路&#xff1a; 这段代码的主要思路是读取一个字符串&#xff0c;然后将其中每个单词的首字母大写输出。具体来说&#xff0c;程序首先使用 fgets 函数读取一个字符串&#xff0c;然后遍历该字符串中的每个字符。当程序遇到一个字母时&#xff0c;如果此时…

职场小白培养项目管理能力的6个小技巧

有很多职场新人会碰到这样一个场景&#xff1a;入职一段时间&#xff0c;领导突然将一个重要项目的其中一个模块分配给你负责&#xff0c;但你之前并没有接触过任何项目。 这时你可能会焦躁无措&#xff0c;不知如何往下规划和开展工作&#xff0c;在推进一段时间后领导开始时…

消息中间件常见知识点

一&#xff1a;消息队列的主要作用是什么&#xff1f; 1.消息队列的特性&#xff1a; 业务无关&#xff0c;一个具有普适性质的消息队列组件不需要考虑上层的业务模型&#xff0c;只做好消息的分发就可以了&#xff0c;上层业务的不同模块反而需要依赖消息队列所定义的规范进行…

浅谈Acrel-2000MG微电网能量管理系统在新能源储能行业中的设计与应用-安科瑞 蒋静

概述:在新型电力系统中新能源装机容量逐年提高&#xff0c;但是新能源比如光伏发电、风力发电是不稳定的能源&#xff0c;所以要维持电网稳定&#xff0c;促进新能源发电的消纳&#xff0c;储能将成为至关重要的一环&#xff0c;是分布式光伏、风电等新能源消纳以及电网安全的必…

亚信安慧AntDB数据并行加载工具的实现(一)

1.概述 数据加载速度是评判数据库性能的重要指标&#xff0c;能否提高数据加载速度&#xff0c;对文件数据进行并行解析&#xff0c;直接影响数据库运维管理效率。基于此&#xff0c;AntDB分布式数据库提供了两种数据加载方式&#xff1a; 一是类似于PostgreSQL的Copy命令&am…

计算机网络物理层 习题答案及解析

2-1 下列选项中&#xff0c;不属于物理层接口规范定义范畴的是&#xff08; D &#xff09;。 A. 引脚功能 B. 接口形状 C. 信号电平 D. 传输媒体 【答案】D 【解析】 2-2 某网络在物理层规定&#xff0c;信号的电平范围为- 15V~15V &#xff0c; 电线长…

2024年AI领域的突破性进展预测

&#x1f989; AI新闻 &#x1f680; 2024年AI领域的突破性进展预测 摘要&#xff1a;23年被誉为生成式AI之年&#xff0c;24年AI有哪些新突破&#xff1f;GPT-5发布后&#xff0c;LLM在本质上仍然有限&#xff0c;基本的AGI也不足以实现。然而&#xff0c;英伟达高级科学家和…