Flink系列知识之:Checkpoint原理

Flink系列知识之:Checkpoint原理

在介绍checkpoint的执行流程之前,需要先明白Flink中状态的存储机制,因为状态对于检查点的持续备份至关重要。

State Backends分类

下图显示了Flink中三个内置的状态存储种类。MemoryStateBackend和FsStateBackend在运行时存储在Java堆中。FsStateBackend仅在执行检查点时才以文件的形式持久地将数据保存到远程存储。RocksDBStateBackend使用RocksDB(一种LSM数据库,结合了内存和磁盘)来存储状态。
在这里插入图片描述

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

下面是执行HeapKeyedStateBackend的方法:

  • 支持异步检查点(默认):存储格式为CopyOnWriteStateMap。
  • 仅支持同步检查点:存储格式为NestedStateMap。
    当在MemoryStateBackend中使用HeapKeyedStateBackend时,默认情况下,基于检查点的数据序列化的最大数据量为5mb。
    对于RocksDBKeyedStateBackend,每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。

对于RocksDBKeyedStateBackend,每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。
在这里插入图片描述

checkpoint执行流程

Flink容错机制的核心部分是绘制分布式数据流和算子状态的一致快照。这些快照充当一致的检查点,系统可以在发生故障时退回到这些检查点。它受到用于分布式快照的标准Chandy-Lamport算法的启发,并专门针对Flink的执行模型进行了定制。

自Flink 1.11以来,检查点可以在对齐或不对齐的情况下进行。在本节中,我们首先描述对齐的检查点。

Checkpoint barrier

Flink分布式快照的一个核心元素是stream barrier。这些barrier会被注入到数据流中,并作为数据流的一部分与记录一起流动。当 Flink 作业设置了检查点时,Flink 会在数据流中插入这些特殊记录,以确保在特定点上所有算子的状态都被一致地保存。barrier永远不会超过记录,它们严格地按顺序流动。barrier将数据流中的记录分隔为进入当前快照的记录集和进入下一个快照的记录集,相当于将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每个barrier都携带着包含了在它前面的记录的快照的ID。barrier不会中断数据流,因此非常轻巧。来自不同快照的多个barrier可以同时在数据流中,这意味着多种快照可能并发发生。整个过程是由 Flink 的执行引擎在运行时负责处理的,通过协调不同操作符之间的信号和状态来实现数据流中的 checkpoint barrier 插入。
在这里插入图片描述

Stream barrier首先会被注入到source流的并行数据流中。快照n的barrier被注入的点(我们称之为Sn)是source源流中快照所能覆盖的数据的位置。例如,在Apache Kafka中,这个位置将是分区中拉取数据的偏移量。这个插入点Sn会被报告给检查点协调器(Flink的JobManager)。

当中间操作符从其所有输入流接收到快照n的barrier时,它会开始执行快照,并将状态写入到State backend中,然后会将快照n的barrier继续向下游流动,发送到其所有传出流中。一旦sink操作符(流DAG的末端)从其所有输入流接收到barrier n,它就向检查点协调器确认快照n。在所有sink算子都确认了快照之后,就认为快照已经完成。

一旦快照n完成,作业就不会再向source算子请求Sn之前的记录,因为此时这些记录已经完整地流过了整个DAG数据拓扑。

checkpoint alignment

Checkpoint alignment 机制是 Apache Flink 中用于确保分布式检查点一致性的一种机制。对于接收多个输入流的算子需要在快照barrier上对齐输入流。如下图所示:
在这里插入图片描述

  • 一旦算子从某个输入流通道中接收到快照barrier n,它就不能处理来自该流的任何一条记录(阻塞),直到它从其他所有输入流通道中都接收到barrier n。因为如果不阻塞的话,算子状态将会混合属于快照n的记录和属于快照n+1的记录。
  • 在对齐的过程中,算子只会继续处理来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。
  • 当从最后一个输入流通道中接收到barrier n时,算子开始执行快照,异步地将状态写入到State Backend中,然后将barrier n继续向下游所有输出通道流动。

比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

需要注意的是,对于具有多个输入流的操作符算子,以及在shuffle后接收多个上游子任务输出流的操作符算子,都需要对齐。

checkpoint执行流程

