【Flink系列五】Checkpoint及Barrier原理

本章内容

  • 一致性检查点
  • 从检查点恢复状态
  • 检查点实现算法-barrier
  • 保存点Savepoint
  • 状态后端(state backend)

本文先设置一个前提,流处理的数据都是可回放的(可以理解成消费的kafka的数据)

一致性检查点(checkpoints)

图1

  • checkpoint是Flink故障恢复的核心,全称是应用状态的一致性检查点
  • 有状态流应用的一致性检查点,其实就是所有任务处理完数据的状态,在某个时间点的一份拷贝(一份快照,存储在状态后端),这个时间点,应用是所有任务能恰好处理完一个相同的输入数据的时候

(图1中不考虑时间,假设1、2、3、4、5、6、7为source源,even为偶数6=2+4,odd为奇数求和9=1+3+5,此时5这个数据在所有tasks都处理完成了,每个任务都会提交一份快照给JM,最终这份拓扑结构(source任务状态是5、sum_even状态是6、sum_odd状态是9)称为checkpoint)

从检查点恢复状态

图2

  • 在执行流应用期间,Flink会定期保存状态的一致性检查点
  • 如果发生故障,Flink会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程

(假设处理到7这个数据的时候,sum_even=2+4+6=12,sum_odd在处理7这个数据的时候fail了,应该如果恢复数据呢)

第一步:遇到故障之后,重启受影响的应用,应用重启的之后,所有任务的状态都是空的

图3

第二步:从checkpoint中读取状态,将状态重置,从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同(回到了和图1相同的状态,如果算子设置了并行度,也可以恢复)。恢复后,source任务必须从检查点恢复的结果后开始读取数据(必须从6开始读取数据)

图4

第三步:开始消费并处理检查点到发生故障之间的所有数据。(处理完7后,sum_even=2+4+6=12,sum_odd=1+3+5+7=16, 所有tasks都处理完后,又会提交一个checkpoint)

图5

这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有的算子都会保存检查点并恢复其所有状态,这样依一来所有的输入流就都会被重置到检查点完成时的位置。

检查点的实现算法

基于Chandy-Lamport算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用

思考一个问题:flink如何判断某个数据已经处理完了呢?(比如图1的offset=5的数据)

答案:是否在每个数据后面跟一个标记,当读到这个标记的时候触发task状态的保存

检查点分界线(checkpoint barrier)
  • Flink的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
  • 分界线之前到来的数据导致的状态更改,都会包含在当前分界线所属的检查点中;二基于分界线之后的数据导致的所有更改,就会包含在之后的检查点中

图6

barrier有很多叫法,如检查点屏障等

分析一下barrier的工作流程,假设现在有这样的一个场景:有两个输入流的应用程序,用并行的两个source任务来读取(可以认为kafka的两个分区,source并行度设置为2),如图7所示。barrier也是和watermark一样,都是通过广播的方式传递给下游算子

图7

(source任务的并行度=2,sum任务的并行度也是2,sink任务的并行度也是2。)

如图7,两个流的数据都是1、2、3、4、5、6;蓝色数字圆圈代表最后一个处理的是蓝流里面的数据,黄色数字圆圈代表最后一个处理的是黄流里面的数据。

图8

图8中两条流的情况下,barrier如何传递呢?(watermark是取上游分区的最小值)下面一起来看一下

图9

barrier是怎么产生的?

答:JobManager会向每个source任务(同时发给并行的source任务)发送一条带有检查点ID的消息(蓝色三角形2),通过这种方式来启动检查点。产生barrier的过程中,不会影响下游task的正常工作(图9相比图8黄2和蓝2都sink完成了)图9中barrier(ID=2)插入在stream1的3后面,stream2的4后面

图10

barrier随着数据流动,广播到下游,source任务处理完barrier(ID=2)后,会向状态后端发送checkpoint,记录此时的状态。图10相比图9蓝3和黄4都被sum任务处理了。

  • 数据源将他们的状态写入检查点,并发出一个检查点barrier
  • 状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成

sum_even收到上游所有的barrier之后,才能去做checkpoint状态保存,这就叫做Barrier对齐(分分界线对齐)

图11

  • 分界线对齐:barrier向下游传递,sum任务会等待所有的输入分区的barrier到达
  • 对于barrier已经到达的分区,继续到达的数据会被缓存
  • 而barrier尚未到达的分区,数据会被正常处理

