摸鱼大数据——Spark Core——RDD的基本介绍和如何构建RDD

1、什么是RDD

RDD:英文全称Resilient Distributed Dataset,叫做弹性分布式数据集,代表一个不可变、可分区、里面的元素可并行计算的分布式的抽象的数据集合。

  • Resilient弹性:RDD的数据可以存储在内存或者磁盘当中,RDD的数据可以分区

  • Distributed分布式:RDD的数据可以分布式存储,可以进行并行计算

  • Dataset数据集:一个用于存放数据的集合

2、RDD的五大特性

1、(必须的)RDD是由一系列分区组成的
2、(必须的)对RDD做计算,相当于对RDD的每个分区做计算
3、(必须的)RDD之间存在着依赖关系(宽依赖和窄依赖)
4、(可选的)对于KV类型的RDD,默认操作的是k,当然我们可以进行自定义分区方案
5、(可选的)移动数据不如移动计算,让计算程序离数据越近越好

3、RDD的五大特点

1、分区:RDD逻辑上是分区的,仅仅是定义分区的规则,并不是直接对数据进行分区操作,因为RDD本身不存储数据。
2、只读:RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
3、依赖:RDD之间存在着依赖关系(宽依赖和窄依赖)
4、cache缓存:如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,后续每次直接从缓存获取即可
5、checkpoint检查点:与缓存类似的,都是可以将中间某一个RDD的结果保存起来,只不过checkpoint支持真正持久化保存

如何构建RDD

构建RDD对象的方式主要有两种:

1、通过 parallelize(data): 通过自定义列表的方式初始化RDD对象。(一般用于测试)
2、通过 textFile(data): 通过读取外部文件的方式来初始化RDD对象,实际工作中经常使用。

1、并行化本地集合方式

黑窗口中实现:

开发工具实现:

# 导包
import os
from pyspark import SparkConf, SparkContext
​
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
​
# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[3]')sc = SparkContext(conf=conf)
​# 2.数据输入# 3.数据处理(切分,转换,分组聚合)d = [1, 2, 3, 4]rdd = sc.parallelize(d,numSlices=1)# 4.数据输出print(rdd.collect())# 6.分区演示# 获取分区数print(rdd.getNumPartitions())# 获取各个分区数据print(rdd.glom().collect())
​# 5.关闭资源sc.stop()
​

相关的API:

# parallelize(参数1,参数2)
使用本地数据构建RDD。参数1:本地数据列表;参数2:可选的,表示有多少个分区
​
# getNumPartitions
查看RDD的分区数量
​
# glom
查看每个分区的数据内容

修改分区数,效果:

1- 默认和setMaster('local[num]')中的num数量有关。如果是*,就是和机器的CPU核数相同。另外可以指定具体的数字,数字是多少,那么分区数就是多少
​
2- parallelize()中第二个参数numSlices可以手动指定RDD的分区数。如果同时设置了local和numSlices,numSlices的优先级高一些

2、读取外部数据源方式

TextFile API的方式实现:

# 导包
import os
from pyspark import SparkConf, SparkContext
​
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
​
# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[3]')sc = SparkContext(conf=conf)
​# 2.数据输入# 3.数据处理(切分,转换,分组聚合)# 注意: 如果要提交到yarn,文件建议使用hdfs路径rdd = sc.textFile('hdfs://node1:8020/source/c1.txt',minPartitions=1)# 4.数据输出print(rdd.collect())# 6.分区演示# 获取分区数print(rdd.getNumPartitions())# 获取各个分区数据print(rdd.glom().collect())
​# 5.关闭资源sc.stop()
​

修改分区数,效果:

到底有多少个分区,一切以getNumPartitions结果为准
​
分区数据量,当调大local[num]中num的值时候,不生效;调小的时候生效
​
同时也受minPartitions影响

3、处理小文件的操作

wholeTextFiles: wholeTextFiles既能直接读取文件,也能读取一个目录下的所有小文件
# 导包
import os
from pyspark import SparkConf, SparkContext# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[10]')sc = SparkContext(conf=conf)# 2.数据输入# 3.数据处理(切分,转换,分组聚合)# 注意: 如果要提交到yarn,文件建议使用hdfs路径# 注意: wholeTextFiles既能直接读取文件,也能读取一个目录下的所有小文件rdd = sc.wholeTextFiles('hdfs://node1:8020/source/c1.txt')# 4.数据输出print(rdd.collect())# 6.分区演示# 获取分区数print(rdd.getNumPartitions())# 获取各个分区数据print(rdd.glom().collect())# 5.关闭资源sc.stop()

修改分区数,效果:

