Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))

这种情况下,就会只有一个reduce任务运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/281745.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

美食杂志制作秘籍:引领潮流,引领味蕾

美食杂志是一种介绍美食文化、烹饪技巧和美食体验的杂志&#xff0c;通过精美的图片和生动的文字&#xff0c;向读者展示各种美食的魅力。那么&#xff0c;如何制作一本既美观又实用的美食杂志呢&#xff1f; 首先&#xff0c;你需要选择一款适合你的制作软件。比如FLBOOK在线制…

sentinel系统负载自适应流控

系统负载自适应流控 规则配置 规则创建 public class SystemRule extends AbstractRule {private double highestSystemLoad -1;private double highestCpuUsage -1;private double qps -1;private long avgRt -1;private long maxThread -1; }SystemRule类包含了以下几…

Springboot笔记(web开启)-08

有一些日志什么的后续我会补充 1.使用springboot: 创建SpringBoot应用&#xff0c;选中我们需要的模块&#xff1b;SpringBoot已经默认将这些场景配置好了&#xff0c;只需要在配置文件中指定少量配置就可以运行起来自己编写业务代码&#xff1b; 2.SpringBoot对静态资源的映…

c语言基础笔记(1)进制转换以及++a,a++,取地址和解引用

一进制转换 OCT - 八进制 DEC - 十进制 HEX - 十六进制 0520&#xff0c;表示八进制 0x520表示16进制 unsigned 无符号&#xff0c;只有正的 signed 有正有负数 char默认是signed 类型 #include <stdio.h>int main(void) { //字符转换成数字char a 5;int a1 a- 4…

HarmonyOS入门学习

HarmonyOS入门学习 前言快速入门ArkTS组件基础组件Image组件Text组件TextInput 文本输入框Buttonslider 滑动组件 页面布局循环控制ForEach循环创建组件 List自定义组件创建自定义组件Builder 自定义函数 状态管理Prop和LinkProvide和ConsumeObjectLink和Observed ArkUI页面路由…

从后端获取文件数据并导出

