RDD优化:缓存和checkpoint机制、数据共享(广播变量、累加器)、RDD的依赖关系、shuffle过程、并行度说明

文章目录

  • 1. 缓存和checkpoint机制
    • 1.1 缓存使用
    • 1.2 checkpoint
    • 1.3 缓存和checkpoint的区别
  • 2. 数据共享
    • 2.1 广播变量
    • 2.2 累加器
  • 3. RDD依赖关系
  • 4.shuffle过程
    • 4.1 shuffle介绍
    • 4.2 spark计算要尽量避免shuffle
  • 5. 并行度

1. 缓存和checkpoint机制

缓存和checkpoint也叫作rdd的持久化将rdd的数据存储在指定位置
作用:

  • 计算容错
  • 提升计算速度

1.1 缓存使用

在这里插入图片描述
缓存是将数据存储在内存或者磁盘上,缓存的特点计算结束时,缓存自动清空

  • 缓存级别
    • 指定缓存的数据位置
    • 默认是缓存到内存上
  • 使用
    • persist使用该方法
    • cache内部调用persist
    • 手动释放 unpersist
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevelsc = SparkContext()
rdd = sc.parallelize(['a', 'b','a','c'])# rdd数据进行转化
rdd_kv  = rdd.map(lambda x: (x,1))
#rdd_kv数据进行缓存
rdd_kv.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
#使用action算子触发
rdd_kv.collect()# 分组处理
rdd_group = rdd_kv.groupByKey()#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x,y:x+y)
# 查看计算结果
res = rdd_reduce.collect()
print(res)

1.2 checkpoint

在这里插入图片描述
checkpoint也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以永久保存,程序结束不会释放
如果需要删除就在HDFS上删除对应的目录文件。

#checkpoint使用
from pyspark import SparkContext
sc = SparkContext()#使用sc对象指定checkpoint存储位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint')rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k'])#rdd数据进行转化
rdd_kv = rdd.map(lambda x: (x, 1))#rdd 数据进行checkpoint
rdd_kv.checkpoint()
#需要使用action算子触发checkpoint
print(rdd_kv.glom().collect())#分组处理
rdd_group = rdd_kv.groupByKey()#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x, y: x + y)#查看计算结果
res = rdd_kv.collect()
print(res)res1 = rdd_group.collect()
print(res1)res2 = rdd_reduce.collect()
print(res2)

1.3 缓存和checkpoint的区别

  • 生命周期
    • 缓存数据,程序计算结束后自动删除
    • checkpoint 程序结束,数据依然保留在HDFS
  • 存储位置
    • 缓存 优先存储在内存上,也可以选存储在本地磁盘,是在计算任务所在的内存和磁盘上。
    • checkpoint存储在HDFS上
  • 依赖关系
    • 缓存数据后,会保留rdd之间依赖关系,缓存临时存储,数据可能会丢失,需要保留依赖,当缓存丢失后可以按照依赖重新计算。
    • checkpoint,数据存储后会断开依赖,数据保存在HDFS,HDFS三副本机制可以保证数据不丢失,所以没有比较保留依赖关系。
      注意:缓存和checkpoint可以作为rdd优化的方案,提升计算速度,一般对经常要使用的rdd进行缓存和checkpoint,对计算比较复杂的rdd进行缓存或checkpoint。

2. 数据共享

2.1 广播变量

在这里插入图片描述
在这里插入图片描述
如果要在分布式计算里面分发大的变量数据,这个都会由driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task都会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor都会拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

减少task线程对应变量的定义,节省内存空间

#广播变量
from pyspark import SparkContextsc= SparkContext()a_obj = sc.broadcast(10)
#生成rdd
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])#转化计算
rdd2 = rdd.map(lambda x:x+a_obj.value)#查看数据
res = rdd2.collect()
print(res)

2.2 累加器

避免资源抢占造成错误

#累加器的使用
from pyspark import SparkContextsc = SparkContext()
#将变量值添加到累加器中
acc_obj = sc.accumulator(0)#rdd操作
rdd = sc.parallelize([1,2,3])#使用累加器进行数据累加
rdd2 = rdd.map(lambda x:acc_obj.add(x))#查看结果
res = rdd.collect()
print(res)res1 = rdd2.collect()
print(res1)#查看累加器的结果
print(acc_obj.value)

