大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
    HDFS(已更完)
    MapReduce(已更完)
    Hive(已更完)
    Flume(已更完)
    Sqoop(已更完)
    Zookeeper(已更完)
    HBase(已更完)
    Redis (已更完)
    Kafka(已更完)
    Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • RDD 的创建
  • 从集合创建RDD、从文件创建RDD、从RDD创建RDD
  • RDD操作算子:Transformation 详细解释
    Transformation

RDD的操作算子分为两类:

  • Transformation,用来对RDD进行转换,这个操作时延迟执行的(或者是Lazy),Transformation,返回一个新的RDD
  • Action,用来触发RDD的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回int、double、集合(不会返回新的RDD)

上节完成了Transformation

Action

Action 用来触发RDD的计算,得到相关的计算结果

  • collect()/collectAsMap()
  • stats/count/mean/stdev/max/min
  • reduce(func)/fold(func)/aggregate(func)
  • first() 返回第一个RDD
  • take(n) 从开头拿一定数量的RDD
  • top(n) 按照将需或指定排序规则 返回前num个元素
  • takeSample 返回采样数据
  • froeach / foreachPartition 与 map、mapPartition类似
  • saveAsTextFile/saveAsSequenceFile/saveAsObjectFile

Key-Value RDD

RDD整体上分为 Value类型 和 Key-Value类型
前面介绍是 Value类型的RDD操作,实际上使用更多的是Key-Value类型的RDD,也称为 PairRDD

  • Value类型的RDD操作基本集中在RDD.scala中
  • Key-Value类型的RDD操作集中在PairRDDFunctions.scala中

创建PairRDD

val arr = (1 to 10).toArray
val arr1 = arr.map(x => (x, x*10, x*100))# rdd1 不是 Pair RDD
val rdd1 = sc.makeRDD(arr1)# rdd2 是 Pair RDD
val arr2 = arr.map(x => (x, (x*10, x*100)))
val rdd2 = sc.makeRDD(arr2)

运行查看如下的结果:
在这里插入图片描述

Transformation操作

mapValues

# mapValues代码更简洁
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.mapValues(x => 1 to x)
b.collect

运行结果如下图:
在这里插入图片描述

flatMapValues

将values压平、拍平

val c = a.flatMapValues(x => 1 to x)
c.collect

执行结果如下图所示:
在这里插入图片描述

groupByKey

键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26),("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25),("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))# 三种写法
rdd.groupByKey().map(x => (x._1, x._2.sum.toDouble/x._2.size)).collectrdd.groupByKey().map{case (k, v) => (k,v.sum.toDouble/v.size)}.collectrdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect

执行结果如下图所示:
在这里插入图片描述

reduceByKey

这种方式也可以
rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).mapValues(x => (x._1.toDouble / x._2)).collect()

foldByKey

rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => {
(x._1+y._1, x._2+y._2)}).mapValues(x=>x._1.toDouble/x._2).collect

执行结果如下图所示:
在这里插入图片描述

sortByKey

根据key来进行排序

val a = sc.parallelize(List("wyp", "iteblog", "com","397090770", "test"))
val b = sc.parallelize(1 to a.count.toInt)
val c = a.zip(b)
c.sortByKey().collect
c.sortByKey(false).collect

执行如下图所示:
在这里插入图片描述

cogroup

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"),(3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"),(6,"冯七")))# join
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect

执行结果如下图所示:
在这里插入图片描述

outerjoin

# 不同的JOIN操作
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect

执行结果如下图所示:
在这里插入图片描述

lookup

rdd1.lookup("1")
rdd1.lookup(3)

执行结果如下图所示:
在这里插入图片描述

文件输入输出

文本文件

  • 数据读取:textFile(String),可指定单个文件,支持通配符
  • 返回值RDD[(String, Sting)],其中Key是文件的名称,Value是文件的内容。
  • 数据保存:saveAsTextFile(String) 指定输出目录

csv文件

读取CSV(Comma-Separated Values)/TSV(Tab-Separaed Values)数据和读取JSON数据相似,都需要先把文件当做普通文件来读取数据,然后通过将每一行进行解析实现对CSV的提取。
CSV/TSV 数据的输出也是需要将结构化RDD通过相关库转换成字符串的RDD,然后使用Spark的文本文件API写出去

JSON文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析
JSON数据的输出主要通过在输出之前将由结构化数据组成的RDD转换为字符串RDD,然后使用Spark的文本文件API写出去
JSON文件的处理使用 SparkSQL 最为简洁。

SequenceFile

SequenceFile文件是Hadoop用存储二进制形式的Key-Value而设计的一种平面文件(FlatFile)。
Spark有专门用来读取SequenceFile的接口,在SparkContext中,可以调用:SequenceFile[keyClass, valueClass];
调用 saveAsSequenceFile(path)保存PairRDD,系统将键和值能够自动转换为Writable类型。

对象文件

对象文件是序列化后保存的文件,采用Java的序列化机制。
通过 objectFile 接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出,因为序列化所以要指定类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/404587.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【联想电脑】:使用拓展坞后转接HDMI,无法识别显示屏

项目场景: 作为一个嵌入式软件开发者,有两个外接屏幕,不足为奇。 但是在今天的使用电脑过程中,出现了接了一个拓展坞上面有HDMI接口,但是HDMI接口接上外接显示屏的时候电脑无法识别到,导致只有电脑直连的HD…

家用小型洗衣机哪款好用?精选内衣洗衣机多维度测评盘点

对于很多都市生活的小伙伴来说,有一台小巧玲珑、功能齐全的内衣洗衣机则成了我们的救星。它不仅方便快捷,还能保持衣物清洁和卫生。然而,市面上的内衣洗衣机品牌五花八门。哪一个最好用、质量又靠谱呢?为了给大家提供更准确的选购…

