Spark 中 RDD checkpoint 是通过启动两个独立的 Job 完成的。

在 Spark 中,RDD checkpoint 是通过启动两个独立的 Job 完成的。这两个 Job 分别用于生成 checkpoint 数据更新依赖关系。下面从源码角度深入分析这个机制。


1. 为什么需要两个 Job?

当调用 RDD.checkpoint() 后:

  1. 第一个 Job:将 RDD 的每个分区数据计算后,写入到指定的 checkpoint 存储位置(如 HDFS)。这个步骤的目的是将 RDD 数据物化为可靠存储,减少后续计算的成本。
  2. 第二个 Job:在 checkpoint 成功完成后,更新 RDD 的依赖关系,将原始的血缘依赖(lineage)替换为从 checkpoint 存储加载数据的依赖。这个步骤的目的是确保后续的计算直接基于 checkpoint 数据,而不是重新计算血缘链。

这两个 Job 是独立的,且按顺序执行,确保 checkpoint 的一致性。


2. 源码分析

以下是 Spark RDD checkpoint 的源码路径和执行过程分析。

2.1 RDD.checkpoint() 的入口

调用 RDD.checkpoint() 方法时:

def checkpoint(): Unit = {if (!isCheckpointedAndMaterialized) {sc.checkpointFile[RDD类型](this)}
}

此方法会:

  1. 检查是否已经 checkpointed,如果是,直接返回。
  2. 如果没有,则调用 SparkContextcheckpointFile 方法,提交一个任务将数据写入存储。

2.2 SparkContext.checkpointFile()

def checkpointFile[T: ClassTag](rdd: RDD[T]): Unit = {val cpManager = env.checkpointManagercpManager.addCheckpoint(rdd)
}

这里调用了 CheckpointManager 来处理 checkpoint 逻辑。


2.3 CheckpointManager 的作用

CheckpointManager 的核心任务是管理 checkpoint 的执行,分为以下两步:

2.3.1 第一个 Job:生成 checkpoint 数据
  • 提交一个 Job,将 RDD 的每个分区数据写入存储。

代码核心逻辑:

