Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(),initDelay, baseInterval, TimeUnit.MILLISECONDS);}

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isCanceled) throws Exception {checkNotNull(options);checkNotNull(metrics);// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignmentsif (lastCheckpointId >= metadata.getCheckpointId()) {LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());channelStateWriter.abort(metadata.getCheckpointId(),new CancellationException("checkpoint aborted via notification"),true);checkAndClearAbortedStatus(metadata.getCheckpointId());return;}// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());return;}// if checkpoint has been previously unaligned, but was forced to be aligned (pointwise// connection), revert it here so that it can jump over output dataif (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {options = options.withUnalignedSupported();initInputsCheckpoint(metadata.getCheckpointId(), options);}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstreamLOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}",taskName,System.currentTimeMillis(),metadata.getTimestamp(),System.currentTimeMillis() - metadata.getTimestamp());CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());// Step (3): Register alignment timer to timeout aligned barrier to unaligned barrierregisterAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);// Step (4): Prepare to spill the in-flight buffers for input and outputif (options.needsChannelState()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (5): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());try {if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {finishAndReportAsync(snapshotFutures, metadata, metrics, options);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}}

代码中可以看到构造了CheckpointBarrier, source将barrier当成数据广播给下游的所有节点。使用的方法就是operatorChain.brodacastEvent()。这里就回到最开始提到的异步屏障快照算法。

下游收到了barrier,如何进行快照处理的?flink同时有多种类型的checkpoint,他们分别的处理时机是啥,后面我会进一步进行代码分析。

CheckpointBarrier checkpointBarrier =new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/316573.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VSCode SSH连接远程主机失败,显示Server status check failed - waiting and retrying

vscode ssh连接远程主机突然连接不上了&#xff0c;终端中显示&#xff1a;Server status check failed - waiting and retrying 但是我用Xshell都可以连接成功&#xff0c;所以不是远程主机的问题&#xff0c;问题出在本地vscode&#xff1b; 现象一&#xff1a; 不停地输入…

软考-信息系统项目管理师-论文技术架构模板(60天备考第26天)

分享一段信息系统项目管理师论文项目技术架构描述的万能模板&#xff0c;供大家参考。距离考试还有二十八天&#xff0c;如果论文写不好的可以加微进论文指导群学习论文写作。 该系统前端基于Vue开发&#xff0c;后端基于java开发&#xff0c;前后端分离部署。整体采用B/S架构&…

STM32 学习13 低功耗模式与唤醒

STM32 学习13 低功耗模式与唤醒 一、介绍1. STM32低功耗模式功能介绍2. 常见的低功耗模式&#xff08;1&#xff09;**睡眠模式 (Sleep Mode)**:&#xff08;2&#xff09;**停止模式 (Stop Mode)**:&#xff08;3&#xff09;**待机模式 (Standby Mode)**: 二、睡眠模式1. 进入…

Golang基础1-基本类型、if、switch、string

基本类型 bool 整数&#xff1a;byte(相当于uint8), rune(相当于int32), int/uint ,int8/uint8 ,int16/uint16 ,int32/uint32 ,int64/uint64 浮点数: float32 ,float64, complex64 ,complex128 array&#xff08;值类型&#xff09;、slice、map、chan&#xff08;引用类型…

西瓜书学习——决策树形状、熵和决策树的本质

文章目录 决策树形状监督学习算法分类与回归 熵信息熵香农熵 (Shannon Entropy) - H(X)联合熵 (Joint Entropy) - H(X, Y)条件熵 (Conditional Entropy) - H(Y|X)互信息 (Mutual Information) - I(X; Y)相对熵 (Relative Entropy) / KL散度 (Kullback-Leibler Divergence) - DK…

小程序使用阿里巴巴矢量图标库

一、登录官网 www.iconfont.cn 二、在搜索框中搜索想要的图标&#xff0c;将鼠标移动到图标上会看到三个标记 可以使用下载&#xff0c;直接使用&#xff1a; 可以使用css文件使用&#xff1a; 首先点击购物车样式的选项&#xff0c;而后点击下图位置&#xff1a; 点击自己创…

【python笔记】datafram的时间动态可视化 pyecharts地图

import pandas as pd# 假设DataFrame是这样的&#xff1a; df pd.DataFrame({ year: [2014, 2015, 2016, 2014, 2015, 2016, 2014, 2015, 2016], province: [广东省, 广东省, 河南省, 湖南省, 北京市, 北京市, 上海市, 新疆维吾尔自治区, 上海市], values: [100, 150, 75…

tomcat 配置支持 ssl 附效果图

1、修改tomcat配置文件server.xml: vim ./conf/server.xml 把配置文件&#xff1a; <Connector port"8088" Server" " protocol"HTTP/1.1"connectionTimeout"20000"redirectPort"8443" URIEncoding"UTF-8" …

可平滑替代FTP的FTP替代解决方案,具有哪些强大功能?

FTP是一种广泛使用的文件传输协议&#xff0c;主要用于在网络上的计算机之间传输文件。具有以下特点&#xff1a; 1.简单易用&#xff1a;FTP协议相对简单&#xff0c;易于设置和使用&#xff0c;许多操作系统和应用程序都内置了对FTP的支持。 2.广泛的客户端支持&#xff1a…

Vue生命周期都有哪些?

定义 Vue的生命周期就是实例从创建到销毁的一个过程&#xff0c;即从创建、初始化数据、编译模板、挂载Dom($el)->渲染、更新->渲染&#xff0c;卸载等一系列的过程。el是挂载点如<div id"app"></div>。 Vue的生命周期分为八个阶段 1.beforeCreate…

Spring Data JPA数据批量插入、批量更新真的用对了吗

Spring Data JPA系列 1、SpringBoot集成JPA及基本使用 2、Spring Data JPA Criteria查询、部分字段查询 3、Spring Data JPA数据批量插入、批量更新真的用对了吗 前言 在前两篇文章已经介绍过&#xff0c;在使用Spring Data JPA时&#xff0c;DAO层的Respository通过继承J…

【基础算法总结】双指针算法二

双指针 1.有效三角形的个数2.和为S的两个数字3. 三数之和4.四数之和 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496; 你的支持是对我最大的鼓励&#xff0c;我们一起努力吧!&#x1f603;&#x1f603; 1.有效三角形的个数 题目…

react实现时钟翻牌效果

需求&#xff1a;随着数字的变动要求有时钟翻动动效 问题&#xff1a;只在加载时有动效 解决方案&#xff1a;通过判断数字改变&#xff08;这里通过新旧数值变动来判断&#xff0c;不贴代码啦&#xff09;&#xff0c;每次变动的时候手动把animationIterationCount设置为inf…

Android --- 网络请求

通常在 Android 中进行网络连接一般使用 Scoket 和HTTP&#xff0c;HTTP 请求方式比 Scoket 多。HTTP 请求一般采用原生的 HttpClient 和 HttpUrlConnection 的两种网络访问方式&#xff08;系统自带的&#xff09;。但是在 Android 5.0 的时候 Google 就不推荐使用 HttpClient…

Python自学篇3-PyCharm开发工具下载、安装及应用

一、Python开发工具 自学篇1中讲到了安装Python之后出现的几个应用程序&#xff0c;其中IDLE、Python.exe都可以用来编写python程序&#xff0c;也可以进行调试&#xff1b;但是比较基础&#xff0c;比较原始&#xff0c;调试不方便&#xff0c;界面也不友好&#xff0c;需要更…

笔记本电脑耗电和发热比较厉害怎么处理

工作中会遇到有同事反馈笔记本电脑耗电和发热比较厉害&#xff0c;主要检查以下几个地方 1、CPU频率 很多人觉得是cpu使用率高就代表电脑跑得快&#xff0c;发热量就大&#xff0c;其实不是的&#xff0c;主要是看的cpu频率&#xff0c;频率越高&#xff0c;电脑发热量越大。如…

Visual Studio中怎样更改Nuget程序包源

场景 Visual Studio 2019 在使用NuGet添加依赖包时&#xff0c;在预览中搜索不到程序包。 排查下NuGet的程序包源为本地。 将程序包源修改下。 实现 在解决方案上右击选择管理解决方案中的NuGet程序包(在 Visual Studio 中打开“工具”>“选项”>“NuGet 包管理器”…

idea上传项目到gitee(码云)

1、打开码云&#xff0c;新建仓库 2、创建 3、这就是创建成功的页面 4、复制仓库地址&#xff0c;后面需要用到 2、打开我们的项目&#xff1a;例如我现在的项目 1、idea创建git仓库 2、选择我们项目文件夹的目录 3、查看文件是否变色&#xff0c;变色表示成功了 4、添加到缓…

STM32的GPIO输入和输出函数详解

系列文章目录 STM32单片机系列专栏 C语言术语和结构总结专栏 文章目录 1. GPIO模式 2. GPIO输出 2.1 RCC 2.2 GPIO 3. 代码示例 3.1 RCC时钟 3.2 GPIO初始化 3.3 GPIO输出函数 3.4 推挽输出和开漏输出 4. GPIO输入 4.1 输入模式 4.2 数据读取函数 5. C语言语法 1…

为什么很多企业都使用OV SSL证书

我们要了解什么是SSL OV证书 SSL OV证书&#xff0c;即组织验证型SSL证书&#xff0c;它要求证书颁发机构对申请证书的组织进行身份验证&#xff0c;确认组织的真实性后&#xff0c;才会发放证书。这种验证方式提高了安全性&#xff0c;因为它确保了证书背后的实体是真实存在的…