大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD

28.RDD_为什么需要RDD

29.RDD_定义

30.RDD_五大特性总述

31.RDD_五大特性1

32.RDD_五大特性2

33.RDD_五大特性3

34.RDD_五大特性4

35.RDD_五大特性5

36.RDD_五大特性总结

37.RDD_创建概述

38.RDD_并行化创建

演示代码:
// 获取当前 RDD 的分区数
@Since ( "1.6.0" )
final def getNumPartitions : Int =
partitions . length
// 显示出 RDD 被分配到不同分区的信息
/**Return an RDD created by coalescing all
elements within each partition into an
array.*/
def glom (): RDD [ Array [ T ]]
1
2
3
4
5
6
package com . itbaizhan . rdd
//1. 导入 SparkConf 类、 SparkContext
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByParallelize {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象。并设置本地运行和程序的
名称
val conf = new
SparkConf (). setMaster ( "local[2]" ). setAppName
( "CreateRdd1" )
//3. 构建 SparkContext 对象
val sc = new SparkContext ( conf )
//4. 通过并行化创建 RDD 对象:将本地集合 -> 分布式的
RDD 对象
1
2
3
4
5
6
7
8
9
10
11
12
79    
//val rdd: RDD[Int] =
sc.parallelize[Int](List(1, 2, 3, 4, 5, 6,
7, 8))
val rdd : RDD [ Int ] =
sc . parallelize ( List ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ,
8 ), 3 )
//5. 输出默认的分区数
//5.1
setMaster("local[*]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//8, 默认当前系统的
CPU
//5.2
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8))
//println(" 默认分区
数: "+rdd.getNumPartitions)//2
//5.3
setMaster("local[2]")&¶llelize(List(1,
2, 3, 4, 5, 6, 7, 8),3)
println ( " 默认分区
数: " + rdd . getNumPartitions ) //3
//6.collect 方法:将 rdd 对象中每个分区的数据,都
发送到 Driver ,形成一个 Array 对象
val array1 : Array [ Int ] = rdd . collect ()
println ( "rdd.collect()=" + array1 . mkString ( ",
" ))
//7. 显示出 rdd 对象中元素被分布到不同分区的数据信
13
14
15
16
17
18
19
20
21
22
23
24
25
80 运行结果:
实时效果反馈
1. 以下关于并行化创建 RDD 的描述错误的是:
A
通过并行化集合创建,将本地集合对象转分布式 RDD
B
parallelize() 方法必须传递两个参数。
C
parallelize 没有给定分区数 , 默认分区数等于执行程序的当前
服务器 CPU 核数。
答案:
val array2 : Array [ Array [ Int ]] =
rdd . glom (). collect ()
println ( "rdd.glom().collect() 的内容是 :" )
/*for(eleArr<- array2){
println(eleArr.mkString(","))
}*/
array2 . foreach ( eleArr => println ( eleArr . mkStr
ing ( "," )))
}
}
26
27
28
29
30
31
32
33
默认分区数: 3
rdd.collect()=1,2,3,4,5,6,7,8
rdd.glom().collect() 的内容是 :
1,2
3,4,5
6,7,8

39.RDD_读取文件创建RDD

40.RDD_读取小文件创建RDD

扩展 wholeTextFiles 适合读取一堆小文件:
//path 指定小文件的路径目录
//minPartitions 最小分区数 可选参数
def wholeTextFiles ( path :
String , minPartitions : Int =
defaultMinPartitions ): RDD [( String , String )]
1
2
3
85 代码演示:
package com . itbaizhan . rdd
//1. 导入类
import org . apache . spark . rdd . RDD
import org . apache . spark .{ SparkConf ,
SparkContext }
object CreateByWholeTextFiles {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "WholeTextFiles" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 读取指定目录下的小文件
val rdd : RDD [( String , String )] =
sc . wholeTextFiles ( "data/tiny_files" )
//(filePath1, " 内容 1"),(filePath2, " 内容
2"),...,(filePathN, " 内容 N")
val tuples : Array [( String , String )] =
rdd . collect ()
tuples . foreach ( ele => println ( ele . _1 , ele . _2 ))
//6. 获取小文件中的内容
val array : Array [ String ] =
rdd . map ( _ . _2 ). collect ()
println ( "---------------------------" )
println ( array . mkString ( "|" ))
//4. 关闭 sc 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
86 运行输出结果 :
RDD_ 算子概述
定义: 分布式集合 RDD 对象的方法被称为算子
算子分类:
Transformation 转换算子
1
Action 行动算子
2
sc . stop ()
}
}
22
23
24
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file1.txt,hello Linux
hello Zookeper
hello Maven
hello hive
hello spark)
(file:/D:/codes/itbaizhan/sparkdemo/data/tin
y_files/file2.txt,Spark Core
Spark RDD
Spark Sql)
----------------
hello Linux
hello Zookeper
hello Maven
hello hive
hello spark|Spark Core
Spark RDD
Spark Sql