def checkpointData[T](rdd: RDD[T]): Unit = {if (!rdd.isCheckpointed) {val newRDD = rdd.materialize() // 触发 RDD 的计算和数据写入rdd.updateCheckpointData(newRDD)}
}

关键点:

  1. 调用 materialize() 触发 Job 提交:
    • 每个分区的数据会被写入到 checkpoint 目录中(例如 HDFS)。
    • 使用的存储格式通常是 Sequence File。
  2. 数据写入存储后,生成一个新的 RDD。

2.3.2 第二个 Job:更新 RDD 的依赖关系

在 checkpoint 数据写入成功后,RDD 的依赖关系会被替换为从 checkpoint 文件加载数据的依赖。

def updateCheckpointData[T](rdd: RDD[T]): Unit = {rdd.dependencies.clear() // 清除原始的血缘依赖rdd.dependencies += new FileDependency(rdd.checkpointFile)
}

核心逻辑:

  1. 清除原来的 RDD 血缘关系。
  2. 为 RDD 添加一个新的文件依赖 FileDependency,确保后续任务直接读取 checkpoint 数据文件,而不是重新计算 lineage。

2.4 为什么需要分成两个 Job?

Spark 使用两个 Job 的原因是分离两种任务的目的:

  1. 第一个 Job 物化数据:确保所有 RDD 的分区数据被安全地保存到 checkpoint 目录。
  2. 第二个 Job 更新依赖关系:确保原 RDD 的 lineage 被替换为 checkpoint 数据的直接引用。

这种设计实现了:

  • 容错性:即使第一个 Job 出现问题,原始 RDD 的血缘依赖仍然存在。
  • 灵活性:两个 Job 分离后,可以分别处理物化和依赖更新的逻辑。

3. 示例说明

以下代码展示了两个 Job 的触发过程:

代码

val rdd = sc.parallelize(1 to 10).map(x => x * x)
rdd.checkpoint()// 触发 checkpoint 计算
println(rdd.collect().mkString(","))

运行过程

  1. 第一个 Job

    • 提交一个任务,计算 RDD 的每个分区数据,并将结果写入 checkpoint 存储。
    • 假设有两个分区,Job 会生成类似以下文件:
      hdfs://checkpointDir/rdd_1/part-00000
      hdfs://checkpointDir/rdd_1/part-00001
      
  2. 第二个 Job

    • 更新 RDD 的依赖关系。
    • 重新定义 RDD 的血缘链,指向 checkpoint 文件,而不是原始计算逻辑。

4. 性能与优化建议

4.1 小文件问题

如果 RDD 分区过多,checkpoint 会在存储中产生大量小文件,增加存储和读取成本。建议:

  • 合理设置分区数(coalesce()repartition())。
  • 优化存储系统(如 HDFS 的 block size)。

4.2 持久化与 checkpoint 配合

由于 checkpoint 需要在计算过程中生成数据,可以结合 persist() 使用,避免重复计算:

rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.checkpoint()

4.3 避免不必要的 checkpoint

不要对不重要的 RDD 或生命周期较短的 RDD 设置 checkpoint,避免浪费计算资源。


5. 总结

在 Spark 中,RDD checkpoint 会启动两个 Job:

  1. 第一个 Job:物化 RDD 数据,将分区数据写入 checkpoint 存储。
  2. 第二个 Job:更新 RDD 的依赖,将 lineage 替换为对 checkpoint 文件的引用。

这种设计保证了容错性和灵活性,但也引入了一定的性能开销。合理使用 checkpoint 是优化 Spark 应用性能的重要手段。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474768.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++为函数提供的型特性——缺省参数与函数重载

目录 一、缺省参数 二、函数重载 一、缺省参数 C为函数提供了一项新的特性——缺省参数。缺省参数指的是当前函数调用中省略了实参自动使用的一个值。这极大地提高了函数的灵活性 缺省参数是声明或定义函数时为函数的参数指定⼀个缺省值 。在调⽤该函数时,如果没有…

android 使用MediaPlayer实现音乐播放--权限请求

在Android应用中,获取本地音乐文件的权限是实现音乐扫描功能的关键步骤之一。随着Android版本的不断更新,从Android 6.0(API级别23)开始,应用需要动态请求权限,而到了android 13以上需要的权限又做了进一步…

go-zero(一) 介绍和使用

go-zero 介绍和使用 一、什么是 go-zero? go-zero 是一个基于 Go 语言的微服务框架,提供了高效、简单并易于扩展的 API 设计和开发模式。它主要目的是为开发者提供一种简单的方式来构建和管理云原生应用。 1.go-zero 的核心特性 高性能: g…

Orcad 输出有链接属性的PDF

安装adobe pdf安装Ghostscript修改C:\Cadence\SPB_16.6\tools\capture\tclscripts\capUtils\capPdfUtil.tcl ​ 设置默认打印机为 Adobe PDF ​ 将Ghostscript的路径修改正确 打开cadence Orcad ,accessories->candece Tcl/Tk Utilities-> Utilities->PD…

.NET6 WebApi第1讲:VSCode开发.NET项目、区别.NET5框架【两个框架启动流程详解】

一、使用VSCode开发.NET项目 1、创建文件夹,使用VSCode打开 2、安装扩展工具 1>C# 2>安装NuGet包管理工具,外部dll包依靠它来加载 法1》:NuGet Gallery,注意要启动科学的工具 法2》NuGet Package Manager GUl&#xff0c…

#define定义宏(3)

大家好,今天来给大家介绍一下宏实现的原理以及缺点,还有宏和函数的一些区别(下一期给大家详细介绍宏和函数的区别),那么话不多说,我们现在开始。 1.宏和参数不是计算之后传进去,而是替换进去的…

AJAX笔记 (速通精华版)

AJAX(Asynchronous Javascript And Xml) 此笔记来自于动力节点最美老杜 传统请求及缺点 传统的请求都有哪些? 直接在浏览器地址栏上输入URL。点击超链接提交 form 表单使用 JS 代码发送请求 window.open(url)document.location.href urlwi…

3D Gaussian Splatting 代码层理解之Part2

现在让我们来谈谈高斯分布。我们已经在Part1介绍了如何根据相机的位置获取 3D 点并将其转换为 2D。在本文中,我们将继续处理高斯泼溅的高斯部分,这里用到的是代码库 GitHub 中part2。 我们在这里要做的一个小改动是,我们将使用透视投影,它利用与上一篇文章中所示的内参矩阵…

【YOLOv8】安卓端部署-2-项目实战

文章目录 1 准备Android项目文件1.1 解压文件1.2 放置ncnn模型文件1.3 放置ncnn和opencv的android文件1.4 修改CMakeLists.txt文件 2 手机连接电脑并编译软件2.1 编译软件2.2 更新配置及布局2.3 编译2.4 连接手机 3 自己数据集训练模型的部署4 参考 1 准备Android项目文件 1.1…

进程其他知识点

/* #include <stdlib.h> void exit(int status); #include <unistd.h> void _exit(int status); status 参数&#xff1a;是进程退出时的一个状态信息。父进程回收子进程资源的时候可以获取到。 */ #include <stdio.h> #include <stdlib.h> #include &…

深度解析FastDFS:构建高效分布式文件存储的实战指南(上)

文章目录 一、FastDFS简介1.1 概述1.2 特性 二、FastDFS原理架构2.1 FastDFS角色2.2 存储策略2.3 上传过程2.4 文件同步2.5 下载过程 三、FastDFS适用场景四、同类中间件对比4.1 FastDFS和集中存储方式对比4.2 FastDFS与其他文件系统的对比 五、FastDFS部署5.1 单机部署5.1.1 使…

hhdb数据库介绍(9-21)

计算节点参数说明 checkClusterBeforeDnSwitch 参数说明&#xff1a; PropertyValue参数值checkClusterBeforeDnSwitch是否可见否参数说明集群模式下触发数据节点高可用切换时&#xff0c;是否先判断集群所有成员正常再进行数据节点切换默认值falseReload是否生效是 参数设…

每日一练:【动态规划算法】斐波那契数列模型之第 N 个泰波那契数(easy)

1. 第 N 个泰波那契数&#xff08;easy&#xff09; 1. 题目链接&#xff1a;1137. 第 N 个泰波那契数 2. 题目描述 3.题目分析 这题我们要求第n个泰波那契Tn的值&#xff0c;很明显的使用动态规划算法。 4.动态规划算法流程 1. 状态表示&#xff1a; 根据题目的要求及公…

网页抓取API,让数据获取更简单

网页抓取的过程通常分为以下步骤&#xff0c;尤其是在面对静态网页时&#xff1a; 获取页面 HTML&#xff1a;使用 HTTP 客户端下载目标页面的 HTML 内容。解析 HTML&#xff1a;将下载的 HTML 输入解析器&#xff0c;准备提取内容。提取数据&#xff1a;利用解析器功能&#…

D3中颜色的表示方法大全

d3-color 是 D3.js 库中的一个模块&#xff0c;用于处理颜色。它提供了多种方式来表示和操作颜色。下面是一些常见的颜色表示方法及示例代码&#xff1a; 1. CSS颜色关键字 CSS 颜色关键字是一种简单的方式来指定颜色。例如&#xff1a; const color d3.color("steelbl…

IDEA2023 创建SpringBoot项目(一)

一、Spring Boot是由Pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置。 二、快速开发 1.打开IDEA选择 File->New->Project 2、…

下一代以区域为导向的电子/电气架构

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 所有人的看法和评价都是暂时的&#xff0c;只有自己的经历是伴随一生的&#xff0c;几乎所有的担忧和畏惧…

详细解析STM32 GPIO引脚的8种模式

目录 一、输入浮空&#xff08;Floating Input&#xff09;&#xff1a;GPIO引脚不连接任何上拉或下拉电阻&#xff0c;处于高阻态 1.浮空输入的定义 2.浮空输入的特点 3.浮空输入的应用场景 4.浮空输入的缺点 5.典型配置方式 6.注意事项 二、输入上拉&#xff08;Inpu…

第8章 硬件维护-8.6 产品变更管理(PCN)

8.6 产品变更管理&#xff08;PCN&#xff09; PCN是Product Change Notice&#xff08;产品变更管理&#xff09;的缩写。PCN是厂商为了提高质量、降低成本主动向客户发起的产品变更。一般涉及如下变更的&#xff0c;需要发布PCN公告。 &#xff08;1&#xff09;生产地址变更…

关于安卓模拟器或手机设置了BurpSuite代理和安装证书后仍然抓取不到APP数据包的解决办法

免责申明 本文仅是用于学习研究安卓系统设置代理后抓取不到App数据包实验,请勿用在非法途径上,若将其用于非法目的,所造成的一切后果由您自行承担,产生的一切风险和后果与笔者无关;本文开始前请认真详细学习《‌中华人民共和国网络安全法》【学法时习之丨网络安全在身边一…