Spark Job优化

1 Map端优化

1.1 Map端聚合

map-side预聚合,就是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

RDD的话建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

SparkSQL本身的HashAggregte就会实现本地预聚合+全局聚合。

1.2 读取小文件优化

读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的Task元信息也会给Spark Driver的内存造成压力,带来单点问题。

设置参数:

spark.sql.files.maxPartitionBytes=128MB   默认128m

spark.files.openCostInBytes=4194304     默认4m

参数(单位都是bytes):

  • maxPartitionBytes:一个分区最大字节数。
  • openCostInBytes:打开一个文件的开销。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.map.MapSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解: DataSourceScanExec.createNonBucketedReadRDD()

FilePartition. getFilePartitions()

1)切片大小= Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

计算totalBytes的时候,每个文件都要加上一个open开销

defaultParallelism就是RDD的并行度

2)当(文件1大小+ openCostInBytes)+(文件2大小+ openCostInBytes)+…+(文件n-1大小+ openCostInBytes)+ 文件n <= maxPartitionBytes时,n个文件可以读入同一个分区,即满足: N个小文件总大小 (N-1*openCostInBytes <=  maxPartitionBytes的话。

1.3 增大map溢写时输出流buffer

1mapShuffle Write有一个缓冲区初始阈值5m,超过会尝试增加到2*当前使用内存。如果申请不到内存,则进行溢写。这个参数是internal,指定无效(见下方源码)。也就是说资源足够会自动扩容,所以不需要我们去设置。

2)溢写时使用输出流缓冲区默认32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率。

3Shuffle文件涉及到序列化,是采取的方式读写,默认按照每批次1万条去读写。设置得太低会导致在序列化时过度复制,因为一些序列化器通过增长和复制的方式来翻倍内部数据结构。这个参数是internal,指定无效(见下方源码)。

综合以上分析,我们可以调整的就是输出缓冲区的大小。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.map.MapFileBufferTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解:

2 Reduce端优化

2.1 合理设置Reduce数

过多的cpu资源出现空转浪费,过少影响任务性能。关于并行度、并发度的相关参数介绍,在2.2.1中已经介绍过。

2.2 输出产生小文件优化

1Join后的结果插入新表

join结果插入新表,生成的文件数等于shuffle并行度,默认就是200份文件插入到hdfs上。

解决方式:

1)可以在插入表数据前进行缩小分区操作来解决小文件过多问题,如coalescerepartition算子

2)调整shuffle并行度。根据2.2.2的原则来设置。

2、动态分区插入数据

1)没有Shuffle的情况下。最差的情况下,每个Task中都有表各个分区的记录,那文件数最终文件数将达到  Task数量 * 表分区数。这种情况下是极易产生小文件的。

INSERT overwrite table A partition ( aa ) 

SELECT * FROM B;

2)有Shuffle的情况下,上面的Task数量 就变成了spark.sql.shuffle.partitions(默认值200)。那么最差情况就会有 spark.sql.shuffle.partitions * 表分区数。

当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

最理想的情况是根据分区字段进行shuffle,在上面的sql中加上distribute by aa。把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件, 但是这种情况下也容易出现数据倾斜的问题。

解决思路:

结合第4章解决倾斜的思路,在确定哪个分区键倾斜的情况下,将倾斜的分区键单独拎出来:

将入库的SQL拆成(where 分区 !倾斜分区键 )和 (where 分区 = 倾斜分区键) 几个部分,非倾斜分区键的部分正常distribute by 分区字段,倾斜分区键的部分 distribute by随机数,sql如下:

//1.非倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa != 大key

distribute by aa;

//2.倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa = 大key

distribute by cast(rand() * 5 as int);

案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.DynamicPartitionSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.3 增大reduce缓冲区,减少拉取次数

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。

源码:BlockStoreShuffleReader.read()

2.4 调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3:

2.5 调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s。

综合2.32.42.5,案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.ReduceShuffleTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.6 合理利用bypass

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200)且不需要map端进行合并操作,则shuffle write过程中不会进行排序操作,使用BypassMergeSortShuffleWriter去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果确实不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

源码分析:SortShuffleManager.registerShuffle()

SortShuffleManager.getWriter()

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.BypassTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3 整体优化

3.1 调节数据本地化等待时长

在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 Task 数据本地化的级别,如果大部分都是 PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。

注意过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得 Spark 作业的运行时间反而增加了。

下面几个参数,默认都是3s,可以改成如下:

spark.locality.wait //建议6s、10s

spark.locality.wait.process //建议60s

spark.locality.wait.node //建议30s

spark.locality.wait.rack //建议20s

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.2 使用堆外内存

1、堆外内存参数