41.RDD_算子概述

42.RDD_转换算子map

43.RDD_转换算子flatmap

44.RDD_转换算子reducebykey

45.RDD_转换算子filter

46.RDD_转换算子distinct

47.RDD_转换算子glom

48.RDD_转换算子groupby

object RddGroupBy {
def main ( args : Array [ String ]): Unit = {
//2. 构建 SparkConf 对象,并设置本地运行和程序名
val conf : SparkConf = new
SparkConf (). setMaster ( "local[*]" ). setAppName
( "groupBy" )
//3. 使用 conf 对象构建 SparkContet 对象
val sc = new SparkContext ( conf )
//5. 创建 Rdd
val rdd : RDD [( Char , Int )] =
sc . parallelize ( Array (( 'a' , 1 ), ( 'a' , 2 ),
( 'b' , 1 ), ( 'b' , 2 ), ( 'a' , 3 ), ( 'a' , 4 )))
//6. 通过 groupBy 算子对 rdd 对象中的数据进行分组
//groupBy 插入的函数的用意是指定按照谁进行分组
// 分组后的结果是有二元组组成的 RDD
val gbRdd : RDD [( Char , Iterable [( Char ,
Int )])] = rdd . groupBy ( tupEle => tupEle . _1 )
// 收集到 Driver
val result1 : Array [( Char ,
Iterable [( Char , Int )])] = gbRdd . collect ()
//(a,CompactBuffer((a,1), (a,2), (a,3),
(a,4))),(b,CompactBuffer((b,1), (b,2)))
println ( result1 . mkString ( "," ))
//7. 使用 map 转换算子
//(a,List((a,1), (a,2), (a,3), (a,4))),
(b,List((b,1), (b,2)))
val result2 : Array [( Char , List [( Char ,
Int )])] = gbRdd . map ( tup => ( tup . _1 ,
tup . _2 . toList )). collect ()
println ( result2 . mkString ( "," ))
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
104 实时效果反馈
1. 以下关于
rdd.groupBy(tupEle => tupEle._1)
的描述错误的是:
A
groupBy 传入的函数的意思是 : 通过这个函数 , 确定按照谁来
分组。
B
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
数只能为 2
C
groupBy 方法适用于元素为元祖类型的 RDD ,元祖元素的个
>=2
答案:
1=>B

49.RDD_转换算子groupbyKey

50.RDD_转换算子sortby

51.RDD_转换算子sortbyKey

52.RDD_转换算子union并集

53.RDD_转换算子交集和差集

54.RDD_转换算子关联算子

55.RDD_转换算子partitionBy

56.RDD_转换算子mapPatitions

57.RDD_转换算子sample

58.RDD_行动算子foreachPartition

59.RDD_行动算子foreach

60.RDD_行动算子saveAsTestFile

61.RDD_行动算子countByKey

62.RDD_行动算子reduce

63.RDD_行动算子fold

64.RDD_行动算子first_take_count

65.RDD_行动算子top_takeOrderd

66.RDD_行动算子takeSample

二.内核进阶

67.内核进阶_DAG概述

68.内核进阶_血缘关系

69.内核进阶_宽窄依赖关系

70.内核进阶_stage划分

71.内核进阶_任务调度概述

72.内核进阶_管道计算模式上

73.内核进阶_管道计算模式下

74.内核进阶_cache缓存

75.内核进阶_checkpoint检查点

76.内核进阶_cache和checkpoint区别

77.内核进阶_并行度

78.内核进阶_广播变量

79.内核进阶_累加器一

80.内核进阶_累加器二