图11中的sum_even中的蓝4需要被缓存,因为来自上游任务的黄色barrier(ID=2)还未到达。(stream1有可能在同一个slot,stream2和stream1跨slot,可能barrier到达的时间会不一致)

图12

  • 当收到所有分区的barrier时,任务就讲其状态保存到状态后端的检查点中,然后barrier继续向下游广播

图12中,barrier(ID=2)继续向下游广播。此时蓝色4会从缓存中拿出来做接下来的计算

图13

图13中,sum_even处理完4+8=12,以及4+6+8=18,任务开始正常的数据处理

图14

  • sink任务向JobManager确认状态保存到checkpoint完毕
  • 当所有的任务都确认已经成功将状态保存到检查点时,检查点就真正完成了(3-4-8-8拓扑保存完成)

最终JobManager会向所有的任务确认task的状态是否正确,确认完成后任务完成。

保存点

  • Flink还提供了自定义的镜像保存功能,就是保存点(savepoints)
  • 原则上,创建保存点使用的算法与检查点的完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
  • Flink不会自动创建保存点,因此用户(或者外部调度系统)必须明确的触发创建操作
  • 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等

状态后端

Flink 提供了三种可用的状态后端用于在不同情况下进行状态的保存

  • MemoryStateBackend

内存级的状态后端,将监控状态作为内存中的对象进行管理,将他们存储在TM的JVM堆上,而将checkpoint存储在JM的内存中

  • FsStateBackend

将checkpoint存储到远程的持久化系统FileSystem中,而对于本地状态,和MemotyStateBackend一样,也会存储在TM的JVM堆上

  • RocksDBStateBackend

将所有的状态序列化后,存入本地的RocksDB中(注意:RocksDb的支持并不直接包含在Flink中,需要引入依赖),RocksDBStateBackend 是唯一支持增量快照的状态后端。

后续补充具体的代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/212549.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单片机第三季-第四课:STM32下载、MDK和调试器

目录 1,扩展板使用的STM32芯片类型 2,使用普中科技软件下载程序 3,keil介绍 4,JLINK调试器介绍 5,使用普中的调试器进行debug 6,使用Simulator仿真 1,扩展板使用的STM32芯片类型 扩展版…

如何用docker在自己服务器上部署springboot项目

一、将springboot项目打包 1、maven clean项目 2、maven package项目 打包成功之后生成jar文件(在target目录下) 3、为Java创建Dockerfile 引入jdk8的Docker镜像 FROM openjdk:8 为了使运行其余命令时更容易,让我们设置映像的工作目录。这将…

1、初识 llvm源码编译 及virtualbox和ubuntu环境搭建

很久没更新了,最近准备研究逆向和加固,于是跟着看雪hanbing老师学习彻底搞懂ollvm,终于把所有流程跑通了,中间遇到了太多的坑,所以必须记录一下,能避免自己和帮助他人最好。 环境搭建太重要了,…

26、pytest使用allure解读

官方实例 # content of pytest_quick_start_test.py import allurepytestmark [allure.epic("My first epic"), allure.feature("Quick start feature")]allure.id(1) allure.story("Simple story") allure.title("test_allure_simple_te…

LabVIEW使用单板RIO开发远程监控电源信号

LabVIEW使用单板RIO开发远程监控电源信号 设计和构建用于智能电网的本地功耗分析系统,主要服务于领先的电力监控设备设计者和制造商。随着智能电网投资的增加,对于能够有效处理替代电源(如太阳能和风能)间歇性功率水平的技术需求…

vue 提交表单重复点击,重复提交防抖问题

