什么是Spark RDD?(RDD的介绍与创建)

什么是Spark RDD?(RDD的介绍与创建)

  • 一、RDD介绍
    • 1、特点
    • 2、RDD的存储和指向
    • 3、RDD与DAG
    • 4、RDD的特性
    • 5、RDD分区
    • 6、RDD操作类型
  • 二、RDD创建
    • 1、引入必要的 Spark 库
    • 2、配置 Spark
    • 3、RDD创建
    • 4、示例代码

一、RDD介绍

RDD: 弹性分布式数据集(Resilient Distributed Datasets)
核心概念:Spark的核心数据抽象。
通过对RDD的理解和使用,可以在分布式计算环境中高效地处理和计算大规模数据

1、特点

  • 分布式数据集:RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上。RDD并不存储真正的数据,只是对数据和操作的描述。
  • 弹性:默认存放在内存中,当内存不足,Spark自动将RDD写入磁盘。
  • 容错性:根据数据血统,可以自动从节点失败中恢复分区。

2、RDD的存储和指向

  • 存储在 (HIVE)HDFS、Cassandra、HBase等
  • 缓存(内存、内存+磁盘、仅磁盘等)
  • 或在故障或缓存收回时重新计算其他RDD分区中的数据

3、RDD与DAG

  • DAG(有向无环图):反映了RDD之间的依赖关系。

  • Stage:RDD和DAG是Spark提供的核心抽象,RDD的操作会生成DAG,DAG会进一步被划分为多个Stage,每个Stage包含多个Task。

    在这里插入图片描述

4、RDD的特性

  1. 分区(Partition):每个任务处理一个分区。
  2. 计算函数(compute):每个分区上都有compute函数,计算该分区中的数据。
  3. 依赖关系:RDD之间有一系列的依赖。
  4. 分区器(Partitioner)
    • 决定数据(key-value)分配至哪个分区。
    • 常见的分区器有Hash Partition和Range Partition。
  5. 优先位置列表:将计算任务分派到其所在处理数据块的存储位置。

5、RDD分区

  • 分区(Partition):是RDD被拆分并发送到节点的不同块之一。
  • 分区越多,并行性越强:我们拥有的分区越多,得到的并行性就越强。
  • 每个分区都是被分发到不同Worker Node的候选者。
  • 每个分区对应一个Task。

6、RDD操作类型

  • Transformation(转换操作)
    • Lazy操作:不会立即执行,只是记录操作,当触发Action时才会真正执行。
    • 例如:map、filter、flatMap等。
  • Actions(动作操作)
    • Non-lazy操作:立即执行,会触发所有相关Transformation的计算。
    • 例如:count、collect、saveAsTextFile等。

二、RDD创建

1、引入必要的 Spark 库

这里用的是scala语言的maven项目

<!-- 导入 spark-core jar 包 -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.2</version>
</dependency>
// 引入 Spark 库
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

2、配置 Spark

setMaster (设置运行模式) 方法的可选方案:

  1. local: 在单核上运行
  2. local[N]: 在指定数量的 N 个核上运行,如 “local[4]”
  3. local[*]: 使用所有可用的核
  4. spark://HOST:PORT: 连接到指定的 Spark standalone cluster
  5. yarn: 连接到 YARN 集群
  6. mesos://HOST:PORT: 连接到 Mesos 集群
val conf = new SparkConf().setAppName("Spark RDD Example")// 设置应用程序名称.setMaster("local[*]")			// 设置运行模式
val sc = new SparkContext(conf)
// sc.setLogLevel()	// 设置日志显示级别

3、RDD创建

  • 从集合创建 RDD,指定分区数

    val rdd: RDD[T] = sc.parallelize(seq: Seq[T], numSlices: Int) // ✔
    val rdd: RDD[T] = sc.makeRDD(seq: Seq[T], numSlices: Int)     // 调用了 parallelize
    
  • 从外部数据源创建 RDD,指定最小分区数

    从文件系统中的单个文件创建 RDD

    • 本地文件系统使用 file:/// 前缀
    • Hadoop 文件系统使用 hdfs:// 前缀
    // 从文件系统创建 RDD,可以通过 minPartitions 指定分区数
    val textRDD: RDD[String] = sc.textFile(filePath, minPartitions:Int)	// 从文件系统创建 RDD
    val rdd: RDD[(String, String)] = sc.wholeTextFiles(dir: String, minPartitions: Int) // 从目录创建 RDD
    

