Spark-Scala语言实战(7)

在之前的文章中,我们学习了如何在IDEA中导入jars包,并做了一道例题,了解了RDD。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(6)-CSDN博客文章浏览阅读695次,点赞15次,收藏24次。今天我会给大家带来如何在IDEA中导入jars包,以及使用SparkRDD,并正确使用它们同时也会给大家讲解一道实训题例。希望在本篇文章中,大家有所收获。也欢迎朋友们到评论区下一起交流学习,共同进步。https://blog.csdn.net/qq_49513817/article/details/137121524?spm=1001.2014.3001.5502

今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的map,sortby,collect三种方法。

目录

一、知识回顾

二、RDD方法

1.map

2.sortby

3.collect

拓展-RDD和DStream

1.RDD和DStream的区别

2.RDD和DStream的联系


一、知识回顾

导入jars包的过程在上一篇文章中以及讲解的很清楚了,图文一步一步带着做。

主要就是进入Libraries 添加java,然后选择spark的jars文件夹即可

如果还有不懂的朋友可以直接评论问我。

在就是文件的这几行代码

import org.apache.spark.{SparkConf, SparkContext}val conf=new SparkConf().setMaster("local").setAppName("123456")val sc=new SparkContext(conf)

这是配置与方法,记住它们的作用。

现在,开始今天的学习吧

二、RDD方法

1.map

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD
  • map()方法是转换操作,不会立即进行计算。
  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

例:

import org.apache.spark.{SparkConf, SparkContext}  // 定义一个名为p1的Scala对象  
object p1 {  // 定义main方法,作为程序的入口点  def main(args: Array[String]): Unit = {  // 创建一个Spark配置对象,并设置运行模式为"local"(本地模式),应用程序名称为"p2"  val conf = new SparkConf().setMaster("local").setAppName("p2")  // 使用Spark配置对象创建一个SparkContext对象,SparkContext是Spark功能的入口点  val sc = new SparkContext(conf)  // 创建一个包含整数的列表,并使用parallelize方法将其转换为RDD  val ppp = sc.parallelize(List(1, 2, 3, 4, 5))  // 使用map操作将RDD中的每个元素乘以2,并返回一个新的RDD  val ppppp = ppp.map(x => x * 2)  //oreach方法遍历并打印每个元素  ppppp.collect().foreach(println)  }  
}

可以看到我们输出的在原列表上*2,达到了代码预期效果

2.sortby

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
  • 1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  • 2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false
  • 3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

例:

import org.apache.spark.{SparkConf, SparkContext}  object p1 {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local").setAppName("p2")  // 使用配置好的conf对象创建一个SparkContext对象sc。   val sc = new SparkContext(conf)  // 使用SparkContext的parallelize方法将包含整数的序列转换成一个RDD。  // 这个RDD现在可以在Spark上并行处理。  val ppp = sc.parallelize(Seq(5, 1, 9, 3, 7))  // 对ppp RDD中的元素进行排序。  // 使用sortBy方法,并传递一个函数x => x作为参数,表示按照元素本身的值进行排序(升序)。  val pppp = ppp.sortBy(x => x)   // 这将返回一个包含RDD所有元素的数组,存储在ppppp中。  val ppppp = pppp.collect()  // 使用foreach方法遍历数组ppppp中的每个元素,并使用println函数打印它们。  ppppp.foreach(println)  }  
}

看下输出可以看到我们的元素已经排序了

3.collect

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

例:

import org.apache.spark.{SparkConf, SparkContext}object p1 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("p2")val sc=new SparkContext(conf)val pp = sc.parallelize(Seq(1, 2, 3, 4, 5))val ppp = pp.collect()ppp.foreach(println)}
}

collect的作用是将RDD中的数据收集到驱动程序中,所以这里运行看不出区别。

拓展-RDD和DStream

在上一篇文章中,我们了解到了RDD,那么DStream是什么呢,我们先来了解一下:

DStream(离散流)是Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream的内部实际上是一系列持续不断产生的RDD,每个RDD包含特定时间间隔的数据。DStream的创建可以通过输入数据源如Kafka、Flume,或者通过对其他DStream应用高阶函数如map、reduce、join、window来实现。

1.RDD和DStream的区别