上面介绍完checkpoint的相关原理后,本节尝试逐步解释执行检查点的过程。如下图所示,左侧为checkpoint coordinator,中间为Flink job(由两个源节点和一个汇聚节点组成),右侧为persistent storage(大多数场景下由HDFS提供)。

Step 1) Checkpoint coordinator触发checkpoint执行信号到所有输入流操作符算子中。
在这里插入图片描述

Step 2) 源节点向下游广播一个checkpoint barrier。该checkpoint barrier是Chandy-Lamport分布式快照算法的核心。下游任务只有在接收到所有输入流通道的barrier后才执行checkpoint操作。
在这里插入图片描述

Step 3) 源操作符算子完成state状态备份后,向checkpoint coordinator(检查点协调器)发送备份数据地址,即状态句柄。同时,barrier继续流向下游。
这里分为同步和异步(如果开启的话)两个阶段:
在这里插入图片描述

同步阶段:task执行状态快照,并写入外部存储系统(根据状态后端的选择不同有所区别)执行快照的过程:

  • 对 state 做深拷贝
  • 将写操作封装在异步的 FutureTask 中,FutureTask 的作用包括:1)打开输入流;2)写入状态的元数据信息;3)写入状态;4)关闭输入流

异步阶段

  • 执行同步阶段创建的 FutureTask
  • 向 Checkpoint Coordinator 发送 ACK 响应

Step 4) 当下游sink节点接收到上游两个输入通道的barrier后,开始执行本地快照。下图演示了执行RocksDB增量检查点的过程。RocksDB将全部数据刷新到磁盘,如红色三角形所示。然后,Flink为未上传的文件实现持久备份,如紫色三角形所示。
在这里插入图片描述

Step 5) 在执行完sink操作符算子的检查点之后,sink操作符算子将状态句柄(state handle)返回给checkpoint coordinator检查点协调器。
在这里插入图片描述

Step 6) 当接收到所有任务算子的状态句柄(state handle)后,checkpoint coordinator确认全局的checkpoint已经完成,然后将checkpoint元文件备份到持久化存储中。
在这里插入图片描述

Unaligned Checkpoint

上述对齐的chekcpoint基于Chandy-Lamport算法实现了分布式系统下的数据一致性快照。通过上面的原理可以看出,该方案在操作符算子具有多个输入流通道时,需要阻塞地等待所有输入通道的barrier都到达后才会开始执行快照。这在大多数情况下是没有问题的,但当某个输入流通道比其他输入流通道的数据流动更慢时,比如出现了反压、数据倾斜问题。会导致快照的完成时间变长甚至超时。其次,这种方案来说,Barrier对齐的过程本身就可能成为一个反压的源头,影响上游算法的效率,而这在某些情况下是不必要的。

为了解决这个问题,Flink在1.11版本中引入了Unaligned Checkpoint的特性。其基本思想是,只要输入通道中的的数据能成为操作符算子状态的一部分,那么checkpoint barrier就可以超越所有输入/输出通道中的数据。

Checkpointing can also be performed unaligned. The basic idea is that checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state.

如何来理解呢?

在上面对齐的checkpoint的原理介绍中可以发现,快照只包含了操作符算子的状态,而不关心输入/输出通道的数据记录。这是因为barrier对齐的checkpoint将本地快照延迟至所有barrier到达,也就是说当执行快照时,属于当前checkpoint周期内的数据记录都已经对该算子状态产生了影响,因而不必关心输入队列的剩余数据,同时输出队列又携带着barrier继续流向下一个算子的输入队列,因而输出队列的数据也不必关心,从而巧妙地避免了对算子输入/输出队列的状态进行快照。

但实际上,这和Chandy-Lamport算法是有一定出入的。举个例子,假设我们对两个数据流进行 equal-join,输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的 Checkpoint 周期):
在这里插入图片描述

  • 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。
  • 图 b: 算子分别读取 Channel 一个元素,输出 2。随后接收到 Channel 1 的 Barrier,停止处理 Channel 1 后续的数据,只处理 Channel 2 的数据。
  • 图 c: 算子再消费 2 个自 Channel 2 的元素,接收到 Barrier,开始本地快照并输出 Barrier。

