【Spark系列1】DAG中Stage和Task的划分全流程

一、整体流程

每个Aciton操作会创建一个JOB,JOB会提交给DAGScheduler,DAGScheduler根据RDD依赖的关系划分为多个Stage,每个Stage又会创建多个TaskSet,每个TaskSet包含多个Task,这个Task就是每个分区的并行计算的任务。DAGScheduler将TaskSet按照顺序提交给TaskScheduler,TaskScheduler将每一个任务去找SchedulerBackend申请执行所需要的资源,获取到资源后,SchedulerBackend将这些Task提交给Executor,Executor负责将这些任务运行起来。

二、JOB提交

2.1、为什么需要action操作

在Spark中,分为transformation操作和action操作。执行用户程序时,transformation操作将一个RDD转换成了新的RDD,并在compute()函数中,记录了如何根据父RDD计算出当前RDD的数据、RDD如何分区等信息,并且能够得出最后一个RDD的数据。 但是RDD中的每个分区中依然是一条一条的分散的数据,那么要对最后一个RDD执行什么操作呢?这就是action操作的作用。

2.2、Job提交

每个action操作都会生成一个Job,这个Job包含了需要计算的RDD对象、需要计算的分区、需要执行什么样的计算。RDD和用户执行的计算都是可以序列化的,RDD序列化之后,在Executor中反序列化之后即可得到该RDD对象,再根据对象compute()函数就可以计算出某个分区的数据。JOB中包含的数据如下所示

2.3、分布式执行

当提交Job以后,就可以将Job划分为多个并行的任务,每个任务计算指定分区的一个分区即可。通过RDD的计算函数即可计算出该分区的数据,今儿计算出分区的结果。

三、Stage划分

3.1、宽依赖和窄依赖

如果一个RDD的每个分区最多只能被一个Child RDD的一个分区所使用, 则称之为窄依赖(Narrow dependency), 如果被多个Child RDD分区依赖, 则称之为宽依赖(wide dependency)

3.2、Stage划分

在用户编写的一系列转换中,多个RDD可能既形成了多次窄依赖,也形成了多次宽依赖,连续的窄依赖可以通过一个任务进行流水线处理,但是如果遇到了宽依赖,就必须先将父RDD的所有数据都进行计算并保存起来,再进行RDD的运算。在一个Job中,action操作知识定义了在最后的RDD中执行何种操作,而最后的RDD会依赖上个RDD,上个RDD又会有其他依赖,这样就形成了一系列的依赖关系。如果为宽依赖的话,就在依赖的地方进行切分,先将宽依赖的父RDD进行计算出来,再计算后续的RDD,按照快依赖被划分的过程,即为Stage划分的过程。

如上图所示,rdd1->rdd2,rdd3->rdd4是窄依赖,rdd2->rdd3,rdd4->rdd5是宽依赖。在发生shuffle的位置,Spark将计算分为两个阶段分别执行,每发生一次shuffle,Spark就将计算划分为先后的两个阶段,如下图

在划分阶段的过程中,对于某个阶段而言其并行的计算任务都完全相同,因此在Job执行的过程中,并行计算就是指每个阶段中任务并行的计算。如在Stage1中,每个分区的数据可以使用一个任务进行计算。10000个分区即可在集群中并行运行10000个任务进行计算。如果集群资源不够,可以将10000个任务依次在集群中运行,直到运行完毕,再进行Stage2的计算。Stage2也会根据分区数启动多个任务并行的加载Stage1生成的数据,完成Stage2的计算。

在一个Job的运行过程中,所有的Stage其实都是为最后一个Stage做准备,因为action操作只需要最后一个RDD的数据。因此最后一个Stage称为ResultStage,之前所有的Stage都是由Shuffle引起的中间计算过程,被称为ShuffleMapStage。其过程如下图

3.3、Spark实现