wholeTextFiles: 读取小文件。1-支持本地文件系统和HDFS文件系统。参数minPartitions指定最小的分区数。2-通过该方式读取文件,会尽可能使用少的分区数,可能会将多个小文件的数据放到同一个分区中进行处理。3-一个文件要完整的存放在一个元组中,也就是不能将一个文件分成多个进行读取。文件是不可分割的。4-RDD分区数量既受到minPartitions参数的影响,同时受到小文件个数的影响

4、RDD分区数量如何确定

1- RDD的分区数量,一般设置为机器CPU核数的2-3倍。为了充分利用服务器的硬件资源

2- RDD的分区数据量受到多个因素的影响,例如:机器CPU的核数、调用的算子、算子中参数的设置、集群的类型等。RDD具体有多少个分区,直接通过getNumPartitions查看

3- 当初始化SparkContext对象的时候,其实就确定了一个参数spark.default.parallelism,默认为CPU的核数。如果是本地集群,就取决于local[num]中设置的数字大小;如果是集群,默认至少有2个分区

4- 通过parallelize来构建RDD,如果没有指定分区数,默认就取spark.default.parallelism参数值;如果指定了分区数,也就是numSlices参数,那么numSlices的优先级会更高一些,最终RDD的分区数取该参数的值。

5- 通过textFile来构建RDD
    5.1- 首先确认defaultMinPartition参数的值。该参数的值,如果没有指定textFile的minPartition参数,那么就根据公式min(spark.default.parallelism,2);如果有指定textFile的minPartition参数,那么就取设置的值
    5.2- 再根据读取文件所在的文件系统的不同,来决定最终RDD的分区数:
        5.2.1- 本地文件系统: RDD分区数 = max(本地文件分片数,defaultMinPartition)
        5.2.2- HDFS文件系统: RDD分区数 = max(文件block块的数量,defaultMinPartition)

常规处理小文件的办法: 1- 大数据框架提供的现有的工具或者命令 1.1- 合并hdfs中多个小文件到linux本地: hadoop fs -getmerge 小文件路径 linux输出路径/文件名.后缀名

举例: [root@node1 ~]# hadoop fs -getmerge /data/*.txt /merged_file.txt

1.2- 归档hdfs中多个小文件到hdfs: hadoop archive -archiveName 归档名.har -p 小文件路径 hdfs输出路径

举例: [root@node1 ~]# hadoop archive -archiveName merged_file.har -p /data/ /

2- 可以通过编写自定义的代码,将小文件读取进来,在代码中输出的时候,输出形成大的文件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/367981.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java常用类(3)

目录 一. 正则表达式 二. Math类 三. Random类 四. Date类 五. Calendar类 六. SimpDateFormate类 七. BigInteger类 八. BigDecimal类 一. 正则表达式 正则表达式(Regular Expression)就是用一些特殊的符号去匹配一个字符串是否符合规则,利用String类中的matches()方…

[Leetcode 136][Easy]-只出现一次的数字

目录 题目描述 具体思路 题目描述 原题链接 具体思路 ①首先看到数组中重复的数字,想到快慢指针,但是数组的元素是乱序的不好求。因此先对数组排序。使用了STL库的sort函数,时间复杂度O(nlogn)不符合题目要求,空间复杂度O(1)。…

KEYSIGHT是德科技 E5063A ENA 系列网络分析仪

E5063A ENA 矢量网络分析仪 18GHz 2端口 降低无源射频元器件的测试成本 Keysight E5063A ENA 是一款经济适用的台式矢量网络分析仪,可用于测试简单的无源元器件,例如频率最高达到 18 GHz 的天线、滤波器、电缆或连接器。 作为业界闻名的 ENA 系列…

打卡第一天

今天是参加算法训练营的第一天,希望我能把这个训练营坚持下来,希望我的算法编程题的能力有所提升,不再面试挂了,面试总是挂编程题,记录我leetcode刷题数量: 希望我通过这个训练营能够实现两份工作的无缝衔接…

用720云搭建数字孪生VR智慧安防系统,赋能安防升级!

“安全防范"一直是我国城镇化发展进程中重点关注的工作板块,随着时代发展需求与科技的日新月异,安防行业正在积极融合VR3D数字孪生技术,升级安防数字基础设施和安防产品服务创新。 今年2月,《数字中国建设整体布局规划》的出…

docker容器间网络仿真工具-pumba

docker-tc&pumba docker-tc:docker-tc项目仓库 pumba:pumba项目仓库 这两个项目理论上都可以实现对容器间的网络环境进行各种模拟干预,包括延迟,丢包,带宽限制等。 但是我在实际使用时,发现docker-tc这个工具在进行网络进行模…

如何使用python网络爬虫批量获取公共资源数据教程?

原文链接:如何使用python网络爬虫批量获取公共资源数据教程?https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247608240&idx4&snef281f66727afabfaae2066c6e92f792&chksmfa826657cdf5ef41571115328a09b9d34367d8b11415d5a5781dc4c…