对于相同的情况,Chandy-Lamport 算法的状态变化如下:
在这里插入图片描述

  • 图 a: 同上。
  • 图 b: 算子分别处理两个 Channel 一个元素,输出结果 2。此后接收到 Channel 1 的 Barrier,算子开始本地快照记录自己的状态,并输出 Barrier。
  • 图 c: 算子继续正常处理两个 Channel 的输入,输出 9。特别的地方是 Channel 2 后续元素会被保存下来,直到 Channel 2 的 Barrier 出现(即 Channel 2 的 9 和 7)。保存的数据会作为 Channel 的状态成为快照的一部分。

两者的差异主要可以总结为两点:

  1. 快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。
  2. 是否需要阻塞已经接收到 Barrier 的 Channel 的计算。

从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面结合 Flink 的定位做了几个改进。

首先,不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态,Flink 的算子有输入和输出两种 Channel,在快照时两者的状态都需要被考虑。

其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的位置,算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定,允许算子优先摄入并优先输出 Barrier。 如此一来,第一个到达 Barrier 会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中(图中绿色部分)。

在这里插入图片描述

上图描述了算子是如何处理非对齐的checkpoint barriers的:

  • 当输入队列中接收到第一个chekcpoint barrier时,算子即开始执行相应处理。
  • 它会立即将该barrier跳过前面的输入队列,并将其插入到输出队列的尾部。
  • 算子在执行快照时,会把所有标记了跳过的数据记录(图中绿色部分),并将其一并写入到算子状态中。

此时,算子只需短暂停止处理输入队列以标记缓冲区、转发barrier并创建其状态的快照。

这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞(因为Chandy-Lamport仍然要等到第一个barrier到达算子时才开始触发快照执行,如果算子的处理本身比较慢,数据的流动仍然会很慢),但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。 这可以从很大程度上加快 Barrier 流经整个 DAG 的速度,从而降低 Checkpoint 整体时长。

回到之前的例子,用 Unaligned Checkpoint 来实现,状态变化如下:
在这里插入图片描述

  • 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。输出 Channel 已存在结果数据 1。
  • 图 b: 算子优先处理输入 Channel 1 的 Barrier,开始本地快照记录自己的状态,并将 Barrier 插到输出 Channel 末端。
  • 图 c: 算子继续正常处理两个 Channel 的输入,输出 2、9。同时算子会将 Barrier 越过的数据(即输入 Channel 1 的 2 和输出 Channel 的 1)写入 Checkpoint,并将输入 Channel 2 后续早于 Barrier 的数据(即 2、9、7)持续写入 Checkpoint。

比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint进行快照和输出Barrier时,部分本属于当前Checkpoint的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint的输出数据却落到Barrier之后(因此未反映到下游算子的状态中)。这也正是Unaligned的含义:不同Checkpoint周期的数据没有对齐,包括不同输入Channel之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint恢复时,不对齐的数据并不能由Source端重放的数据计算得出,同时也没有反应到算子状态中,但因为它们会被Checkpoint恢复到对应Channel中,所以依然能够提供只计算一次的准确结果。

Unaligned Checkpoint方案确保barrier可以尽可能快地在数据流中移动。它特别适合至少有一个缓慢移动的数据输入队列的应用,其对齐时间可能达到几个小时。但是,由于它增加了额外的I/O压力,所以当应用写入State Backend的I/O本身就是瓶颈时,非对齐Checkpoint方案并不会有明显帮助。

Exactly Once vs. At Least Once

为了实现EXACTLY ONCE的语义,Flink使用了输入缓存队列来缓存在对齐过程中队列中传入的数据。同时,我们经过上面Checkpoint原理介绍也能清晰地知道,使用对齐的方式来执行快照是能够实现EXACTLY ONCE的语义的。
需要注意的是,这里的EXACTLY ONCE语义并不意味着每个事件将被精确地处理一次,而是意味着每个事件只会影响Flink算子状态一次。同时,EXACTLY ONCE语义并不能实现端到端的数据EXACTLY ONCE,如果需要实现端到端的EXACTLY ONCE语义,需要sink算子能够实现写入的幂等和事务性。

通常,在checkpoint过程中额外的对齐时间延迟大约是几毫秒,但也可能会有一些异常值的延迟明显增加的情况。对于所有记录都需要超低延迟(几毫秒)的应用程序,Flink有一个开关,可以在检查点期间跳过对齐步骤。此时,当算子接收到每个输入队列的checkpoint barrier时不会阻塞,会继续处理barrier之后的数据记录。这就可能会导致本属于下一个checkpoint周期的数据记录影响了当前checkpoint周期的算子状态,从而导致恢复时数据重复消费的情况,因此,这种模式下只能保证At Least Once语义。