再Spark实现中,SparkContext将Job提交至DAGScheduler,DAGScheduler获取Job中执行action操作的RDD,将最后执行action操作的RDD划分到最后的ResultStage中,然后遍历该RDD的依赖和所有的父依赖,每遇到宽依赖就将两个RDD划分到两个不同的Stage中,遇到窄依赖就将窄依赖的多个RDD划分到一个Stage中,经过这次操作,一个RDD就划分为有多个依赖关系的Stage。再每个Stage中,所有的RDD之间都是窄依赖的关系,Stage之间的RDD都是宽依赖的关系。DAGScheduler将最初被依赖的Stage提交,计算该Stage中的数据,计算完成后,再将后续的Stage提交,知道最后运行的ResultStage,则整个计算Job完成。ResultStage和ShuffleMapStage结构如下图

在生成ShuffleapStage时,ShuffleDependency起到了承上启下的作用,如果两个RDD之间为宽依赖,子RDD的依赖为ShuffleDependency;在划分Stage的时候,父Stage会保存该ShuffleDependency,以便在执行父Stage的时候,根据ShuffleDependency获取Shuffle的写入器,在子Stage执行的时候,会根据RDD的依赖关系使用相同的ShuffleDependency获取Shuffle的读取器。

在计算过程中,ShuffleMapStage会生成该Stage的结果,为下一个Stage提供数据,计算下一个Stage的RDD的时候,会拉取上一个Stage的计算结果。上一个Stage的计算保存在哪呢?答案是Spark的组件MapOutputTracker。MapOutputTracker也是主从结构,Executor端是MapOutputTrackerWroker,当ShuffleMapStage的任务运行完成后,会通过Executor上的MapOutputTrackerWroker将数据保存的位置发送到Driver上的MapOutputTrackerMaster中。在后续Stage需要上一个Stage的计算结果的时候,就通过MapOutputTrackerMaster询问计算结果的保存位置,进而加载相应的数据。

四、Task划分

DAGScheduler将Job划分为多个Stage之后,下一步就是将Stage划分为多个可以在集群中并行执行的任务,只有将任务并行执行,Stage才能更快的完成。

4.1、任务的个数

由于Stage中都是对RDD的计算,RDD又是分区的,所以在对任务进行划分的时候,每个分区可以启动一个任务进行计算。无论是ResultStage还是ShuffleMapStage,每个阶段能够并行执行的任务数量都取决于该阶段中最后一个Rdd的分区数量

上面已经介绍,在一个Stage中,RDD的依赖关系是窄依赖,所以最后一个RDD的分区数量取决于其依赖的RDD的分区数量,一直依赖到该阶段的开始的RDD的分区。对于第一阶段开始的RDD分为两种情况:

  1. 第一种为初始的RDD,即从数据源加载数据形成的初始RDD,这种情况的分区数量取决于初始RDD的形成分区方式。
  2. 第二种为该阶段的初始RDD为Shuffle阶段的Reduce任务,这种情况下,该RDD的分区数量取决于在Shuffle的Map阶段最后一个RDD的分区器设置的分区数量。

4.2、Task的生成

当确定了每个Stage的分区数量之后,就需要为每个分区生成相应的计算任务,该计算任务就是需要对该阶段的最后一个RDD执行什么操作

在ResultStage中,需要对最后一个RDD的每个分区分别执行用户自定义的action操作,所以在ResultStage中生成的每个Task都包含以下三个部分

  1. 需要对哪个RDD进行操作
  2. 需要对RDD哪个分区进行操作
  3. 需要对分区的内容执行什么样的操作

在ResultStage中划分的Task称为ResultTask,ResultTask中包含了ResultStage中最后一个RDD,即执行action操作的的RDD,需要计算的RDD分区的id和执行action操作的函数。

