文章目录
- 一、Spark内核原理
- 1、Spark 内核概述
- 1.1 简介
- 1.2 Spark 核心组件
- 1.3 Spark 通用运行流程概述
- 2、Spark 部署模式
- 2.1 YARN Cluster 模式(重点)
- 2.2 YARN Client 模式
- 2.3 Standalone Cluster 模式
- 2.4 Standalone Client 模式
- 3、Spark 通讯架构
- 3.1 Spark 通信架构概述
- 3.2 Spark 通讯架构解析
- 4、Spark 任务调度机制
- 4.1 简介
- 4.2 Spark 任务调度概述
- 4.3 Spark Stage 级调度
- 4.4 Spark Task 级调度
- 1 调度策略
- 2 本地化调度
- 3 失败重试与黑名单机制
- 5、Spark 内存管理
- 5.1 堆内和堆外内存规划
- 5.2 内存空间分配
- 5.3 存储内存管理
- 5.4 执行内存管理
- 二、Spark 的两种核心 Shuffle
- 1、概述
- 1.1 简介
- 1.2 Spark Shuffle
- 1.3 Shuffle 的核心要点
- 2、Hash Shuffle 解析
- 2.1 未优化解析
- 2.1 优化的 HashShuffleManager
- 2.2 基于 Hash 的 Shuffle 机制的优缺点
- 3、SortShuffle 解析
- 3.1 概述
- 3.2 普通运行机制
- 3.2 bypass 运行机制
- 3.3 Tungsten Sort Shuffle 运行机制
- 3.4 基于 Sort 的 Shuffle 机制的优缺点
- 三、Spark 性能调优
- 1、常规性能优化
- 1.1 最优资源配置
- 1.2 并行度调节
- 1.3 Kryo 序列化
- 1.4 RDD 优化
- 1.5 广播大变量
- 1.6 调节本地化等待时长
- 2、算子调优
- 2.1 mapPartitions
- 2.2 foreachPartition 优化数据库操作
- 2.3 filter 与 coalesce 的配合使用
- 2.4 repartition 解决 SparkSQL 低并行度问题
- 2.5 reduceByKey 预聚合
- 3、Shuffle 调优
- 3.1 调节 map 端缓冲区大小
- 3.2 调节 reduce 端拉取数据缓冲区大小
- 3.3 调节 reduce 端拉取数据重试次数
- 3.4 调节 reduce 端拉取数据等待间隔
- 3.5 调节 SortShuffle 排序操作阈值
- 4、JVM 调优
- 4.1 降低 cache 操作的内存占比
- 4.2 调节 Executor 堆外内存
- 4.3 调节连接等待时长
- 四、Spark 数据倾斜
- 1、概述
- 1.1 简介
- 1.2 数据倾斜的表现
- 1.3 定位数据倾斜问题
- 2、聚合原数据
- 2.1 避免 shuffle 过程
- 2.2 缩小 key 粒度
- 2.3 增大 key 粒度
- 3、过滤导致倾斜的 key
- 4、提高 shuffle 操作中的 reduce 并行度
- 4.1 **reduce** **端并行度的设置**
- 4.2 **reduce** **端并行度设置存在的缺陷**
- 5、使用随机 key 实现双重聚合
- 6、将 reduce join 转换为 map join
- 6.1 核心思路
- 6.2 不适用场景分析
- 7、sample 采样对倾斜 key 单独进行 join
- 7.1 适用场景分析
- 7.2 不适用场景分析
- 8、使用随机数扩容进行 join
- 8.1 核心思想
- 8.2 局限性与优化
- 五、Spark 故障排除
- 1、控制 reduce 端缓冲大小以避免 OOM
- 2、JVM GC 导致的shuffle 文件拉取失败
- 3、解决各种序列化导致的报错
- 4、解决算子函数返回 NULL 导致的问题
- 5、解决 YARN-CLIENT 模式导致的网卡流量激增问题
- 6、解决 YARN-CLUSTER 模式的 JVM 栈内存溢出无法执行问题
- 7、解决 SparkSQL 导致的 JVM 栈内存溢出
- 8、持久化与 checkpoint 的使用
一、Spark内核原理
1、Spark 内核概述
1.1 简介
Spark 内核泛指 Spark 的核心运行机制,包括 Spark 核心组件的运行机制、Spark 任务调度机制、Spark 内存管理机制、Spark 核心功能的运行原理等,熟练掌握 Spark 内核原理,能够帮助我们更好地完成 Spark 代码设计,并能够帮助我们准确锁定项目运行过程中出现的问题的症结所在
1.2 Spark 核心组件
Driver是Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(Job)
- 在 Executor 之间调度任务(Task)
- 跟踪Executor 的执行情况
- 通过UI 展示查询运行情况
Executor是负责在 Spark 作业中运行具体任务,任务彼此之间相互独立。Spark 应用启动时,ExecutorBackend 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 ExecutorBackend 节点发生了故障或崩溃,Spark 应用也可以继续执行, 会将出错节点上的任务调度到其他Executor 节点上继续运行,Executor 有两个核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器(Driver)
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存
1.3 Spark 通用运行流程概述
- 任务提交后,都会先启动 Driver 程序;
- 随后Driver 向集群管理器注册应用程序;
- 集群管理器根据此任务的配置文件分配Executor 并启动;
- Driver 开始执行 main 函数,Spark 查询为懒执行,当执行到 Action 算子时开始反向推算,根据宽依赖进行 Stage 的划分,随后每一个 Stage 对应一个Taskset,Taskset 中有多个 Task,查找可用资源Executor 进行调度;
- 根据本地化原则,Task 会被分发到指定的 Executor 去执行,在任务执行的过程中,Executor 也会不断与Driver 进行通信,报告任务运行情况
2、Spark 部署模式
2.1 YARN Cluster 模式(重点)
- 执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程
- SparkSubmit 类中的 main 方法反射调用 YarnClusterApplication 的 main 方法
- YarnClusterApplication 创建 Yarn 客户端,然后向 Yarn 服务器发送执行指令:bin/java ApplicationMaster
- Yarn 框架收到指令后会在指定的 NM 中启动ApplicationMaster
- ApplicationMaster 启动 Driver 线程,执行用户的作业
- AM 向 RM 注册,申请资源
- 获取资源后 AM 向NM 发送指令:bin/java YarnCoarseGrainedExecutorBackend
- CoarseGrainedExecutorBackend 进程会接收消息,跟 Driver 通信,注册已经启动的Executor;然后启动计算对象 Executor 等待接收任务
- Driver 线程继续执行完成作业的调度和任务的执行
- Driver 分配任务并监控任务的执行
注意:SparkSubmit、ApplicationMaster 和CoarseGrainedExecutorBackend 是独立的进程;Driver是独立的线程;Executor 和 YarnClusterApplication 是对象
2.2 YARN Client 模式
- 执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程
- SparkSubmit 类中的 main 方法反射调用用户代码的main 方法
- 启动Driver 线程,执行用户的作业,并创建 ScheduleBackend
- YarnClientSchedulerBackend 向RM 发送指令:bin/java ExecutorLauncher
- Yarn 框架收到指令后会在指定的 NM 中启动 ExecutorLauncher(实际上还是调用ApplicationMaster 的 main 方法)
- AM 向 RM 注册,申请资源
- 获取资源后 AM 向NM 发送指令:bin/java CoarseGrainedExecutorBackend
- CoarseGrainedExecutorBackend 进程会接收消息,跟 Driver 通信,注册已经启动的Executor;然后启动计算对象 Executor 等待接收任务
- Driver 分配任务并监控任务的执行
注意:SparkSubmit、ApplicationMaster 和 YarnCoarseGrainedExecutorBackend 是独立的进程;Executor 和Driver 是对象
2.3 Standalone Cluster 模式
在 Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver。Driver 启动后向 Master 注册应用程序,Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向Driver 反向注册,所有的 Executor 注册完成后,Driver 开始执行 main函数,之后执行到Action 算子时,开始划分Stage,每个 Stage 生成对应的 taskSet,之后将Task 分发到各个 Executor 上执行
2.4 Standalone Client 模式
在 Standalone Client 模式下,Driver 在任务提交的本地机器上运行。Driver 启动后向Master 注册应用程序,Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 Stage,每个 Stage 生成对应的 TaskSet,之后将 Task 分发到各个Executor 上执行
3、Spark 通讯架构
3.1 Spark 通信架构概述
- Spark 早期版本中采用 Akka 作为内部通信部件
- Spark1.3 中引入Netty 通信框架,为了解决 Shuffle 的大数据传输问题使用
- Spark1.6 中Akka 和Netty 可以配置使用。Netty 完全实现了 Akka 在 Spark 中的功能
- Spark2 系列中,Spark 抛弃 Akka,使用 Netty。
Spark2.x 版本使用 Netty 通讯框架作为内部通讯组件。Spark 基于 Netty 新的 RPC 框架借
Endpoint(Client/Master/Worker)有 1 个 InBox 和N 个OutBox(N>=1,N 取决于当前 Endpoint与多少其他的Endpoint 进行通信,一个与其通讯的其他Endpoint 对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入 OutBox 并被发送到其他Endpoint 的InBox中
3.2 Spark 通讯架构解析
- RpcEndpoint:RPC 通信终端。Spark 针对每个节点(Client/Master/Worker)都称之为一个 RPC 终端,且都实现 RpcEndpoint 接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。在 Spark 中,所有的终端都存在生命周期:Constructor/onStart/receive*/onStop
- RpcEnv:RPC 上下文环境,每个 RPC 终端运行时依赖的上下文环境称为 RpcEnv;在把当前 Spark 版本中使用的 NettyRpcEnv
- Dispatcher:消息调度(分发)器,针对于 RPC 终端需要发送远程消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱
- Inbox:指令消息收件箱。一个本地RpcEndpoint 对应一个收件箱,Dispatcher 在每次向Inbox 存入消息时,都将对应 EndpointData 加入内部ReceiverQueue 中,另外 Dispatcher创建时会启动一个单独线程进行轮询 ReceiverQueue,进行收件箱消息消费
- RpcEndpointRef:RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当我们需要向一个具体的RpcEndpoint 发送消息时,一般我们需要获取到该RpcEndpoint 的引用,然后通过该应用发送消息
- OutBox:指令消息发件箱。对于当前 RpcEndpoint 来说,一个目标 RpcEndpoint 对应一个发件箱,如果向多个目标RpcEndpoint 发送信息,则有多个OutBox。当消息放入Outbox 后,紧接着通过 TransportClient 将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行
- RpcAddress:表示远程的RpcEndpointRef 的地址,Host + Port
- TransportClient:Netty 通信客户端,一个 OutBox 对应一个TransportClient,TransportClient不断轮询OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer
- TransportServer:Netty 通信服务端,一个 RpcEndpoint 对应一个TransportServer,接受远程消息后调用 Dispatcher 分发消息至对应收发件箱
4、Spark 任务调度机制
4.1 简介
在生产环境下,Spark 集群的部署方式一般为 YARN-Cluster 模式,之后的内核分析内容中我们默认集群的部署方式为 YARN-Cluster 模式。在上一章中我们讲解了 Spark YARN-Cluster 模式下的任务提交流程,但是我们并没有具体说明 Driver 的工作流程, Driver 线程主要是初始化 SparkContext 对象, 准备运行所需的上下文, 然后一方面保持与ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。
当 ResourceManager 向ApplicationMaster 返回Container 资源时,ApplicationMaster 就尝试在对应的Container 上启动 Executor 进程,Executor 进程起来后,会向Driver 反向注册, 注册成功后保持与 Driver 的心跳,同时等待 Driver 分发任务,当分发的任务执行完毕后, 将任务状态上报给Driver。
4.2 Spark 任务调度概述
当 Driver 起来后,Driver 则会根据用户程序逻辑准备任务,并根据 Executor 资源情况逐步分发任务。在详细阐述任务调度前,首先说明下 Spark 里的几个概念。一个 Spark 应用程序包括 Job、Stage 以及 Task 三个概念:
- Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job
- Stage 是 Job 的子集,以RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分
- Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task
Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总体调度流程如下图所示
Spark RDD 通过其Transactions 操作,形成了RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler 和 TaskScheduler
- DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage打包成 TaskSet 交给 TaskScheduler 调度
- TaskScheduler 负责Task 级的调度,将 DAGScheduler 给过来的TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中SchedulerBackend 有多种实现,分别对接不同的资源管理系统
Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,并启动SchedulerBackend 以及 HeartbeatReceiver。SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor的存活状况,并通知到TaskScheduler。
4.3 Spark Stage 级调度
Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交。Job 由最终的 RDD 和 Action 方法封装而成;SparkContext 将 Job 交给 DAGScheduler 提交,它会根据 RDD 的血缘关系构成的 DAG 进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之间被划分到同一个 Stage 中,可以进行pipeline 式的计算。划分的 Stages 分两类,一类叫做 ResultStage ,为 DAG 最下游的 Stage ,由 Action 方法决定, 另一类叫做ShuffleMapStage,为下游 Stage 准备数据,下面看一个简单的例子 WordCount
Job 由 saveAsTextFile 触发,该 Job 由 RDD-3 和 saveAsTextFile 方法组成,根据 RDD 之间的依赖关系从RDD-3 开始回溯搜索,直到没有依赖的 RDD-0,在回溯搜索过程中,RDD-3 依赖 RDD-2,并且是宽依赖,所以在RDD-2 和 RDD-3 之间划分 Stage,RDD-3 被划到最后一个 Stage,即 ResultStage 中,RDD-2 依赖RDD-1,RDD-1 依赖RDD-0,这些依赖都是窄依赖,所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,形成 pipeline 操作,即ShuffleMapStage 中,实际执行的时候,数据记录会一气呵成地执行 RDD-0 到 RDD-2 的转化。不难看出,其本质上是一个深度优先搜索(Depth First Search)算法
一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型的 Task 失败会在TaskScheduler 的调度过程中重试。相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交Stage 并监控相关状态信息
4.4 Spark Task 级调度
Spark Task 的调度是由 TaskScheduler 来完成,由前文可知,DAGScheduler 将 Stage 打包到交给 TaskScheTaskSetduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到调度队列中,TaskSetManager 结构如下图所示
TaskSetManager 负责监控管理同一个 Stage 中的 Tasks , TaskScheduler 就是以TaskSetManager 为单元来调度任务。TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道, 接收 Executor 的注册信息,并维护 Executor 的状态,所以说 SchedulerBackend 是管"粮食" 的,同时它在启动后会定期地去"询问"TaskScheduler 有没有任务要运行,也就是说,它会定期地"问"TaskScheduler"我有这么余粮,你要不要啊",TaskScheduler 在 SchedulerBackend"问"它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行
1 调度策略
TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。在 TaskScheduler 初始化过程中会实例化 rootPool,表示树的根节点,是 Pool 类型
如果是采用 FIFO 调度策略,则直接简单地将 TaskSetManager 按照先来先到的方式入队,出队时直接拿出最先进队的 TaskSetManager
FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的TaskSetMagager。在 FAIR 模式中,需要先对子Pool 进行排序,再对子Pool 里面的TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因此使用相同的排序算法。排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性:runningTasks 值(正在运行的Task 数)、minShare 值、weight 值,比较时会综合考量runningTasks值,minShare 值以及weight 值。注意,minShare、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相关配置
- 如果A 对象的runningTasks大于它的minShare,B 对象的runningTasks 小于它的minShare,那么B 排在 A 前面;(runningTasks 比 minShare 小的先执行)
- 如果 A、B 对象的 runningTasks 都小于它们的 minShare,那么就比较 runningTasks 与minShare 的比值(minShare 使用率),谁小谁排前面;(minShare 使用率低的先执行)
- 如果 A、B 对象的 runningTasks 都大于它们的 minShare,那么就比较 runningTasks 与weight 的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)
- 如果上述比较均相等,则比较名字。
整体上来说就是通过minShare和weight 这两个参数控制比较过程,可以做到让minShare 使用率和权重使用率少(实际运行 task 比例较少)的先运行。FAIR 模式排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次被取出并发送给Executor 执行。从调度队列中拿到 TaskSetManager 后,由于 TaskSetManager 封装了一个 Stage 的所有Task,并负责管理调度这些 Task,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 Task 给TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor 上执行
2 本地化调度
DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的tasks,submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个需要计算的 task的 preferredLocations,通过调用 getPreferrdeLocations()得到 partition 的优先位置,由于一个partition 对应一个 Task,此 partition 的优先位置就是 task 的优先位置,对于要提交到TaskScheduler 的TaskSet 中的每一个 Task,该 task 优先位置与其对应的partition 对应的优先位置一致。
调度队列中拿到 TaskSetManager 后,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 task 给 TaskScheduler,TaskScheduler 再交给SchedulerBackend 去发到Executor 上执行。前面也提到,TaskSetManager 封装了一个 Stage 的所有Task,并负责管理调度这些Task。根据每个 Task 的优先位置,确定 Task 的 Locality 级别,Locality 一共有五种,优先级由高到低顺序
名称 | 解析 |
---|---|
PROCESS_LOCAL | 进程本地化,task 和数据在同一个 Executor 中,性能最好 |
NODE_LOCAL | 节点本地化,task 和数据在同一个节点中,但是 task 和数据不在同一个 Executor 中,数据需要在进程间进行传输 |
RACK_LOCAL | 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输 |
NO_PREF | 对于 task 来说,从哪里获取都一样,没有好坏之分 |
ANY | task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差 |
在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个task 以X 本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败, 此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能就会有相应的资源去执行此 task,这就在在一定程度上提到了运行性能
3 失败重试与黑名单机制
除了选择合适的 Task 调度运行外,还需要监控Task 的执行状态,前面也提到,与外部打交道的是 SchedulerBackend,Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给SchedulerBackend,SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task的失败与成功状态,对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中,否则整个Application 失败。
在记录 Task 失败次数过程中,会记录它上一次失败所在的 Executor Id 和Host,这样下次再调度这个 Task 时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录 Task 上一次失败所在的Executor Id 和 Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个 Task 了
5、Spark 内存管理
5.1 堆内和堆外内存规划
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
堆 内 内 存 的 大 小 , 由 Spark 应 用 程 序 启 动 时 的 executor-memory
或spark.executor.memory
参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存, 而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间
堆外内存为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled
参数启用, 并由 spark.memory.offHeap.size
参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存
5.2 内存空间分配
在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。Spark1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所示
其中最重要的优化在于动态占用机制,其规则如下:
- 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间;
- 存储内存的空间被对方占用后,无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理
5.3 存储内存管理
- RDD 的持久化机制
- RDD 的缓存过程
- 淘汰与落盘
5.4 执行内存管理
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对RDD 数据重新分区的过程,我们来看 Shuffle 的Write 和Read 两阶段对执行内存的使用:
Shuffle Write
若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
Shuffle Read
在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。
二、Spark 的两种核心 Shuffle
1、概述
1.1 简介
在 MapReduce 框架中, Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序的性能。 Spark 也有 Map 阶段和 Reduce 阶段,因此也会出现 Shuffle
1.2 Spark Shuffle
Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。在 Spark 1.1 之前, Spark 中只实现了一种 Shuffle 方式,即基于 Hash 的 Shuffle 。在 Spark 1.1 版本中引入了基于 Sort 的 Shuffle 实现方式,并且 Spark 1.2 版本之后,默认的实现方式从基于 Hash 的 Shuffle 修改为基于 Sort 的 Shuffle 实现方式,即使用的 ShuffleManager 从默认的 hash 修改为 sort。在 Spark 2.0 版本中, Hash Shuffle 方式己经不再使用。
Spark 之所以一开始就提供基于 Hash 的 Shuffle 实现机制,其主要目的之一就是为了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是将 sort 作为固定步骤,有许多并不需要排序的任务,MapReduce 也会对其进行排序,造成了许多不必要的开销。
在基于 Hash 的 Shuffle 实现方式中,每个 Mapper 阶段的 Task 会为每个 Reduce 阶段的 Task 生成一个文件,通常会产生大量的文件(即对应为 M*R 个中间文件,其中, M 表示 Mapper 阶段的 Task 个数, R 表示 Reduce 阶段的 Task 个数) 伴随大量的随机磁盘 I/O 操作与大量的内存开销。为了缓解上述问题,在 Spark 0.8.1 版本中为基于 Hash 的 Shuffle 实现引入了 Shuffle Consolidate 机制(即文件合并机制),将 Mapper 端生成的中间文件进行合并的处理机制。通过配置属性spark.shuffie.consolidateFiles=true
,减少中间生成的文件数量。通过文件合并,可以将中间文件的生成方式修改为每个执行单位为每个 Reduce 阶段的 Task 生成一个文件。
基于 Hash 的 Shuffle 的实现方式中,生成的中间结果文件的个数都会依赖于 Reduce 阶段的 Task 个数,即 Reduce 端的并行度,因此文件数仍然不可控,无法真正解决问题。为了更好地解决问题,在 Spark1.1 版本引入了基于 Sort 的 Shuffle 实现方式,并且在 Spark 1.2 版本之后,默认的实现方式也从基于 Hash 的 Shuffle,修改为基于 Sort 的 Shuffle 实现方式,即使用的 ShuffleManager 从默认的 hash 修改为 sort。
在基于 Sort 的 Shuffle 中,每个 Mapper 阶段的 Task 不会为每 Reduce 阶段的 Task 生成一个单独的文件,而是全部写到一个数据(Data)文件中,同时生成一个索引(Index)文件, Reduce 阶段的各个 Task 可以通过该索引文件获取相关的数据。避免产生大量文件的直接收益就是降低随机磁盘 I/0 与内存的开销。最终生成的文件个数减少到 2M ,其中 M 表示 Mapper 阶段的 Task 个数,每个 Mapper 阶段的 Task 分别生成两个文件(1 个数据文件、 1 个索引文件),最终的文件个数为 M 个数据文件与 M 个索引文件。因此,最终文件个数是 2M 个。
1.3 Shuffle 的核心要点
在划分 stage 时,最后一个 stage 称为 finalStage,它本质上是一个 ResultStage 对象,前面的所有 stage 被称为 ShuffleMapStage。ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。ResultStage 基本上对应代码中的action 算子,即将一个函数应用在 RDD 的各个partition的数据集上,意味着一个 job 的运行结束
2、Hash Shuffle 解析
2.1 未优化解析
shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“划分”。所谓“划分”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。比如下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件。如果当前 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建 5000 个磁盘文件。由此可见,未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的。
shuffle read 阶段,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 就需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,map task 给下游 stage 的每个 reduce task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 reduce task 只要从上游 stage 的所有 map task 所在节点上,拉取属于自己的那一个磁盘文件即可。
shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。
2.1 优化的 HashShuffleManager
为了优化 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles
,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 Executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。
当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。假设第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(Executor CPU 个数为 1),每个 Executor 执行 5 个 task。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生 500 个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量
,也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。
2.2 基于 Hash 的 Shuffle 机制的优缺点
优点:
- 可以省略不必要的排序开销。
- 避免了排序所需的内存开销。
缺点:
- 生产的文件过多,会对文件系统造成压力。
- 大量小文件的随机读写带来一定的磁盘开销。
- 数据块写入时所需的缓存空间也会随之增加,对内存造成压力
3、SortShuffle 解析
3.1 概述
SortShuffleManager 的运行机制主要分成三种:
- 普通运行机制;
- bypass 运行机制,当 shuffle read task 的数量小于等于
spark.shuffle.sort.bypassMergeThreshold
参数的值时(默认为 200),就会启用 bypass 机制; - Tungsten Sort 运行机制,开启此运行机制需设置配置项
spark.shuffle.manager=tungsten-sort
。开启此项配置也不能保证就一定采用此运行机制
3.2 普通运行机制
在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。
一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。
SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件
3.2 bypass 运行机制
Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort huffle 实现机制提供了一个回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold
设置的个数时,使用带 Hash 风格的回退计划。bypass 运行机制的触发条件如下:
- shuffle map task 数量小于
spark.shuffle.sort.bypassMergeThreshold=200
参数的值。 - 不是聚合类的 shuffle 算子(比如 reduceByKey)
此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。bypass 运行机制的 SortShuffleManager 工作原理如下图所示:
3.3 Tungsten Sort Shuffle 运行机制
基于 Tungsten Sort 的 Shuffle 实现机制主要是借助 Tungsten 项目所做的优化来高效处理 Shuffle。Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:**对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort
方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle
方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。**因此,当设置了 spark.shuffle.manager=tungsten-sort
时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。要实现 Tungsten Sort Shuffle 机制需要满足以下条件:
- Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求
- Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)
- Shuffle 过程中的输出分区个数少于 16777216 个
实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的
3.4 基于 Sort 的 Shuffle 机制的优缺点
优点:
- 小文件的数量大量减少,Mapper 端的内存占用变少;
- Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。
缺点:
- 如果 Mapper 中 Task 的数量过大,依旧会产生很多小文件,此时在 Shuffle 传数据的过程中到 Reducer 端, Reducer 会需要同时大量地记录进行反序列化,导致大量内存消耗和 GC 负担巨大,造成系统缓慢,甚至崩溃;
- 强制了在 Mapper 端必须要排序,即使数据本身并不需要排序;
- 它要基于记录本身进行排序,这就是 Sort-Based Shuffle 最致命的性能消耗。
三、Spark 性能调优
1、常规性能优化
1.1 最优资源配置
# 标准的 Spark 任务提交脚本
bin/spark-submit \
--class com.atguigu.spark.Analysis \
--master yarn
--deploy-mode cluster
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar # 可以进行分配的资源
# --num-executors 配置 Executor 的数量
# --driver-memory 配置 Driver 内存( 影响不大)
# --executor-memory 配置每个 Executor 的内存大小
# --executor-cores 配置每个 Executor 的 CPU core 数量
对于具体资源的分配,我们分别讨论 Spark 的两种Cluster 运行模式:
- 第一种是 Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写 submit 脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有 15 台机器,每台机器为 8G 内存,2 个 CPU core,那么就指定 15 个 Executor,每个 Executor 分配 8G 内存,2 个 CPU core
- 第二种是 Spark Yarn 模式,由于 Yarn 使用资源队列进行资源的分配和调度,在编写submit 脚本的时候,就根据 Spark 作业要提交到的资源队列,进行资源的分配,比如资源队列有 400G 内存,100 个 CPU core,那么指定 50 个 Executor,每个 Executor 分配8G 内存,2 个 CPU core
对各项资源进行了调节后,得到的性能提升会有如下表现
名称 | 解析 |
---|---|
增加 Executor个数 | 在资源允许的情况下,增加 Executor 的个数可以提高执行 task 的并行度。比如有 4 个Executor,每个 Executor 有 2 个 CPU core,那么可以并行执行 8 个 task,如果将 Executor 的个数增加到 8 个( 资源允许的情况下), 那么可以并行执行 16 个 task, 此时的并行能力提升了一倍 |
增加每个 Executor 的 CPU core 个数 | 在资源允许的情况下,增加每个 Executor 的Cpu core 个数, 可以提高执行 task 的并行度。比如有 4 个 Executor,每个 Executor 有 2 个CPU core,那么可以并行执行 8 个 task,如果将每个 Executor 的 CPU core 个数增加到 4 个( 资源允许的情况下),那么可以并行执行 16个 task, 此时的并行能力提升了一倍 |
增加每个 Executor 的内存量 | 在资源允许的情况下,增加每个 Executor 的内存量以后, 对性能的提升有三点: 1. 可以缓存更多的数据( 即对 RDD 进行 cache), 写入磁盘的数据相应减少, 甚至可以不写入磁盘, 减少了可能的磁盘 IO; 2. 可以为 shuffle 操作提供更多内存,即有更多空间来存放 reduce 端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘, 减少了可能的磁盘 IO; 3. 可以为 task 的执行提供更多内存,在 task 的执行过程中可能创建很多对象,内存较小时会引发频繁的 GC, 增加内存后, 可以避免频繁的 GC, 提升整体性能。 |
补充:生产环境 Spark submit 脚本配置
bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn\
--deploy-mode cluster\
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar# 参数配置参考值:
# --num-executors:50~100
# --driver-memory:1G~5G
# --executor-memory:6G~10G
# --executor-cores:3
# --master:实际生产环境一定使用 yarn
1.2 并行度调节
Spark 作业中的并行度指各个 stage 的 task 的数量。如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20 个Executor,每个Executor 分配 3 个CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 分配到的task 个数是 2 个,这就使得每个 Executor 有一个CPU core 空闲,导致资源的浪费。理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下, 并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark 作业的性能和运行速度。
Spark 官方推荐,task 数量应该设置为 Spark 作业总CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与CPU core 总数相等,是因为 task 的执行时间不同,有的task 执行速度快而有的 task 执行速度慢,如果 task 数量与CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为CPU core 总数的 2~3 倍,那么一个task 执行完毕后,CPU core 会立刻执行下一个task,降低了资源的浪费,同时提升了 Spark作业运行的效率。
val conf = new SparkConf().set("spark.default.parallelism", "500")
1.3 Kryo 序列化
默认情况下,Spark 使用 Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现 Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用Kryo 序列化方式了。
public class MyKryoRegistrator implements KryoRegistrator{@Overridepublic void registerClasses(Kryo kryo){kryo.register(StartupReportLogs.class);}
}//配置Kryo 序列化方式的实例代码
//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
1.4 RDD 优化
-
RDD 复用
在对RDD 进行算子时,要避免相同的算子和计算逻辑之下对RDD 进行重复的计算
-
RDD 持久化
- RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中
- 如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对 RDD 数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本, 放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。
-
RDD 尽可能早的 filter 操作
1.5 广播大变量
默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。
在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor 对应的BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点BlockManager 上远程拉取变量的复本,并由本地的BlockManager 进行管理;之后此Executor 的所有 task 都会直接从本地的BlockManager 中获取变量
Spark join 策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用 Broadcast Hash Join,其原理就是先将小表聚合到 driver 端,再广播到各个大表分区中,那么再次进行 join 的时候,就相当于大表的各自分区的数据与小表进行本地 join,从而规避了 shuffle。广播 join 默认值为 10MB,由spark.sql.autoBroadcastJoinThreshold
参数控制
1.6 调节本地化等待时长
Spark 作业运行过程中,Driver 会对每一个 stage 的 task 进行分配。根据 Spark 的task 分配算法,Spark 希望 task 能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task 可能不会被分配到它处理的数据所在的节点, 因为这些节点可用的资源可能已经用尽,此时,Spark 会等待一段时间,默认 3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将 task 分配到比较差的本地化级别所对应的节点上,比如将 task 分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。
当 task 要处理的数据不在 task 所在节点上时,会发生数据的传输。task 会通过所在节点的BlockManager 获取数据,BlockManager 发现数据不在本地时,会通过网络传输组件从数据所在节点的BlockManager 处获取数据。网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的 task 将有机会得到执行,这样就能够改善 Spark 作业的整体性能
名称 | 解析 |
---|---|
PROCESS_LOCAL | 进程本地化,task 和数据在同一个 Executor 中,性能最好 |
NODE_LOCAL | 节点本地化,task 和数据在同一个节点中,但是task和数据不在同一个 Executor 中,数据需要在进程间进行传输 |
RACK_LOCAL | 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输 |
NO_PREF | 对于 task 来说,从哪里获取都一样,没有好坏之分 |
ANY | task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差 |
在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息, 日志信息中有明确的 task 数据本地化的级别, 如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看 task 的本地化级别有没有提升,并观察 Spark 作业的运行时间有没有缩短。
注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark 作业的运行时间反而增加了。
val conf = new SparkConf().set("spark.locality.wait", "6")
2、算子调优
2.1 mapPartitions
普通的 map 算子对RDD 中的每一个元素进行操作,而 mapPartitions 算子对RDD 中每一个分区进行操作。如果是普通的 map 算子,假设一个 partition 有 1 万条数据,那么 map算子中的 function 要执行 1 万次,也就是对每个元素进行操作
如果是 mapPartition 算子,由于一个 task 处理一个 RDD 的 partition,那么一个 task 只会执行一次function,function 一次接收所有的partition 数据,效率比较高。比如,当要把 RDD 中的所有数据通过 JDBC 写入数据,如果使用 map 算子,那么需要对 RDD 中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions 算子,那么针对一个分区的数据,只需要建立一个数据库连接
mapPartitions 算子也存在一些缺点:对于普通的 map 操作,一次处理一条数据,如果在处理了 2000 条数据后内存不足,那么可以将已经处理完的 2000 条数据从内存中垃圾回收掉;但是如果使用mapPartitions 算子,但数据量非常大时,function 一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。因此,mapPartitions 算子适用于数据量不是特别大的时候,此时使用 mapPartitions 算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用 mapPartitions 算子,就会直接OOM)
在项目中,应该首先估算一下 RDD 的数据量、每个 partition 的数据量,以及分配给每个 Executor 的内存资源,如果资源允许,可以考虑使用 mapPartitions 算子代替 map
2.2 foreachPartition 优化数据库操作
在生产环境中,通常使用 foreachPartition 算子来完成数据库的写入,通过 foreachPartition算子的特性,可以优化写数据库的性能。如果使用 foreach 算子完成数据库的操作,由于 foreach 算子是遍历 RDD 的每条数据, 因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition 算子。与 mapPartitions 算子非常相似,foreachPartition 是将 RDD 的每个分区作为遍历对象, 一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接
使用了 foreachPartition 算子后,可以获得以下的性能提升:
- 对于我们写的 function 函数,一次处理一整个分区的数据;
- 对于一个分区内的数据,创建唯一的数据库连接;
- 只需要向数据库发送一次 SQL 语句和多组参数;
在生产环境中,全部都会使用 foreachPartition 算子完成数据库操作。foreachPartition 算子存在一个问题,与 mapPartitions 算子类似,如果一个分区的数据量特别大,可能会造成 OOM, 即内存溢出。
2.3 filter 与 coalesce 的配合使用
在 Spark 任务中我们经常会使用 filter 算子完成 RDD 中数据的过滤,在任务初始阶段, 从各个分区中加载到的数据量是相近的,但是一旦进过 filter 过滤后,每个分区的数据量有可能会存在较大差异
repartition 与 coalesce 都可以用来进行重分区,其中 repartition 只是 coalesce 接口中 shuffle为 true 的简易实现,coalesce 默认情况下不进行 shuffle,但是可以通过参数进行设置。假设我们希望将原本的分区个数A 通过重新分区变为 B,那么有以下几种情况
-
A > B(多数分区合并为少数分区)
- A 与 B 相差值不大,此时使用 coalesce 即可,无需 shuffle 过程。
-
A 与 B 相差值很大,此时可以使用 coalesce 并且不启用 shuffle 过程,但是会导致合并过程性能低下,所以推荐设置 coalesce 的第二个参数为 true,即启动 shuffle 过程
-
A < B(少数分区分解为多数分区)
此时使用 repartition 即可,如果使用 coalesce 需要将 shuffle 设置为 true,否则 coalesce 无效。我们可以在filter 操作之后,使用 coalesce 算子针对每个partition 的数据量各不相同的情况, 压缩 partition 的数量,而且让每个 partition 的数据量尽量均匀紧凑,以便于后面的 task 进行计算操作,在某种程度上能够在一定程度上提升性能。
注意:local 模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。
2.4 repartition 解决 SparkSQL 低并行度问题
并行度的设置对于Spark SQL 是不生效的,用户设置的并行度只对于 Spark SQL 以外的所有 Spark 的 stage 生效。Spark SQL 的并行度不允许用户自己指定,Spark SQL 自己会默认根据 hive 表对应的HDFS 文件的 split 个数自动设置 Spark SQL 所在的那个 stage 的并行度,用户自己通过spark.default.parallelism
参数指定的并行度,只会在 Spark SQL 的 stage 中生效。
由于 Spark SQL 所在 stage 的并行度无法手动设置,如果数据量较大,并且此 stage 中后续的 transformation 操作有着复杂的业务逻辑,而 Spark SQL 自动设置的 task 数量很少, 这就意味着每个 task 要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有 Spark SQL 的 stage 速度很慢,而后续的没有 Spark SQL 的 stage 运行速度非常快。
为了解决 Spark SQL 无法设置并行度和 task 数量的问题,我们可以使用 repartition 算子。Spark SQL 这一步的并行度和 task 数量肯定是没有办法去改变了,但是,对于Spark SQL 查询出来的 RDD, 立即使用 repartition 算子, 去重新进行分区, 这样可以重新分区为多个 partition,从 repartition 之后的 RDD 操作,由于不再设计 SparkSQL,因此 stage 的并行度就会等于你手动设置的值,这样就避免了 Spark SQL 所在的 stage 只能用少量的 task 去处理大量数据并执行复杂的算法逻辑。
2.5 reduceByKey 预聚合
reduceByKey 相较于普通的 shuffle 操作一个显著的特点就是会进行map 端的本地聚合,map 端会先对本地的数据进行 combine 操作,然后将数据写入给下个 stage 的每个 task 创建的文件中,也就是在 map 端,对每一个 key 对应的 value,执行 reduceByKey 算子函数。使用 reduceByKey 对性能的提升如下:
- 本地聚合后,在 map 端的数据量变少,减少了磁盘 IO,也减少了对磁盘空间的占用;
- 本地聚合后,下一个 stage 拉取的数据量变少,减少了网络传输的数据量;
- 本地聚合后,在 reduce 端进行数据缓存的内存占用减少;
- 本地聚合后,在 reduce 端进行聚合的数据量减少。
基于 reduceByKey 的本地聚合特征,我们应该考虑使用 reduceByKey 代替其他的 shuffle 算子。groupByKey 不会进行 map 端的聚合,而是将所有 map 端的数据 shuffle 到reduce 端,然后在 reduce 端进行数据的聚合操作。由于 reduceByKey 有 map 端聚合的特性, 使得网络传输的数据量减小,因此效率要明显高于groupByKey
3、Shuffle 调优
3.1 调节 map 端缓冲区大小
在 Spark 任务运行过程中,如果 shuffle 的 map 端处理的数据量比较大,但是map 端缓冲的大小是固定的,可能会出现 map 端缓冲数据频繁 spill 溢写到磁盘文件中的情况,使得性能非常低下,通过调节 map 端缓冲的大小,可以避免频繁的磁盘 IO 操作,进而提升 Spark 任务的整体性能。
map 端缓冲的默认配置是 32KB,如果每个 task 处理 640KB 的数据,那么会发生 640/32= 20 次溢写,如果每个 task 处理 64000KB 的数据,机会发生 64000/32=2000 此溢写,这对于性能的影响是非常严重的。
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
3.2 调节 reduce 端拉取数据缓冲区大小
Spark Shuffle 过程中,shuffle reduce task 的 buffer 缓冲区大小决定了reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。reduce 端数据拉取缓冲区的大小可以通过 spark.reducer.maxSizeInFlight
参数进行设置,默认为 48MB
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
3.3 调节 reduce 端拉取数据重试次数
Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如 60 次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。
reduce 端拉取数据重试次数可以通过 spark.shuffle.io.maxRetries
参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为 3
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")
3.4 调节 reduce 端拉取数据等待间隔
Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性。reduce 端拉取数据等待间隔可以通过 spark.shuffle.io.retryWait
参数进行设置, 默认值为 5s
val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")
3.5 调节 SortShuffle 排序操作阈值
对于 SortShuffleManager,如果 shuffle reduce task 的数量小于某一阈值则 shuffle write 过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
当你使用 SortShuffleManager 时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于 shuffle read task 的数量,那么此时 map-side 就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此 shuffle write 性能有待提高。SortShuffleManager 排序操作阈值的设置可以通过spark.shuffle.sort. bypassMergeThreshold
这一参数进行设置,默认值为 200
val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")
4、JVM 调优
full gc/minor gc都会导致 JVM 的工作线程停止工作, 即 stop the world
4.1 降低 cache 操作的内存占比
- 静态内存管理机制
根据 Spark 静态内存管理机制,堆内存被划分为了两块,Storage 和 Execution。Storage 主要用于缓存RDD 数据和 broadcast 数据,Execution 主要用于缓存在 shuffle 过程中产生的中间数据,Storage 占系统内存的 60%,Execution 占系统内存的 20%,并且两者完全独立。在一般情况下,Storage 的内存都提供给了cache 操作,但是如果在某些情况下 cache 操作内存不是很紧张,而 task 的算子中创建的对象很多,Execution 内存又相对较小,这回导致频繁的 minor gc,甚至于频繁的 full gc,进而导致 Spark 频繁的停止工作,性能影响会很大。在 Spark UI 中可以查看每个 stage 的运行情况,包括每个 task 的运行时间、gc 时间等等,如果发现 gc 太频繁,时间太长,就可以考虑调节 Storage 的内存占比,让 task 执行算子函数式,有更多的内存可以使用。Storage 内存区域可以通过 spark.storage.memoryFraction
参数进行指定,默认为 0.6,60%,可以逐级向下递减
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.4")
- 统一内存管理机制
根据 Spark 统一内存管理机制,堆内存被划分为了两块,Storage 和 Execution。Storage 主要用于缓存数据,Execution 主要用于缓存在 shuffle 过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage 和Execution 各占统一内存的 50%,由于动态占用机制的实现,shuffle 过程需要的内存过大时,会自动占用 Storage 的内存区域,因此无需手动进行调节。
4.2 调节 Executor 堆外内存
Executor 的堆外内存主要用于程序的共享库、Perm Space、 线程 Stack 和一些 Memory mapping 等, 或者类C 方式 allocate object。有时,如果你的 Spark 作业处理的数据量非常大,达到几亿的数据量,此时运行 Spark 作业会时不时地报错,例如 shuffle output file cannot find,executor lost,task lost,out of memory 等,这可能是Executor 的堆外内存不太够用,导致Executor 在运行的过程中内存溢出。
stage 的 task 在运行的时候,可能要从一些 Executor 中去拉取 shuffle map output 文件, 但是Executor 可能已经由于内存溢出挂掉了,其关联的BlockManager 也没有了,这就可能会报出 shuffle output file cannot find,executor lost,task lost,out of memory 等错误,此时, 就可以考虑调节一下 Executor 的堆外内存,也就可以避免报错,与此同时,堆外内存调节的比较大的时候,对于性能来讲,也会带来一定的提升。默认情况下,Executor 堆外内存上限大概为 300 多 MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致 Spark 作业反复崩溃,无法运行,此时就会去调节这个参数,到至少 1G,甚至于 2G、4G
Executor 堆外内存的配置需要在 spark-submit 脚本里配置--conf spark.yarn.executor.memoryOverhead=2048
,以上参数配置完成后,会避免掉某些 JVM OOM 的异常问题,同时,可以提升整体 Spark作业的性能
4.3 调节连接等待时长
在 Spark 作业运行过程中,Executor 优先从自己本地关联的 BlockManager 中获取某份数据,如果本地BlockManager 没有的话,会通过TransferService 远程连接其他节点上Executor 的 BlockManager 来获取数据。
如果 task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的Executor 进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。
在生产环境下,有时会遇到 file not found、file lost 这类错误,在这种情况下,很有可能是 Executor 的BlockManager 在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长 60s 后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致 Spark 作业的崩溃。这种情况也可能会导致 DAGScheduler 反复提交几次 stage,TaskScheduler 返回提交几次 task,大大延长了我们的 Spark 作业的运行时间。此时,可以考虑调节连接的超时时长,连接等待时长需要在 spark-submit 脚本中进行设置--conf spark.core.connection.ack.wait.timeout=300
,调节连接等待时长后,通常可以避免部分的 XX 文件拉取失败、XX 文件 lost 等报错
四、Spark 数据倾斜
1、概述
1.1 简介
Spark 中的数据倾斜问题主要指 shuffle 过程中出现的数据倾斜问题,是由于不同的 key对应的数据量不同导致的不同 task 所处理的数据量不同的问题。
例如,reduce 点一共要处理 100 万条数据,第一个和第二个 task 分别被分配到了 1万条数据,计算 5 分钟内完成,第三个 task 分配到了 98 万数据,此时第三个 task 可能需要 10 个小时完成,这使得整个 Spark 作业需要 10 个小时才能运行完成,这就是数据倾斜所带来的后果。
注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数 task 被分配了绝大多数的数据,因此少数 task 运行缓慢;数据过量是指所有 task 被分配的数据量都很大, 相差不多,所有task 都运行缓慢。
1.2 数据倾斜的表现
- Spark 作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;
- Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM, 反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,作业无法正常运行。
1.3 定位数据倾斜问题
- 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜
- 查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个 stage,对应的 shuffle 算子是哪一个
2、聚合原数据
2.1 避免 shuffle 过程
绝大多数情况下,Spark 作业的数据来源都是 Hive 表,这些 Hive 表基本都是经过 ETL 之后的昨天的数据。为了避免数据倾斜,我们可以考虑避免 shuffle 过程,如果避免了 shuffle 过程,那么从根本上就消除了发生数据倾斜问题的可能。
如果 Spark 作业的数据来源于Hive 表,那么可以先在 Hive 表中对数据进行聚合,例如按照key 进行分组,将同一key 对应的所有value 用一种特殊的格式拼接到一个字符串里去,这样,一个 key 就只有一条数据了;之后,对一个 key 的所有 value 进行处理时,只需要进行map 操作即可,无需再进行任何的 shuffle 操作。通过上述方式就避免了执行 shuffle 操作, 也就不可能会发生任何的数据倾斜问题。
对于 Hive 表中数据的操作,不一定是拼接成一个字符串,也可以是直接对 key 的每一条数据进行累计计算。要区分开,处理的数据量大和数据倾斜的区别。
2.2 缩小 key 粒度
增大数据倾斜可能性,降低每个 task 的数据量,key 的数量增加,可能使数据倾斜更严重
2.3 增大 key 粒度
减小数据倾斜可能性,增大每个 task 的数据量,如果没有办法对每个 key 聚合出来一条数据,在特定场景下,可以考虑扩大 key 的聚合粒度。
例如,目前有 10 万条用户数据,当前key 的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将key 的粒度扩大为(省,城市,日期),这样的话,key 的数量会减少,key 之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)
3、过滤导致倾斜的 key
如果在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行过滤,滤除可能导致数据倾斜的 key 对应的数据,这样,在 Spark 作业中就不会发生数据倾斜了
4、提高 shuffle 操作中的 reduce 并行度
当前面对于数据倾斜的处理没有很好的效果时,可以考虑提高 shuffle 过程中的 reduce 端并行度,reduce 端并行度的提高就增加了 reduce 端 task 的数量,那么每个 task 分配到的数据量就会相应减少,由此缓解数据倾斜问题
4.1 reduce 端并行度的设置
在大部分的 shuffle 算子中,都可以传入一个并行度的设置参数,比如reduceByKey(500), 这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle 操作的时候,就会对应着创建指定数量的reduce task。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等, 需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度, 该值默认是 200,对于很多场景来说都有点过小。
增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个key 分配给多个 task, 从而让每个task 处理比原来更少的数据。举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了shuffle read task 以后,每个 task 就分配到一个key,即每个 task 就处理 10 条数据,那么自然每个 task 的执行时间都会变短了。
4.2 reduce 端并行度设置存在的缺陷
提高 reduce 端并行度并没有从根本上改变数据倾斜的本质和问题(前面方法从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻 shuffle reduce task 的数据压力, 以及数据倾斜的问题,适用于有较多 key 对应的数据量都比较大的情况。
该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
5、使用随机 key 实现双重聚合
当使用了类似于 groupByKey、reduceByKey 这样的算子时,可以考虑使用随机 key 实现双重聚合。首先,通过 map 算子给每个数据的 key 添加随机数前缀,对 key 进行打散,将原先一样的 key 变成不一样的 key,然后进行第一次聚合,这样就可以让原本被一个 task 处理的数据分散到多个task 上去做局部聚合;随后,去除掉每个 key 的前缀,再次进行聚合。
此方法对于由 groupByKey、reduceByKey 这类算子造成的数据倾斜由比较好的效果, 仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案。
6、将 reduce join 转换为 map join
正常情况下,join 操作都会执行 shuffle 过程,并且执行的是 reduce join,也就是先将所有相同的 key 和对应的 value 汇聚到一个 reduce task 中,然后再进行join。普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个RDD 是比较小的,则可以采用广播小 RDD 全量数据+map 算子来实现与join 同样的效果,也就是 map join, 此时就不会发生 shuffle 操作,也就不会发生数据倾斜。(注意,RDD 是并不能进行广播的,只能将 RDD 内部的数据通过 collect 拉取到 Driver 内存然后再进行广播)
6.1 核心思路
不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小 RDD 中的数据直接通过 collect 算子拉取到Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。
根据上述思路,根本不会发生 shuffle 操作,从根本上杜绝了 join 操作可能导致的数据倾斜问题。当 join 操作有数据倾斜问题并且其中一个 RDD 的数据量较小时,可以优先考虑这种方式,效果非常好
6.2 不适用场景分析
由于Spark 的广播变量是在每个Executor 中保存一个副本,如果两个RDD 数据量都比较大, 那么如果将一个数据量比较大的 RDD 做成广播变量,那么很有可能会造成内存溢出
7、sample 采样对倾斜 key 单独进行 join
在 Spark 中,如果某个 RDD 只有一个 key,那么在 shuffle 过程中会默认将此 key 对应的数据打散,由不同的 reduce 端 task 进行处理。
当由单个 key 导致数据倾斜时,可有将发生数据倾斜的 key 单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的 key 组成的 RDD 跟其他 RDD 单独 join,此时,根据Spark 的运行机制,此 RDD 中的数据会在 shuffle 阶段被分散到多个 task 中去进行 join 操作
7.1 适用场景分析
对于RDD 中的数据,可以将其转换为一个中间表,或者是直接使用 countByKey()的方式,看一个这个 RDD 中各个 key 对应的数据量,此时如果你发现整个 RDD 就一个 key 的数据量特别多,那么就可以考虑使用这种方法。当数据量非常大时,可以考虑使用 sample 采样获取 10%的数据,然后分析这 10%的数据中哪个 key 可能会导致数据倾斜,然后将这个 key 对应的数据单独提取出来
7.2 不适用场景分析
如果一个RDD 中导致数据倾斜的 key 很多,那么此方案不适用
8、使用随机数扩容进行 join
如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义,此时就只能使用最后一种方案来解决问题了,对于 join 操作,我们可以考虑对其中一个 RDD 数据进行扩容,另一个RDD 进行稀释后再 join。
我们会将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理后的“不同 key”分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。这一种方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,需要对整个RDD 进行数据扩容,对内存资源要求很高。
8.1 核心思想
选择一个 RDD,使用 flatMap 进行扩容,对每条数据的 key 添加数值前缀(1~N 的数值),将一条数据映射为多条数据;(扩容)选择另外一个 RDD,进行map 映射操作,每条数据的 key 都打上一个随机数作为前缀(1~N 的随机数);(稀释)将两个处理后的RDD,进行 join 操作
8.2 局限性与优化
如果两个RDD 都很大,那么将RDD 进行 N 倍的扩容显然行不通; 使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题。当 RDD 中有几个 key 导致数据倾斜时:
- 对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个key
- 然后将这几个 key 对应的数据从原来的RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上n 以内的随机数作为前缀,而不会导致倾斜的大部分 key 形成另外一个RDD
- 接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀, 不会导致倾斜的大部分 key 也形成另外一个RDD
- 再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了
- 而另外两个普通的RDD 就照常 join 即可。最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果
五、Spark 故障排除
1、控制 reduce 端缓冲大小以避免 OOM
在 Shuffle 过程,reduce 端 task 并不是等到 map 端 task 将其数据全部写入磁盘后再去拉取,而是 map 端写一点数据,reduce 端 task 就会拉取一小部分数据,然后立即进行后面的聚合、算子函数的使用等操作。reduce 端 task 能够拉取多少数据,由 reduce 拉取数据的缓冲区 buffer 来决定,因为拉取过来的数据都是先放在buffer 中,然后再进行后续的处理,buffer 的默认大小为 48MB。
reduce 端 task 会一边拉取一边计算,不一定每次都会拉满 48MB 的数据,可能大多数时候拉取一部分数据就处理掉了。虽然说增大 reduce 端缓冲区大小可以减少拉取次数,提升Shuffle 性能,但是有时 map 端的数据量非常大,写出的速度非常快,此时 reduce 端的所有 task 在拉取的时候,有可能全部达到自己缓冲的最大极限值,即 48MB,此时,再加上 reduce 端执行的聚合函数的代码,可能会创建大量的对象,这可难会导致内存溢出,即 OOM。
如果一旦出现 reduce 端内存溢出的问题,我们可以考虑减小 reduce 端拉取数据缓冲区的大小,例如减少为 12MB。在实际生产环境中是出现过这种问题的,这是典型的以性能换执行的原理。reduce 端拉取数据的缓冲区减小,不容易导致 OOM,但是相应的,reudce 端的拉取次数增加,造成更多的网络传输开销,造成性能的下降。注意,要保证任务能够运行,再考虑性能的优化。
2、JVM GC 导致的shuffle 文件拉取失败
在 Spark 作业中,有时会出现 shuffle file not found 的错误,这是非常常见的一个报错, 有时出现这种错误以后,选择重新执行一遍,就不再报出这种错误。出现上述问题可能的原因是 Shuffle 操作中,后面 stage 的 task 想要去上一个 stage 的task 所在的 Executor 拉取数据,结果对方正在执行GC,执行 GC 会导致 Executor 内所有的工作现场全部停止,比如BlockManager、基于 netty 的网络通信等,这就会导致后面的 task 拉取数据拉取了半天都没有拉取到,就会报出 shuffle file not found 的错误,而第二次再次执行就不会再出现这种错误。
可以通过调整 reduce 端拉取数据重试次数和 reduce 端拉取数据时间间隔这两个参数来对 Shuffle 性能进行调整,增大参数值,使得 reduce 端拉取数据的重试次数增加,并且每次失败后等待的时间间隔加长
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "60").set("spark.shuffle.io.retryWait", "60s")
3、解决各种序列化导致的报错
当 Spark 作业在运行过程中报错,而且报错信息中含有 Serializable 等类似词汇,那么可能是序列化问题导致的报错。序列化问题要注意以下三点:
- 作为RDD 的元素类型的自定义类,必须是可以序列化的;
- 算子函数里可以使用的外部的自定义变量,必须是可以序列化的;
- 不可以在 RDD 的元素类型、算子函数里使用第三方的不支持序列化的类型,例如Connection
4、解决算子函数返回 NULL 导致的问题
在一些算子函数里,需要我们有一个返回值,但是在一些情况下我们不希望有返回值, 此时我们如果直接返回NULL,会报错,例如 Scala.Math(NULL)异常。如果你遇到某些情况,不希望有返回值,那么可以通过下述方式解决:
- 返回特殊值,不返回 NULL,例如“-1”;
- 在通过算子获取到了一个RDD 之后,可以对这个 RDD 执行filter 操作,进行数据过滤, 将数值为-1 的数据给过滤掉;
- 在使用完 filter 算子后,继续调用 coalesce 算子进行优化。
5、解决 YARN-CLIENT 模式导致的网卡流量激增问题
在 YARN-client 模式下,Driver 启动在本地机器上,而 Driver 负责所有的任务调度,需要与 YARN 集群上的多个Executor 进行频繁的通信。
假设有 100 个 Executor, 1000 个 task,那么每个 Executor 分配到 10 个 task,之后,Driver 要频繁地跟Executor 上运行的 1000 个 task 进行通信,通信数据非常多,并且通信品类特别高。这就导致有可能在 Spark 任务运行过程中,由于频繁大量的网络通讯,本地机器的网卡流量会激增。
注意,YARN-client 模式只会在测试环境中使用,而之所以使用 YARN-client 模式,是由于可以看到详细全面的 log 信息,通过查看 log,可以锁定程序中存在的问题,避免在生产环境下发生故障。在生产环境下,使用的一定是 YARN-cluster 模式。在 YARN-cluster 模式下,就不会造成本地机器网卡流量激增问题,如果 YARN-cluster 模式下存在网络通信的问题,需要运维团队进行解决。
6、解决 YARN-CLUSTER 模式的 JVM 栈内存溢出无法执行问题
当 Spark 作业中包含 SparkSQL 的内容时,可能会碰到 YARN-client 模式下可以运行,但是YARN-cluster 模式下无法提交运行(报出 OOM 错误)的情况。YARN-client 模式下,Driver 是运行在本地机器上的,Spark 使用的 JVM 的 PermGen 的配置,是本地机器上的 spark-class 文件,JVM 永久代的大小是 128MB,这个是没有问题的, 但是在 YARN-cluster 模式下,Driver 运行在 YARN 集群的某个节点上,使用的是没有经过配置的默认设置,PermGen 永久代大小为 82MB。
SparkSQL 的内部要进行很复杂的 SQL 的语义解析、语法树转换等等,非常复杂,如果sql 语句本身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特别是对 PermGen的占用会比较大。所以,此时如果 PermGen 的占用好过了 82MB,但是又小于 128MB,就会出现 YARN-client模式下可以运行,YARN-cluster 模式下无法运行的情况。
解决上述问题的方法时增加 PermGen 的容量,需要在 spark-submit 脚本中对相关参数进行设置,设置方法如代码清单所示:--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
通过上述方法就设置了Driver 永久代的大小,默认为 128MB,最大 256MB,这样就可以避免上面所说的问题
7、解决 SparkSQL 导致的 JVM 栈内存溢出
当 SparkSQL 的 sql 语句有成百上千的 or 关键字时,就可能会出现 Driver 端的 JVM 栈内存溢出。JVM 栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了 JVM 栈深度限制的递归。(我们猜测 SparkSQL 有大量 or 语句的时候,在解析 SQL 时, 例如转换为语法树或者进行执行计划的生成的时候,对于 or 的处理是递归,or 非常多时,会发生大量的递归)此时,建议将一条 sql 语句拆分为多条 sql 语句来执行,每条 sql 语句尽量保证 100 个以内的子句。根据实际的生产环境试验,一条 sql 语句的 or 关键字控制在 100 个以内,通常不会导致 JVM 栈内存溢出
8、持久化与 checkpoint 的使用
Spark 持久化在大部分情况下是没有问题的,但是有时数据可能会丢失,如果数据一旦丢失,就需要对丢失的数据重新进行计算,计算完后再缓存和使用,为了避免数据的丢失, 可以选择对这个 RDD 进行 checkpoint,也就是将数据持久化一份到容错的文件系统上(比如 HDFS)。
一个 RDD 缓存并 checkpoint 后,如果一旦发现缓存丢失,就会优先查看 checkpoint 数据存不存在,如果有,就会使用 checkpoint 数据,而不用重新计算。也即是说,checkpoint 可以视为 cache 的保障机制,如果 cache 失败,就使用 checkpoint 的数据。使用 checkpoint 的优点在于提高了 Spark 作业的可靠性,一旦缓存出现问题,不需要重新计算数据,缺点在于,checkpoint 时需要将数据写入 HDFS 等文件系统,对性能的消耗较大