2023_Spark_实验十二:Spark高级算子使用

掌握Spark高级算子在代码中的使用
相同点分析
三个函数的共同点,都是Transformation算子。惰性的算子。
不同点分析
map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。
mapPartitions函数是一个partition数据一起处理,也即是说,mapPartitions函数的输入是一个partition的所有数据构成的“迭代器”,然后函数里面可以一条一条的处理,在把所有结果,按迭代器输出。也可以结合yield使用效果更优。
rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。
两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。
mapPartitionsWithIndex函数,其实和mapPartitions函数区别不大,因为mapPartitions背后调的就是mapPartitionsWithIndex函数,只是一个参数被close了。mapPartitionsWithIndex的函数可以或得partition索引号;
假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
mapPartitionsWithIndex则是带上分区下标进行操作。
1、mapPartitionWithIndex

import org.apache.spark.{SparkConf,SparkContext}
object mapPartitionWithIndex {def getPartInfo:(Int,Iterator[Int]) => Iterator[String] = (index:Int,iter:Iterator[Int]) =>{
iter.map(x =>"[ PartId " + index +", elems: " + x + " ]")
}def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddMapPartitionsWithIndexDemo")
val sc = new SparkContext(conf)val rdd1 = sc.parallelize(List(1,2,3,4,5,9,6,7,8),numSlices =3 )
val rdd2 = rdd1.mapPartitionsWithIndex(getPartInfo)
rdd2.collect().foreach(println)}}

2、aggregate
首先我们来创建一个 RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24scala> rdd1.collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)

这个 RDD 仅有 1 个分片,包含 5 个数据: 1, 2, 3, 4, 5 。
 
然后我们来应用一下 aggregate 方法。
 
在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。
scala> 
// Entering paste mode (ctrl-D to finish)
def pfun1(p1: Int, p2: Int): Int = {
p1 * p2
}// Exiting paste mode, now interpreting.
pfun1: (p1: Int, p2: Int)Intscala>scala> 
// Entering paste mode (ctrl-D to finish)
def pfun2(p3: Int, p4: Int): Int = {
p3 + p4
}// Exiting paste mode, now interpreting.
pfun2: (p3: Int, p4: Int)Intscala>
接着是第 2 个函数。就不再解释什么了。
 
然后终于可以开始应用我们的 aggregate 方法了。
scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363scala>
输出结果是 363 !这个结果是怎么算出来的呢?
 
首先我们的 zeroValue 即初值是 3 。然后通过上面小节的介绍,我们知道首先会应用 pfun1 函数,因为我们这个 RDD 只有 1 个分片,所以整个运算过程只会有一次 pfun1 函数调用。它的计算过程如下:
 
首先用初值 3 作为 pfun1 的参数 p1 ,然后再用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后,得到的结果是  3 * 1 * 2 * 3 * 4 * 5 = 360 。这个 pfun1 的应用过程有点像是 “在 RDD 中滑动计算” 。
在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。
 
pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363 

import org.apache.spark.{SparkConf, SparkContext}object RddAggregateDemo {def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddDemos")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("12","23","345",""),numSlices = 2)
//下面代码等价与rdd1.aggregate("")(x+y) => math.min(x.length,y.length)),(x,y)=>x+y))
val result = rdd1.aggregate("")(func1,func2)
println(result)
}
//(x,y)=>math.min(x.Length,y.length)
def func1:(String,String) => String =(x:String,y:String) =>{
println("<x: " + x + ",x.len: " + x.length + ">,<y: " + y +",y.len: " + y.length + ">")
val ret =math.min(x.length,y.length).toString
println("func1 ret:" +ret)
ret
}
//(x+y) => x+y
def func2:(String,String)=>String=(x:String,y:String) =>{
println("========" +(x+y))
x+y
}
}

