Spark分布式计算原理

一、Spark WordCount运行原理

二、划分Stage

数据本地化

        移动计算,而不是移动数据

        保证一个Stage内不会发生数据移动

三、Spark Shuffle过程

在分区之间重新分配数据

        父RDD中同一分区中的数据按照算子要求重新进入RDD的不同分区中

        中间结果写入磁盘

        有子RDD拉取数据,而不是由父RDD推送

        默认情况下,shuffle不会改变分区数量

四、RDD的依赖关系

Lineage:血统、依赖

        RDD最重要的特征之一,保存了RDD的依赖关系

        RDD实现了基于Lineage的容错机制

依赖关系

        宽依赖:一个父RDD的分区被子RDD的多个分区使用

        窄依赖:一个父RDD的分区被子RDD的一个分区使用

宽依赖对比窄依赖

宽依赖对应shuffle操作,需要在运行时将同一个父RDD的分区传入到不同的子RDD分区中,不同的分区可能位于不同的节点,就可能涉及多个节点间数据传输

当RDD分区丢失时,Spark会对数据进行重新计算,对于窄依赖只需要重新计算一次子RDD的父RDD分区

补:最多22个元组,元组可以套元组

宽依赖容错图

结论:

相比于宽依赖,窄依赖对优化更有利

练习:判断RDD依赖关系

Map  flatMap  filter  distinct  reduceByKey  groupByKey  sortByKey  union  join

scala> val b = sc.parallelize(Array(("hello",11),("java",12),("scala",13),("kafka",14),("sun",15)))
// b: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24scala> val c = sc.parallelize(Array(("hello",11),("java",12),("scala",13),("kafka",14),("sun",15)))
// c: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24scala> b.glom.collect
// collect   collectAsyncscala> b.glom.collect
// res4: Array[Array[(String, Int)]] = Array(Array((hello,11)), Array((java,12)), Array((scala,13)), Array((kafka,14), (sun,15)))scala> c.glom.collect
// collect   collectAsyncscala> c.glom.collect
// res5: Array[Array[(String, Int)]] = Array(Array((hello,11)), Array((java,12)), Array((scala,13)), Array((kafka,14), (sun,15)))scala> val d = a.jion(b)
// <console>:27: error: value jion is not a member of org.apache.spark.rdd.RDD[(String, Int, Int, Int)]
//        val d = a.jion(b)  //拼写错误^
scala> val d = a.join(b)
// <console>:27: error: value join is not a member of org.apache.spark.rdd.RDD[(String, Int, Int, Int)]
//        val d = a.join(b)  //应该是bc不是ab^
scala> val d = b.join(c)   //Tab成功
// d: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[8] at join at <console>:27scala> d.glom.collect
// collect   collectAsyncscala> d.glom.collect
// res6: Array[Array[(String, (Int, Int))]] = Array(Array((sun,(15,15))), Array(), Array((scala,(13,13)), (hello,(11,11)), (java,(12,12)), (kafka,(14,14))), Array())

五、DAG工作原理

(1)根据RDD之间的依赖关系,形成一个DAG(有向无环图)

(2)DAGScheduler将DAG划分为多个Stage

        划分依据:是否发生宽依赖(Shuffle)

        划分规则:从后往前,遇到宽依赖切割为新的Stage

        每个Stage由一组并行的Task组成的

六、Shuffle实践

最佳实践

        提前部分聚合减少数据移动

        尽量避免Shuffle  

      

七、RDD优化

  • RDD持久化
  • RDD共享变量
  • RDD分区设计
  • 数据倾斜

(一)RDD持久化

(1)RDD缓存机制:缓存数据至内存/磁盘,可大幅度提升Spark应用性能

(2)缓存策略StorageLevel

①RDD存储级别介绍(StorageLevel )

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

②缓存应用场景

从文件加载数据之后,因为重新获取文件成本较高

经过较多的算子变换之后,重新计算成本较高

