spark-RDD原理

1.缓存和checkpoint机制

1.1缓存的使用

缓存级别

  • 指定缓存的数据位置

  • 默认是缓存到内存上

StorageLevel.DISK_ONLY # 将数据缓存到磁盘上
StorageLevel.DISK_ONLY_2 # 将数据缓存到磁盘上 保存两份
StorageLevel.DISK_ONLY_3 # 将数据缓存到磁盘上 保存三份
StorageLevel.MEMORY_ONLY # 将数据缓存到内存  默认
StorageLevel.MEMORY_ONLY_2 # 将数据缓存到内存 保存两份
StorageLevel.MEMORY_AND_DISK # 将数据缓存到内存和磁盘  优先将数据缓存到内存上,内存不足可以缓存到磁盘
StorageLevel.MEMORY_AND_DISK_2 = # 将数据缓存到内存和磁盘
StorageLevel.OFF_HEAP # 不使用  缓存在系统管理的内存上   heap jvm的java虚拟机中的heap
StorageLevel.MEMORY_AND_DISK_ESER # 将数据缓存到内存和磁盘  序列化操作,按照二进制存储,节省空间 

使用

  • persist 使用该方法

  • cache 内部调用persist

  • 手动释放 unpersist

# 缓存 实现持久化
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel
sc = SparkContext()# 获取数据
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_line = rdd.map(lambda x:x.split(','))
# 将数据转为kv结构
def func(x)return (x[2],int(x[3]))rdd_kv = rdd_line.map(func)# 进入reduce阶段
# 先对kv数据进行分组
rdd_groupby = rdd_kv.groupByKey()
# 对分组后的结果进行缓存
# # storageLevel 修改缓存级别
rdd_groupby.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
# 触发缓存
rdd_groupby.collect()# 获取kv数据中value部分数据
rdd_count = rdd_groupby.mapValues(lambda x:len(list(x)))# 查看数据
res_count = rdd_count.collect()
print(res_count)

 1.2checkpoint

也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以进行永久保存,程序结束不会释放

# checkpoint 持久化  将数据存储在hdfs上
from pyspark import SparkContext# 创建对象
sc = SparkContext()# 指定checkpoint存储的hdfs位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint')# 生成rdd数据
rdd = sc.parallelize(['hadoop,spark','spark,python'])# 字符串数据切割
rdd_split = rdd.map(lambda x:x.split(','))  # [[hadoop,spark],[spark,python]]# 将二维列表转为一维
def func(x):return xrdd_word = rdd_split.flatMap(func) # [hadoop,spark,spark,python]
# 持久化操作,可以使用缓存或checkpoint
# # 对rdd使用checkpoint
rdd_word.checkpoint()
# rdd_word.persist()
# # # 触发执行
print(rdd_word.getCheckpointFile())# 将数据转为kv
rdd_kv1 = rdd_word.map(lambda x:(x,1))rdd_kv2 = rdd_word.map(lambda x:(x,2))rdd_kv3 = rdd_word.map(lambda x:(x,3))rdd_kv4 = rdd_word.map(lambda x:(x,4))rdd_kv5 = rdd_word.map(lambda x:(x,5))# 查看kv数据
res = rdd_kv1.collect()
print(res)
res2 = rdd_kv2.collect()
print(res2)
res3 = rdd_kv3.collect()
print(res3)
res4 = rdd_kv4.collect()
print(res4)
res5 = rdd_kv5.collect()
print(res5)

2.数据共享

2.1广播变量

        如果要在分布式计算里面分发大的变量数据,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

减少task线程对应变量的定义,节省内存空间

# 广播变量
from pyspark import SparkContextsc  = SparkContext()num = 10
# 将变量定义成广播变量
b_obj = sc.broadcast(num)rdd = sc.parallelize([1,2,3,4])# 转化计算
def func(x):# 广播变量无法修改# b_obj.value=20# 获取广播变量值return x+b_obj.valuerdd_map = rdd.map(func)# 查看数据
res = rdd_map.collect()
print(res)

2.2累加器

避免资源抢占造成计算错误

