【Spark精讲】一文讲透Spark RDD

MapReduce的缺陷

MR虽然在编程接口的种类和丰富程度上已经比较完善了,但这些系统普遍都缺乏操作分布式内存的接口抽象,导致很多应用在性能上非常低效 。 这些应用的共同特点是需要在多个并行操 作之间重用工作数据集 ,典型的场景就是机器学习和图应用中常用的迭代算法 (每一步对数据 执行相似的函数) 。

RDD

RDD是只读的。

RDD五大属性:①分区、②依赖、③计算函数、④分区器、⑤首选运行位置。

RDD 则是直接在编程接口层面提供了一种高度受限的共享内存模型,如图下图所示。 RDD 是 Spark 的核心数据结构,全称是弹性分布式数据集 (Resilient Distributed Dataset),其本质是一种分布式的内存抽象,表示一个只读的数据分区( Partition)集合 。一个 RDD 通常只能通过其他的 RDD转换而创建。 RDD 定义了各种丰富的转换操作(如 map、 join和 filter等),通过这些转换操作,新的 RDD 包含了如何从其他 RDD 衍生所必需的信息,这些信息构成了 RDD 之间的依赖关系( Dependency) 。 依赖具体分为两种, 一种是窄依赖, RDD 之间分区是一一对应的;另一种是宽依赖,下游 RDD 的每个分区与上游 RDD (也称之为父 RDD)的每个分区都有关,是多对多的关系 。 窄依赖中的所有转换操作可以通过类似管道(Pipeline)的方式全部执行,宽依赖意味着数据需要在不同节点之间 Shuffle 传输 。

RDD计算的时候会通过一个 compute函数得到每个分区的数据。 若 RDD是通过已有的文件系统构建的,则 compute 函数读取指定文件系统中的数据;如果 RDD 是通过其他 RDD 转换而来的,则 compute 函数执行转换逻辑,将其他 RDD 的数据进行转换。 RDD 的操作算子包括两 类, 一类是 transformation,用来将 RDD 进行转换,构建 RDD 的依赖关系;另一类称为 action, 用来触发 RDD 的计算,得到 RDD 的相关计算结果或将 RDD 保存到文件系统中。

在 Spark 中, RDD 可以创建为对象 ,通过对象上的各种方法调用来对 RDD 进行转换 。 经过一系列的 transformation逻辑之后,就可以调用 action来触发 RDD 的最终计算。 通常来讲, action 包括多种方式,可以 是 向应用程序返回结果( show、 count 和 collect等),也可以是向存 储系统保存数据(saveAsTextFile等)。 在Spark中,只有遇到 action,才会真正地执行 RDD 的计算(注:这被称为惰性计算,英文为 LazyEvqluation),这样在运行时可以通过管道的方式传输多个转换 。

总结而言,基于 RDD 的计算任务可描述为从稳定的物理存储(如分布式文件系统 HDFS) 中加载记录,记录被传入由一组确定性操作构成的 DAG (有向无环图),然后写回稳定存储。 RDD还可以将数据集缓存到内存中,使得在多个操作之间可以很方便地重用数据集。 总的来讲,RDD 能够很方便地支持 MapReduce 应用、关系型数据处理、流式数据处理(Stream Processing) 和迭代型应用(图计算、机器学习等)。

在容错性方面,基于 RDD 之间的依赖, 一个任务流可以描述为 DAG。 在实际执行的时候, RDD 通过 Lineage 信息(血缘关系)来完成容错,即使出现数据分区丢失,也可以通过 Lineage 信息重建分区。 如果在应用程序中多次使用同一个 RDD,则可以将这个 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据 Lineage 信息得到分区的数据,在后续其他地方用到这个 RDD 的时候,会直接从缓存处读取而不用再根据 Lineage信息计算,通过重用达到提升性能的目的 。 虽然 RDD 的 Lineage 信息可以天然地实现容错(当 RDD 的某个分区数据计算失败或丢 失时,可以通过 Lineage信息重建),但是对于长时间迭代型应用来说,随着迭代的进行,RDD 与 RDD之间的 Lineage信息会越来越长,一旦在后续迭代过程中出错,就需要通过非常长的 Lineage信息去重建,对性能产生很大的影响。 为此,RDD 支持用 checkpoint机制将数据保存到持久化的存储中,这样就可以切断之前的 Lineage信息,因为 checkpoint后的 RDD不再需要知道它的父 RDD ,可以从 checkpoint 处获取数据。