单个非常消耗资源的算子之后

③使用注意事项

Cache() 或persist() 后不能再有其他的算子

Cache() 或persist() 遇到Action算子完成后才生效

④操作

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}object CacheDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")val sc: SparkContext = SparkContext.getOrCreate(conf)val rdd: RDD[String] = sc.textFile("in/users.csv")//    stack堆栈      heap栈
//    rdd.cache()   //设置默认缓存Memory_Onlyrdd.persist(StorageLevel.MEMORY_ONLY)val value: RDD[(String, Int)] = rdd.map(x=>(x,1))
//    val start: Long = System.currentTimeMillis()
//    println(value.count())
//    val end: Long = System.currentTimeMillis()
//    println("1:"+(end-start))for (i<- 1 to 10){val start: Long = System.currentTimeMillis()println(value.count())val end: Long = System.currentTimeMillis()println(i+":"+(end-start))Thread.sleep(10)if (i>6){rdd.unpersist()}}}
}

(3)检查点:类似于快照

sc.setCheckpointDir("hdfs:/checkpoint0918")
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.checkpoint
rdd.collect //生成快照
rdd.isCheckpointed
rdd.getCheckpointFile

(4)检查点与缓存的区别

        检查点会删除RDD lineage,而缓存不会

        SparkContext被销毁后,检查点数据不会被删除

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object CheckPointDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")val sc: SparkContext = SparkContext.getOrCreate(conf)sc.setCheckpointDir("file://文件路径")val rdd: RDD[Int] = sc.parallelize(1 to 20)rdd.checkpoint()rdd.glom().collect.foreach(x=>println(x.toList))println(rdd.isCheckpointed)println(rdd.getCheckpointFile)}
}

(二)RDD共享变量

(1)广播变量:允许开发者将一个只读变量(Driver)缓存到每个节点(Executor)上,而不是每个任务传递一个副本

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object BroadCastDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")val sc: SparkContext = SparkContext.getOrCreate(conf)var arr = Array("hello","hi","Come here")//一维数组var arr2 = Array((1,"hello"),(2,"hello"),(3,"hi"))//二维数组//广播变量val broadcastVar: Broadcast[Array[String]] = sc.broadcast(arr)val broadcastVar2: Broadcast[Array[(Int, String)]] = sc.broadcast(arr2)//普通RDDval rdd: RDD[(Int, String)] = sc.parallelize(Array((1,"leader"),(2,"teamleader"),(3,"worker")))val rdd2: RDD[(Int, String)] = rdd.mapValues(x => {println("value is:" + x)
//      broadcastVar.value(0) + ":" + xbroadcastVar2.value(1)._2 + ":" + x})rdd2.collect.foreach(println)}
}

(2)累加器:只允许added操作,常用于实现计算

(三)RDD分区设计

(1)分区大小限制为2GB

(2)分区太少

        不利于并发

        更容易数据倾斜影响

        groupBy,reduceByKey,sortByKey等内存压力增大

(3)分区过多

        Shuffle开销越大

        创建任务开销越大

(4)经验

        每个分区大约128MB

        如果分区小于但接近2000,则设置为大于2000

(四)数据倾斜

1、指分区中的数据分配不均匀,数据集中在少数分区中

        严重影响性能

        通常发生在groupBy,jion等之后

2、解决方案

        使用新的Hash值(如对key加盐)重新分区

3、实战

[root@kb23 jars]# pwd

/opt/soft/spark312/examples/jars

[root@kb23 jars]# ls

scopt_2.12-3.7.1.jar  spark-examples_2.12-3.1.2.jar

[root@kb23 sbin]# pwd

/opt/soft/spark312/sbin

[root@kb23 sbin]# start-all.sh

[root@kb23 spark312]# ./bin/spark-submit --master spark://192.168.91.11:7077 --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.12-3.1.2.jar 100

Pi is roughly 3.1407527140752713

重新分区(../spark312目录)

