Spark Optimization —— Reducing Shuffle

Spark Optimization : Reducing Shuffle

“Shuffling is the only thing which Nature cannot undo.” — Arthur Eddington

在这里插入图片描述

Shuffle Shuffle Shuffle

I used to see people playing cards and using the word “Shuffle” even before I knew how to play it. Shuffling in cards, play a very critical role to distribute “power”, adding weightage to a player’s hand. It is nothing but adding the randomness in selection. When we want to distribute the cards for various games for example contract bridge shuffle is the way to create even/uneven distribution to 4 hands.

在这里插入图片描述

Sweet hand

Well, enough of playing cards!

Let us understand how shuffle impacts big data computation. Ah, yes I think again I will use card shuffle to explain you. 😀

在这里插入图片描述

Chaos! I love that!

Look at the above image and give me the answers of the below questions.

  • How many black cards are present? ♠️♣️
  • How many of the red cards have numbers greater than 4? ♥️♦️
  • How many high value cards(showing off my knowledge eh!) are left in clubs? ♣️

No need to explain that you will tell me, “Yes, I can give you answers but let me arrange them first.” Then you will do what is shown here ⬇️

在这里插入图片描述

Ta Da!

The Shuffle in Big Data World

To answer my questions you must do the arrangement to order cards of same packs together like the above image. That means you need to find all cards of same family one by one and them order then A to K or vice versa. This operation of moving cards(data) to seek and order is actually called Shuffle in big data world.

Imagine a situation when you are processing 1000s of GBs of data joining with similar magnitude and answering similar questions of different grains and groups. Yes, in distributed computing world exchanging data across machines, across networks creates so much exchange(I/O) that it slows down the computing process. Shuffle alone cause multiple stages in a big data job and delays the outcome.

How does shuffle work in Spark?

在这里插入图片描述

In Apache Spark, Shuffle describes the procedure in between reduce task and map task. Shuffling refers to the shuffle of data given. This operation is considered the costliest .The shuffle operation is implemented differently in Spark compared to Hadoop.

On the map side, each map task in Spark writes out a shuffle file (OS disk buffer) for every reducer — which corresponds to a logical block in Spark. These files are not intermediary in the sense that Spark does not merge them into larger partitioned ones. Since scheduling overhead in Spark is lesser, the number of mappers (M) and reducers(R) is far higher than in Hadoop. Thus, shipping M*R files to the respective reducers could result in significant overheads.

Similar to Hadoop, Spark also provide a parameter spark.shuffle.compress to specify compression libraries to compress map outputs. In this case, it could be Snappy (by default) or LZF. Snappy uses only 33KB of buffer for each opened file and significantly reduces risk of encountering out-of-memory errors.

On the reduce side, Spark requires all shuffled data to fit into memory of the corresponding reducer task. This would of course happen only in cases where the reducer task demands all shuffled data for a GroupByKey or a ReduceByKey operation, for instance. Spark throws an out-of-memory exception in this case, which has been quite a challenge because when spark spills over to disk it creates more problem of I/O and read slowness.

Also with Spark there is no overlapping copy phase, unlike Hadoop that has an overlapping copy phase where mappers push data to the reducers even before map is complete. This means that the shuffle is a pull operation in Spark, compared to a push operation in Hadoop. Each reducer should also maintain a network buffer to fetch map outputs. Size of this buffer is specified through the parameter spark.reducer.maxMbInFlight (by default, it is 48MB).

Tuning Spark to reduce shuffle

spark.sql.shuffle.partitions

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions. It is typically based on the volume of data you might have to reduce or increase the number of partitions of RDD/DataFrame using spark.sql.shuffle.partitions configuration or through code.

Using this configuration we can control the number of partitions of shuffle operations. By default, its value is 200. But, 200 partitions does not make any sense if we have files of few GB(s). So, we should change them according to the amount of data we need to process via Spark SQL.

Let’s see a practical difference. Here I am creating a small two small dataframes with the most popular employee, department with two employees Daniel and me.

在这里插入图片描述

The default value of spark.sql.shuffle.partitions is 200. Let us run with default and see how much time it takes.

在这里插入图片描述

Time taken : 6060 ms with spark.sql.shuffle.partitions = 200

在这里插入图片描述

Now, if we do some modification with the config as we don’t need 200 shuffle partitions for this such small amount of data if can be done faster. Here I am setting it to 2.

在这里插入图片描述

Time taken : 398 ms with spark.sql.shuffle.partitions = 2

在这里插入图片描述
So, you can see tweaking the shuffle partition alone made it 15 times faster.

Reduce dataSet size

