RDD算子(四)、血缘关系、持久化

1. foreach

分布式遍历每一个元素,调用指定函数

val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。

RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。

2. 闭包检测

class User {var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{var age : Int = 30
}

或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)

case class User {var age : Int = 30
}

如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化

val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

 再看如下案例:

class Search(query:String) {def isMatch(s:String): Boolean {s.contains(query)}def getMatch1(rdd: RDD[String]): RDD[String] {rdd.filter(isMatch)}def getMatch2(rdd: RDD[String]): RDD[String] {rdd.filter(x => x.contains(query))}
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)

此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:

def isMatch(s:String): Boolean {s.contains(this.query)
}

this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:

def getMatch2(rdd: RDD[String]): RDD[String] {val s = queryrdd.filter(x => x.contains(s))
}

3. 依赖关系

 每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:

val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")

查看依赖关系只需直接将代码中的toDebugString改为dependencies即可

val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")

​​​​​​​

 一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。

4. 阶段和任务划分

窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。

RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。

每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。

5. cache和persist

如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。 

为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.cache()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

放在内存中可能不安全,使用persist方法缓存在磁盘中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.persist(StorageLevel.DISK_ONLY)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。

除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。

另外,使用cache方法会改变RDD的血缘关系:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

 

可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

6. 自定义分区器