在ShuffleMapStage中,最终需要完成Shuffle过程中的Map阶段的操作,每个分区按照Shuffle中的Map端定义的过程执行数据的分组操作,将分组结果进行保存,并将保存结果位置通知Driver端的MapOutputTrackerMaster,MapOutputTrackerMaster保存着每一个Shuffle中Map输出的位置。在ShuffleMapStage中划分的Task称为ShuffleMapTask。ShuffleMapTask同样由三个重要的部分组成:Stage中最后的RDD、需要计算的分区的id、划分Stage的ShuffleDependency

4.3、Task的最佳运行位置

生成Task时,还会计算Task的最佳运行位置。虽然RDD包含计算RDD的所有信息,可以在任何节点上运行,但是如果通过为Task计算分配最佳的运行位置,可以将Task调度到含有该Task需要的数据的节点,从而实现移动计算而不是移动数据的目的。Spark会根据RDD可能分布的的情况,将Task的运行位置主要分为Host级别和Executor级别当一个RDD被某个Executor缓存,则对该RDD计算时,优先会把计算的Task调度到该Executor中执行。当一个RDD需要的数据存在某个host中时,则会把该Task调度到这个节点的Executor中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/249499.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习鸿蒙基础(2)

arkts是声名式UI DevEcoStudio的右侧预览器可以预览。有个TT的图标可以看布局的大小。和html的布局浏览很像。 上图布局对应的代码: Entry //入口 Component struct Index {State message: string Hello Harmonyos //State 数据改变了也刷新的标签build() {Row()…

PyFlink使用教程,Flink,Python,Java

环境准备 环境要求 Java 11 Python 3.7, 3.8, 3.9 or 3.10文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/ 打开 Anaconda3 Prompt > java -version java version "11.0.22" 2024-01-16 LTS J…

自动驾驶:Apollo如何塑造人类的未来出行

前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家:https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言1. 什么是自定义指令?2. Apollo中的自定义指令2.1 查询中的自定…

7款免费的Midjourney平替平台