81.内核进阶_累加器之重复计算

82.内核进阶_项目实战PVUV需求分析

83.内核进阶_项目实战PV分析

84.内核进阶_项目实战UV分析

85.内核进阶_二次排序实战

86.内核进阶_分组取topN实战

87.内核进阶_卡口统计项目需求分析

88.内核进阶_卡口统计项目统计正常的卡口

89.内核进阶_卡口统计项目TOP5

90.内核进阶_卡口统计项目统计不同区域同时出现的车辆

91.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹一

92.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹二

93.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹三

94.内核进阶_卡口统计项目统计某卡口下通过的车辆轨迹四

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/12819.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【分布式架构理论3】分布式调用(2):API 网关分析

文章目录 一、API 网关的作用1. 业务层面&#xff1a;简化调用复杂性2. 系统层面&#xff1a;屏蔽客户端调用差异3. 其他方面&#xff1a; 二、API 网关的技术原理1. 协议转换2. 链式处理3. 异步请求机制1. Zuul1&#xff1a;同步阻塞处理2. Zuul2&#xff1a;异步非阻塞处理 三…

3.【BUUCTF】XSS-Lab1

进入题目页面如下 好好好&#xff0c;提示点击图片&#xff0c;点进去页面如下&#xff0c;且url中有传参&#xff0c;有注入点 发现题目给出了源码 查看得到本题的源码 分析一下代码 <!DOCTYPE html><!--STATUS OK--> <!-- 声明文档类型为 HTML5&#xff0c;告…

uniapp小程序自定义中间凸起样式底部tabbar

我自己写的自定义的tabbar效果图 废话少说咱们直接上代码&#xff0c;一步一步来 第一步&#xff1a; 找到根目录下的 pages.json 文件&#xff0c;在 tabBar 中把 custom 设置为 true&#xff0c;默认值是 false。list 中设置自定义的相关信息&#xff0c; pagePath&#x…

105,【5】buuctf web [BJDCTF2020]Easy MD5

进入靶场 先输入试试回显 输入的值成了password的内容 查看源码&#xff0c;尝试得到信息 什么也没得到 抓包&#xff0c;看看请求与响应里有什么信息 响应里得到信息 hint: select * from admin where passwordmd5($pass,true) 此时需要绕过MD5&#xff08;&#xff09;函…

JVM监控和管理工具

基础故障处理工具 jps jps(JVM Process Status Tool)&#xff1a;Java虚拟机进程状态工具 功能 1&#xff1a;列出正在运行的虚拟机进程 2&#xff1a;显示虚拟机执行主类(main()方法所在的类) 3&#xff1a;显示进程ID(PID&#xff0c;Process Identifier) 命令格式 jps […

【大模型】AI 辅助编程操作实战使用详解

目录 一、前言 二、AI 编程介绍 2.1 AI 编程是什么 2.1.1 为什么需要AI辅助编程 2.2 AI 编程主要特点 2.3 AI编程底层核心技术 2.4 AI 编程核心应用场景 三、AI 代码辅助编程解决方案 3.1 AI 大模型平台 3.1.1 AI大模型平台代码生成优缺点 3.2 AI 编码插件 3.3 AI 编…

机器学习--2.多元线性回归

多元线性回归 1、基本概念 1.1、连续值 1.2、离散值 1.3、简单线性回归 1.4、最优解 1.5、多元线性回归 2、正规方程 2.1、最小二乘法 2.2、多元一次方程举例 2.3、矩阵转置公式与求导公式 2.4、推导正规方程0的解 2.5、凸函数判定 成年人最大的自律就是&#xff1a…

2025最新软件测试面试大全(附答案+文档)

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 1、问&#xff1a;你在测试中发现了一个bug&#xff0c;但是开发经理认为这不是一个bug&#xff0c;你应该怎样解决? 首先&#xff0c;将问题提交到缺陷管理库里…

手写MVVM框架-环境搭建

项目使用 webpack 进行进行构建&#xff0c;初始化步骤如下: 1.创建npm项目执行npm init 一直下一步就行 2.安装webpack、webpack-cli、webpack-dev-server&#xff0c;html-webpack-plugin npm i -D webpack webpack-cli webpack-dev-server html-webpack-plugin 3.配置webpac…

如何自定义软件安装路径及Scoop包管理器使用全攻略