Checkpoint Exactly Once和At Least Once语义配置:

// 启用 Checkpoint 每 5 秒 一次,模式为 EXACTLY_ONCE
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);// 启用 Checkpoint 每 5 秒 一次,模式为 At Least Once
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);

另外,Aligned过程只发生在具有多个输入队列(连接)的算子以及具有多个输出队列的算子(比如在重新分区/shuffle之后)。正因为如此,只有单并行度的操作算子(map(), flatMap(), filter(),…)的数据流实际上及时被设置为At Least Once语义,也能实现Exactly once语义(实际上就是单输入流的算子不需要barrier对齐)。

参考

Flink 1.11 Unaligned Checkpoint 解析
Stateful Stream Processing
Flink Checkpoints Principles and Practices: Flink Advanced Tutorials

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/429801.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二叉搜索树(BSTree)原理及应用场景

目录 引言 二叉搜索树的基本概念 常见算法 插入节点 查找节点 删除节点 二叉搜索树的应用场景 1. 数据库索引 2. 符号表 3. 字典和词汇表 4. 动态集合 结论 引言 二叉搜索树(Binary Search Tree, BST)是一种特殊的二叉树,其每个节…

JavaEE: 深入探索TCP网络编程的奇妙世界(五)

文章目录 TCP核心机制TCP核心机制六: 拥塞控制为什么要有拥塞控制?动态调整的拥塞控制拥塞控制中,窗口大小具体的变化过程 TCP核心机制七: 延时应答TCP核心机制八: 捎带应答 TCP核心机制 前一篇文章 JavaEE: 深入探索TCP网络编程的奇妙世界(四) 书接上文~ TCP核心机制六: 拥…

Ubuntu20.04 搜索不到任何蓝牙设备

电脑信息 联想扬天YangTianT4900k 问题描述 打开蓝牙之后,一直转圈,搜索不到任何蓝牙设备 排查 dmesg | grep -i blue 有如下错误: Bluetooth: hci0: RTL: unknown IC info, lmp subver 8852, hci rev 000b, hci ver 000b lsusb 芯片型号如…

spark读取数据性能提升