3、aggregateByKey
通过scala集合以并行化方式创建一个RDD
scala> val pairRdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
pairRdd 这个RDD有两个区,一个区中存放的是:
(“cat”,2),(“cat”,5),(“mouse”,4)
另一个分区中存放的是:
(“cat”,12),(“dog”,12),(“mouse”,2)
然后,执行下面的语句
scala > pairRdd.aggregateByKey(100)(math.max(_ , _), _ + _ ).collect
结果:
res0: Array[(String,Int)] = Array((dog,100),(cat,200),(mouse,200)
下面是以上语句执行的原理详解:
aggregateByKey的意思是:按照key进行聚合
第一步:将每个分区内key相同数据放到一起
分区一
(“cat”,(2,5)),(“mouse”,4)
分区二
(“cat”,12),(“dog”,12),(“mouse”,2)
第二步:局部求最大值
对每个分区应用传入的第一个函数,math.max(_ , _),这个函数的功能是求每个分区中每个key的最大值
这个时候要特别注意,aggregateByKe(100)(math.max(_ , _),_+_)里面的那个100,其实是个初始值
在分区一中求最大值的时候,100会被加到每个key的值中,这个时候每个分区就会变成下面的样子
分区一
(“cat”,(2,5,100)),(“mouse”,(4,100))
然后求最大值后变成:
(“cat”,100), (“mouse”,100)
分区二
(“cat”,(12,100)),(“dog”,(12.100)),(“mouse”,(2,100))
求最大值后变成:
(“cat”,100),(“dog”,100),(“mouse”,100)
第三步:整体聚合
将上一步的结果进一步的合成,这个时候100不会再参与进来
最后结果就是:
(dog,100),(cat,200),(mouse,200)
对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。
import org.apache.spark.{SparkConf, SparkContext}object RddAggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("RddAggregateByKeyDemo")
val sc = new SparkContext(conf)
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),numSlices = 2)
val rdd = pairRDD.aggregateByKey(100)(func2,func2)
val resultArray = rdd.collect
resultArray.foreach(println)
sc.stop()}
def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {
iter.map(x=>"[partID:" + index + ",val: " +x +"]")
}//(x,y)=>math.max(x+y)
def func1:(Int,Int) => Int =(x:Int,y:Int) =>{
println("<x: " + x + "," +",y:"+ y + ">")
val ret =math.max(x,y)
println("func1 max:" +ret)
ret
}
//(x+y) => x+y
def func2:(Int,Int)=>Int=(x:Int,y:Int) =>{
println("========func2 x :" + x + ",y:"+y)
println("========func2 ret =====" +(x+y))
x+y
}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/137577.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

公开研讨会|智能制造中生产管理挑战与解决方案(9月29日)

随着新能源行业的规模化发展&#xff0c;企业增效降本成为迫切需求。生产制造能力成为关键因素&#xff0c;其发展必将是在大规模生产的前提下&#xff0c;追寻极简制造、极限制造以及智能制造。然而在这个发展过程中&#xff0c;企业依旧面临着诸多挑战&#xff1a; 产品设计…

DAZ To UMA⭐一.DAZ使用简介 / 设置DAZ导出的内容 / 获取模型纹理贴图

文章目录 🟥 DAZ快捷键🟧 DAZ界面介绍🟩 设置DAZ导出的内容1️⃣ 找到要导出的参数名称2️⃣ 打开导出面板3️⃣ 设置导出规则举例 : 导出身体Assets🟦 获取模型纹理贴图🟥 DAZ快捷键 移动物体:ctrl+alt+鼠标左键 旋转物体:ctrl+alt+鼠标右键 导入模型:双击左侧模型…

大数据-玩转数据-Flink SQL编程

一、概念 1.1 Apache Flink 两种关系型 API Apache Flink 有两种关系型 API 来做流批统一处理&#xff1a;Table API 和 SQL。 Table API 是用于 Scala 和 Java 语言的查询API&#xff0c;它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。 Flink SQL 是…

23种设计模式汇总详解

设计原则 中文名称英文名称含义解释单一职责原则Single Responsibility Principle(SRP)任何一个软件模块都应该只对某一类行为者负责一个类只干一件事&#xff0c;实现类要单一开闭原则Open-Close Principle(OCP)软件实体&#xff08;类、模块、函数等&#xff09;应该是可以扩…

七绝 . 秋寒

题记 拜读署名“淡定人生D”近日发表在“ 今日头条 ”上的古体诗《七绝 . 凉》&#xff0c;本老朽在由衷赞叹该女子才貌双全之时&#xff0c;也对自己寄居养老的成都崇州街子古镇今日下午的秋寒突至天气&#xff0c;情怀涌动&#xff0c;思绪万千&#xff0c;亦作《七绝 . 秋寒…

图像处理之《基于语义对象轮廓自动生成的生成隐写术》论文精读

一、相关知识 首先我们需要了解传统隐写和生成式隐写的基本过程和区别。传统隐写需要选定一幅封面图像&#xff0c;然后使用某种隐写算法比如LSB、PVD、DCT等对像素进行修改将秘密嵌入到封面图像中得到含密图像&#xff0c;通过信道传输后再利用算法的逆过程提出秘密信息。而生…

IPV6真的神

ipv6 地址短缺的现实&#xff0c;万物互联的未来<全局可达性> 1、路由表更小。地址分配遵循聚类原则&#xff0c;路由表用Entry的路由表示一片子网。 2、更强的组播以及流控制。为媒体服务质量QoS。控制提供了良好的网络平台。 3、DHCPv6,自动配置地址。使得网&#xff0…

算法基础:图

图论 图论〔Graph Theory〕是数学的一个分支。它以图为研究对象。图论中的图是由若干给定的点及连接两点的线所构成的图形&#xff0c;这种图形通常用来描述某些事物之间的某种特定关系&#xff0c;用点代表事物&#xff0c;用连接两点的线表示相应两个事物间具有这种关系。 …

openGauss天津用户组招募正式启动,欢迎报名