如何自定义软件安装路径及Scoop包管理器使用全攻略 一、为什么无法通过WingetUI自定义安装路径&#xff1f; 问题背景&#xff1a; WingetUI是Windows包管理器Winget的图形化工具&#xff0c;但无法直接修改软件的默认安装路径。原因如下&#xff1a; Winget设计限制&#xf…

数据结构实战之线性表(三)

目录 1.顺序表释放 2.顺序表增加空间 3.合并顺序表 4.线性表之链表实现 1.项目结构以及初始代码 2.初始化链表(不带头结点) 3.链表尾部插入数据并显示 4.链表头部插入数据 5.初始化链表&#xff08;带头结点&#xff09; 6.带头结点的链表头部插入数据并显示 7.带头结…

5.6 Mybatis代码生成器Mybatis Generator (MBG)实战详解

文章目录 前言一、Mybatis Generator简介二、Maven插件运行方式三、生成配置 generatorConfig.xml MyBatis3Simple风格MyBatis3风格MyBatis3DynamicSql风格 四、Java代码运行方式五、MGB生成全部表六、增加Ext包七、Git提交总结 前言 本文我们主要实战Mybatis官方的代码生成器…

DeepSeek:全栈开发者视角下的AI革命者

目录​​​​​​​ DeepSeek&#xff1a;全栈开发者视角下的AI革命者 写在前面 一、DeepSeek的诞生与定位 二、DeepSeek技术架构的颠覆性突破 1、解构算力霸权&#xff1a;从MoE架构到内存革命 2、多模态扩展的技术纵深 3、算法范式的升维重构 4、重构AI竞争规则 三、…

(篇一)基于PyDracula搭建一个深度学习的界面之添加启动界面

文章目录 基于PyDracula搭建一个深度学习的界面插入一个启动界面1启动页面的资源如何加载与管理&#xff1f;2启动界面的代码如何写&#xff1f; 基于PyDracula搭建一个深度学习的界面 插入一个启动界面 1启动页面的资源如何加载与管理&#xff1f; 1. 问题一 启动界面包含一…

无人机图传模块 wfb-ng openipc-fpv,4G

openipc 的定位是为各种模块提供底层的驱动和linux最小系统&#xff0c;openipc 是采用buildroot系统编译而成&#xff0c;因此二次开发能力有点麻烦。为啥openipc 会用于无人机图传呢&#xff1f;因为openipc可以将现有的网络摄像头ip-camera模块直接利用起来&#xff0c;从而…

拍照对比,X70 PRO与X90 PRO+的细节差异

以下是局部截图&#xff08;上X70P下X90PP&#xff09; 对比1 这里看不出差异。 对比2 X90PP的字明显更清楚。 对比3 中下的字&#xff0c;X90PP显然更清楚。

深度探索 C 语言操作符:从基础到实战应用

前言&#xff1a; 在 C 语言的编程体系中&#xff0c;操作符就像是一个个精密的齿轮&#xff0c;相互配合驱动着程序的运转。熟练掌握操作符的使用&#xff0c;不仅能编写出高效、简洁的代码&#xff0c;还能深入理解程序运行的底层逻辑。接下来&#xff0c;让我们一同深入探索…

从零开始实现一个双向循环链表:C语言实战

文章目录 1链表的再次介绍2为什么选择双向循环链表&#xff1f;3代码实现&#xff1a;从初始化到销毁1. 定义链表节点2. 初始化链表3. 插入和删除节点4. 链表的其他操作5. 打印链表和判断链表是否为空6. 销毁链表 4测试代码5链表种类介绍6链表与顺序表的区别7存储金字塔L0: 寄存…

简单本地部署deepseek(软件版)

Download Ollama on Windows 下载 下载安装 winr 输入 cmd 然后输入ollama -v&#xff0c;出现ollama版本&#xff0c;安装成功 deepseek-r1 选择1.5b 输入 cmd 下面代码 ollama run deepseek-r1:1.5b 删除deepseek的代码如下&#xff1a; ollama rm deepseek-r1:1.5b 使用…

21.2.1 基本操作

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 Excel的基本操作步骤&#xff1a; 1、打开Excel&#xff1a;定义了一个Application对象&#xff1a; Microsoft.Office.Interop.E…