./bin/spark-submit \
--master spark://192.168.78.131:7077 
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-examples_2.12-3.1.2.jar \
1000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/154666.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

零代码编程:用ChatGPT批量调整文件名称中的词汇顺序

文件夹里面很多文件&#xff0c;需要批量挑战标题中的一些词组顺序&#xff1a;“Peppa Pig - Kylie Kangaroo (14 episode _ 4 season) [HD].mp4”这个文件名改成“14 episode _ 4 season _ Peppa Pig - Kylie Kangaroo.mp4”&#xff0c;可以在ChatGPT中输入提示词&#xff1…

NSSCTF[SWPUCTF 2021 新生赛]hardrce(无字母RCE)

代码审计&#xff1a; 使用get方式请求给wllm传参 使用preg_match函数正则匹配过滤掉了一些符号 \t,\r,\n,\,\[,\^,\],\",\-,\$,\*,\?,\<,\>,\,\ 以及 [a-zA-Z] 即所有的大小写字母 如果传入内容满足这些条件则会执行eval函数 URL编码取反绕过正则实现RCE&…

第83步 时间序列建模实战:Catboost回归建模

基于WIN10的64位系统演示 一、写在前面 这一期&#xff0c;我们介绍Catboost回归。 同样&#xff0c;这里使用这个数据&#xff1a; 《PLoS One》2015年一篇题目为《Comparison of Two Hybrid Models for Forecasting the Incidence of Hemorrhagic Fever with Renal Syndr…

修改ubuntu服务器fs文件最大打开数

起因 在对项目进行压测的时候&#xff0c;请求异常 java.net.SocketException: socket closed&#xff0c;查看nginx代理服务器的日志。tail -f -n500 /var/log/nginx/error.log 显示 文件打开数太多socket() failed (24: Too many open files) while connecting to upstream …

数据结构:链表(1)

顺序表的优缺点 缺点&#xff1a; 1.插入数据必须移动其他数据&#xff0c;最坏情况下&#xff0c;就是插入到0位置。时间复杂度O(N) 2.删除数据必须移动其他数据&#xff0c;最坏情况下&#xff0c;就是删除0位置。时间复杂度O(N) 3.扩容之后&#xff0c;有可能会浪费空间…

游戏服务端性能测试实战总结

导语&#xff1a;近期经历了一系列的性能测试&#xff0c;涵盖了Web服务器和游戏服务器的领域。在这篇文章中&#xff0c;我将会对游戏服务端所做的测试进行详细整理和记录。需要注意的是&#xff0c;本文着重于记录&#xff0c;而并非深入的编程讨论。在这里&#xff0c;我将与…

mysql面试题35:MySQL有关权限的表有哪些?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:MySQL有关权限的表有哪些? MySQL中与权限相关的表主要包括以下几个: user表:存储MySQL用户的基本信息,包括用户名、密码等。可以使用以下命令…

sqlalchemy 连接池