导出文件的公共方法 export const download (res, tools) > {const { message, hide } tools;const fileReader: any new FileReader();console.log(fileReader-res>>>, res);fileReader.onload (e) > {if (res?.data?.type application/json) {try {co…

数字孪生与智慧城市:重塑城市生活的新模式

随着信息技术的迅猛发展&#xff0c;数字孪生作为一种新兴的技术理念&#xff0c;正在逐渐改变城市建设和管理的传统模式。智慧城市作为数字孪生技术应用的重要领域&#xff0c;正在以其独特的优势和潜力&#xff0c;重塑着城市生活的方方面面。本文将从数字孪生的概念、智慧城…

Java-SSM电影购票系统

Java-SSM电影购票系统 1.服务承诺&#xff1a; 包安装运行&#xff0c;如有需要欢迎联系&#xff08;VX:yuanchengruanjian&#xff09;。 2.项目所用框架: 前端:JSP、layui、bootstrap等。 后端:SSM,即Spring、SpringMvc、Mybatis等。 3.项目功能点: 3-1.后端功能: 1.用户管…

解决GNURadio自定义C++ OOT块-导入块时报错问题

文章目录 前言一、问题描述二、解决方法1、安装依赖2、配置环境变量3、重新编译及安装三、结果1、添加结果2、运行结果前言 本文记录在 GNURadio 自定义 C++ OOT 块后导入块时报错 AttributeError: module myModule has no attribute multDivSelect。 一、问题描述 参考官方教…

作品展示ETL

1、ETL 作业定义、作业导入、控件拖拽、执行、监控、稽核、告警、报告导出、定时设定 欧洲某国电信系统数据割接作业定义中文页面&#xff08;作业顶层&#xff0c;可切英文&#xff0c;按F1弹当前页面帮助&#xff09; 涉及文件拆分、文件到mysql、库到库、数据清洗、数据转…

银行量子金融系统应用架构设计

量子金融&#xff08;即Financial-Quantum&#xff0c;简称Fin-Q&#xff09;&#xff0c;特指量子科技在金融行业中的应用。 目前&#xff0c;量子科技中以量子保密通信、量子随机数和量子计算发展进度较快&#xff0c;取得了诸多阶段性重大技术突破和商用成果&#xff0c;这…

【FLOOD FILL专题】【蓝桥杯备考训练】:扫雷、动态网格、走迷宫、画图、山峰和山谷【已更新完成】

目录 1、扫雷&#xff08;Google Kickstart2014 Round C Problem A&#xff09; 2、动态网格&#xff08;Google Kickstart2015 Round D Problem A&#xff09; 3、走迷宫&#xff08;模板&#xff09; 4、画图&#xff08;第六次CCF计算机软件能力认证&#xff09; 5、山…

【蓝桥杯】RMQ(Range Minimum/Maximum Query)

一.概述 RMQ问题&#xff0c;是求区间最大值或最小值&#xff0c;即范围最值问题。 暴力解法是对每个询问区间循环求解&#xff0c;设区间长度n&#xff0c;询问次数m&#xff0c;则复杂度是O ( nm )。 一般还可以使用线段树求解&#xff0c;复杂度是O(mlogn)。 但还有一种…

Postgresql数据库入门简介

Postgresql入门 1.Postgresql数据库简介 PostgresQL是一个功能强大的开源数据库系统。经过长达15年以上的积极开发和不断改进&#xff0c;PostgreSQL已在可靠性、稳定性、数据一致性等获得了业内极高的声誉。目前PostgreSql可以运行在所有主流操作系统上&#xff0c;包括Linux…

会员项目定价卡css3特效

会员项目定价卡css3特效&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面 下载地址 会员项目定价卡css3特效代码

【爬虫】web自动化和接口自动化

专栏文章索引&#xff1a;爬虫 目录 一、介绍 二、推荐 1.接口自动化 2.Web自动化 一、介绍 爬虫技术一般可以分为两种类型&#xff1a;接口自动化和web自动化。下面是它们的简要介绍&#xff1a; 1.接口自动化 接口自动化技术的主要目的是通过模拟HTTP请求来实现自动化…

Zama:链上隐私新标准

1. 引言 揭示 Web3 中全同态加密的潜在用例&#xff0c;并深入研究 Zama 的四种主要开源产品&#xff1a; TFHE-rsConcreteConcrete MLfhEVM 众所周知&#xff0c;在当今时代&#xff0c;数据隐私问题与互联网诞生以来一样普遍。仅 Yahoo!、Equifax 和 Marriott 的数据泄露就…

java动态规划学习笔记

学习笔记目录&#xff0c;这里记录个大纲&#xff0c;详情点链接 背包问题 01背包问题综述 01背包问题&#xff08;二维数组&#xff09;https://blog.csdn.net/m0_73065928/article/details/136794406?spm1001.2014.3001.5501 01背包问题&#xff08;滚动数组&#xff09…

LeetCode 热题 100 | 堆(一)

目录 1 什么是堆排序 1.1 什么是堆 1.2 如何构建堆 1.3 举例说明 2 215. 数组中的第 K 个最大元素 2.1 子树大根化 2.2 遍历所有子树 2.3 弹出栈顶元素 2.4 完整代码 菜鸟做题&#xff0c;语言是 C 1 什么是堆排序 1.1 什么是堆 堆的定义和分类&#xff…

ECharts5 概念篇1

图表容器及大小 初始化 在 HTML 中定义有宽度和高度的父容器&#xff08;推荐&#xff09; 通常来说&#xff0c;需要在 HTML 中先定义一个 <div> 节点&#xff0c;并且通过 CSS 使得该节点具有宽度和高度。初始化的时候&#xff0c;传入该节点&#xff0c;图表的大小默认…