# 累加器
from pyspark import SparkContextsc  = SparkContext()num = 10
# 将变量定义成累加器
a_obj = sc.accumulator(num)
# 生成rdd
rdd = sc.parallelize([1,2,3,4])# 对rdd进行计算
def func(x):print(x) # 输出rdd中元素数据# 对累加器的值进行修改 每次加1a_obj.add(1)return (x,1)rdd_map = rdd.map(func)# 查看数据
res = rdd_map.collect()
print(res)# 查看累加器的数据
print(a_obj.value)

3.RDD的依赖关系

  • 窄依赖

    • 每个父RDD的一个Partition最多被子RDD的一个Partition所使用

      • map

      • flatMap

      • filter

  • 宽依赖

    • 一个父RDD的Partition会被多个子RDD的Partition所使用

      • groupbykey

      • reducebykey

      • sortBykey

    • 在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle

      • 只要是宽依赖必然发生shuffle

      • 在宽依赖进行数据交换时,只有等待所有分区数据交换完成后,才能进行后续的计算,非常影响计算速度

DAG 管理维护rdd之间依赖关系,保证代码的执行顺序。

4.shuffle过程

        在 Spark 中,Shuffle 是一种将数据在不同的分区之间重新分布的过程,通常发生在一些特定的操作中,如 groupByKeyreduceByKeyjoin 等。Shuffle 过程涉及到数据的重新分区、排序和聚合等操作,对 Spark 作业的性能有很大的影响。

 

以下是 Spark Shuffle 的大致过程:

 

一、Mapper 阶段(Map 任务)

 
  1. 数据处理:

    • 每个输入分区的数据被分配到一个或多个 Mapper(Map 任务)进行处理。Mapper 会对输入数据进行转换操作,生成中间结果。
    • 例如,在 reduceByKey 操作中,Mapper 会将输入数据中的每个键值对进行处理,生成中间的键值对结果,其中键相同的值会被聚合在一起。
  2. 分区函数:

    • 根据指定的分区函数,将中间结果分配到不同的分区中。分区函数决定了每个键值对应该被分配到哪个分区。
    • 例如,在 HashPartitioner(默认的分区函数)中,根据键的哈希值来确定分区。如果有两个分区,键的哈希值对 2 取模,结果为 0 的键值对分配到一个分区,结果为 1 的键值对分配到另一个分区。
  3. 缓存中间结果:

    • Mapper 会将中间结果缓存在内存中,以便在后续的 Shuffle Write 阶段进行写入。如果内存不足,中间结果可能会被溢出到磁盘上。
 

二、Shuffle Write(混洗写阶段)

 
  1. 数据写入:

    • Mapper 任务将中间结果写入本地磁盘。每个 Mapper 会根据目标分区的数量,将数据写入多个文件,每个文件对应一个目标分区。
    • 这些文件通常是临时文件,包含了要发送到不同 Reducer 的数据。
  2. 索引文件:

    • 同时,Mapper 会为每个输出文件生成一个索引文件,记录了每个分区的数据在输出文件中的偏移量。索引文件用于在 Shuffle Read 阶段快速定位数据。
 

三、Shuffle Read(混洗读阶段)

 
  1. 数据读取:

    • Reducer(Reduce 任务)从各个 Mapper 的本地磁盘读取属于自己的分区数据。Reducer 会根据分区的索引文件,确定要读取哪些文件以及从文件中的哪个位置开始读取。
    • Reducer 会从多个 Mapper 读取数据,然后对相同键的数据进行聚合或其他操作。
  2. 数据合并:

    • Reducer 会将从不同 Mapper 读取的数据进行合并和排序。如果有多个 Mapper 输出了相同键的数据,Reducer 会将这些数据合并在一起,并按照键进行排序。
  3. 聚合操作:

    • 最后,Reducer 对合并后的数据进行聚合操作,生成最终的结果。聚合操作可以是求和、计数、求平均值等。
 

