运行架构
Spark 框架的核心是一个计算引擎,整体来说,它采用了标准的 master-slave 结构。上图中的 Driver 表示 master ,负责管理整个集群中的作业任务调度;Executor 则是 slave,负责实际执行任务;
核心组件
Driver
Spark 驱动器节点,用于执行 Spark任务中的 main 方法,负责实际代码的执行工作;Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(job)
- 在 Executor 之间调度任务(task)
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
Work
集群从节点,负责启动 Executor 或 Driver ;
Executor
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行;
Executor 的两个核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给 Driver 进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算;
ClusterManager
standalone 模式种即为 Master 节点,控制整个集群;yarn 模式种为 ResourceManager;
核心概念
Executor 与 Core
Spark Executor 是集群中运行在 Worker 中的一个 JVM 进程,是整个集群中专门用于计算的节点。在提交应用时可以提供参数指定计算节点的个数,以及对应应的资源。这个资源一般是指 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量;
应用程序相关启动参数如下:
名称 | 说明 |
---|---|
- -num-executors | 配置 Executor 的数量 |
- -executor-memory | 配置 Executor 的内存大小 |
- -executor-cores | 配置 Executor 的虚拟 CPU Core 数量 |
并行度(Parallelism)
在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算过,所以能够真正地实现多任务并行执行,我们将整个集群并行执行任务的数量称之为并行度;
RDD
弹性分布式数据集,是 Spark 的核心结构,可以通过一系列算子进行操作;
特性:
- 弹性:
存储弹性:内存与磁盘自动切换;
容错弹性:数据丢失可以自动恢复;
计算弹性:计算出错有重试机制;
分片弹性:可以根据需要重新分片; - 分布式:数据存储在大数据集群不同节点上;
- 数据集:RDD 封装了计算逻辑,并不保存数据,所以对 RDD 的操作并不会改变数据本身,改变的只是 RDD 提供的数据副本;
- 数据抽象:RDD 是一个抽象类,需要子类具体实现;
- 不可变:RDD 可以类似看作 String ,是不可改变的,只能产生新的 RDD;
- 可分区、并行计算;
有向无环图(DAG)
有向无环图并不是真正意义的图形,而是由 Spark 程序直接映射成的数据流的高级抽象模型,描述了 RDD 的依赖关系;
当 RDD 遇到 Action 算子时,会将之前的所有算子形成一个 DAG ,也就是 RDD Graph,再在 Spark 中转化为 Job ,提交到集群执行。一个 APP 中可以包含多个 Job;
Job
一个 RDD Graph 触发的作业,往往由 Spark Action 算子触发,在 SparkContext 中通过 runJob() 方法向 Spark 提交 Job;
Stage
每个 Job 会根据 RDD 的宽依赖关系被切分成多个 Stage,每个 Stage 中包含一组相同的 Task,这一组 Task 也叫 TaskSet;
Task
一个分区对应一个 Task,Task 执行 RDD 中对应 Stage 中包含的算子。Task 被封装好后放入 Executor 的线程池中执行;
一个 Job 包含多个 Stage,一个 Stage 包含多个 Task;
程序提交运行流程
Yarn client 模式:
1、在 Yarn client 模式下,通过 spark-submit 提交程序后,会在 client 服务器运行 main() 函数,启动 Dirver 进程;
2、Driver 开始构建并初始化 SparkContext ;
3、SparkContext 向 ClusterManager(ResourceManager)注册,并申请运行 Executor 的资源(内核和内存);
4、ClusterManager 根据 SparkContext 提出的申请和 Worker(NodeManager) 的心跳报告,来决定在哪个 Worker 上启动 Executor;
5、Worker 节点收到请求后会启动 Executor;
6、Executor 向 SparkContext 注册,这样 Driver 就知道哪些 Executor 运行该应用;
7、SparkContext 构建 DAG 图,DAG Scheduler 将 DAG 图分解成多个 Stage ,并把每个 Stage 的 TaskSet 发送给 Task Scheduler ;
8、Executor 向 SparkContext 申请 Task ,Task Secheduler 将 Task发送给 Executor,同时 SparkContext 将程序代码发送给 Executor;
9、Task 在 Executor 上运行,把运行结果反馈给 Task Scheduler,然后再反馈给 DAG Scheduler,运行完毕后写入数据;
10、SparkContext 向 ClusterManager 注销并释放所有资源;
Yarn cluster 模式:
1、在 Yarn cluster 模式下,通过 spark-submit 提交任务后,会启动一个临时进程;
2、临时进程向 ClusterManager(ResourceManager) 通信申请启动 ApplicationMaster(Driver);
3、 ClusterManager分配 container,并通知 NodeManager 启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver;
4、NodeManager 启动 Driver;
5、Driver 启动后开始运行用户 main() 函数;
6、Driver开始构建 SparkContext;
7、SparkContext 向 ClusterManager注册 Application 并申请运行 Executor 的资源;
8、ClusterManager收到 Driver 的资源申请后会分配合适的 Worker 节点;
9、Worker 节点启动 Executor 进程;
10、Executor 进程启动后会向 SparkContext 反向注册;
11、SparkContext 构建 DAG 图,DAG Scheduler 将 DAG 图分解成多个 Stage ,并把每个 Stage 的 TaskSet 发送给 Task Scheduler ;
12、Executor 向 SparkContext 申请 Task ,Task Secheduler 将 Task发送给 Executor,同时 SparkContext 将程序代码发送给 Executor;
13、Task 在 Executor 上运行,把运行结果反馈给 Task Scheduler,然后再反馈给 DAG Scheduler,运行完毕后写入数据;
114、SparkContext 向 ClusterManager 注销并释放所有资源;