Spark 【分区与并行度】

RDD 并行度和分区

SparkConf

setMaster("local[*]")

我们在创建 SparkContext 对象时通常会指定 SparkConf 参数,它包含了我们运行时的配置信息。如果我们的 setMaster 中的参数是 "local[*]" 时,通常代表使用的CPU核数为当前环境的最大值。

val conf = new SparkConf().setMaster("local[*]").setAppName("test partition")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))rdd.saveAsTextFile("test_par_out")sc.stop()

运行结果:

在设备管理器中查看CPU核数:

 setMaster("local")

这时的使用 CPU 核数的默认值为 1 。

val conf = new SparkConf().setMaster("local").setAppName("test partition")

 

setMaster["local[2]"]

设置使用的 CPU 核数为 2

val conf = new SparkConf().setMaster("local[2]").setAppName("test partition")

创建RDD时指定分区数

我们也可以在创建 RDD 对象时指定切片数 numSlices(切片数就是分区的数量(通常一个分区对应一个Task,一个Task对应一个Excutor(一个CPU核心)))。

   val conf = new SparkConf().setMaster("local[*]").setAppName("test partition")val sc = new SparkContext(conf)//第二个参数用来指定并行度(分区数)val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),1)rdd.saveAsTextFile("test_par_out")sc.stop()

 

conf.set 指定并行度

val conf = new SparkConf().setMaster("local[*]").setAppName("test partition")conf.set("spark.default.parallelism","5")

读取内存数据(集合)的分区规则

核心源码:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {(0 until numSlices).iterator.map { i =>val start = ((i * length) / numSlices).toIntval end = (((i + 1) * length) / numSlices).toInt(start, end)}}

比如我们读取集合数组 List(1,2,3,4,5),我们在创建RDD对象时设置分区数为 3 。

val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5),3)

 当我们保存时,会输出三个文件,文件内容分别是:

  • part-00000:1
  • part-00001:2,3
  • part-000002:4,5

因为此时我们源码的 positions 的参数是 (length:5,numSilces:3),它会返回三个元组(start,end),对应我们数组的下标,并且左闭右开。

  • part-00000:(0,1)
  • part-00001:(1,3)
  • part-000002:(3,5)

读取文件数据的分区规则

我们在通过读取本地文件系统的文件来创建 RDD 时:

val conf = new SparkConf().setMaster("local[*]").setAppName("test partition")conf.set("spark.default.parallelism","5")val sc = new SparkContext(conf)val rdd = sc.textFile("data/1.txt")sc.stop()

默认的分区数量是最小分区数量(2):

//defaultParallelism取决于 setMaster("local[*]") ,如果是 local[*] 代表分区数=CPU核数 但是min方法返回最小值,最小值=2
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

Spark 分区规则和 Hadoop 是一样的,只是切片规则和数据读取规则有差异。

案例-文件a.txt:

1
2
3

Spark 分区数量的计算方式:

源码:

long totalSize = 0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

 对于我们上面的文件 a.txt:

// totalSize 是文件的总字节数,一个回车占两个字节
totalSize = 7
goalSize = 7 / (2 == 0 ? 1 : 2) = 3 (单位:byte)    //也就是每个分区占用3个字节//分区数= totalSize/gogalSize=2...1 (余数1byte,根据hadoop的规则,如果余数>每个分区的字节数的1.1倍,就要产生新的分区,否则就不会产生新的分区)
//这里余数是 1 , 1/3 = 33.3% > 0.1 所以会产生一个新的分区
//所以分区数 = 3

数据分区的分配

1. Spark 数据分区以行为单位进行读取

2. 数据读取时,以偏移量为单位

以上面的 a.txt 为例(@@代表一个回车)

1@@    => 012            
2@@    => 345
3      => 6

 

3. 数据分区的偏移量范围的计算

//注意: 左右都是闭区间,
//偏移量不会被重复读取   
part-00000    => [0,3]    => 1@@,2@@    //读到3的时候已经到了第二行,要读就读一整行,所以2@@都会被读取
part-00001    => [3,6]    => 3 [3,6]对应的第二行的第1个字节(2)~第3行第1个字节(3),而2已经被读过了,所以只剩3
part-00002    => [6,7]    => 