报错 sqlalchemy.exc.TimeoutError: QueuePool limit of size 100 overflow 10 reached, connection timed out, timeout 30 (Background on this error at: http://sqlalche.me/e/3o7r) 查看数据库未活动超时时间 show variables like "interactive_timeout";一般…

CAN和CANFD通信介绍

CAN&#xff08;Controller Area Network&#xff0c;控制器局域网&#xff09;是一种串行通信技术&#xff0c;专门用于在汽车电子控制单元&#xff08;ECU&#xff09;之间实现可靠的数据交换。 CAN协议介绍 电子化 汽车近年来的发展呈现出以电子化为主的特点。电子化的主…

虚幻引擎:如何才能对音波(声音资产)进行逻辑设置和操作

案列&#xff1a;调整背景音乐大小 1.创建一个SoundCue 2.进入创建的SoundCue文件 3. 创建音效类和音效类混合 4.进入SoundCue选择需要的音效类 5.然后音效类混合选择相同的音效类 6.然后蓝图中通过节点进行控制音量大小

C#实现五子棋小游戏:简单、有趣的编程项目

目录 引言什么是五子棋游戏规则开发环境准备安装C#开发环境选择合适的集成开发环境(IDE)游戏设计与功能分析游戏界面设计实现棋盘的绘制与操作实现落子功能实现输赢判断说明引言 什么是五子棋 五子棋是一种源于中国的传统棋类游戏,常见于中国、日本、韩国等亚洲国家,是亚洲…

Maven创建父子工程详解

引言 在微服务盛行的当下&#xff0c;我们创建的工程基本都是父子工程&#xff0c;我们通过父工程来引入jar&#xff0c;定义统一的版本号等&#xff0c;这样我们在子工程中就可以直接引用后使用了&#xff0c;而不需要去重复的声明版本号等&#xff0c;这样会更方便对整个项目…

测试老鸟整理,Pytest自动化测试框架的一些关键点,一文贯通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Pytest自动化框架&…

机器学习、深度学习相关的项目集合【自行选择即可】

【基于YOLOv5的瓷砖瑕疵检测系统】 YOLOv5是一种目标检测算法&#xff0c;它是YOLO&#xff08;You Only Look Once&#xff09;系列模型的进化版本。YOLOv5是由Ultralytics开发的&#xff0c;基于一阶段目标检测的概念。其目标是在保持高准确率的同时提高目标检测的速度和效率…

Flink开发环境搭建与提交运行Flink应用程序

Flink开发环境搭建与提交运行Flink应用程序 Flink概述环境 Flink程序开发项目构建添加依赖安装Netcat实现经典的词频统计批处理示例流处理示例 Flink Web UI 命令行提交作业编写Flink程序打包上传Jar提交作业查看任务测试 Web UI提交作业提交作业测试 Flink 概述 Apache Flink…

MinIO的安装与使用

文章目录 1.MINIO是什么&#xff1f;2.MINIO安装3.启动脚本4.打开MINIO页面5.MC命令6.MINIO备份脚本 1.MINIO是什么&#xff1f; MinIO 是一款高性能、分布式的对象存储系统. 它是一款软件产品, 可以100%的运行在标准硬件。即X86等低成本机器也能够很好的运行MinIO。 MinIO与…

alsa音频pcm设备之i2c调试

i2cdetect 列举 I2C bus i2cdetect -l ls /dev/i2c* 列出I2C bus i2c-7 上面连接的所有设备,并得到i2c设备地址 i2cdetect -y 7 发现i2c设备的位置显示为UU或表示设备地址的数值,UU表示设备在driver中被使用. I2cdump i2c设备大量register的值 i2cdump -y 7 0x40 I2cset设置…

ICPC 2019-2020 North-Western Russia Regional Contest

A (codeforces.com) 这题在移动不被挡板挡住以及不超过边界的情况下&#xff0c;每次走的越多那么次数就越少 只要两个每次都走b-a步&#xff08;已经是不被挡板挡住走的最多了&#xff09;&#xff0c;就不用考虑被挡板挡住的情况&#xff0c;只用单独考虑了&#xff0c;如果…

微服务09-Sentinel的入门

文章目录 微服务中的雪崩现象解决办法&#xff1a;1. 超时处理2. 舱壁模式3. 熔断降级4.流量控制 Sentinel1.介绍2.使用操作3.限流规则4.实战&#xff1a;流量监控5.高级选项功能的使用1.关联模式2.链路模式3.总结 流控效果1.预热模式2.排队等待模式3.总结4.热点参数限流5.实战…

【业务功能篇 131】23种设计模式介绍

第一章 设计模式概述 1.1 代码质量好坏如何评价? 要想学习设计模式呢 我们就必须搞清楚设计模式到底在我们的编程过程中起到了怎样的作用,在编程世界中它处在一个什么样的位置,它到底是一种抽象的设计思想,还是一套具体的落地方案. 在学习设计模式之前呢 我们需要了解一下 代…