openGauss天津用户组招募正式启动&#xff0c;欢迎报名&#xff01; openGauss用户组(openGauss User Group&#xff0c;简称oGUG)是一个让openGauss用户就技术特性、最佳实践、运营进展等方向交流的开放性本地社区。oGUG致力于构建一个开放、多元、包容的 openGauss城市用户交…

Java高级-注解

注解 1.介绍2.元注解3.注解的解析4.注解的应用场景 1.介绍 注解 Annotation 就是Java代码里的特殊标记&#xff0c;作用是让其他程序根据注解信息来决定什么是执行该程序注解&#xff1a;注解可以在类上、构造器上、方法上、成员变量上、参数上等位置 自定义注解 /*** 自定…

拼多多商品详情API接口实时数据,获取到指定商品的详细信息,例如价格、标题、图片、描述、所属类目等信息

1.获取拼多多开发者账号 在使用拼多多 API 之前&#xff0c;需要先注册账号。注册成功后可以获取到相应的key 和Secret 用于调用 API。 2.了解拼多多商品详情 API 拼多多商品详情 API 提供了多种接口可以使用&#xff0c;其中最常用的是 pdd.ddk.goods.detail 接口。此接口可…

CLIP 基础模型:从自然语言监督中学习可转移的视觉模型

一、说明 在本文中&#xff0c;我们将介绍CLIP背后的论文&#xff08;Contrastive Language-I mage Pre-Training&#xff09;。我们将提取关键概念并分解它们以使其易于理解。此外&#xff0c;还对图像和数据图表进行了注释以澄清疑问。 图片来源&#xff1a; 论文&#xff1a…

epoll的并发服务器(TCP服务器与客户端通信)

服务器&#xff1a; #include<myhead.h> #define IP "192.168.250.100" #define PORT 8888 /* typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64; } epoll_data_t;struct epoll_event {uint32_t events; …

x86架构基础汇编知识

​ 通用寄存器 EAX 32位 函数返回值 AX 低16位 AH 高八位 AL 低八位 EBX 32位 ECX 32位 循环次数&#xff0c;this指针 EDX 32位 EBP 32位 栈底寄存器 ESP 32位 栈顶寄存器 ESI 源索引寄存器 EDI 目标索引寄存器 EIP 无法直接通过汇编操作 例子 mov al&#xff0c;0xff …

Windows虚拟机访问网页证书错误问题

问题&#xff1a; 显示证书错误&#xff0c;图片加载不出来&#xff0c;看着很别扭&#xff0c;如下&#xff1a; 方法: 1.先导出可用的证书&#xff1a; 可以将自己正常环境的证书导出来&#xff08;google浏览器为例&#xff09; 浏览器右上角三个竖点——设置——隐私设…

Python计算机二级知识点整理模拟考试

1. 循环队列是队列的一种顺序存储结构&#xff0c;用队尾指针 rear 指向队列中的队尾元素&#xff0c;用排头指针 front 指向排头元素的前一个位置。因此&#xff0c;从排头指针 front 指向的后一个位置直到队尾指针 rear 指向的位置之间所有的元素均为队列中的元素。 2&…

Python灰帽编程——网页信息爬取

文章目录 网页信息爬取1. 相关模块1.1 requests 模块1.1.1 模块中的请求方法1.1.2 请求方法中的参数1.1.3 响应对象中属性 1.2 RE 模块1.2.1 匹配单个字符1.2.2 匹配一组字符1.2.3 其他元字符1.2.4 核心函数 2. 网页信息爬取2.1 获取网页HTML 源代码2.2 提取图片地址2.3 下载图…

项目开发流程

最近在工作中总是觉得项目开发周期不足,和领导以及同时沟通后,发现自己在评估开发时间的时候,评估不准确,缺少了某些环节的时间评估。例如,没有把需求反串讲,方案讨论和制定以及自测的时间评估在内。所以大致理了一下整个项目的开发周期包含的工作量,这些工作量都需要在给出人力…

多线程的学习第二篇

多线程 线程是为了解决并发编程引入的机制. 线程相比于进程来说,更轻量 ~~ 更轻量的体现: 创建线程比创建进程,开销更小销毁线程比销毁进程,开销更小调度线程比调度进程,开销更小 进程是包含线程的. 同一个进程里的若干线程之间,共享着内存资源和文件描述符表 每个线程被独…

ORB-SLAM2_RGBD_DENSE_MAP编译、问题解决、离线加载TUM数据和在线加载D435i相机数据生成稠密地图

文章目录 0 引言1 安装依赖1.1 其他库安装1.2 pcl库安装 2 编译ORB-SLAM2_RGBD_DENSE_MAP2.1 build.sh2.2 build_ros.sh 3 运行ORB-SLAM2_RGBD_DENSE_MAP3.1 build.sh编译版本3.2 build_ros.sh编译版本 0 引言 ORB-SLAM2_RGBD_DENSE_MAP是基于ORB-SLAM2框架的一种RGB-D稠密地图…