coalesce 和 repartition

coalesce 和 repartition 分别用于缩减分区节省资源和扩大分区提高并行度。

coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。

当 Spark 程序中,存在过多的小任务的时候,可以使用 coalesce 方法,收缩合并分区,减少分区的个数,减少任务调度成本。

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)val newRDD1 = rdd.coalesce(2)/*coalesce 默认情况下不会将分区内的数据打乱重新组合,这里是直接将三个分区中两个分区合并为一个分区,另外一个仍然是一个分区这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜如果想要数据均衡,可以进行shuffle处理分区结果:part-00000: 1 2part-00001: 3 4 5 6*/val newRDD2 = rdd.coalesce(2,true)/*分区结果:part-0000: 1 4 5part-0001: 2 3 6*/

repartition

repartition 的底层其实就是 coalesce ,为了区分缩减和扩大分区(都可以由coalesce实现),所以分成了两个方法。

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)/*length=6,numSlices=2part-00000:  1 2 3part-00001:  4 5 6*/// 想要扩大分区数量 提高并行度 shuffle 必须为true 因为我们要把2个分区的数据分为3个分区 就必须打乱分区内的数据重新排// 如果不设置shuffle为true是没有意义的 结果还是2个分区val newRdd1 = rdd.coalesce(3,true)/*分区结果:part-00000: 3 5part-00001: 2 4part-00002: 1 6*/// 缩减分区用 coalesce,如果要数据均衡可以采用 shuffle// 扩大分区用 repartition , repartition底层就是 coalesce(numSlices,true)rdd.repartition(2)

repartition 底层代码

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/137664.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

setState是同步还是异步的?

您好,如果喜欢我的文章,可以关注我的公众号「量子前端」,将不定期关注推送前端好文~ 以下内容针对React v18以下版本。 前言 setState到底是同步还是异步?很多人可能面试都被问到过,就好比这道代码输出题&#xff1…

零售超市如何应对消费者需求?非常全面!

随着科技的飞速发展和消费者期望的不断演变,零售行业正经历着一场深刻的革命。传统零售模式逐渐被新零售模式所取代,而其中一个备受关注的元素是自动售货机。 自动售货机不仅在商场、车站和办公楼等高流量地点迅速扩张,还在重新定义我们如何购…

基于SSM的北京集联软件科技有限公司信息管理系统

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…

Meta | 对比解码:进一步提升LLM推理能力

深度学习自然语言处理 原创作者:wkk 为了改进LLM的推理能力,University of California联合Meta AI实验室提出将Contrastive Decoding应用于多种任务的LLM方法。实验表明,所提方法能有效改进LLM的推理能力。让我们走进论文一探究竟吧&#xff…

“百华多功能休闲鞋”走进CCTV移动传媒真购好物

激动的心, 颤抖的手。 百华多功能休闲鞋福利来袭! 坚守匠心之道, 安全是我们的使命。 百华入选产品将在CCTV移动传媒 《真购好物》直播。 9月21日晚10点30分,经过层层选拔,山东百华鞋业有限公司的“百华多功能休…

网络协议学习地图分享

最近在回顾网络知识点的时候,发现华为数通有关报文格式及网络协议地图神仙网站,这里涵盖了各个协议层及每个协议层对应的协议内容,最人性的化的一点是点击每个单独的协议可以跳转到该协议详细报文格式页面,有对应的说明和解释&…

linux下链接

linux下链接用法 ln链接格式与介绍 linux下链接用法一、链接的使用格式二、链接的介绍 一、链接的使用格式 链接: 格式: ln 源文件 链接文件 硬链接 ln -s 源文件 链接文件 软连接 硬链接文件占磁盘空间 但是删除源文件不会影响硬链接文件 软链接文件不…

Android 应用上线注意事项

将 Android 应用上线到 Google Play 商店需要仔细注意一系列问题,以确保应用的质量、安全性和用户体验。以下是一些在 Android 应用上线过程中需要注意的关键问题,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司&…

介绍Spring Security框架,以及如何使用它实现应用程序的安全性

文章目录 什么是 Spring Security?Spring Security 的工作原理如何使用 Spring Security 构建安全的应用程序步骤 1:添加 Spring Security 依赖步骤 2:配置 Spring Security步骤 3:配置安全性规则步骤 4:创建用户和角色…