四、性能影响因素和优化

 
  1. 数据倾斜:

    • 如果某些键的值在数据集中出现的频率非常高,可能会导致数据倾斜。这意味着某些 Reducer 会处理大量的数据,而其他 Reducer 处理的数据量很少。数据倾斜会严重影响作业的性能,导致某些任务运行时间过长。
    • 解决数据倾斜的方法包括使用更合适的分区函数、对倾斜的键进行特殊处理(如随机前缀)、在 Mapper 阶段进行预聚合等。
  2. 内存管理:

    • Shuffle 过程中需要大量的内存来缓存中间结果和读取的数据。如果内存不足,可能会导致数据溢出到磁盘上,增加磁盘 I/O 开销,降低性能。
    • 可以通过调整 Spark 的内存参数(如 spark.executor.memoryspark.storage.memoryFraction 等)来优化内存使用。此外,也可以使用一些内存优化技术,如序列化、压缩等,减少内存占用。
  3. 网络传输:

    • Shuffle 过程中需要在不同的节点之间传输大量的数据。网络传输的性能会影响作业的整体执行时间。
    • 可以通过优化网络配置、使用高效的序列化格式(如 Kryo 序列化)、增加网络带宽等方式来提高网络传输性能。
  • park的shuffle的两个部分

    • shuffle wirte 写

    • shuffle read 读

    • 会进行文件的读写,影响spark的计算速度

  • spark的shuffle方法类

    • 是spark封装好的处理shuffle的方法

    • hashshuffle 类

      • 进行的是hash计算

      • spark1.2版本前主要使用,之后引入了sortshuffle

      • spark2.0之后,删除了hashshuffle ,从2.0版本开始使用sortshuffle类

      • 优化的hashshufulle和未优化

    • sortshuffle类

      • 排序方式将相同key值数据放在一起

      • sortshuffle类使用时,有两个方法实现shuffle

        • bypass模式版本和普通模式版本

        • bypass模式版本不会排序,会进行hash操作

        • 普通模式版本会排序进行shuffle

      • 可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于等于200 采用bypass,task数量超过200个则使用普通模式的方法进行shuffle

      • 一个分区对应一个task,所以task数量由分区数决定

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/443926.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

理解PID(零)——什么是PID

PID控制器是一种广泛用于各种工业控制场合的控制器,它结构简单,可以根据工程经验整定参数Kp,Ki,Kd. 虽然现在控制专家提出了很多智能的控制算法,比如神经网络,模糊控制等,但是PID仍然被广泛使用。常见的PID控制器有位置…

开源全文搜索(搜索引擎)

吃水不忘挖井人,介绍Doug Cutting大牛是十分有必要的。 最早,接触到搜索引擎,知道有个Nutch(开源搜索引擎),于是开始查看Nutch相关的资料,发现了Nutch的创始人Doug Cutting,随着项目…

初创公司首选HR软件推荐与功能解析

初创企业需HR软件自动化招聘、文书、日常任务及合规管理,提升效率。ZohoPeople是理想选择,性价比高,版本多样,满足不同需求。 1、简化招聘流程 您一直在寻找杰出的人才,以助您的初创企业飞跃发展。您选择的HR软件应该…

Python数字图像处理实战——基于OpenCV实现多种滤波器(附完整代码和结果图)

Python数字图像处理实战——基于OpenCV实现多种滤波器(附完整代码和结果图) 关于作者 作者:小白熊 作者简介:精通python、matlab、c#语言,擅长机器学习,深度学习,机器视觉,目标检测…

极客兔兔Gee-Cache Day7

protobuf配置: 从 Protobuf Releases 下载最先版本的发布包安装。解压后将解压路径下的 bin 目录 加入到环境变量即可。 如果能正常显示版本,则表示安装成功。 $ protoc --version libprotoc 3.11.2在Golang中使用protobuf,还需要protoc-g…

LSTM时间序列模型实战——预测上证指数走势

LSTM时间序列模型实战——预测上证指数走势 关于作者 作者:小白熊 作者简介:精通python、matlab、c#语言,擅长机器学习,深度学习,机器视觉,目标检测,图像分类,姿态识别,…

影刀RPA实战:Excel排序、替换与格式

1.实战目标 今天继续介绍影刀RPA操作Excel的指令,内容替换,数据排序与单元格格式设置,这几个功能在日常工作中使用率还是比较频繁的。我们可以使用影刀来处理这些重复繁琐的工作。 2.内容替换 我们手动替换内容时 打开Excel文件&#xff1…

鸿蒙进入“无人区”:该如何闯关?

按照华为方面的说法,“打造鸿蒙操作系统是三大战役,目前已经完成了底座和体验两大战役,第三大战役则是生态。”生态固然重要,但要让鸿蒙与当今世界主流操作系统抗衡,乃至成为新一代操作系统中的翘楚,其实还…

