Spark UI中Shuffle dataSize 和shuffle bytes written 指标区别

背景

本文基于Spark 3.1.1
目前在做一些知识回顾的时候,发现了一些很有意思的事情,就是Spark UI中ShuffleExchangeExec 的dataSize和shuffle bytes written指标是不一样的,
那么在AQE阶段的时候,是以哪个指标来作为每个Task分区大小的参考呢

结论

先说结论 dataSzie指标是 是存在内存中的UnsafeRow 的大小的总和,AQE阶段(规则OptimizeSkewedJoin/CoalesceShufflePartitions)用到判断分区是否倾斜或者合并分区的依据是来自于这个值,
shuffle bytes written指的是写入文件的字节数,会区分压缩和非压缩,如果在开启了压缩(也就是spark.shuffle.compress true)和未开启压缩的情况下,该值的大小是不一样的。
开启压缩如下:
在这里插入图片描述
未开启压缩如下:
在这里插入图片描述

先说杂谈

这两个指标的值都在 ShuffleExchangeExec中:

case class ShuffleExchangeExec(override val outputPartitioning: Partitioning,child: SparkPlan,shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)extends ShuffleExchangeLike {private lazy val writeMetrics =SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)private[sql] lazy val readMetrics =SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)override lazy val metrics = Map("dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) ++ readMetrics ++ writeMetrics

dataSize指标来自于哪里

涉及到datasize的数据流是怎么样的如下,一切还是得从ShuffleMapTask这个shuffle的起始操作讲起:

ShuffleMapTask||\/
runTask||\/
dep.shuffleWriterProcessor.write //这里的shuffleWriterProcessor是来自于 ShuffleExchangeExec中的createShuffleWriteProcessor||\/
writer.write()  //这里是writer 是 UnsafeShuffleWriter类型的实例||\/
insertRecordIntoSorter||\/
UnsafeRowSerializerInstance.writeValue||\/
dataSize.add(row.getSizeInBytes)

这里的 rowUnsafeRow的实例,这样就获取到了实际内存中的每个分区的大小,
而ShuffleMapTask runTask 方法最终返回的是MapStatus,而该MapStatus最终是在UnsafeShuffleWriter的closeAndWriteOutput方法中被赋值的:

void closeAndWriteOutput() throws IOException {assert(sorter != null);updatePeakMemoryUsed();serBuffer = null;serOutputStream = null;final SpillInfo[] spills = sorter.closeAndGetSpills();sorter = null;final long[] partitionLengths;try {partitionLengths = mergeSpills(spills);} finally {for (SpillInfo spill : spills) {if (spill.file.exists() && !spill.file.delete()) {logger.error("Error while deleting spill file {}", spill.file.getPath());}}}mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);}

shuffle bytes written指标来自哪里

基本流程和dataSize 一样,还是来自于ShuffleMapTask

ShuffleMapTask||\/
runTask||\/
dep.shuffleWriterProcessor.write //这里的shuffleWriterProcessor是来自于 ShuffleExchangeExec中的createShuffleWriteProcessor||\/
writer.write()  //这里是writer 是 UnsafeShuffleWriter类型的实例||\/
closeAndWriteOutput||\/
sorter.closeAndGetSpills() ->  writeSortedFile -> writer.commitAndGet -> writeMetrics.incBytesWritten(committedPosition - reportedPosition) -> serializerManager.wrapStream(blockId, mcs) // 这里进行了压缩||\/
mergeSpills||\/
mergeSpillsUsingStandardWriter||\/
mergeSpillsWithFileStream -> writeMetrics.incBytesWritten(numBytesWritten)||\/
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/172013.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis实现消息队列

使用Redis中的list实现消息队列 list是Redis的一种数据结构,可以把它理解成双向链表 可以从头部插入数据然后从尾部取出数据,从而实现消息队列的效果 利用命令 LPUSH和RPOP (从左边插入数据从右边取出数据) lpush l1 e1 e2rpo…

【1.2】神经网络:神经元与激活函数

✅作者简介:大家好,我是 Meteors., 向往着更加简洁高效的代码写法与编程方式,持续分享Java技术内容。 🍎个人主页:Meteors.的博客 💞当前专栏: 神经网络(随缘更新) ✨特色…

@TableField(fill = FieldFill.INSERT)这个注解的作用

TableField 是 MyBatis-Plus提供的一个注解,用于标注实体类的属性与数据库表的字段之间的映射关系。当你在一个实体类的属性上使用 TableField(fill FieldFill.INSERT) 注解时,你告诉 MyBatis-Plus 在插入记录时自动填充这个字段。 FieldFill.INSERT 是一…

Lvs +keepalivede : 高可用集群

keepalived为Ivs应运而生的高可用服务。Ivs的调度器无法做高可用,于是keepalived这个软件。 实现的是调度器的高可用。 但是: keepalived不是专为Ivs集群服务的,也可以做其他代理服务器的高可用。 lvs的高可用集群:主调度器和备调度器&…

轻松合并多个TXT文本,实现一键文件整理!