【Linux系统编程】通过系统调用获取进程标识符 及 创建子进程(fork)

文章目录 1. 通过系统调用获取进程标示符(PID)1.1 进程id(PID)1.2 父进程id(PPID) 2. bash也是一个进程3. 通过系统调用创建进程-fork初识3.1 批量化注释3.2 取消注释3.3 fork创建子进程3.4 fork的返回值3.…

目标检测算法改进系列之Neck添加渐近特征金字塔网络(AFPN模块)

渐近特征金字塔网络(AFPN模块) 在目标检测任务中,多尺度特征对具有尺度差异的目标进行编码具有重要意义。多尺度特征提取的常用策略是采用经典的自顶向下和自底向上的特征金字塔网络。 然而,这些方法存在特征信息丢失或退化的问…

十四、流式编程(2)

本章概要 中间操作 跟踪和调试流元素排序移除元素应用函数到元素在 map() 中组合流 中间操作 中间操作用于从一个流中获取对象,并将对象作为另一个流从后端输出,以连接到其他操作。 跟踪和调试 peek() 操作的目的是帮助调试。它允许你无修改地查看…

Maven3.6.1下载和详细配置

1.下载maven 说明:以下载maven3.6.1为例 1.1网址 Maven – Welcome to Apache Maven 1.2点击下载 1.3点击Maven 3 archives 1.4 点击相应的版本 1.5 点击binaries下载 说明:binaries是二进制的意思 1.6点击zip格式 1.7 蓝奏云获取 说明&#xff1a…

gateway之断言的使用详解

文章目录 gateway产生的背景,为什么要是用gateway什么是网关gateway 带来的好处功能特征gateway在项目中使用的依赖 什么是断言断言分类内置自定义示例 断言和过滤器的不同 gateway产生的背景,为什么要是用gateway 一个系统会被拆分为多个微服务&#x…

软考 -- 计算机学习(2)

文章目录 一、安全性知识1.1 信息安全和信息系统安全1.2 信息安全技术1.3 网络安全技术 二、多媒体技术三、软件工程基础知识3.1 信息系统生命周期3.2 软件过程模型3.3 信息系统开发方法3.4 系统分析和设计概述3.5 结构化开发方法3.6 系统运行与维护 四、项目管理4.1 进度管理4…

说说hashCode() 和 equals() 之间的关系?

每天一道面试题,陪你突击金九银十! 上一篇关于介绍Object类下的几种方法时面试题时,提到equals()和hashCode()方法可能引出关于“hashCode() 和 equals() 之间的关系?”的面试题,本篇来解析一下这道基础面试题。 先祭一…

ESP-IDF学习——1.环境安装与hello-world

ESP-IDF学习——1.环境安装与hello-world 0.前言一、环境搭建1.官方IDE工具2.vscode图形化配置 二、示例工程三、自定义工程四、点灯五、总结 0.前言 最近在学习freertos,但由于买的书还没到,所以先捣鼓捣鼓ESP-IDF,因为这个比Arduino更接近底…

『贪吃蛇』AI 算法简易实现(中秋特别版)

前言 一年一度的中秋节就快到了,平台也有各种各样的中秋发文活动,正在翻阅时偶然间我看到了这篇文章:《兔饼大作战》:吃月饼、见月亮,还能咬自己?| 欢庆中秋特制版 - 掘金 (juejin.cn) 大家肯定比较熟悉了…

python处理CSV文件

CSV库还有其他处理CSV的方法,这里只是介绍几个常用的,后面如果用到别的会进行更新 目录 1 生成一个新的csv文件,并向其中写一点东西 2 单纯往里面写几行 3 读取csv文件 1 生成一个新的csv文件,并向其中写一点东西 import…

Mybatis学习笔记11 缓存相关

Mybatis学习笔记10 高级映射及延迟加载_biubiubiu0706的博客-CSDN博客 缓存:cache 缓存的作用:通过减少IO的方式,来提高程序的执行效率 Mybatis的缓存:将select语句的查询结果放到缓存(内存)当中,下一次还是这条select语句的话,直接从缓存中取,不再查数据库.一方面是减少了I…