上市四天暴涨又暴跌,扫描全能王背后公司坐上“过山车”

股价四天涨五倍,遇到回调跌一半,扫描全能王母公司——合合信息,一上市就坐上了“过山车”。 合合信息其实早在2021年就向科创板申请上市,并在2023年成功过会,但直到9月13日才开启申购,IPO之路一走就是三年…

springboot-网站开发-thymeleaf引擎报错找不到指定的页面模板文件

springboot-网站开发-thymeleaf引擎报错找不到指定的页面模板文件! 这种错误的情况,发生,一般都是因为,我们自己的html模板文件,存档位置并不是在默认的templates下面。而是我们自己新建的一个子目录里面。然后&#x…

又被Transformer秀到了!结合小样本学习发A会!

在有限的数据资源下,为了训练出高性能的机器学习模型,我们常会考虑Transformer小样本学习。 这是因为Transformer能从有限的数据中提取更多有用的信息,这样与小样本学习结合,可以更有效的帮助我们提高模型的性能,加速…

Vue84 vue3项目结构分析

打开main.js文件,发现和vue2不同 //引入的不再是Vue构造函数了,引入的是一个名为createApp的工厂函数 import { createApp } from vue import App from ./App.vue//创建应用实例对象——app(类似于之前Vue2中的vm,但app比vm更“轻”) const …

Chrome(谷歌)浏览器 数据JSON格式美化 2024显示插件安装和使用

文章目录 目录 文章目录 安装流程 小结 概要安装流程技术细节小结 概要 没有美化的格式浏览器展示 美化之后效果图 安装流程 下载地址 https://github.com/gildas-lormeau/JSONVue 点击下载 下载成功,如图所示 解压文件 添加成功,如图所示 通过浏览器…

Python测试框架--Allure

严格意义上讲 Allure 不算是测试框架,但是它是生成漂亮测试报告的开源工具,搭配 Pytest 测试框架食用更搭。 也就是说 Allure 是在 Pytest 执行完生成的测试数据的基础上,对测试数据进行处理统计,生成格式统一、美观的测试报告。 …

C语言函数栈帧的创建与销毁(32)

文章目录 前言一、什么是函数栈帧?二、理解函数栈帧能解决什么问题?三、函数栈帧的创建和销毁解析什么是栈?认识相关寄存器和汇编指令 四、解析函数栈帧的创建和销毁预备知识函数的调用堆栈准备环境转到反汇编函数栈帧的创建函数栈帧的销毁 五…

FreeRTOS学习总结

背景:在裸机开发上,有时候我们需要等待某个信号或者需要延迟时,CPU的运算是白白浪费掉了的,CPU的利用率并不高,我们希望当一个函数在等待的时候,可以去执行其他内容,提高CPU的效率,同…

视频格式不支持播放怎么办?几招教你转换成mp4格式

视频已成为我们生活中不可或缺的一部分,无论是学习、娱乐还是工作交流,视频都扮演着重要角色。然而,在享受视频带来的便利时,我们时常会遇到一个令人头疼的问题——视频格式不支持播放。不同设备、平台和软件对视频格式的支持各不…

什么是组态软件?Web组态软件又是什么?

从事相关工作的对“组态软件”应该都不陌生,那Web组态软件又是什么呢?本文将对Web组态可视化软件(下称“Web组态软件”)做简单介绍,可视化编辑器是Web组态软件中的一个重要功能模块。除了编辑器,还有哪些功能模块?又…

leetcode---素数,最小质因子,最大公约数

1 判断一个数是不是质数(素数) 方法1&#xff1a;依次判断能否被n整除即可&#xff0c;能够整除则不是质数&#xff0c;否则是质数 方法2&#xff1a;假如n是合数&#xff0c;必然存在非1的两个约数p1和p2&#xff0c;其中p1<sqrt(n)&#xff0c;p2>sqrt(n)。 方法3&…

医院管理新思维:Spring Boot技术应用

5系统详细实现 5.1 医生模块的实现 5.1.1 病床信息管理 医院管理系统的医生可以管理病床信息&#xff0c;可以对病床信息添加修改删除操作。具体界面的展示如图5.1所示。 图5.1 病床信息管理界面 5.1.2 药房信息管理 医生可以对药房信息进行添加&#xff0c;修改&#xff0c;…