查看结果:
在这里插入图片描述

3. RDD依赖关系

  • 窄依赖
    • 每个父RDD的一个partition最多被子RDD的一个partition所使用。
      • map
      • flatMap
      • filter
  • 宽依赖
    • 一个父RDD的partition会被多个子RDD的partition所使用
      • groupByKey
      • reduceByKey
      • sortByKey
    • 在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle
      • 只要是宽依赖必然发生shuffle
      • 在宽依赖进行数据交换时,只有等待所有分区交换完成后,才能进行后续的计算,非常影响计算速度。

那么如何判断是宽依赖还是窄依赖呢?

#判断宽窄依赖
from pyspark import SparkContext
sc = SparkContext()rdd = sc.parallelize([1,2,3,4])
rdd_kv = sc.parallelize([('a',1), ('b',2), ('c',3)])#算子演示
rdd2 = rdd.map(lambda x:x+1)
rdd3 = rdd_kv.groupByKey()#查看结果
# res = rdd2.collect()
# print(res)res = rdd3.collect()
print(res)

DAG 管理维护rdd之间依赖关系,保证代码的执行顺序,


DAG会根据依赖关系划分stage,每个stage都是一个独立的计算步骤,当发生宽依赖时,会单独拆分一个计算步骤(stage),进行相关数据计算,可以保证每个单独的stage可以并行执行

在发生宽依赖进行shuffle时,会独立的方法执行shuffle计算


拆分计算步骤的本质是为了保证数据计算的并行执行

查看spark的计算过程,通过DAG判断算子是宽依赖还是窄依赖

拆分了计算stage是宽依赖,没有拆分是窄依赖

启动spark的历史日志

start-history-server.sh

在这里插入图片描述
在这里插入图片描述

4.shuffle过程

mapreduce的shuffle作用: 将map计算后的数据传递给redue使用。
mapreduce的shuffle过程: 分区(将相同key的数据放在一个分区,采用hash),排序,合并(规约)。
将map计算的数据传递给reduce


spark中也有shuffle
当执行宽依赖的算子就会进行shuffle
将rdd的数据传递给下一个rdd,进行数据交换


无论是spark还是mr,shuffle的本质是传递交换数据。

在这里插入图片描述

4.1 shuffle介绍

  • spark的shuffle的两个部分
    • shuffle write 写
    • shuffle read 读
    • 会进行文件的读写,影响spark的计算速度
  • spark的shuffle方法类
    • 是spark封装好的处理shuffle的方法
    • hashshuffle类
      • 进行的是hash计算
      • spark1.2版本之前主要使用,之后引入了sortshuffle
      • spark2.0之后,删除了hashshuffle,从2.0版本开始使用sortshuffle类
      • 优化的hashshuffle和未优化的ashshuffle
    • sortshuffle类
      • 排序方式将相同key值数据放在一起
      • sortshuffle类使用时,有两个方法实现shuffle
        • bypass模式版本和普通模式版本
        • bypass模式版本不会排序,会进行hash操作
        • 普通模式版本会排序进行shuffle
      • 可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于等于200 采用bypass,task数量超过200个则使用普通模式的方法进行shuffle
      • 一个分区对应一个task,所以task数量由分区数决定

普通模式和bypass模式的主要区别在于如何将相同key值的数据放在一起

  • 排序 普通模式采用的策略
  • 哈希取余 bypass模式采用的策略

4.2 spark计算要尽量避免shuffle

# 优化计算,减少shuffle
from pyspark import SparkContext
sc = SparkContext()rdd = sc.parallelize([('男',20),('男',23),('女',20),('女',22)])
#求不同性别的年龄和
#reduceByKey  是宽依赖算子
rdd2 = rdd.reduceByKey(lambda x,y:x+y)# 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
boy = sc.accumulator(0)
girl = sc.accumulator(0)
def func(x):if x[0]=='男':boy.add(x[1])else :girl.add(x[1])return None
rdd3 = rdd.map(func)rdd3.collect()
print(boy.value)
print(girl.value)