1. 背景 spark默认的jdbc只会用单task读取数据,读取大数据量时,效率低。 2. 解决方案 根据分区字段,如日期进行划分,增加task数量提升效率。 /*** 返回每个task按时间段划分的过滤语句* param startDate* param endDate* param …

每日学习一个数据结构-Trie树(字典树)

文章目录 定义节点结构根节点插入操作查找操作删除操作特点应用示例 “Trie”树,又称为前缀树或字典树,是一种专门用于存储字符串的数据结构。它在许多应用程序中都非常有用,特别是在那些需要高效查找、插入和删除字符串的应用场景中。下面是…

[项目:微服务即时通讯系统客户端(基于C++QT)]三,左侧界面搭建

三,左侧界面搭建 一,导入 先把MainWidget类做成“单例类” 采用的是单例模式,让某一个类,在指定进程中只有唯一的实例 先看一下MainWidget的框架 QWidget//这部分是头文件保护宏,确保该头文件只被包含一次&#x…

低级编程语言和高级编程语言

一.区分低级编程语言和高级编程语言的方法 1.低级编程语言 低级编程语言,并不是简单的编程语言,而是写起来很费事的编程语言,如所有编程语言的"祖宗":汇编语言,写起来极其麻烦,说不定一个 int a1; 它就得写好几行,甚至十几行 这样麻烦的编程语言为什么还没消失那,因…

基于微信小程序的家教信息管理系统的设计与实现(论文+源码)_kaic

摘 要 随着互联网时代的来临,使得传统的家教模式已不复存在,亟需一种方便、快捷的在线教学平台。因此,利用Java语言作为支撑和MySQL数据库存储数据,结合微信小程序的便利性,为用户开发出了一个更加人性化、方便的家庭…

超越sora,最新文生视频CogVideoX-5b模型分享

CogVideoX-5B是由智谱 AI 开源的一款先进的文本到视频生成模型,它是 CogVideoX 系列中的更大尺寸版本,旨在提供更高质量的视频生成效果。 CogVideoX-5B 采用了 3D 因果变分自编码器(3D causal VAE)技术,通过在空间和时…

ps证件照蓝底换白底

ps证件照蓝底换白底 1、打开 Photoshop,导入需要处理的照片。 2、左侧工具栏中选择“魔棒工具”,点击证件照的背景区域进行选择。 3、使用快捷键 Shift F5 或者从顶部菜单选择“编辑” -> “填充”,在弹出的对话框中选择“填充内容”中…

【全网最全】2024年华为杯研究生数学建模A题成品论文

您的点赞收藏是我继续更新的最大动力! 一定要点击如下的卡片,那是获取资料的入口! 点击链接获取群聊【2024华为杯研赛资料汇总】:https://qm.qq.com/q/yB6JDUTaWAhttps://qm.qq.com/q/yB6JDUTaWAA题第一问是关于如何建立一个低复杂度模型&a…

【M-LOAM学习】

M-LOAM(INITIALIZATION) Article Analysis Scan-Based Motion Estimation 通过在consecutive frame (each LiDAR)(因为omp parallel)中寻找correspondences然后通过最小化所有考虑feature之间residual error的transformation between frame to frame 针…

通过解预测和机器学习促进蚁群优化

文章目录 Abstract1. Introduction2. Background and related work2.1 定向越野问题2.2 ACO优化3. 基于预测的蚁群优化算法3.1 构建训练集3.2 训练与解预测3.3 将预测解融入蚁群优化Abstract ML - ACO 算法的第一阶段,使用一组已知最优解的小定向越野问题实例训练一个 ML 模型…

tornado

Tornado通过使用非阻塞网络1/0,可以扩展到数以万计的开放链接,非常适合 长时间轮询,WebSockets和其他需要与每个用户建立长期连接的应用程序。 特点 注重性能优越,速度快解决高并发异步非阻塞websockets 长连接内嵌了HTTP服务器…

Linux 一些快捷键使用操作技巧

ctrl c : 强制停止 如图仅输入tail命令时程序会卡住,这时就需要强制停止 ctrl d : 退出或者登出 history : 查看历史输入命令 !命令 :自动执行上一次匹配前缀的命令 (注意不要用这个命令执行太过久远的,容易执行错误…

AWS 管理控制台

目录 控制台主页 AWS 账户信息 AWS 区域 AWS 服务选择器 AWS 搜索 AWS CloudShell AWS 控制面板小部件 控制台主页 注册新的 AWS 账户并登录后,您将看到控制台控制面板。这是与各种 AWS 服务以及其他重要控制台组件进行交互的起点。控制面板由页面顶部的导航…

C语言 | Leetcode C语言题解之第423题从英文中重建数字

题目&#xff1a; 题解&#xff1a; char * originalDigits(char * s) {int lenstrlen(s);int arr[26]{0},num[10]{0},cot0;for(int i 0; i < len; i)arr[s[i] - a];num[0] arr[z-a];num[2] arr[w-a];num[4] arr[u-a];num[6] arr[x-a];num[8] arr[g-a];num[1] arr[o…

nginx upstream转发连接错误情况研究

本次测试用到3台服务器&#xff1a; 192.168.10.115&#xff1a;转发服务器A 192.168.10.209&#xff1a;upstream下服务器1 192.168.10.210&#xff1a;upstream下服务器2 1台客户端&#xff1a;192.168.10.112 服务器A中nginx主要配置如下&#xff1a; log_format main…

双向链表:实现、操作与分析【算法 17】

双向链表&#xff1a;实现、操作与分析 引言 双向链表&#xff08;Doubly Linked List&#xff09;是链表数据结构的一种重要形式&#xff0c;它允许节点从两个方向进行遍历。与单向链表相比&#xff0c;双向链表中的每个节点不仅包含指向下一个节点的指针&#xff08;或引用&…

C语言 | Leetcode C语言题解之第429题N叉树的层序遍历

题目&#xff1a; 题解&#xff1a; #define MAX_LEVE_SIZE 1000 #define MAX_NODE_SIZE 10000int** levelOrder(struct Node* root, int* returnSize, int** returnColumnSizes) {int ** ans (int **)malloc(sizeof(int *) * MAX_LEVE_SIZE);*returnColumnSizes (int *)mal…