4、示例代码

附加单词次数统计

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SparkRDDExample {def main(args: Array[String]): Unit = {// 配置 Spark  val conf = new SparkConf().setAppName("Spark RDD Example").setMaster("local[*]")val sc = new SparkContext(conf)// 从集合创建 RDD,指定分区数val data = Seq(1, 2, 3, 4, 5)val rdd: RDD[Int] = sc.parallelize(data, numSlices = 2)rdd.collect().foreach(println)// 从外部数据源创建 RDD,指定最小分区数val filePath = "file:///F:\\sparkRDD\\spark01\\data\\story.txt"val textRDD: RDD[String] = sc.textFile(filePath, minPartitions = 4)textRDD.collect().foreach(println)// 将文本文件中的每行数据拆分为单词并统计每个单词的出现次数val wordCountRDD = textRDD.mapPartitions {_.flatMap {_.split("[^a-zA-Z]+") // 按非字母字符拆分字符串.map(word => (word, 1)) // 将每个单词转换为 (单词, 1) 的元组}}.reduceByKey(_+_)// 显示单词计数结果println("Word count from textFile:")wordCountRDD.collect().foreach(println)// 停止 SparkContextsc.stop()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/341650.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go微服务: 基于rocketmq:5.2.0搭建RocketMQ环境,以及示例参考

概述 参考最新官方文档&#xff1a;https://rocketmq.apache.org/zh/docs/quickStart/03quickstartWithDockercompose以及&#xff1a;https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard综合以上两个文档来搭建环境 搭建RocketMQ环境 1 ) 基于 docker-c…

K8S==ingress配置自签名证书

安装openssl Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions 生成证书 openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout example.local.key -out example.local.crt -subj "/CNexample.local/Oexample.local"创建K8S secr…

【JVM】已验鼎真,鉴定为:妈妈加载的(双亲委派模型)

【JVM】已验鼎真&#xff0c;鉴定为&#xff1a;妈妈加载的&#xff08;双亲委派模型&#xff09; 在Java的世界中&#xff0c;类加载器&#xff08;ClassLoader&#xff09;是Java虚拟机&#xff08;JVM&#xff09;用来动态加载类的基础组件。双亲委派模型&#xff08;Paren…

Java基础27,28(多线程,ThreadMethod ,线程安全问题,线程状态,线程池)

目录 一、多线程 1. 概述 2. 进程与线程 2.1 程序 2.2 进程 2.3 线程 2.4 进程与线程的区别 3. 线程基本概念 4.并发与并行 5. 线程的创建方式 方式一&#xff1a;继承Thread类 方式二&#xff1a;实现Runable接口 方式三&#xff1a;实现Callable接口 方式四&…

C#操作MySQL从入门到精通(10)——对查询数据进行通配符过滤

前言 我们有时候需要查询数据,并且这个数据包含某个字符串,这时候我们再使用where就无法实现了,所以mysql中提供了一种模糊查询机制,通过Like关键字来实现,下面进行详细介绍: 本次查询的表中数据如下: 1、使用(%)通配符 %通配符的作用是,表示任意字符出现任意次数…

【简单讲解TalkingData的数据统计】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

Python04:python代码设置作者/创建时间/文件名称

我们新建一个py文件时&#xff0c;如果希望文件开头有固定的内容&#xff0c;怎么设置呢&#xff1f; 比如代码作者、文件创建时间等。。。 1、点击左上角【Python】–>【Settings】设置 2、在弹出的新窗口找到【File and Code Templates】–>【Python Script】–>在右…

鸿蒙小案例-音乐播放器

之前参加鸿蒙比赛的音乐播放器 效果展示 HF音乐效果展示 功能列 有一些功能没写上去&#xff0c;自行发掘 说明&#xff1a; 1.API:网易云接口&#xff0c;QQ个人接口&#xff0c; 需要请看gitee 2.本地关系型数据由bug,提的工单已确认&#xff0c;建议使用API11,12,9的不稳…

java代码审计之fastjson反序列化漏洞

fastjson反序列化漏洞分析 Fastjson 是一个 Java 库&#xff0c;可以将 Java 对象转换为 JSON 格式&#xff0c;当然它也可以将 JSON 字符串转换为 Java 对象。Fastjson 可以操作任何 Java 对象&#xff0c;即使是一些预先存在的没有源码的对象。该产品主要提供了两个接口&…

创新入门|营销中的视频内容:不可或缺的策略

视频在营销中日益重要。你是否也发现,视频内容最近似乎无处不在?它占据着社交媒体的推文、网站首页,甚至电子邮件中的位置。事实上,并不是你一个人有这样的感受。在过去十年中,视频作为一种营销手段日益成熟和强大。这是因为,人类天生就是视觉动物。我们大脑处理视觉信息的速度…

Priority_queue

一、priority_queue的介绍和使用 1.1 priority_queue的介绍 1.优先队列是一种容器适配器&#xff0c;根据严格的弱排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大的。 2.优先队列类似于堆&#xff0c; 在堆中可以随时插入元素&#xff0c; 并且只能检索最大堆…

硕士课程 可穿戴设备之作业一

作业一 第一个代码使用的方法是出自于[1]。 框架结构 如下图&#xff0c;不过根据对代码的解读&#xff0c;发现作者在代码中省去了对SSR部件的实现&#xff0c;下文再说。 Troika框架由三个关键部件组成&#xff1a;信号分解&#xff0c;SSR和光谱峰值跟踪。&#xff08;粗…

word 无法自动检测拼写

word 有时候不能分辨是哪种语言,比如把英语错认为法语 。 例如&#xff1a;Interlaayer spacace,发现误认为是法语。 1、选中Interlaayer spacace 2、点击语言下拉按钮 选择设置校对语言 发现校对语言为法语 3、手动修改校对语言为英语&#xff0c;并点击确认。 4、发现现…

升级鸿蒙4.2新变化,新增 WLAN 网络自动连接开关!

手机已经成为现代人生活中不可或缺的一部分&#xff0c;手机里的功能可以满足大部分人的生活场景&#xff0c;但是最依赖的应该就是手机网络&#xff0c;手机网络突然变差怎么办——消息发不出去&#xff1f;刷新闻速度变慢&#xff1f;仔细检查后&#xff0c;发现其实不是手机…

【一步一步了解Java系列】:重磅多态

看到这句话的时候证明&#xff1a;此刻你我都在努力 加油陌生人 个人主页&#xff1a;Gu Gu Study专栏&#xff1a;一步一步了解Java 喜欢的一句话&#xff1a; 常常会回顾努力的自己&#xff0c;所以要为自己的努力留下足迹 喜欢的话可以点个赞谢谢了。 作者&#xff1a;小闭…

E10:流程主表表单字段值变化触发事件

效果– //window.WeFormSDK.showMessage("这是一个E10的提示", 3, 2); const onClickCreate () > console.log("create"); const onClickSave () > console.log("save"); const onClickCancel () > dialogComponent?.destroy();/…

Python量化交易学习——Part4:基于基本面的单因子选股策略

技术分析与基本面分析是股票价格分析最基础也是最经典的两个部分。技术分析是针对交易曲线及成交量等指标进行分析,基本面分析是基于公司的基本素质进行分析。 一般来说选股要先选行业,在选个股,之后根据技术分析选择买卖节点,因此针对行业及个股的基本面分析是选股的基础。…

排序算法集合

1. 冒泡排序 排序的过程分为多趟&#xff0c;在每一趟中&#xff0c;从前向后遍历数组的无序部分&#xff0c;通过交换相邻两数位置的方式&#xff0c;将无序元素中最大的元素移动到无序部分的末尾&#xff08;第一趟中&#xff0c;将最大的元素移动到数组倒数第一的位置&…

【scikit-learn010】sklearn算法模型清单实战及经验总结(已更新)

1.一直以来想写下基于scikit-learn训练AI算法的系列文章,作为较火的机器学习框架,也是日常项目开发中常用的一款工具,最近刚好挤时间梳理、总结下这块儿的知识体系。 2.熟悉、梳理、总结下scikit-learn框架模型算法包相关技术点及经验。 3.欢迎批评指正,欢迎互三,跪谢一键…

代码随想录算法训练营day41

题目&#xff1a;01背包理论基础、416. 分割等和子集 参考链接&#xff1a;代码随想录 动态规划&#xff1a;01背包理论基础 思路&#xff1a;01背包是所有背包问题的基础&#xff0c;第一次看到比较懵&#xff0c;完全不知道dp数据怎么设置。具体分析还是dp五部曲&#xff…