【FPGA数字信号处理】- 数字信号处理如何入门?

​数字信号处理(Digital Signal Processing,简称DSP)是一种利用计算机或专用数字硬件对信号进行处理的技术,在通信、音频、视频、雷达等领域发挥着越来越重要的作用,也是FPGA主要应用领域之一。 本文将详细介绍数字信…

YOLOv5改进 | 融合改进 | C3融合ContextGuided增强分割效果

秋招面试专栏推荐 :深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 专栏目录: 《YOLOv5入门 改…

模糊控制——创建与添加自定义的隶属函数

关键字:模糊控制;隶属函数;Matlab。 系列文章目录 模糊控制——(一)理论基础 模糊控制——(二)设计流程 模糊控制——(三)模糊洗衣机 模糊控制——(四&#…

SQL— DDL语句学习【后端 9】

SQL— DDL语句学习 在数据管理的广阔领域中,SQL(Structured Query Language)作为操作关系型数据库的编程语言,扮演着举足轻重的角色。它不仅定义了操作所有关系型数据库的统一标准,还为我们提供了强大的工具来管理、查…

jenkins最佳实践(二):Pipeline流水线部署springCloud微服务项目

各位小伙伴们大家好呀,我是小金,本篇文章我们将介绍如何使用Pipeline流水线部署我们自己的微服务项目,之前没怎么搞过部署相关的,以至于构建流水线的过程中中也遇到了很多自己以前没有考虑过的问题,特写此篇&#xff0…

【Redis】数据类型详解及其应用场景

目录 Redis 常⻅数据类型预备知识基本全局命令小结 数据结构和内部编码单线程架构引出单线程模型为什么单线程还能这么快 Redis 常⻅数据类型 Redis 提供了 5 种数据结构,理解每种数据结构的特点对于 Redis 开发运维⾮常重要,同时掌握每种数据结构的常⻅…

Postman接口测试项目实战

第 1 章 什么是接口测试 1.1、为什么要进行接口测试 目前除了特别Low的公司外,开发都是前后端分离的,就是说前端有前端的工程师进行编码,后端有后端的工程师进行编码,前后端进行数据基本都是通过接口进行交互的。 1.2、接口测…

zookeeper源码分析之事务请求处理

一.参考 zookeeper启动和选举的源码分析参考之前的帖子. 二.源码 1.职责链模式. 每次经过的processor都是异步处理,加入当前processor的队列,然后新的线程从队列里面取出数据处理. PrepRequestProcessor 检查ACL权限,创建ChangeRecord. SyncRequest…

ArcGIS空间自相关Moran‘s I——探究人口空间格局的20年变迁

先了解什么是莫兰指数? 莫兰指数(Morans I)是一种用于衡量空间自相关性的统计量,即它可以帮助我们了解一个地理区域内的观测值是否彼此相关以及这种相关性的强度和方向。 莫兰指数分类: 全局莫兰指数 (Global Moran…

聊聊如何利用ingress-nginx实现应用层容灾

前言 容灾是一种主动的风险管理策略,旨在通过构建和维护异地的冗余系统,确保在面临灾难性事件时,关键业务能够持续运作,数据能够得到保护,从而最大限度地减少对组织运营的影响和潜在经济损失。因此容灾的重要性不言而…

zabbix实战-磁盘空间告警

1.创建监控项 选择&#xff1a;键值&#xff1a;vfs.fs.size[fs,<mode>] 。 直接写 vfs.fs.size[fs,<mode>]是不出数据的。我们要写具体的值 &#xff1a;vfs.fs.size[/,free] &#xff0c;这个表示查看根的剩余空间。 2.创建图形 为磁盘剩余空间监控项创建图形&am…

redis 遍渐进式历

1.scan cursor [match pattern] [coutn] [type]:以渐进式的方式进行建的遍历 cursor:是光标 指向当前遍历的位置 设置成0表示当前从0开始获取 math parttern &#xff1a;和keys命令一样的 keys * count: 限制一次遍历能够获取到多少个 元素默认是10 type :这次遍历只想获取…

[Python学习日记-10] Python中的流程控制(if...else...)

[Python学习日记-10] Python中的流程控制&#xff08;if...else...&#xff09; 简介 缩进 单分支 双分支 多分支 练习 简介 假如把写程序比做走路&#xff0c;那我们到现在为止&#xff0c;一直走的都是直路&#xff0c;还没遇到过分叉口&#xff0c;想象现实中&#x…

【python】Python实现XGBoost算法的详细理论讲解与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

Python编码系列—Python数据可视化:Matplotlib与Seaborn的实战应用

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

Ps:首选项 - 单位与标尺

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性&#xff0c;尤其是在跨设备或跨平台…

专题--自底向上的计算机网络(物理层)

目录 计算机网络概述 物理层 数据链路层 网络层 运输层 应用层 网络安全 详细见http://t.csdnimg.cn/MY5aI http://t.csdnimg.cn/8Ipa4 http://t.csdnimg.cn/uvMxS 信道复用技术不仅在物理层有运用&#xff0c;在数据链路层也确实需要信道复用技术。‌ 数据链路层是…

第N8周:使用Word2vec实现文本分类

本文为365天深度学习训练营 中的学习记录博客原作者&#xff1a;K同学啊 一、数据预处理 任务说明: 本次将加入Word2vec使用PyTorch实现中文文本分类&#xff0c;Word2Vec 则是其中的一种词嵌入方法&#xff0c;是一种用于生成词向量的浅层神经网络模型&#xff0c;由Tomas M…