The classic rule of ETL. Filter as much as data near to the source is much important in spark as well. If you are dealing with lot of data, which has very fine grained aggregates and joins, it is pretty obvious there would be shuffles. It is always essential to control number of records before you start joins/aggregates so that data volume gets reduced by some %. Use appropriate filter predicates in your SQL query so Spark can push them down to the underlying datasource. Selective predicates are good. Use them as appropriate. Use partition filters if they are applicable.

Broadcast Broadcast Broadcast

When you join two datasets, one large and one small the best option in Spark is to perform a broadcast join (map-side join). With broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions) by avoiding sending all data of the large table over the network.

You can use broadcast function to mark a dataset to be broadcasted when used in a join operator. It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.

在这里插入图片描述

SELECT /*+ BROADCAST(B) */ * FROM TableA A INNER JOIN TableB B ON A.key = B.key;

This technique will broadcast the entire table B to all the executors and will help spark to avoid shuffle. The joins will will be local to all executors and thus it won’t be needed any data to come across machines and there won’t be any shuffle.

在这里插入图片描述

More Shuffles vs Lesser Shuffles

Some times we encounter situations where we are joining multiple datasets but based on different keys. For example, let’s check the sqls below.

SELECT * FROM TableA A INNER JOIN TableB B ON A.key1 = B.key1;
SELECT * FROM TableB B INNER JOIN TableC C ON B.key2 = C.key2;

It is evident that if we consider that while we read A and B it may or may not be partitioned to support the second join that means if we try to execute the joins without any such optimisation technique it might cause more shuffles. Key1 and Key2 across executors will not be evenly distributed. So in such cases we prefer to do repartition B or C accordingly. Repartition can be done on a column with a number specified or we can just do it with a random number which is suitable and comparable with the number of executor and core combination.

SELECT /*+ REPARTITION(key2)*/ * FROM TableB B;

There are many other techniques to overcome shuffles which you will come across as much you start dealing with production level problems. I think the above ones are definitely the most important to start with.

you start dealing with production level problems. I think the above ones are definitely the most important to start with.

For any type of help regarding career counselling, resume building, discussing designs or know more about latest data engineering trends and technologies reach out to me at anigos.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482092.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Elasticsearch——Java API 操作

Elasticsearch 软件是由Java语言开发的,所以也可以通过JavaAPI的方式对 Elasticsearch服务进行访问。 创建 Maven 项目 我们在 IDEA 开发工具中创建 Maven 项目(模块也可)ES。并修改pom文件&#xff0c;增加Maven依赖关系。 #直接复制在pom文件的<dependencies></de…

量化的8位LLM训练和推理使用bitsandbytes在AMD GPUs上

Quantized 8-bit LLM training and inference using bitsandbytes on AMD GPUs — ROCm Blogs 在这篇博客文章中&#xff0c;我们将介绍bitsandbytes的8位表示方式。正如你将看到的&#xff0c;bitsandbytes的8位表示方式显著地减少了微调和推理大语言模型&#xff08;LLMs&…

自回归(Autoregressive)模型概述

自回归&#xff08;Autoregressive&#xff09;模型概述 自回归&#xff08;Autoregressive&#xff0c;简称AR&#xff09;模型是一类基于“历史数据”来预测未来数据的模型。其核心思想是模型的输出不仅依赖于当前输入&#xff0c;还依赖于先前的输出。自回归模型通常用于时…

Win11电脑亮度无法调节以及夜间模式点击没有用失效解决方法

一、问题 最近&#xff0c;突然感觉屏幕亮度十分刺眼&#xff0c;想调整为夜间模式&#xff0c;发现点了夜间模式根本没用&#xff0c;亮度也是变成了灰色。 明明前几天还能调节的&#xff0c;这实在是太难受了&#xff01; 二、原因 这是远程控制软件向日葵的问题 在向日葵…

Linux笔记---进程:进程终止

1. 进程终止概念与分类 进程终止是指一个正在运行的进程结束其执行的操作。以下是一些常见的导致进程终止的情况&#xff1a; 一、正常终止 完成任务当进程完成了它被设计要执行的任务后&#xff0c;就会正常终止。收到特定信号在操作系统中&#xff0c;进程可能会收到来自操作…

【工具推荐】dnsx——一个快速、多用途的 DNS 查询工具

basic/基本使用方式 echo baidu.com | dnsx -recon # 查询域名所有记录echo baidu.com | dnsx -a -resp # 查询域名的a记录echo baidu.com | dnsx -txt -resp # 查询域名的TXT记录echo ip | dnsx -ptr -resp # ip反查域名 A记录查询 TXT记录查询 ip反查域名 help/帮助信息 输…

【树莓派5】移动热点获取树莓派IP并初次登录SSH