讲到堆外内存,就必须去提一个东西,那就是去yarn申请资源的单位,容器。Spark  on yarn模式,一个容器到底申请多少内存资源。

一个容器最多可以申请多大资源,是由yarn参数yarn.scheduler.maximum-allocation-mb决定, 需要满足:

spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size

≤ yarn.scheduler.maximum-allocation-mb

参数解释:

  • spark.executor.memory:提交任务时指定的堆内内存。
  • spark.executor.memoryOverhead:堆外内存参数,内存额外开销。

默认开启,默认值为spark.executor.memory*0.1并且会与最小值384mb做对比,取最大值。所以spark on yarn任务堆内内存申请1个g,而实际去yarn申请的内存大于1个g的原因。

  • spark.memory.offHeap.size:堆外内存参数,spark中默认关闭,需要将spark.memory.enable.offheap.enable参数设置为true。

注意:很多网上资料说spark.executor.memoryOverhead包含spark.memory.offHeap.size,这是由版本区别的,仅限于spark3.0之前的版本。3.0之后就发生改变,实际去yarn申请的内存资源由三个参数相加。

测试申请容器上限:

yarn.scheduler.maximum-allocation-mb修改为7G,将三个参数设为如下大于7G,会报错:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

spark.memory.offHeap.size修改为1g后再次提交:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2、使用堆外缓存

使用堆外内存可以减轻垃圾回收的工作,也加快了复制的速度。

当需要缓存非常大的数据量时,虚拟机将承受非常大的GC压力,因为虚拟机必须检查每个对象是否可以收集并必须访问所有内存页。本地缓存是最快的,但会给虚拟机带来GC压力,所以,当你需要处理非常多GB的数据量时可以考虑使用堆外内存来进行优化,因为这不会给Java垃圾收集器带来任何压力。让JAVA GC为应用程序完成工作,缓存操作交给堆外。

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.job.OFFHeapCache spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.3 调节连接等待时长

在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。

如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长120s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler反复提交几次task,大大延长了我们的Spark作业的运行时间。

为了避免长时间暂停(如GC)导致的超时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit脚本中进行设置,设置方式可以在提交时指定:

--conf spark.core.connection.ack.wait.timeout=300s

调节连接等待时长后,通常可以避免部分的XX文件拉取失败、XX文件lost等报错。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 1g --conf spark.core.connection.ack.wait.timeout=300s --class com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189743.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端面试题 计算机网络

文章目录 ios 7层协议tcp协议和udp协议的区别tcp协议如何确保数据的可靠http和tcp的关系url输入地址到呈现网页有哪些步骤post和get本质区别&#xff0c;什么时候会触发二次预检GET请求&#xff1a;POST请求&#xff1a;触发二次预检&#xff08;CORS中的预检请求&#xff09;&…

SolidWorks绘制花瓶教程

这个花瓶是我学习solidworks画图以来用时最长的一个图形了&#xff0c;特此记录一下&#xff0c;用了我足足两个早晨才把他给画出来&#xff0c;我这是跟着哔站里的隔壁老王学习的&#xff0c;下面是视频地址&#xff1a;点击我一下看视频教程 下面是我的绘图过程&#xff0c;…

美国材料与试验协会ASTM发布新版玩具安全标准 ASTM F963-23

美国材料与试验协会ASTM发布新版玩具安全标准 ASTM F963-23 2023年10月13日&#xff0c;美国材料与试验协会&#xff08;ASTM&#xff09;发布了新版玩具安全标准ASTM F963-23 ​根据CPSIA的规定&#xff0c;当ASTM将ASTM F963的拟定修订意见通知CPSC时&#xff0c;若CPSC认为…

如何配置Apple推送证书 push证书

转载&#xff1a;如何配置Apple推送证书 push证书 想要制作push证书&#xff0c;就需要使用快捷工具appuploader工具制 作证书&#xff0c;然后使用Apple的推送功能配置push证书&#xff0c;就可以得到了。PS&#xff1a;push没有描述文件&#xff0c;所以不 要问推送选择哪…

【c++随笔12】继承

【c随笔12】继承 一、继承1、继承的概念2、3种继承方式3、父类和子类对象赋值转换4、继承中的作用域——隐藏5、继承与友元6、继承与静态成员 二、继承和子类默认成员函数1、子类构造函数 二、子类拷贝构造函数3、子类的赋值重载4、子类析构函数 三、单继承、多继承、菱形继承1…

视觉大模型DINOv2:自我监督学习的新领域

1 DINOv2 1.1 DINOv2特点 前段时间&#xff0c;Meta AI 高调发布了 Segment Anything&#xff08;SAM&#xff09;&#xff0c;SAM 以交互式方式快速生成 Mask&#xff0c;并可以对从未训练过的图片进行精准分割&#xff0c;可以根据文字提示或使用者点击进而圈出图像中的特定…