RDDDStream
定义弹性分布式数据集,是Spark中最基本的数据处理模型。离散流,是Spark Streaming提供的一种高级抽象,代表一个持续不断的数据流。
数据结构静态的、不可变的数据集,可以划分为多个分区。动态的、连续的数据流,内部由一系列RDD组成。
数据处理方式批处理,适用于静态数据的处理和分析。流处理,适用于实时数据流的处理和分析。
时间维度无特定的时间维度,主要关注数据的分区和处理。具有时间维度,每个RDD代表一段时间内的数据。
操作方式对整个RDD进行操作,结果生成新的RDD。对DStream进行操作,结果生成新的DStream,底层转换为RDD操作。
应用场景大规模数据的批处理任务,如机器学习、数据挖掘等。实时数据流处理任务,如日志分析、实时监控等。
容错性具有容错性,数据丢失可以自动恢复。继承了RDD的容错性特点。
与Spark的关系Spark的核心组件,用于构建各种数据处理和分析任务。Spark Streaming的核心组件,用于处理实时数据流。

2.RDD和DStream的联系

RDDDStream
基础构建单元RDD是Spark的基本数据处理单元。DStream基于RDD构建,每个时间间隔内的数据对应一个RDD。
计算模型RDD支持分布式计算模型,数据被划分为多个分区进行并行处理。DStream继承了RDD的计算模型,对流数据进行分布式处理。
容错性RDD具有容错性,可以自动恢复丢失的数据。DStream同样具有容错性,因为它基于RDD构建。
操作方式RDD提供了一系列转换操作(如map、reduce)和动作操作(如collect、save)。DStream也提供了类似的操作,这些操作最终会转换为底层RDD的操作。
数据处理能力RDD适用于批处理任务,可以对大规模数据集进行处理和分析。DStream适用于实时流处理任务,可以对连续的数据流进行实时分析和处理。
底层实现DStream内部实际上是由一系列RDD组成的,每个RDD代表一段时间内的数据。DStream的操作最终会转换为RDD的操作,利用RDD的分布式计算能力。
扩展性RDD可以通过自定义操作进行扩展,支持更多的数据处理场景。DStream同样可以通过自定义操作和转换函数进行扩展,以满足特定的实时处理需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/291217.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows下QT如何集成OpenCV

说明 我在windows下使用QT Creator12创建的CMake项目,需要OpenCV的一些功能。由于安装的时候我选择的QT组件都是MInGW的,所以无法使用VS studio版本的dll库。 为什么vs的版本不能用 我安装QT选择的是MinGW版本,本地编译QT工程只能选择MinG…

Triton推理服务器部署YOLOv8实战

课程链接:Triton推理服务器部署YOLOv8实战_在线视频教程-CSDN程序员研修院 Triton Inference Server(Triton 推理服务器)是一个高性能、灵活、可扩展的推理服务器,支持多种机器学习框架(PyTorch、ONNX等)和…

服务器被CC攻击之后怎么办?

1.取消域名绑定取消域名绑定后Web服务器的CPU能够马上恢复正常状态,通过IP进行访问连接一切正常。但是不足之处也很明显,取消或者更改域名对于别人的访问带来了不变,另外,对于针对IP的CC攻击它是无效的,就算更换域名攻…

计算机毕业设计Python+Spark知识图谱高考志愿推荐系统 高考数据分析 高考可视化 高考大数据 大数据毕业设计 机器学习 深度学习 人工智能

学院(全称): 专业(全称): 姓名 学号 年级 班级 设计(论文) 题目 基于Spark的高考志愿推荐系统设计与实现 指导教师姓名 职称 拟…

实时语音识别(Python+HTML实战)

项目下载地址:FunASR 1 安装库文件 项目提示所需要下载的库文件:pip install -U funasr 和 pip install modelscope 运行过程中,我发现还需要下载以下库文件才能正常运行: 下载:pip install websockets,pi…

Excel数据分析-----快捷键

智能拆分 1、先将第一行的数据手动拆分出来。 2、在拆分出来的列上面按住ctrlE,就可以自动向下填充了 自动生成下拉列表 alt向下箭头 插入批注 shiftf2 如何在批注中进行查找 ctrlf打开查找窗口。 快速删除批注 ctrlG 右键删除批注 快速美化表格 ctr…

常见的Nginx+Redis+MQ+DB架构设计

三高,复杂的架构 SQRS CAP 缓存,限流 【Redis,缓存】 cache-aside 缓存cache:数据源的副本 store 1. Read/Write Through Pattern 读写穿透模式 redis:放当前在线用户,热点数据