84 柱状图中最大的矩形

题目 给定 n 个非负整数,用来表示柱状图中各个柱子的高度。每个柱子彼此相邻,且宽度为 1 。 求在该柱状图中,能够勾勒出来的矩形的最大面积。 示例 输入:heights [2,1,5,6,2,3] 输出:10 解释:最大的矩…

从全连接到卷积

一、全连接到卷积 1、卷积具有两个原则: 平移不变性:无论作用在哪个部分,它都要有相同的作用,而不会随着位置的改变而改变 局部性:卷积核作用处,作用域应该是核作用点的周围一小部分而不作用于更大的部分 …

【无需公网IP】在树莓派上搭建Web站点

目录 1.概述 2.使用 Raspberry Pi Imager 安装 Raspberry Pi OS 3.设置 Apache Web 服务器 3.1测试 web 站点 3.2安装静态样例站点 3.3将web站点发布到公网 3.4安装 Cpolar 3.5cpolar进行token认证 3.6生成cpolar随机域名网址 3.7生成cpolar二级子域名 3.8将参数保存…

linux 下,Java由Java8升级到Java11,Java不更新版本号

在ES对接过程,springboot3进行对接,需要将Java升级到11版本。首先下载安装选好的11版本Java,在linux下解压后,配置/etc/profile export JAVA_HOME/root/SJL/jdk-11.0.22 然后保存,执行文件source /etc/profile&#…

Linux多进程和多线程(一)-进程的概念和创建

进程 进程的概念进程的特点如下进程和程序的区别LINUX进程管理 getpid()getppid() 进程的地址空间虚拟地址和物理地址进程状态管理进程相关命令 ps toppstreekill 进程的创建 并发和并行fork() 父子进程执行不同的任务创建多个进程 进程的退出 exit()和_exit() exit()函数让当…

【机器学习】机器学习与电商推荐系统的融合应用与性能优化新探索

文章目录 引言第一章:机器学习在电商推荐系统中的应用1.1 数据预处理1.1.1 数据清洗1.1.2 数据归一化1.1.3 特征工程 1.2 模型选择1.2.1 协同过滤1.2.2 矩阵分解1.2.3 基于内容的推荐1.2.4 混合推荐 1.3 模型训练1.3.1 梯度下降1.3.2 随机梯度下降1.3.3 Adam优化器 …

根据后端返回的省市区重新封装树结构(省市区通过children表示)

对比图(截取部分): 注:先看分步,最后会附上完整代码(如果有用,可以给小编点个赞吗?十分感谢) 1.首先将前端返回相同的省份只展示一次 const obj {}; let keyList []r…

最新简约美观的网址网站引导页HTML源码

最新简约美观的网址网站引导页HTML源码 带一言 随机大图 源码下载:https://download.csdn.net/download/m0_66047725/89487135 更多资源下载:关注我。

分布式数据库HBase:从零开始了解列式存储

在接触过大量的传统关系型数据库后你可能会有一些新的问题: 无法整理成表格的海量数据该如何储存? 在数据非常稀疏的情况下也必须将数据存储成关系型数据库吗? 除了关系型数据库我们是否还有别的选择以应对Web2.0时代的海量数据? 如果你也曾经想到过这些问题, 那么HBase将是…

【教程】lighttpd配置端口反向代理

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 1、修改配置文件: sudo vim /etc/lighttpd/lighttpd.conf2、先添加mod_proxy: 3、然后添加端口映射: 4、保存&…

24位DAC转换的FPGA设计及将其封装成自定义IP核的方法

在vivado设计中,为了方便的使用Block Desgin进行设计,可以使用vivado软件把自己编写的代码封装成IP核,封装后的IP核和原来的代码具有相同的功能。本文以实现24位DA转换(含并串转换,使用的数模转换器为CL4660)为例,介绍VIVADO封装IP核的方法及调用方法,以及DAC转换的详细…

SpringBoot 通过Knife4j集成API文档 在线调试

介绍 Knife4j 是一款基于 Swagger 构建的增强型 API 文档生成工具&#xff0c;它提供了更多的定制化功能和界面优化&#xff0c;使得生成的 API 文档更加美观和易用。它可以帮助开发者快速生成和管理 API 文档&#xff0c;支持在线调试和交互。 依赖 <!--knife4j--> &…

不用编码构建本地RAG聊天机器人

前言 还记得开发一个智能聊天机器人需要花费数月编码的时间吗&#xff1f; 像 LangChain 这样的框架确实简化了开发&#xff0c;但是对于非程序员来说&#xff0c;数百行代码仍然是一个障碍。⁤ 就在那时&#xff0c;我发现了“Lang Flow”&#xff0c;这是一个基于 Python 版…