DAG

顾名思义,DAG 是一种“图”,图计算模型的应用由来已久,早在上个世纪就被应用于数据库系统(Graph databases)的实现中。任何一个图都包含两种基本元素:节点(Vertex)和边(Edge),节点通常用于表示实体,而边则代表实体间的关系。

DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。
基础概念
介绍DAGScheduler中的一些概念,有助于理解后续流程。

  • Job:调用RDD的一个action,如count,即触发一个Job,spark中对应实现为ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob维护Job
  • Stage:代表一个Job的DAG,会在发生shuffle处被切分,切分后每一个部分即为一个Stage,Stage实现分为ShuffleMapStage和ResultStage,一个Job切分的结果是0个或多个ShuffleMapStage加一个ResultStage
  • TaskSet:一组Task
  • Task:最终被发送到Executor执行的任务,和stage的ShuffleMapStage和ResultStage对应,其实现分为ShuffleMapTask和ResultTask

把 DAG 图反向解析成多个阶段,每个阶段中包含多个任务,每个任务会被任务调度器分发给工作节点上的 Executor 上执行。

Web UI上DAG举例 

Checkpoint

RDD的依赖

checkpoint先了解一下RDD的依赖,比如计算wordcount:

scala>  sc.textFile("hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd").flatMap(_.split("\\\t")).map((_,1)).reduceByKey(_+_);
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28scala> res0.toDebugString
res1: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []+-(2) MapPartitionsRDD[3] at map at <console>:28 []|  MapPartitionsRDD[2] at flatMap at <console>:28 []|  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd MapPartitionsRDD[1] at textFile at <console>:28 []|  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd HadoopRDD[0] at textFile at <console>:28 []

1、在textFile读取hdfs的时候就会先创建一个HadoopRDD,其中这个RDD是去读取hdfs的数据key为偏移量value为一行数据,因为通常来讲偏移量没有太大的作用所以然后会将HadoopRDD转化为MapPartitionsRDD,这个RDD只保留了hdfs的数据。
2、flatMap 产生一个RDD MapPartitionsRDD
3、map 产生一个RDD MapPartitionsRDD
4、reduceByKey 产生一个RDD ShuffledRDD

如何建立checkPoint

1、首先需要用sparkContext设置hdfs的checkpoint的目录,如果不设置使用checkpoint会抛出异常:

scala> res0.checkpoint
org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContextscala> sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")

sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")
执行了上面的代码,hdfs里面会创建一个目录:
/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d

2、然后执行checkpoint

scala> res0.checkpoint
1

发现hdfs中还是没有数据,说明checkpoint也是个transformation的算子。

scala> res0.count()
INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://leen:8020/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4, new parent is RDD 5
res5: Long = 73689
1
2
3
hive > dfs -du -h /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4;
147    147    /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/_partitioner
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00000
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00001

但是执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()
rdd.checkpoint()
rdd.collect