5. 并行度

  • 资源并行度

    • task在指定任务能够使用到的cpu核心数量

    • 多任务 多个进程或多个线程执行任务

      • 两种方式
        • 并行 多个任务同时执行
        • 并发 任务交替执行
        • 和cpu的核心数有关
          • 例如
          • cpu核心是4核 有两个线程任务 两个线程任务可以 并行执行
          • cpu核心是4核 有八个线程任务 并发执行
    • spark中cpu核心数据设置

      • –num-executors=2 设置executors数量 和服务器数量保持一致
      • –executor-cores=2 设置每个executors中的cpu核心数 每个服务器中cpu核心数一致
      spark-submit  --master yarn  --num-executors=3   --executor-cores=2
      

      最大支持的task并行数量是 num-executors* executor-cores =6

      需要按照服务器实际的cpu核心数指定 lscpu

  • 数据并行度

    • 就是task数量,task由分区数决定
    • 为了保证task能充分利用cpu资源,实现并行计算,需要设置分区数应该和资源并行度一致
    • 在实际公司中就要根据公司资源并行度进行设置分区数
    • 有的场景下公司会要求数据并行度大于资源并行度

资源并行度,

按照yarn安装的服务器数量指定excutor数量 3

核心数量按照yarn中的nodemanager中的核心数指定 2
数据并行度指定

官方建议 数据并行度的task数量和资源并行度数量一致

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/444920.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Springboot 整合 Java DL4J 实现企业门禁人脸识别系统

🧑 博主简介:历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程,…

vue后台管理系统从0到1(3)element plus 的三种导入方式

文章目录 vue后台管理系统从0到1(3)element plus 的三种导入方式element plus 引入方式完整引入按需导入手动导入 vue后台管理系统从0到1(3)element plus 的三种导入方式 element plus 引入方式 官方网址:https://el…

windows系统更新升级node指定版本【避坑篇!!!亲测有效】(附带各版本node下载链接)一定看到最后!不用删旧版!

Node.js 是一个开源、跨平台的 JavaScript 运行时环境,广泛应用于服务器端和网络应用的开发。随着 Node.js 版本的不断更新,我们可能需要升级到特定版本以满足项目需求或修复安全漏洞。又或者是学习开发另外一个新项目,新项目对Node版本要求更…

优达学城 Generative AI 课程2:Large Language Models (LLMs) Text Generation

建议先了解一下附录知识。 文章目录 1 官方课程内容自述Lesson 1: 大型语言模型(LLMs)简介Lesson 2: 自然语言处理(NLP)基础Lesson 3: Transformer 和注意力机制Lesson 4: 检索增强生成(RAG)Lesson 5: 为大…

查找企业联系电话的几种方法

在商业合作和销售拓展的过程中,找到企业的联系电话是至关重要的一步。无论是精准营销还是客户开发,拥有有效的联系方式可以大大提高成功率。那么,如何快速有效地查找企业联系电话呢?下面介绍几种常见的方法,以及如何借…

如何解决项目跟进中关键节点难以把控的问题?

在项目跟进的过程中,关键节点的把控常常是一个棘手的问题。如果不能有效地管理这些节点,项目可能会偏离轨道,导致延误、成本超支甚至失败。下面我们来分析一下都有哪些关键节点难以把控以及相应的应对策略。 1、需求变更节点 在项目进行中&a…

快速入门Tomcat服务(业务发布基础技能)

文章目录 1 Tomcat简介 2 安装tomcat 2.1 安装jdk 2.2 安装Tomcat 3 Tomcat目录结构 4 Tomcat重要配置文件 1 Tomcat简介 Tomcat是Sun公司官方推荐的Servlet和JSP容器,在中小型系统和并发访问用户不是很多的场合下,其作为轻量级应用服务…

无刷直流电机工作原理:【图文讲解】