Kubernetes-running app on kube

Docker 安装Docker 首先,您需要在Linux机器上安装Docker。如果您不使用Linux,则需要启动一个Linux虚拟机(VM)并在该虚拟机中运行Docker。如果你使用的是Mac或Windows系统,并按照指令安装Docker, Docker将为你建立一个虚拟机,并在…

使用mysql官网软件包安装mysql

确定你的操作系统,我的是Centos myqsl 所有安装包的地址:https://repo.mysql.com/yum/ 如果你是使用rpm安装你可以到对应的版本里面找到对应的包。 mysql 发行包的地址:http://repo.mysql.com/ 在这里你可以找到对应的发布包安装。 这里使用y…

AJAX(一):初识AJAX、http协议、配置环境、发送AJAX请求、请求时的问题

一、什么是AJAX 1.AJAX 就是异步的JS和XML。通过AJAX 可以在浏览器中向服务器发送异步请求,最大的优势:无刷新获取数据。AJAX 不是新的编程语言,而是一种将现有的标准组合在一起使用的新方式。 2.XML 可扩展标记语言。XML被设计用来传输和…

爬虫逆向实战(38)-某空气质量平台(反调试,AES,DES,MD5)

一、数据接口分析 主页地址:某空气质量平台 1、抓包 (1) 反调试 该网站对鼠标右击以及F12进行了监听并拦截 虽然该网站无法打开Chrome控制台,导致我们无法抓包,但是道高一尺魔高一丈。既然我们无法在打开该网站的时候打开Chrome控制台&…

在Windows上交叉编译STM32(环境搭建)

在Windows上交叉编译STM32 Keil 虽然好用,但是是收费的,不想破解怎么办~ 使用交叉编译工具! 交叉编译工具下载 官方交叉编译工具下载连接 下载解压好后将 bin 目录写入 PATH, 使用命令行检测是否安装成功。 Windows 安装 make …

RN在android/ios手机剪切图片的操作

之前写过一个React Native调用摄像头画面及拍照和保存图片到相册全流程但是这个仅限于调用摄像头拍照并保存图片,今天再写一个版本的操作,这个博客目前实现的有三点操作: 调用摄像头拍照对照片进行剪切从相册选取图片 功能上面来说有两点: 点击按钮可以对摄像头进行拍照,拍完照…

Etcd 基本入门

1:什么是 Etcd ? Etcd 是 CoreOS 团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,Etcd基于 Go 语言实现。 名字由来,它源于两个方面,…

#include<初见c语言之指针总结>

第一小节: #include<初见C语言之指针(1)>-CSDN博客 #add<初见C语言之指针(1)>-CSDN博客 第二小节: #include<初见c语言之指针…

redis集群配置(精华版):哨兵模式

哨兵模式 概念单机单个哨兵多哨兵模式 动手实操1、环境准备2、配置sentinel.conf配置文件3、启动哨兵&测试4、SpringBoot测试哨兵模式故障转移功能 概念 主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这…

苹果Find My产品需求增长迅速,伦茨科技ST17H6x芯片供货充足

苹果的Find My功能使得用户可以轻松查找iPhone、Mac、AirPods以及Apple Watch等设备。如今Find My还进入了耳机、充电宝、箱包、电动车、保温杯等多个行业。苹果发布AirTag发布以来,大家都更加注重物品的防丢,苹果的 Find My 就可以查找 iPhone、Mac、Ai…

04-JavaScript函数

函数(重点) 1.为什么使用函数? 用函数来解决代码重用的问题。 2.函数的意义 函数其实就是封装,把可以重复使用的代码放到函数中,如果需要多次使用同一段代码,就可以把封装成一个函数。这样的话,在你需…

大数据-hive,初步了解

1. Hive是什么 Hive是基于Hadoop的数据仓库解决方案。由于Hadoop本身在数据存储和计算方面有很好的可扩展性和高容错性,因此使用Hive构建的数据仓库也秉承了这些特性。 简单来说,Hive就是在Hadoop上架了一层SQL接口,可以将SQL翻译成MapRedu…

docker环境配置过程中的常见问题

1、pull镜像问题 docker pull jenkins/jenkins:lts Using default tag: latest Trying to pull repository docker.io/library/centos ... Get https://registry-1.docker.io/v2/library/centos/manifests/latest: Get https://auth.docker.io/token?scoperepository%3Alibr…