本篇文章包含的内容 1 打开系统热点2 烧录系统设置3 配置 MobaXterm4 初次启动树莓派配置选项4.1 换源4.2 更新软件包4.3 安装vim编辑器4.4 更改CPU FAN温度转速 Windows版本&#xff1a;Windows11 24H2树莓派&#xff1a;树莓派5&#xff0c;Raspberry Pi 5SSH软件&#xff1a…

【Git系列】Git 提交历史分析:深入理解`git log`命令

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

第144场双周赛:移除石头游戏、两个字符串得切换距离、零数组变换 Ⅲ、最多可收集的水果数目

Q1、[简单] 移除石头游戏 1、题目描述 Alice 和 Bob 在玩一个游戏&#xff0c;他们俩轮流从一堆石头中移除石头&#xff0c;Alice 先进行操作。 Alice 在第一次操作中移除 恰好 10 个石头。接下来的每次操作中&#xff0c;每位玩家移除的石头数 恰好 为另一位玩家上一次操作…

Python parsel库学习总结

parsel库是Python中用于解析HTML文件的库&#xff0c;其能通过CSS选择器、xpath、正则表达式来定位html中的元素。 通过css选择器定位元素 from parsel import Selectorhtml """ <html><head><a class"option1">这是一个伪html片…

【HarmonyOS学习日志(11)】计算机网络之概念,组成和功能

文章目录 计算机网络概念计算机网络&#xff0c;互连网与互联网的区别计算机网络互连网互联网&#xff08;因特网&#xff0c;Internet&#xff09; 计算机网络的组成和功能计算机网络的组成从组成部分看从工作方式看从逻辑功能看 计算机网络的功能数据通信资源共享分布式处理提…

Vue3 开源UI 框架推荐 (大全)

一 、前言 &#x1f4a5;这篇文章主要推荐了支持 Vue3 的开源 UI 框架&#xff0c;包括 web 端和移动端的多个框架&#xff0c;如 Element-Plus、Ant Design Vue 等 web 端框架&#xff0c;以及 Vant、NutUI 等移动端框架&#xff0c;并分别介绍了它们的特性和资源地址。&#…

视觉语言动作模型VLA的持续升级:从π0之参考基线Octo到OpenVLA、TinyVLA、DeeR-VLA、3D-VLA

第一部分 VLA模型π0之参考基线Octo 1.1 Octo的提出背景与其整体架构 1.1.1 Octo的提出背景与相关工作 许多研究使用从机器人收集的大量轨迹数据集来训练策略 从早期使用自主数据收集来扩展策略训练的工作[71,48,41,19-Robonet,27,30]到最近探索将现代基于transformer的策略…

k8s--pod创建、销毁流程

文章目录 一、pod创建流程二、pod销毁流程 一、pod创建流程 1、用户通过kubectl或其他api客户端提交pod spec给apiserver,然后会进行认证、鉴权、变更、校验等一系列过程2、apiserver将pod对象的相关信息最终存入etcd中,待写入操作执行完成,apiserver会返回确认信息给客户端3、…

相同的二叉树

给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3], q [1,2,3] 输出&#xff1a;true示例 2&…

算法妙妙屋-------1.递归的深邃回响:全排列的奇妙组合

全排列的简要总结 全排列&#xff08;Permutation&#xff09;是数学中一个经典的问题&#xff0c;指的是从一组元素中&#xff0c;将所有元素按任意顺序排列形成的所有可能序列。 特点 输入条件&#xff1a; 给定一组互异的元素&#xff08;通常为数组或字符串&#xff09;。…

【Rust】unsafe rust入门

这篇文章简单介绍下unsafe rust的几个要点 1. 解引用裸指针 裸指针其实就是C或者说C的指针&#xff0c;与C的指针不同的是&#xff0c;Rust的裸指针还是要分为可变和不可变&#xff0c;*const T 和 *mut T&#xff1a; 基于引用创建裸指针 let mut num 5;let r1 &num …

什么是人工智能大模型?

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于人工智能大模型的相关内容&#xff01; …

基于深度学习和卷积神经网络的乳腺癌影像自动化诊断系统(PyQt5界面+数据集+训练代码)

乳腺癌是全球女性中最常见的恶性肿瘤之一&#xff0c;早期准确诊断对于提高生存率具有至关重要的意义。传统的乳腺癌诊断方法依赖于放射科医生的经验&#xff0c;然而&#xff0c;由于影像分析的复杂性和人类判断的局限性&#xff0c;准确率和一致性仍存在挑战。近年来&#xf…

【IMF靶场渗透】

文章目录 一、基础信息 二、信息收集 三、flag1 四、flag2 五、flag3 六、flag4 七、flag5 八、flag6 一、基础信息 Kali IP&#xff1a;192.168.20.146 靶机IP&#xff1a;192.168.20.147 二、信息收集 Nmap -sP 192.168.20.0/24 Arp-scan -l nmap -sS -sV -p- -…