问题:用户点击保存时,可能会多次点击。导致生成重复数据。 目标:多次点击时,1s内只允许提交一次数据。 解决方案: 1.在utils文件夹创建文件preventReClick.js export default {install (Vue) {// 防止按钮重复点击V…

lv11 嵌入式开发 IIC(上) 19

目录 1 IIC总线简介 1.1 串行、半双工(同时只能1收或者1发) 1.2 IIC总线通信过程 2 IIC总线信号实现 2.1 IIC总线寻址方式 2.2 起始信号和停止信号 2.3 字节传送与应答 2.4 同步信号 2.5 典型IIC时序 3 练习 1 IIC总线简介 1.1 串行、半双工&a…

2.5D封装与3D IC封装主流产品介绍

2.5D封装和3D IC封装都是新兴的半导体封装技术,它们都可以实现芯片间的高速、高密度互连,从而提高系统的性能和集成度。但是它们之间也存在一些差异和异同点。 1、3D 结构与 2.5D 有何不同? 首先,2.5D封装和3D IC封装的互连方式有…

Chrome浏览器禁止更新策略

在做爬虫过程中,需要用到Selenium驱动浏览器去做动态爬虫 这里我一般用到的是Chrome谷歌浏览器进行爬虫 但是,目前python和Chrome浏览器适配最好的是110.版本 尽管我用了很多种方法 去取消浏览器自动更新 但是 过一段时间 浏览器总是会自动更新到最新…

Linux Docker 安装Nginx

1.21、查看可用的Nginx版本 访问Nginx镜像库地址:https://hub.docker.com/_/nginx 2、拉取指定版本的Nginx镜像 docker pull nginx:latest #安装最新版 docker pull nginx:1.25.3 #安装指定版本的Nginx 3、查看本地镜像 docker images 4、根据镜像创建并运行…

TCP单聊和UDP群聊

TCP协议单聊 服务端: import java.awt.BorderLayout; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.V…

geemap学习笔记022:如何找出一年中最绿的一天?

前言 这虽然是一个问题,但是解决这个问题之后,就会学习到很多的内容。包括如何计算NDVI、如何进行镶嵌、如何获取影像的时间等等。 1 导入库并显示地图 import ee import geemapee.Initialize() Map geemap.Map() Map2 定义一个感兴趣区(…

新书推荐——《Copilot和ChatGPT编程体验:挑战24个正则表达式难题》

《Copilot和ChatGPT编程体验:挑战24个正则表达式难题》呈现了两方竞争的格局。一方是专业程序员David Q. Mertz,是网络上最受欢迎的正则表达式教程的作者。另一方则是强大的AI编程工具OpenAI ChatGPT和GitHub Copilot。 比赛规则如下:David编…

【Flink系列二】如何计算Job并行度及slots数量

接上文的问题 并行的任务,需要占用多少slot ?一个流处理程序,需要包含多少个任务 首先明确一下概念 slot:TM上分配资源的最小单元,它代表的是资源(比如1G内存,而非线程的概念,好多…

PCL 点云最小二乘法拟合二维圆

文章目录 一、原理概述二、实现代码三、实现效果参考资料一、原理概述 二、实现代码 // 标准文件 #include <iostream>// PCL #include <pcl/io/pcd_io.h>

Python中的并发编程(2)线程的实现

Python中线程的实现 1. 线程 在Python中&#xff0c;threading 库提供了线程的接口。我们通过threading 中提供的接口创建、启动、同步线程。 例1. 使用线程旋转指针 想象一个场景&#xff1a;程序执行了一个耗时较长的操作&#xff0c;如复制一个大文件&#xff0c;我们希…

如何加快网络攻击发现速度

网络攻击可能会摧毁受害者。例如&#xff0c;米高梅度假村 (MGM Resorts) 预计将因 9 月份的网络攻击而遭受 1 亿美元的损失。 鲜为人知的是&#xff0c;在许多情况下&#xff0c;借助网络攻击发现可以预防网络攻击或将其消灭在萌芽状态。 威胁行为者变得越来越复杂&#xff…

【计算机网络笔记】物理层——频带传输基础

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

文件同步及实现简单监控

1. 软件简介 rsync rsync 是一款开源的、快速的、多功能的、可实现全量及增量的本地或远程 数据同步备份的优秀工具。在同步备份数据时&#xff0c;默认情况下&#xff0c;Rsync 通过其 独特的“quick check”算法&#xff0c;它仅同步大小或者最后修改时间发生变化的文 件或…

Linux_CentOS_7.9配置oracle sqlplus、rman实现上下按键切换历史命令等便捷效率功能之简易记录

配置oracle sqlplus以及rman可以上下按键切换历史命令等便捷效率功能 设置前提是已经yum安装了rlwrap软件具体软件下载及配置参考文章http://t.csdnimg.cn/iXuVK su - oracleVim .bash_profile ## 文件中增加如下的别名设置 ---------------- alias sqlplusrlwrap sqlplus…