Flink SQL 表值聚合函数(Table Aggregate Function)详解

使用场景&#xff1a; 表值聚合函数即 UDTAF&#xff0c;这个函数⽬前只能在 Table API 中使⽤&#xff0c;不能在 SQL API 中使⽤。 函数功能&#xff1a; 在 SQL 表达式中&#xff0c;如果想对数据先分组再进⾏聚合取值&#xff1a; select max(xxx) from source_table gr…

第24章_mysql性能分析工具的使用

文章目录 1. 数据库服务器的优化步骤2.查看系统性能参数3. 统计SQL的查询成本&#xff1a;last_query_cost4. 定位执行慢的 SQL&#xff1a;慢查询日志4.1 开启慢查询日志参数4.2 查看慢查询数目4.3 测试慢sql语句&#xff0c;查看慢日志4.4 系统变量 log_output&#xff0c; l…

用excel计算一个矩阵的转置矩阵

假设我们的原矩阵是一个3*3的矩阵&#xff1a; 125346789 现在求它的转置矩阵&#xff1a; 鼠标点到一个空白的地方&#xff0c;用来存放结果&#xff1a; 插入-》函数&#xff1a; 选择TRANSPOSE&#xff0c;这个就是求转置矩阵的函数&#xff1a; 点击“继续”&#xff1a…

Windows查看端口占用情况

Windows如何查看端口占用情况 方法1. cmd命令行执行netstat命令&#xff0c;查看端口占用情况 netstat -ano 以上命令输出太多信息&#xff0c;不方便查看&#xff0c;通过如下命令搜索具体端口占用情况&#xff0c;例如&#xff1a;8080端口 netstat -ano | findstr "…

初阶JavaEE(17)Linux 基本使用和 web 程序部署

接上次博客&#xff1a;初阶JavaEE&#xff08;16&#xff09;博客系统&#xff08;Markdown编辑器介绍、博客系统功能、博客系统编写&#xff1a;博客列表页 、博客详情页、实现登录、实现强制登录、显示用户信息、退出登录、发布博客&#xff09;-CSDN博客 目录 Linux 基本…

std::any

一、简介 std::any 可以储存任何可拷贝构造和可销毁的类型的对象。 struct test {test(int a,int b){} };int main(int argc, char *argv[]) {std::any a 1;qDebug() << a.type().name();a 3.14;qDebug() << a.type().name();a true;qDebug() << a.type…

从0到0.01入门React | 010.精选 React 面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

开发者测试2023省赛--UnrolledLinkedList测试用例

测试结果 官方提交结果 EclEmma PITest 被测文件UnrolledLinkedList.java /** This source code is placed in the public domain. This means you can use it* without any restrictions.*/package net.mooctest;import java.util.AbstractList; import java.util.Collectio…

Postman模拟上传文件

如图&#xff0c;在F12抓到的上传文件的请求 那要在postman上模拟这种上传&#xff0c;怎么操作呢&#xff0c;如图&#xff0c;选中【Select File】选取文件上传即可

C++进阶篇4---番外-红黑树

一、红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或 Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路 径会比其他路径长出俩倍&#xff0…

19. 深度学习 - 用函数解决问题

文章目录 Hi&#xff0c; 你好。我是茶桁。 上一节课&#xff0c;我们从一个波士顿房价的预测开始写代码&#xff0c;写到了KNN。 之前咱们机器学习课程中有讲到KNN这个算法&#xff0c;分析过其优点和缺点&#xff0c;说起来&#xff0c;KNN这种方法比较低效&#xff0c;在数…

Leetcode刷题详解—— 有效的数独

1. 题目链接&#xff1a;36. 有效的数独 2. 题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的…

U-Mail邮件中继,让海外邮件沟通更顺畅

在海外&#xff0c;电子邮件是人们主要的通信工具&#xff0c;尤其是商务往来沟通&#xff0c;企业邮箱是标配。这主要是因为西方国家互联网发展较早&#xff0c;在互联网早期&#xff0c;电子邮件技术较为成熟&#xff0c;大家都用电子邮件交流&#xff0c;于是这成了一种潮流…

Jenkins简介及Docker Compose部署

Jenkins是一个开源的自动化服务器&#xff0c;用于自动化构建、测试和部署软件项目。它提供了丰富的插件生态系统&#xff0c;支持各种编程语言和工具&#xff0c;使得软件开发流程更加高效和可靠。在本文中&#xff0c;我们将介绍Jenkins的基本概念&#xff0c;并展示如何使用…