电动机 (俗称马达) 是机械能与电能之间转换装置的通称。可以分为电动机和发电机.一般称电机时就是指电动机。这个在日常应用中,比较多见,比如机器人,手机,电动车等。 直流电机:分为有刷直流电机(BDC&#…

HTTP的工作原理

HTTP(Hypertext Transfer Protocol)是一种用于在计算机网络上传输超文本数据的应用层协议。它是构成万维网的基础之一,被广泛用于万维网上的数据通信。(超文本(Hypertext)是用超链接的方法,将各种不同空间的文字信息组…

【MySQL】CRUD增删改查操作

文章目录 CRUD简介一、Creat 新增1.单行数据全列插入2.单行数据全指定列插入3.多行数据指定列插入 二、Retrieve 检索1.全列查询 --练习阶段最简单的查询:(在生产环境最好不要用!!)2.指定列查询3.结果去重查询4.where条…

柒拾伍- AI内容农场生产文章自动发布至公众号 (一)

一、内容农场 X AI 看过很多的新闻说 AI 产生 内容 污染网络,我也想试一下到底能污染成怎样。 然后为了编写爆款的内容,我选用这个 内容农场 的种子是来源于 微博热搜,让生长出来的垃圾文章更加火爆 涉及内容不能放 二、编写代码 关于代…

常用类(一)----包装类的使用和分析

文章目录 1.包装类2.课堂测试题3.包装类方法4.Integer创建机制5.Integer面试题 1.包装类 概念:基本数据类型对应的类就是包装类,就是为了把基本数据类型转换为包装类,使用这个类里面的方法操作数据----装箱的过程; //装箱&#…

springboot查询全部部门流程

前端发送请求后,会请求DeptController的方法list()。 package com.intelligent_learning_aid_system.controller;import com.intelligent_learning_aid_system.pojo.Dept; import com.intelligent_learning_aid_system.pojo.Result; import com.intelligent_learni…

ArcGis JS天地图 暗色地图

方法一&#xff1a;使用css filter 在body下增加svg&#xff0c;并增加需要用到的滤镜&#xff0c;这边用到x-rays <svg id"svgfilters" aria-hidden"true" style"position: absolute; width: 0; height: 0; overflow: hidden"version"…

Kafka-初识

一、Kafka是什么&#xff1f; Kafka是一个高度可扩展、弹性、容错和安全的分布式流处理平台&#xff0c;由服务器和客户端组成&#xff0c;通过高性能TCP网络协议进行通信。它可以像消息队列一样生产和消费数据。可以部署在裸机硬件、虚拟机和容器上&#xff0c;也可以部署在本…

鼠标市场洞察:数据分析揭示消费趋势!

鼠标整体数据分析 一. 概述 本报告基于从淘宝商品搜索接口和淘宝精确月销量接口中提取的数据&#xff0c;分析了前百个品牌在销售额上的占比情况。分析涵盖了销售额和占比的数据&#xff0c;为决策提供了依据。(以上两个接口有需求的可以找我要链接&#xff09; 1. 大盘整体…

基于Python flask的豆瓣电影可视化系统,豆瓣电影爬虫系统

博主介绍&#xff1a;✌Java徐师兄、7年大厂程序员经历。全网粉丝13w、csdn博客专家、掘金/华为云等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb; 不…

ppt压缩文件怎么压缩?压缩PPT文件的多种压缩方法

ppt压缩文件怎么压缩&#xff1f;当文件体积过大时&#xff0c;分享和传输就会变得困难。许多电子邮件服务对附件的大小有限制&#xff0c;而在网络环境不佳时&#xff0c;上传和下载大文件可能耗时较长。此外&#xff0c;在不同设备上播放时&#xff0c;较大的PPT文件还可能导…

基于FPGA的以太网设计(一)

以太网简介 以太网&#xff08;Ethernet&#xff09;是一种计算机局域网技术。IEEE组织的IEEE 802.3标准制定了以太网的技术标准&#xff0c;它规定了包括物理层的连线、电子信号和介质访问控制的内容。以太网是目前应用最普遍的局域网技术&#xff0c;取代了其他局域网标准如…

GA-BP回归预测 | MATLAB实现GA-BP多输入单输出回归预测

回归预测 | MATLAB实现GA-BP多输入单输出回归预测 目录 回归预测 | MATLAB实现GA-BP多输入单输出回归预测预测效果基本介绍模型描述遗传算法神经网络GA-BP网络程序设计学习总结参考资料预测效果 基本介绍 MATLAB实现GA-BP多输入单输出回归预测,输入7个特征,输出1个,优化权重…