AI艺术生成器正在改变设计和内容的制作方式。像Midjourney这样的工具已经将困难的想法转化为令人惊叹的视觉效果,改变了创造力的运作方式。但是,AI艺术涵盖了许多风格和需求。这就是Midjourney替代方案变得重要的原因(特别是免费的替代方案&a…

2024年,AI 掀起数据与分析市场的新风暴

2024 年伊始,Kyligence 联合创始人兼 CEO 韩卿在其公司内部的飞书订阅号发表了多篇 Rethink Data & Analytics 的内部信,分享了对数据与分析行业的一些战略思考,尤其是 AI 带来的各种变化和革命,是如何深刻地影响这个行业乃至…

【极数系列】Flink环境搭建Linux版本 (03)

文章目录 引言01 Linux部署JDK11版本1.下载Linux版本的JDK112.创建目录3.上传并解压4.配置环境变量5.刷新环境变量6.检查jdk安装是否成功 02 Linux部署Flink1.18.0版本1.下载Flink1.18.0版本包2.上传压缩包到服务器3.修改flink-config.yaml配置4.启动服务5.浏览器访问6.停止服务…

python爬虫-多线程-数据库——WB用户

数据库database的包: Python操作Mysql数据库-CSDN博客 效果: 控制台输出: 数据库记录: 全部代码: import json import os import threading import tracebackimport requests import urllib.request from utils im…

【数据结构 08】红黑树

一、概述 红黑树,是一种二叉搜索树,每一个节点上有一个存储位表示节点的颜色,可以是Red或Black。 通过对任何一条从根到叶子的路径上各个节点着色方式的限制,红黑树确保没有一条路径会比其他路径长上两倍,因而是接进…

-1- Python环境安装

1、Python安装 1.1、Windows安装Python 进入python官网:Welcome to Python.org点击 download——>all releases;建议选择3.7.2版本(网页链接:Python Release Python 3.7.2 | Python.org);下拉&#xf…

Unity 自动轮播、滑动轮播

如图所示,可设置轮播间隔,可左右滑动进行轮播 1.在UGUI创建个Image,添加自动水平组件 2.添加并配置脚本 3.代码如下,都有注释 using UnityEngine; using UnityEngine.UI;public class IndicatorManager : MonoBehaviour {public …

【RTP】webrtc 学习2: webrtc对h264的rtp打包

切片只是拷贝帧的split的各个部分到新的rtp 包的封装中。并没有在rtp包本身标记是否为关键帧FU-A 切片 输入的H.264 数据进行split :SplitNalu SplitNalu : 按照最大1200字节进行切分 切分后会返回一个数组 对于FU-A :split的数据总大小是 去掉一个字节的nalu header size …

C语言指针学习(1)

前言 指针是C语言中一个重要概念,也是C语言的一个重要特色,正确而灵活地运用指针可以使程序简洁、紧凑、高效。每一个学习和使用C语言的人都应当深入的学习和掌握指针,也可以说不掌握指针就没有掌握C语言的精华。 一、什么是指针 想弄清楚什…

Apache Commons Collection3.2.1反序列化分析(CC1)

Commons Collections简介 Commons Collections是Apache软件基金会的一个开源项目,它提供了一组可复用的数据结构和算法的实现,旨在扩展和增强Java集合框架,以便更好地满足不同类型应用的需求。该项目包含了多种不同类型的集合类、迭代器、队…

Linux逻辑卷(LV)扩容

Linux逻辑卷(LV)扩容 1、准备物理磁盘(分区和不分区都行),可以使用lsblk命令查看新增的磁盘,如下图sde就是我们新增磁盘,容量为600G。 2、将新磁盘变成物理卷(PV) pvcr…

解释性人工智能(XAI)

引言 解释性人工智能(XAI)是指一类旨在使人能够理解和解释机器学习模型的方法和技术。XAI的目标是提高AI系统的透明度和可理解性,让人们能够理解机器学习模型的决策过程、推理方式和结果。这对于社会应用和用户信任非常重要,因为A…

【八大排序】直接插入排序 | 希尔排序 + 图文详解!!

📷 江池俊: 个人主页 🔥个人专栏: ✅数据结构冒险记 ✅C语言进阶之路 🌅 有航道的人,再渺小也不会迷途。 文章目录 一、排序的概念二、直接插入排序2.1 基本思想2.2 适用说明2.3 过程图示2.4 代码实现2.…

华为配置ARP安全综合功能实验

配置ARP安全综合功能示例 组网图形 图1 配置ARP安全功能组网图 ARP安全简介配置注意事项组网需求配置思路操作步骤配置文件 ARP安全简介 ARP(Address Resolution Protocol)安全是针对ARP攻击的一种安全特性,它通过一系列对ARP表项学习和A…

Docker 基础篇

目录 一、Docker 简介 1. Docker 2. Linux 容器 3. 传统虚拟机和容器的对比 4. Docker 的作用 5. Docker 的基本组成(Docker 三要素) 6. Docker 工作原理 7. Docker 架构 8. Docker 下载 二、Docker 安装 1. CentOS Docker 安装 2. CentOS8 …

beep蜂鸣器驱动实验-创建蜂鸣器的设备节点

一. 简介 前面我借助 pinctrl 和 gpio 子系统编写了 LED 灯驱动。 I.MX6U-ALPHA 开发板上还有一个蜂鸣器,从软件的角度考虑,蜂鸣器驱动和 LED 灯驱动其实是相同的,都是控制 IO 输出高低电平。接下来我们就来学习编写蜂鸣器的 Linux 驱动。…

5、应急响应-拒绝服务钓鱼识别DDOS压力测试邮件反制分析应用日志

目录 前言: 1、#内网应急-日志分析-爆破&横向&数据库 2、#红队APT-钓鱼邮件识别-内容&发信人&附件 3、#拒绝服务攻击-DDOS&CC-代理&防火墙防御 用途:个人学习笔记,欢迎指正! 前言: 了解和…