spark中的shuffle简述 那些会导致shuffle的算子

shuffle操作说白了就是重分区操作

在Apache Spark中,任务之间的依赖关系主要分为两类:宽依赖(Wide Dependency)和窄依赖(Narrow Dependency)。这两者之间的主要区别在于它们对任务之间数据的依赖性以及执行方式的不同。

窄依赖意味着任务可以在内存中的管道上迭代并行运行,而不需要等待前一阶段的运行结果。相比之下,宽依赖涉及到shuffle操作,需要等待上一阶段的运行结果才能继续执行程序。


理解和掌握宽窄依赖对于优化Spark作业的性能至关重要。在设计Spark作业时,尽量使用窄依赖,以减少Shuffle的开销。通过合理的分区策略和选择适当的转换操作,可以有效地减少宽依赖的出现。

那么现在让我们来介绍一下spark中那些会导致shuffle的算子:
 

🧨
1、repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等

重分区: 一般会shuffle,因为需要在整个集群中,对之前所有的分区的数据进行随机,均匀的打乱,然后把数据放入下游新的指定数量的分区内

2、byKey类的操作:比如reduceByKey、groupByKey、sortByKey等

byKey类的操作:因为你要对一个key,进行聚合操作,那么肯定要保证集群中,所有节点上的,相同的key,一定是到同一个节点上进行处理

3、join类的操作:比如join、cogroup等join类的操作:两个rdd进行join,就必须将相同join key的数据,shuffle到同一个节点上,然后进行相同key的两个rdd数据的笛卡尔乘积

理解了上述内容,现在让我们来上机实践一下:

这里我们使用 toDebugString()  这个方法 返回该RDD及其用于调试的递归依赖项的描述。

代码如下 这里我们先用repartition算子举例:

from pyspark import SparkContext#  repartition 算子
data = [2, 3, 1, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
repartitioned_rdd = rdd.repartition(5)
print(repartitioned_rdd.glom().collect())
print(repartitioned_rdd.toDebugString().decode())

我们来看看运行结果:

可以发现这段程序经历了shuffle

现在让我们将上述算子都运行一下,看看是否都经历了shuffle 

代码如下,各位读者可自行实验:

from pyspark import SparkContextsc = SparkContext("local", "apple1")# 那些会导致shuffle的算子# =======================================================================================# 1、分区类算子
# TODO
#  repartition 算子
data = [2, 3, 1, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
repartitioned_rdd = rdd.repartition(5)
print(repartitioned_rdd.glom().collect())
print(repartitioned_rdd.toDebugString().decode())# TODO
#  repartitionAndSortWithinPartitions 它会在每个分区内对数据进行排序 分区先排序 提高性能嘛
# data = [(1, "apple"), (6, "banana"), (3, "banana"),  (2, "orange"), (4, "grape")]
# rdd = sc.parallelize(data, 2)  # 创建一个有2个分区的键值对RDD
#
# repartitioned_sorted_rdd = rdd.repartitionAndSortWithinPartitions(numPartitions=2)
# # print(repartitioned_sorted_rdd.glom().collect())
# print(repartitioned_sorted_rdd.toDebugString().decode())# TODO
#  coalesce 减少分区 可以设置是否进行shuffle
# data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# rdd = sc.parallelize(data, 5)  # 创建一个有5个分区的RDD
#
# coalesced_rdd = rdd.coalesce(2, shuffle=True)  # 将RDD合并到2个分区
# # print(coalesced_rdd.glom().collect())
# print(coalesced_rdd.toDebugString().decode())# =======================================================================================# TODO
#  reduceByKey
# data = ["apple", "banana", "orange", "banana", "grape", "apple", "orange"]
# rdd = sc.parallelize(data)
# rdd = rdd.map(lambda x: (x, 1))
# rdd2 = rdd.reduceByKey(lambda x, y: x+y)
# # print(rdd2.collect())
# print(rdd2.toDebugString().decode() + '\n')# TODO
#  groupByKey
# rdd3 = rdd.groupByKey()
# print(rdd3.toDebugString().decode() + '\n')# TODO
#  sortByKey 当分区大于1时才有shuffle 因为sortbykey涉及重分区 按照key分组 然后排序
# 如果 RDD 使用了默认的分区器(即 HashPartitioner),并且你要求对 key 进行排序,那么就会发生 shuffle。
# 这是因为默认情况下,HashPartitioner 使用 key 的 hash 值来确定数据所在的分区,
# 这可能导致相同 key 的数据散布在不同的分区中,而进行排序时需要将相同 key 的数据聚合在一起。
# rdd = sc.parallelize(data, 2)
#
# rdd = rdd.map(lambda x: (x, 1))
# print(rdd.glom().collect())
#
# rdd4 = rdd.sortByKey()
# print(rdd4.glom().collect())
# print(rdd4.toDebugString().decode() + '\n')# =======================================================================================# TODO
#  join 操作
# rdd1 = sc.parallelize([(1, "apple"), (2, "banana"), (3, "orange")])
# rdd2 = sc.parallelize([(1, 5), (2, 3), (3, 8), (1, 6)])
#
# joined_rdd = rdd1.join(rdd2)
# # print(joined_rdd.collect())
# print(joined_rdd.toDebugString().decode() + '\n')# TODO
#  cogroup
# rdd1 = sc.parallelize([(1, "apple"), (2, "banana"), (3, "orange")])
# rdd2 = sc.parallelize([(1, 5), (2, 3), (3, 8), (1, 9)])
#
# cogrouped_rdd = rdd1.cogroup(rdd2)
# result = cogrouped_rdd.collect()
# # map 函数: map 是 Python 内置函数,用于对一个可迭代对象的每个元素应用一个指定的函数。在这里,map 的目标是 list 函数。
# # list 函数: list 是 Python 内置函数,用于将一个可迭代对象转换为列表。
# for k, v in result:
#     print(k, tuple(map(list, v)))
# print(cogrouped_rdd.toDebugString().decode() + '\n')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/157701.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

02 stm32-hal库 timer 基本定时器设定

1.配置始终时钟参数 >2. 初始化 MX_TIM3_Init();/* USER CODE BEGIN 2 */HAL_TIM_Base_Start_IT(&htim3);> 3.增加回调函数 4 中断服务函数 void TIM3_IRQHandler(void) {/* USER CODE BEGIN TIM3_IRQn 0 *//* USER CODE END TIM3_IRQn 0 */HAL_TIM_IRQHandler(&…

vue项目打包,使用externals抽离公共的第三方库

封装了一个插件,用来vue打包抽离公共的第三方库,使用unplugin进行插件开发,vite对应的功能使用了vite-plugin-externals进行二次开发 github地址 npm地址 hfex-auto-externals-plugin 自动注入插件,使用 unplugin 和 html-webpack-plugin进…

虚幻引擎5:增强输入的使用方法

一、基本配置 1.创建一个输入映射上下文(映射表) 2.创建自己需要的操作映射或者轴映射 3.创建完成之后进入这个映射,来设置类型,共有4个类型 1.Digital:是旧版操作映射类型,一般是按下抬起来使用,像跳跃…

RabbitMQ常见的交换机类型

RabbitMQ安装 pom.xml里导入相关的依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency> application.properties配置文件 spring.rabbitmq.hos…

JAXB 使用记录 bean转xml xml转bean 数组 继承 CDATA(转义问题)

JAXB 使用记录 部分内容引自 https://blog.csdn.net/gengzhy/article/details/127564536 基础介绍 JAXBContext类&#xff1a;是应用的入口&#xff0c;用于管理XML/Java绑定信息 Marshaller接口&#xff1a;将Java对象序列化为XML数据 Unmarshaller接口&#xff1a;将XML数…

计算机网络第四层 运输层

一&#xff0c;运输层引入的目的 1&#xff0c;网络通信主体标识 网络通信的本质是运行的主机上的进程之间的通信 同一个主机上有多个进程在工作&#xff0c;进程如何加以区分标识&#xff08;PID&#xff09;---本地主机 网络上的主机需要一个统一的进程标识分配机制 逻辑…

【奇葩问题】微信小程序 We分析 访问来源Top10的总比例为什么不止100%

今天有朋友在小程序后台开访问来源数据的时候发现三个渠道来源的比例超过了100% 搜了很多文章最终在官方社区找到了官方回复&#xff1a; 超过100%&#xff0c;是因为可能有用户&#xff0c;在当日通过多个场景&#xff0c;打开过你的小程序 比如用户A&#xff0c;上午通过【…

matlab第三方硬件支持包下载和安装

1、在使用matlab内部的附加功能安装时&#xff0c;由于matlab会验证是否正版无法打开 2、在matlab官网直接找到对应的硬件支持包下载&#xff0c;但是是下图的安装程序 可以直接在matlab中跳转到该程序所在的文件夹双击安装&#xff0c;但是安装到最后出错了 3.根据出错时mala…

Django Test

Django--Laboratory drug management and early warning system-CSDN博客 创建项目doinglms django-admin startproject doinglms python manage.py runserver 运行开发服务器(Development Server) 创建一个自定义 App,名称为 lms: python manage.py startapp lms

时间序列分析基础篇

**时间序列分析&#xff08;time series analysis&#xff09;是量化投资中的一门基本技术。时间序列是指在一定时间内按时间顺序测量的某个变量的取值序列。**比如变量是股票价格&#xff0c;那么它随时间的变化就是一个时间序列&#xff1b;同样的&#xff0c;如果变量是股票…

Vue 的响应式数据 ref的使用

ref 是 vue 提供给我们用于创建响应式数据的方法。 ref 常用于创建基本数据&#xff0c;例如&#xff1a;string、number、boolean 等。 ref 还是通过 Object.defineProperty 的 get 与 set 方法&#xff0c;实现的响应式数据。 ref 创建基本数据&#xff1a; <template…

一文带你快速上手MySQL8窗口函数,实现更高效的数据处理

文章目录 MySQL8窗口函数前言窗口函数相关概念介绍窗口函数分区介绍 窗口函数的使用语法介绍实战演练示例一&#xff1a;聚合函数示例二&#xff1a;排名函数示例三&#xff1a;偏移函数示例四&#xff1a;分布函数示例五&#xff1a;首尾函数示例六&#xff1a;其它函数 总结 …

ubuntu下yolov6 tensorrt模型部署

文章目录 ubuntu下yolov6 tensorrt模型部署一、Ubuntu18.04环境配置1.1 安装工具链和opencv1.2 安装Nvidia相关库1.2.1 安装Nvidia显卡驱动1.2.2 安装 cuda11.31.2.3 安装 cudnn8.21.2.4 下载 tensorrt8.4.2.41.2.5 下载仓库TensorRT-Alpha并设置 二、从yolov6源码中导出onnx文…

最近公共祖先

一、题目 将一棵无穷大满二叉树的结点按根结点一层一层地从左往右编号&#xff0c;根结点编号为1。现给定a&#xff0c;b为两个结点。设计一个算法&#xff0c;返回a、b最近的公共祖先的编号。注意其祖先也可能是结点本身。 二、代码 class LCA { public:int getLCA(int a, i…

Eclipse插件安装版本不兼容问题解决方案——Papyrus插件为例

项目场景: Eclipse Papyrus安装后,没有新建Papyrus工程选项,也没有新建Papyrus Model的选项。 打开Papyrus Model会报错 问题描述 同样的,安装其他插件也是。可能某个插件之前安装是好用的,结果Eclipse的版本更新了,就再也安装不好用了 原因分析: 根本原因是因为包之…

数字孪生技术:新零售的未来之路

随着科技的不断进步&#xff0c;新零售产业正经历着巨大的变革。数字孪生作为一种新兴技术正在加速这一变革的进程。它不仅为新零售企业带来了更高效的运营方式&#xff0c;还为消费者提供了更个性化、便捷的购物体验。那么&#xff0c;数字孪生技术究竟如何在新零售产业中发挥…

415. 字符串相加

415. 字符串相加 class Solution { public:string addStrings(string num1, string num2){//i j分别指向当前字符串的最后一位int i num1.length() - 1;int j num2.length() - 1;int add 0;string s "";//不要忽略两个串都遍历完了 但是还有一个进位while (i …

十七、【渐变工具组】

文章目录 渐变工具油漆桶工具 渐变工具 渐变样式有5种&#xff0c;分别是线性渐变&#xff0c;径向渐变&#xff0c;角度渐变&#xff0c;对称渐变&#xff0c;菱形渐变 另外渐变工具的颜色可以进行编辑&#xff0c;需要先打开渐变编辑工具&#xff1a; 如何使用渐变编辑工…

MVVM 与 MVC区别和应用场景?

MVVM 和 MVC 1. MVC2. MVVM 1. MVC MVC 是 Model View Controller 的缩写 Model&#xff1a;模型层&#xff0c;是应用程序中用于处理应用程序数据逻辑的部分。通常模型对象负责在数据库中存取数据。View&#xff1a;视图层&#xff0c;用户界面渲染逻辑&#xff0c;通常视图…

Elasticsearch 分片内部原理—使文本可被搜索、动态更新索引

目录 一、使文本可被搜索 不变性 二、动态更新索引 删除和更新 一、使文本可被搜索 必须解决的第一个挑战是如何使文本可被搜索。 传统的数据库每个字段存储单个值&#xff0c;但这对全文检索并不够。文本字段中的每个单词需要被搜索&#xff0c;对数据库意味着需要单个字…