在源码中,在checkpoint的时候强烈建议先进行cache,并且当你checkpoint执行成功了,那么前面所有的RDD依赖都会被销毁,如下:

 /*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}}

RDD依赖被销毁

scala> res0.toDebugString
res6: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []|  ReliableCheckpointRDD[5] at count at <console>:30 []

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/226444.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue(一):Vue 入门与 Vue 指令

Vue 01. Vue 快速上手 1.1 Vue 的基本概念 用于 构建用户界面 的 渐进性 框架 构建用户界面&#xff1a;基于数据去渲染用户看到的界面渐进式&#xff1a;不需要学习全部的语法就能完成一些功能&#xff0c;学习是循序渐进的框架&#xff1a;一套完整的项目解决方案&#x…

Django Cookie和Session使用(十一)

一、Cookie Cookie具体指一小段信息&#xff0c;它是服务器发送出来存储在浏览器上的一组键值对&#xff0c;下次访问服务器时浏览器会自动携带这些键值对&#xff0c;以便服务器提取有用信息。 Cookie的特性 1、服务器让浏览器进行设置的 2、保存在浏览器本地&#xff0c;…

linux 网络工具(二)

linux 网络工具 1. ip命令簇4.1 address4.2 link4.3 route4.4 rule 2. 其他常用命令2.1 ifup/ifdown2.2 配置主机名2.3 设置DNS服务器指向2.4 配置域名解析2.5 ss2.6 路由相关配置文件2.7 查看机器可用端口2.8 traceroute2.9 dhclient 1. ip命令簇 Linux的ip命令和ifconfig类似…

微信小程序picker组件扩展选择时间到秒插件

创建插件seldatetime // 插件JS部分 Component({// 一些选项options: {// 样式隔离&#xff1a;apply-shared 父影响子&#xff0c;shared父子相互影响&#xff0c; isolated相互隔离styleIsolation:"isolated",// 允许多个插槽multipleSlots: true},// 组件的对外属…

机器学习三要素与拟合问题

1.如何构建机器学习模型&#xff1f; 机器学习工作流程总结 1.获取数据 2.数据基本处理 3.特征工程 4.机器学习(模型训练) 5.模型评估 结果达到要求&#xff0c;上线服务&#xff0c;没有达到要求&#xff0c;重新上面步骤 我们使用机器学习监督学习分类预测模型的工作流…

SLF4J: Class path contains multiple SLF4J bindings.解决

背景 项目正常运行几年&#xff0c;近期优化调整修复漏洞&#xff0c;依赖升级后cleaninstall 重启发现项目启动失败&#xff0c;访问所有接口都报错404 错误信息 output输出异常信息截图 tomcat 打印异常信息截图 output打印异常信息详情 D:\javaRuanJian\Tomcat\apach…

人工智能的新篇章:深入了解大型语言模型(LLM)的应用与前景

LLM&#xff08;Large Language Model&#xff09;技术是一种基于深度学习的自然语言处理技术&#xff0c;旨在训练能够处理和生成自然语言文本的大型模型。 LLM 技术的核心思想是使用深度神经网络&#xff0c;通过大规模的文本数据预训练模型&#xff0c;并利用这些预训练模型…

linux 防火墙查看放行端口,追加放行端口命令

linux 查看防火墙已经放行端口列表 firewall-cmd --list-ports 运行结果如下&#xff1a; linux 追加防火墙经放行端口&#xff08;如追加443&#xff09; firewall-cmd --zonepublic --add-port443/tcp --permanent 亲测有效&#xff01;

【WPF.NET开发】路由事件

本文内容 先决条件什么是路由事件&#xff1f;路由策略为什么使用路由事件&#xff1f;附加并实现路由事件处理程序类处理程序WPF 中的附加事件XAML 中的限定事件名称WPF 输入事件EventSetter 和 EventTrigger Windows Presentation Foundation (WPF) 应用程序开发人员和组件…

FileZilla的使用主动模式与被动模式

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《产品经理如何画泳道图&流程图》 ⛺️ 越努力 &#xff0c;越幸运 目录 一、FileZilla简介 1、FileZilla是什么&#xff1f; 2、FileZilla的应用场景 二、FileZilla的安装 1、下…

【直播预告】刘军博士:科学研究中的AI计算:何助力团队协作创新

【直播预告】随着数据、算法、算力的融合发展&#xff0c;AI已经成为科学和工程研究的不可或缺的力量&#xff0c;涉足药物设计、天气预测、新材料研发等领域。在AI领域&#xff0c;协作是关键。欢迎大家关注12月28日20:00九章云极资深数据科学家刘军博士的直播&#xff01;刘军…

HLS 2017.4 导出 RTL 报错:ERROR: [IMPL 213-28] Failed to generate IP.

软件版本&#xff1a;HLS 2017.4 在使用 HLS 导出 RTL 的过程中产生如下错误&#xff1a; 参考 Xilinx 解决方案&#xff1a;https://support.xilinx.com/s/article/76960?languageen_US 问题描述 DESCRIPTION As of January 1st 2022, the export_ip command used by Vivad…

【计算机视觉】角点检测(Harris、SIFT)

Harris 角点指的是窗口延任意方向移动&#xff0c;都有很大变化量的点。 用数学公式表示为&#xff1a; E(u,v)反映的移动后窗口的差异&#xff0c;w(x,y)为每个像素的点权值&#xff0c;I(xu,yv)是移动的像素值&#xff0c;I(x,y)是移动前的像素值。 将E(u,v)进行泰勒展开&am…

MySQL进阶之(一)逻辑架构

一、逻辑架构 1.1 逻辑架构剖析1.1.1 连接层1.1.2 服务层01、基础服务组件02、SQL Interface&#xff1a;SQL 接口03、Parser&#xff1a;解析器04、Optimizer&#xff1a;查询优化器05、Caches & Buffers&#xff1a; 查询缓存组件 1.1.3 引擎层1.1.4 存储层1.1.5 总结 1.…

elasticsearch系列九:异地容灾-CCR跨集群复制

概述 起初只在部分业务中采用es存储数据&#xff0c;在主中心搭建了个集群&#xff0c;随着es在我们系统中的地位越来越重要&#xff0c;数据也越来越多&#xff0c;针对它的安全性问题也越发重要&#xff0c;那如何对es做异地容灾呢&#xff1f; 今天咱们就一起看下官方提供的…

25、商城系统(七):商城项目基础功能pom.xml(重要),mybatis分页插件

截止这一章,我们就不把重心放在前端,后台的基础代码,因为后面都是业务层面的crud。 前端直接替换这两个文件夹即可,后台代码也直接复制: 一、重新更新一下所有的pom.xml 这个地方我踩了好多坑,最后得到一个完整的pom.xml,建议大家直接用我的pom.xml替换即可。 1.comm…

大数据与人工智能|万物皆算法(第三节)

要点一&#xff1a;数据与智能的关系 1. 一切的核心都是数据&#xff0c;数据和智能之间是密切相关的。 数据是对客观现实的描述&#xff0c;而信息是数据转化而来的。 例如&#xff0c;24是数据&#xff0c;但说“今天的气温是24摄氏度”是信息&#xff0c;而说“班可以分成24…

How to Develop Word Embeddings in Python with Gensim

https://machinelearningmastery.com/develop-word-embeddings-python-gensim/ 本教程分为 6 个部分;他们是&#xff1a; 词嵌入 Gensim 库 开发 Word2Vec 嵌入 可视化单词嵌入 加载 Google 的 Word2Vec 嵌入 加载斯坦福大学的 GloVe 嵌入 词嵌入 单词嵌入是一种提供单词的…

web自动化(4)——POM设计重构

1. 什么是POM Page Object Model 是ui自动化测试中常见的封装方式。 原理&#xff1a;将页面封装为PO对象&#xff0c;然后通过面向对象的方式实现UI自动化 2. 封装原则 PO无需包含全部UI元素PO应当验证元素PO不应该包含断言PO不应该暴露元素 3. 怎么进行POM封装 面向对象…

IntelliJ IDE 插件开发 | (四)开发一个时间管理大师插件

系列文章 IntelliJ IDE 插件开发 |&#xff08;一&#xff09;快速入门IntelliJ IDE 插件开发 |&#xff08;二&#xff09;UI 界面与数据持久化IntelliJ IDE 插件开发 |&#xff08;三&#xff09;消息通知与事件监听IntelliJ IDE 插件开发 |&#xff08;四&#xff09;开发一…