Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/364850.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2095.删除链表的中间节点

给你一个链表的头节点 head 。删除链表的中间节点 ,并返回修改后的链表的头节点 head。 长度为 n 链表的中间节点是从头数起第 ⌊n / 2⌋ 个节点(下标从 0 开始),其中 ⌊x⌋ 表示小于或等于 x 的最大整数。 对于 n 1、2、3、4 和…

【机器学习】机器学习重要方法——迁移学习:理论、方法与实践

文章目录 迁移学习:理论、方法与实践引言第一章 迁移学习的基本概念1.1 什么是迁移学习1.2 迁移学习的类型1.3 迁移学习的优势 第二章 迁移学习的核心方法2.1 特征重用(Feature Reuse)2.2 微调(Fine-Tuning)2.3 领域适…

matlab仿真 通信信号和系统分析(上)

(内容源自详解MATLAB/SIMULINK 通信系统建模与仿真 刘学勇编著第三章内容,有兴趣的读者请阅读原书) 一、求离散信号卷积和 主要还是使用卷积函数conv,值得注意的是,得到的卷积和长度结果为81&#xff0…

Windows USB设备驱动开发 - 常见概念的解释

我们听到许多 USB 术语几乎交替抛出。 它们都是什么意思?假设我们看到类似 “多亏了 USB 3.0,我可以将 SuperSpeed U 盘连接到电脑的 xHCI 主机控制器,并更快地复制文件。” 让我们了解该句子中的 USB 术语。 USB 3.0、USB 2.0 和 USB 1.0 请…

[深度学习] 自编码器Autoencoder

自编码器(Autoencoder)是一种无监督学习算法,主要用于数据的降维、特征提取和数据重建。自编码器由两个主要部分组成:编码器(Encoder)和解码器(Decoder)。其基本思想是将输入数据映射…

Redis 7.x 系列【9】数据类型之自动排重集合(Set)

有道无术,术尚可求,有术无道,止于术。 本系列Redis 版本 7.2.5 源码地址:https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 前言2. 常用命令2.1 SADD2.2 SCARD2.3 SISMEMBER2.4 SREM2.5 SSCAN2.6 SDIFF2.7 SU…

海云安参编《数字安全蓝皮书 》正式发布并入选《2024中国数字安全新质百强》荣膺“先行者”

近日,国内数字化产业第三方调研与咨询机构数世咨询正式发布了《2024中国数字安全新质百强》(以下简称百强报告)。海云安凭借在开发安全领域的技术创新力及市场影响力入选百强报告“新质百强先行者” 本次报告,数世咨询经过对国内8…

以算筑基,以智赋能 | Gooxi受邀出席2024中国智算中心全栈技术大会

6月25日,2024中国智算中心全栈技术大会暨展览会、第5届中国数据中心绿色能源大会暨第10届中国(上海)国际数据中心产业展览会在上海新国际博览中心隆重召开。Gooxi受邀参与并携最新服务器产品以及解决方案亮相展会,吸引众多行业领袖…

C++学习/复习18----迭代器/反向迭代器及在list/vector中的应用、list与vector模拟实现复习

迭代器是一个对象,可以循环访问 C 标准库容器中的元素,并提供对各个元素的访问。 C 标准库容器全都提供迭代器,以便算法可以采用标准方式访问其元素,而不必考虑用于存储元素的容器类型。 一、反向迭代器类 基于普通迭代器构建反…

亚太杯赛题思路发布(中文版)

导读: 本文将继续修炼回归模型算法,并总结了一些常用的除线性回归模型之外的模型,其中包括一些单模型及集成学习器。 保序回归、多项式回归、多输出回归、多输出K近邻回归、决策树回归、多输出决策树回归、AdaBoost回归、梯度提升决策树回归…

Java进阶-try-with-resources

Java进阶-try-with-resources try-with-resources 是什么传统使用try-catch-finally关闭资源使用try-with-resources什么时候用 try-with-resources 是什么 try-with-resources 是 Java 7 中引入的一个新特性,用于简化资源管理,一般是用于处理实现了 Au…

大模型+多模态合规分析平台,筑牢金融服务安全屏障

随着金融市场的快速发展,金融产品和服务日趋多样化,消费者面临的风险也逐渐增加。 为保护消费者权益,促进金融市场长期健康稳定发展,国家监管机构不断加强金融监管,出台了一系列法律法规和政策文件。对于金融从业机构…

设计模式-状态模式和策略模式

1.状态模式 1.1定义 当一个对象的内在状态改变时允许根据当前状态作出不同的行为; 1.2 适用场景 (1)一个对象的行为取决于它的状态,并且它必须在运行时根据状态来决定其行为. (2)代码中包含了大量的与状态有关的条件语句,例如:一个操作含有庞大的多分值语句(if…

对原生input加上:当前输入字数/最大输入字数

源码: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><style>/* 样式…

商城自动化测试实战 —— 登录+滑块验证

hello大家好&#xff0c;我是你们的小编&#xff01; 本商城测试项目采取PO模型和数据分离式架构&#xff0c;采用pytestseleniumjenkins结合的方式进行脚本编写与运行&#xff0c;项目架构如下&#xff1a; 1、创建项目名称&#xff1a;code_shopping&#xff0c;创建所需项目…

常微分方程算法之编程示例五(阿当姆斯法)

目录 一、研究问题 二、C++代码 三、计算结果 一、研究问题 本节我们采用阿当姆斯法(Adams法)求解算例。 阿当姆斯法的原理及推导请参考: 常微分方程算法之阿当姆斯法(Adams法)_四步四阶adams显格式;三步四阶adams隐格式;四阶adams预估-校正格式-CSDN博客https://blog…

Linux系统编程--进程间通信

目录 1. 介绍 1.1 进程间通信的目的 1.2 进程间通信的分类 2. 管道 2.1 什么是管道 2.2 匿名管道 2.2.1 接口 2.2.2 步骤--以父子进程通信为例 2.2.3 站在文件描述符角度-深度理解 2.2.4 管道代码 2.2.5 读写特征 2.2.6 管道特征 2.3 命名管道 2.3.1 接口 2.3.2…

Nginx中封装的数据结构

Nginx中封装的数据结构 Nginx中封装的数据结构整型ngx_str_t【字符串】ngx_list_t【链表】ngx_table_elt_t【key/value】ngx_buf_tngx_chain_t Nginx中封装的数据结构 整型 typedef intptr_t ngx_int_t; typedef uintptr_t ngx_uint_t;ngx_str_t【字符串】 typ…

【面试题】信息系统安全运维要做什么

信息系统安全运维是确保信息系统稳定、可靠、安全运行的一系列活动和措施。 其主要包括以下几个方面&#xff1a; 1.系统监控&#xff1a; 实时监测信息系统的运行状态&#xff0c;如服务器的性能指标、网络流量、应用程序的运行情况等。通过监控工具&#xff0c;及时发现系统…

What does the error ‘module ‘langchain‘ has no attribute ‘verbose‘ refer to?

题意&#xff1a;错误 module langchain has no attribute verbose 指的是什么意思&#xff1f; 问题背景&#xff1a; Kind of new to Langchain/Qdrant but Im building a recommendation engine to recommend users based on the contents of their associated PDF files, …