亲爱的读者们,您是否曾经需要将多个TXT文本文件合并成一个文件,却苦于无从下手?现在,我们向您介绍一个全新的TXT文本合并工具,让您轻松实现一键文件整理! 首先,在首助编辑高手的主页面板块栏里…

通过Vue自带服务器实现Ajax请求跨域(vue-cli)

通过Vue自带服务器实现Ajax请求跨域(vue-cli) 跨域 原理:从A页面访问到B页面,并且要获取到B页面上的数据,而两个页面所在的端口、协议和域名中哪怕有一个不对等,那么这种行为就叫跨域。注意:类…

漏洞复现--用友 畅捷通T+ .net反序列化RCE

免责声明: 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…

关于 @Transactional 注解的类中使用 this 调用问题

在一个类中打断点的时候报了一个异常,这个异常,但是一直找不到,直到我在类中调用另外一个方法的时候,看到该方法里面用了 this 调用了 mybatis-plus 的 Api 去操作数据库,而最外层的方法却添加了 Transactional(rollba…

接口自动化测试要做什么?一文3个步骤带你成功学会!

先了解下接口测试流程: 1、需求分析 2、Api文档分析与评审 3、测试计划编写 4、用例设计与评审 5、环境搭建(工具) 6、执行用例 7、缺陷管理 8、测试报告 了解了接口测试的工作流程,那"接口自动化测试"怎么弄&#xff1…

人工智能基础_机器学习007_高斯分布_概率计算_最小二乘法推导_得出损失函数---人工智能工作笔记0047

这个不分也是挺难的,但是之前有详细的,解释了,之前的文章中有, 那么这里会简单提一下,然后,继续向下学习 首先我们要知道高斯分布,也就是,正太分布, 这个可以预测x在多少的时候,概率最大 要知道在概率分布这个,高斯分布公式中,u代表平均值,然后西格玛代表标准差,知道了 这两个…

【psychopy】【脑与认知科学】认知过程中的面孔识别加工

目录 实验描述 实验思路 python实现 实验描述 现有的文献认为,人们对倒置的面孔、模糊的面孔等可能会出现加工时长增加、准确率下降的问题,现请你设计一个相关实验,判断不同的面孔是否会出现上述现象。请按照认知科学要求,画…

【Docker】Docker Swarm介绍与环境搭建

为什么不建议在生产环境中使用Docker Compose 多机器如何管理?如何跨机器做scale横向扩展?容器失败退出时如何新建容器确保服务正常运行?如何确保零宕机时间?如何管理密码,Key等敏感数据? Docker Swarm介…

论文阅读——InstructGPT

论文:Training_language_models_to_follow_instructions_with_human_feedback.pdf (openai.com) github:GitHub - openai/following-instructions-human-feedback 将语言模型做得更大并不能从本质上使它们更好地遵循用户的意图。例如,大型语…

基于Pytest+Requests+Allure实现接口自动化测试!

一、整体结构 框架组成:pytestrequestsallure设计模式: 关键字驱动项目结构: 工具层:api_keyword/参数层:params/用例层:case/数据驱动:data_driver/数据层:data/逻辑层&#xff1a…

PLC 学习day03 PLC软件安装 PLC软件的介绍和对应的知识

1.资料来源 链接:7.PLC编程学习入门视频教程全集-三菱GX-Works2编程软件安装_哔哩哔哩_bilibili 链接:8.三菱plc视频教程全集之编程语言及软元件介绍_哔哩哔哩_bilibili 2. PLC软件的安装 三菱的PLC软件安装视屏的链接: 7.PLC编程学习入门视频…

当线性规划与算法相遇:揭秘单纯形法(Simplex)的独特魅力

传统的解决线性规划问题的方法是图形法、代数法求解,但是图形法解题有极大的局限性,因为一旦变量超过3个,基本上就无法通过图形解决,而代数法虽然可以解题,但对于复杂的问题可能效果较差甚至无法求解! 相比…

python DevOps

在云原生中,python扮演的角色是什么? 在云原生环境中,Python 作为一种高级编程语言,在多个方面扮演着重要角色。云原生是指利用云计算的各种优势(如弹性、可扩展性和自动化),构建和运行应用程序…

Transformer英语-法语机器翻译实例

依照Transformer结构来实例化编码器-解码器模型。在这里,指定Transformer编码器和解码器都是2层,都使用4头注意力。为了进行序列到序列的学习,我们在英语-法语机器翻译数据集上训练Transformer模型,如图11.2所示。 da…

【Linux】部署单体项目以及前后端分离项目(项目部署)

一、简介 以下就是Linux部署单机项目和前后端分离项目的优缺点,希望对你有所帮助。 1、Linux部署单机项目: 优点: 简化了系统管理:由于所有服务都在同一台机器上运行,因此可以简化系统管理和维护。提高了性能&#x…

Spring Boot集成RESTful API

在Spring Boot中集成一个RESTful API是我们在实际开发中较为常见的一种开发任务,以下通过一个小的案例来展示在Spring Boot中创建RESTful API来编写一个单元测试。 本节使用到的注解: Controller:修饰class,用来创建处理http请求的…