class MyPartitioner extends Partitioner {override def numPartitions : Int = n  //自定义,可以写死override def getPartition(key : Any) : Int = {key match {case "xxx" => 0case _ => 1}}}

分区器传给RDD:

val partitionRDD = rdd.partitionBy(new MyPartitioner)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/298458.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51之定时器与中断系统

目录 1.定时器与中断系统简介 1.1中断系统 1.2定时器 1.2.1定时器简介 1.2.2定时器大致原理及其配置 1.2.3定时器所需的所有配置总介 2.定时器0实现LED闪烁 3.使用软件生成定时器初始化程序 1.定时器与中断系统简介 1.1中断系统 首先,我们需要来了解一下什么…

Vue项目中引入html页面(vue.js中引入echarts数据大屏html [静态非数据传递!] )

在项目原有vue(例如首页)基础上引入html页面 1、存放位置 vue3原有public文件夹下 我这边是新建一个static文件夹 专门存放要用到的html文件 复制拖拽过来 index为html的首页 2、更改路径引入到vue中 这里用到的是 iframe 方法 不同于vue的 component…

python爬虫获取豆瓣前top250的标题(简单)

今天是简略的一篇,简单小实验 import requests from bs4 import BeautifulSoup# 模拟浏览器的构成(请求头) headers {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Ch…

从零开始搭建后端信息管理系统(新手小白比如)

如果你是新手小白,首先我们要进行一些准备工作,安装一些基础软件, 备注一下:这里安装的vue环境的后台管理系统,不同的后台管理系统,需要安装不同的插件 准备工作: 安装 Visual Studio Code …

如何使用 Midjourney?2024年最新更新

一:基础篇 1:注册 首先,你需要注册一个 Discord 账号,然后加入 Midjourney 的 Discord 服务器。或者去 Midjourney 的官网点击右下角的 Join the Beta: ​ 2:在 Discord 公共服务器里使用 注册并进入到…

Unix 网络编程, Socket 以及bind(), listen(), accept(), connect(), read()write()五大函数简介

Unix网络编程是针对类Unix操作系统(包括Linux、BSD以及其他遵循POSIX标准的操作系统)进行网络通信开发的技术领域。网络编程涉及创建和管理网络连接、交换数据以及处理不同层次网络协议栈上的各种网络事件。在Unix环境中,网络编程通常涉及到以…

kubectl explain资源文档命令

学习并使用了一段时间的kubernetes,发现对k8s还是了解甚少,于是利用上下班通勤的时间又去B站看一些大佬的视频,又来重学巩固一遍知识,并做些记录。 之前在学习使用过程中未成了解过explain这个命令,因为自己部署的版本…

【开发、测试】接口规范与测试

接口测试基础 url 是互联网标准资源地址,称为统一资源定位符 组成:协议,服务器地址,端口号 HTTP协议 HTTP:超文本传输协议,基于请求与响应的应用层协议 作用:规定了客户端和服务器之间的信…

NoSQL之Redis配置

文章目录 NoSQL之Redis配置一、关系数据库和非关系数据库1、关系型数据库2、非关系型数据库3、非关系型数据库产生背景4、关系型数据库和非关系型数据库的区别4.1 数据存储方式不同4.2 扩展方式不同4.3 对事务性的支持不同 5、总结5.1 关系型数据库5.2 非关系型数据库 二、Redi…

Anaconda/Python快速安装jieba 【win/mac】

一、直接上命令 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba 我是在PyCharm里面的终端输进去。 之后就很快速的看到成功的下图。 二、官网 官网下载的速度太慢了——这是官网地址https://pypi.org/project/jieba/#files 点进去之后点击下载&#xff0c…

Star GAN论文解析

论文地址:https://arxiv.org/pdf/1912.01865v1.pdf https://openaccess.thecvf.com/content_cvpr_2018/papers/Choi_StarGAN_Unified_Generative_CVPR_2018_paper.pdf 源码:stargan项目实战及源码解读-CSDN博客 1. 概述 在传统方法中&#x…

练习14 Web [极客大挑战 2019]Upload

phtml格式绕过,burp修改content-type绕过,常见的文件上传存放目录名 题目就叫upload,打开靶机 直接上传一个图片格式的一句话木马,返回如下: 提交练习5和9中的两种可以执行图片格式php代码的文件,修改con…

全面解析找不到msvcr110.dll,无法继续执行代码的解决方法

MSVCR110.dll的丢失可能导致某些应用程序无法启动。当用户试图打开依赖于该特定版本DLL文件的软件时,可能会遭遇“找不到指定模块”的错误提示,使得程序启动进程戛然而止。这种突如其来的故障不仅打断了用户的正常工作流程,也可能导致重要数据…

[中级]软考_软件设计_计算机组成与体系结构_08_输入输出技术

输入输出技术 前言控制方式考点往年真题 前言 输入输出技术就是IO技术 控制方式 程序控制(查询)方式:分为无条件传送和程序查询方式两种。 方法简单,硬件开销小,但I/O能力不高,严重影响CPU的利用率。 程序中断方式&#xff1…

极简云验证 download.php 文件读取漏洞复现

0x01 产品简介 极简云验证是一款开源的网络验证系统,支持多应用卡密生成:卡密生成 单码卡密 次数卡密 会员卡密 积分卡密、卡密管理 卡密长度 卡密封禁 批量生成 批量导出 自定义卡密前缀等;支持多应用多用户管理:应用备注 应用版…

数学建模-最优包衣厚度终点判别法(主成分分析)

💞💞 前言 hello hello~ ,这里是viperrrrrrr~💖💖 ,欢迎大家点赞🥳🥳关注💥💥收藏🌹🌹🌹 💥个人主页&#xff…

armlinux-外部中断

s3c2440的中断框图 如果我们单纯配置一个按键的外部中断,就不存在子中断与优先级的问题。 由于是按键的外部中断,通过引脚的高低电平来触发。所以我们要先配置引脚的功能。 我们使用按键1,终端源为EINT8,对应引脚GPG0 通过用户手…

DBU-Net:用于乳腺超声图像中肿瘤分割的双分支U形网络

DBU-Net:用于乳腺超声图像中肿瘤分割的双分支U形网络 摘要引言材料和方法概述所提出的方法 DBU-Net Dual branch U-Net for tumor segmentation in breast ultrasound images 摘要 乳腺超声医学图像通常具有低成像质量沿着不清楚的目标边界。这些问题使得医生在诊断…

JS继承与原型、原型链

在 JavaScript 中,继承是实现代码复用和构建对象关系的重要概念。本文将讨论原型链继承、构造函数继承以及组合继承等几种常见的继承方式,并提供相应的示例代码,并分析它们的特点、优缺点以及适用场景。 在开始讲解 JavaScript 的继承方式之…

[java]网络编程

网络编程概述 计算机网络: 把分布在不同地理区域的具有独立功能的计算机,通过通信设备与线路连接起来,由功能完善的软件实现资源共享和信息传递的系统。 Java是 Internet 上的语言,它从语言级上提供了对网络应用程序的支持,程序…