Flink源码之Checkpoint执行流程

checkpoint

Checkpoint完整流程如上图所示:

  1. JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint
  2. SourceTask向下游广播CheckpointBarrier
  3. SouceTask完成状态快照后向JobMaster发送快照结果
  4. 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果
  5. JobMaster保存SubTask快照结果
  6. JobMaster收到所有SubTask快照结果后保存快照信息,想SubTask通知Checkpoint完成

以下对整个流程具体说明。

CheckpointCoordinator

JobMaster将JobGraph转换为ExecutionGraph时,如果开启Checkpoint,会为ExecutionGraph生成一个CheckpointCoordinator

DefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraphDefaultExecutionGraph::newDefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertexDefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopologyDefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinatorDefaultExecutionGraph::createCheckpointPlanCalculator//创建DefaultCheckpointPlanCalculatorCheckpointCoordinator::new 

CheckpointCoordinator封装了StateBackend和CheckpointStorage

StateBackend负责管理状态:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

CheckpointStorage则是负责存储StateBackend管理的状态:

  • JobManagerCheckpointStorage //checkpoint state放入JobManager内存
  • FileSystemCheckpointStorage //配置state.checkpoints.dir时

在为StreamTask构造SubtaskCheckpointCoordinatorImpl时会调用:

CheckpointStorage::createCheckpointStorage

创建CheckpointStorageAccess用于执行Checkpoint时解析状态存储位置

  • MemoryBackendCheckpointStorageAccess //对应JobManagerCheckpointStorage
  • FsCheckpointStorageAccess //对应FileSystemCheckpointStorage

CheckpointCoordinator在执行状态快照时会调用

CheckpointStorageAccess::resolveCheckpointStorageLocation

生成CheckpointStreamFactory用于生成读写状态数据流

  • MemCheckpointStreamFactory //对应JobManagerCheckpointStorage
  • FsCheckpointStreamFactory //对应FileSystemCheckpointStorage

Checkpoint触发流程

JobMaster状态转换为running后,通过CheckpointCoordinator向SourceTask发送TriggerCheckpoint

JobMaster端触发流程

JobMaster::start  //RPCServer启动
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
SchedulerBase::transitionToRunningDefaultExecutionGraph::transitionToRunning //调用ExecutionGraph监听器通知状态变化CheckpointCoordinatorDeActivator::jobStatusChanges//触发checkpointCheckpointCoordinator::startCheckpointSchedulerCheckpointCoordinator::scheduleTriggerWithDelay //定时不断触发CheckpointCheckpointCoordinator::triggerCheckpointCheckpointCoordinator::startTriggeringCheckpointDefaultCheckpointPlanCalculator::calculateCheckpointPlan//Plan中会隔离出SourceTask作为作为Trigger Checkpoint的入口CheckpointCoordinator::createPendingCheckpointCheckpointCoordinator::triggerCheckpointRequestCheckpointCoordinator::triggerTasks Execution::triggerCheckpoint //向每个SourceTask发送TriggerCheckpoint请求Execution::triggerCheckpointHelperTaskManagerGateway::triggerCheckpoint//向TaskExecutor发RPC

StreamTask端执行流程

SourceTask

SourceTask由JobMaster RPC直接触发,执行时先广播CheckpointBarrier,然后对状态执行异步快照

TaskExecutor::triggerCheckpoint
Task::triggerCheckpointBarrier
AbstractInvokable::triggerCheckpointAsync
SourceStreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsyncInMailbox
StreamTask::performCheckpoint
SubtaskCheckpointCoordinatorImpl::checkpointStateOperatorChain.broadcastEvent //广播CheckpointBarrier
CheckpointStorage::createCheckpointStorage//为JobId创建CheckpointStorageAccess
SubtaskCheckpointCoordinatorImpl::takeSnapshotSync
CheckpointStorageWorkerView::resolveCheckpointStorageLocation//CheckpointStorageAccess创建 CheckpointStreamFactoryOperatorChain::snapshotState //对每个OperatorRegularOperatorChain::buildOperatorSnapshotFuturesRegularOperatorChain::checkpointStreamOperatorAbstractStreamOperator::snapshotStateStreamOperatorStateHandler::snapshotState//调用Operator/Keyed Backend的snapshotStateSnapshotContextSynchronousImpl::newAbstractUdfStreamOperator::snapshotState //调用UDF中snapshotState方法,一般用于更新OperatorStateDefaultOperatorStateBackend::snapshotSnapshotStrategyRunner::snapshotDefaultOperatorStateBackendSnapshotStrategy::syncPrepareResources//深copy operator state,便于后续进行异步快照DefaultOperatorStateBackendSnapshotStrategy::asyncSnapshot//异步快照					  	  CheckpointStateOutputStream::closeAndGetHandleOperatorStreamStateHandle::new //包装元信息及数据StreamStateHandleHeapKeyedStateBackend::snapshotSnapshotStrategyRunner::snapshotHeapSnapshotStrategy::syncPrepareResourcesHeapSnapshotStrategy::asyncSnapshot //采用COWSateTable异步快照CheckpointStateOutputStream::closeAndGetHandleKeyGroupsStateHandle::new //包装KeyGroup及数据StreamStateHandle
SubtaskCheckpointCoordinatorImpl::finishAndReportAsync //向JobMaster发送checkpoint的结果AsyncCheckpointRunnable::new AsyncCheckpointRunnable::runAsyncCheckpointRunnable::finalizeNonFinishedSnapshotsOperatorSnapshotFinalizer::new //等待TaskSnapshot状态信息序列化完成AsyncCheckpointRunnable::reportCompletedSnapshotStatesTaskStateManagerImpl::reportTaskStateSnapshotsRpcCheckpointResponder::acknowledgeCheckpoint//向JobMaster发送Ack,带上State信息
非SourceTask

在StreamTask启动后调用StreamTask::processInput不断读取数据进行处理, 非SourceTask在收到上游的CheckpointBarrier对齐后触发Checkpoint,

StreamTask::processInput
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理CheckpointedInputGate::pollNextCheckpointedInputGate::handleEventSingleCheckpointBarrierHandler::processBarrierSingleCheckpointBarrierHandler::markCheckpointAlignedAndTransformStateWaitingForFirstBarrier::barrierReceivedAbstractAlignedBarrierHandlerState::barrierReceivedSingleCheckpointBarrierHandler.ControllerImpl::allBarriersReceived//判断对齐AbstractAlignedBarrierHandlerState::triggerGlobalCheckpointSingleCheckpointBarrierHandler.ControllerImpl::triggerGlobalCheckpointSingleCheckpointBarrierHandler::triggerCheckpointCheckpointBarrierHandler::notifyCheckpoint //触发StreamTask CheckpointStreamTask::triggerCheckpointOnBarrierStreamTask::performCheckpoint //后续调用过程与SourceTask一样SubtaskCheckpointCoordinatorImpl::checkpointState   		

根据调用栈看出,非SourceStreamTask执行Checkpoint只是触发时机不同,SourceTask由JobMaster RPC定时不断触发,非SourceTask则是在上游的CheckpointBarrier对齐后触发Checkpoint,最终执行逻辑都是将当前算子的信息写入CheckpointStorage后向JobMaster发送确认信息。

StreamTask向JobMaster ACK信息中包含状态元信息及StreamStateHandle,根据状态存储位置分为:

  • ByteStreamStateHandle //对应JobManagerCheckpointStorage,将状态序列化为byte[]发送给JobMaster
  • FileStateHandle //对应FileSystemCheckpointStorage,将状态写入文件系统后将文件路径发送给JobMaster

JobMaster端完成流程

JobMaster收到StreamTask的acknowledgeCheckpoint后:

JobMaster::acknowledgeCheckpoint
SchedulerBase::acknowledgeCheckpoint
ExecutionGraphHandler::acknowledgeCheckpoint
CheckpointCoordinator::receiveAcknowledgeMessagePendingCheckpoint::acknowledgeTask //某一个Task的确认PendingCheckpoint::updateOperatorState//更新SubTask状态信息CheckpointCoordinator::completePendingCheckpoint//所有Task Ack后PendingCheckpoint::finalizeCheckpointCheckpoints.storeCheckpointMetadata//保存CheckpointMetadataCompletedCheckpoint::newCheckpointCoordinator::sendAcknowledgeMessages//向Task通知Checkpoint完成消息ExecutionVertex::notifyCheckpointCompleteTaskManagerGateway.notifyCheckpointComplete

JobMaster收到所有StreamTask的Checkpoint状态信息后,标志一次Checkpoint完成,这时会通知StreamTask CheckPoint完成消息,便于SubTask监听Checkpoint完成后做后续动作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/106149.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux:权限

目录 一、shell运行原理 二、权限 1.权限的概念 2.文件访问权限的相关设置方法 三、常见的权限问题 1.目录权限 2.umsk(权限掩码) 3.粘滞位 一、shell运行原理 1.为什么我们不是直接访问操作系统? ”人“不善于直接使用操作系统如果让人直接访问操作系统&a…

【网络安全】防火墙知识点全面图解(三)

本系列文章包含: 【网络安全】防火墙知识点全面图解(一)【网络安全】防火墙知识点全面图解(二)【网络安全】防火墙知识点全面图解(三) 防火墙知识点全面图解(三) 39、什…

vscode 与 C++

序 具体流程的话,官方文档里都有的:C programming with Visual Studio Code 浏览器下载一个mingw64,解压,配置环境变量vscode里安装c相关的插件没了 第一步只看文字,可能有点抽象,相关视频: …

git介绍+集成到IDEA中+使用gitee

目录 git介绍 本地工作流程 IDEA集git 添加到暂存区 添加到本地仓库 gitee使用 添加到远程仓库 git介绍 git是一个开源的分布式版本控制工具,效率高。可以记录历史代码,多人代码共享 知识小点: 集中式版本控制:使用中央存…

RabbitMQ集群搭建和测试总结_亲测

RabbiMQ简介 RabbitMQ是用Erlang开发的,集群非常方便,因为Erlang天生就是一门分布式语言,但其本身并不支持负载均衡。 RabbitMQ模式 RabbitMQ模式大概分为以下三种: (1)单一模式。 (2)普通模式(默认的集群模式)。 (3)镜像模式(把需要的队列…

【力扣每日一题】2023.8.26 汇总区间

目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 题目给我们一个有序数组,让我们把数组内的元素汇总区间,也就是说有一串数字是连续的,比如是 1 2 3 4…

空时自适应处理用于机载雷达——元素空间空时自适应处理(Matla代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Spring Cloud Alibaba-Sentinel-Sentinel入门

1 什么是Sentinel Sentinel (分布式系统的流量防卫兵) 是阿里开源的一套用于服务容错的综合性解决方案。它以流量为切入点, 从流量控制、熔断降级、系统负载保护等多个维度来保护服务的稳定性。Sentinel 具有以下特征: 丰富的应用场景:Sentinel 承接了阿里…

使用Java开发Jmeter自定义取样器(Sampler)插件

文章目录 1、Jmeter自定义取样器扩展类2、SpringBoot服务器端http测试例子3、自定义取样器实现3.1、默认界面的AbstractJavaSamplerClient扩展实现3.2、自定义界面的AbstractSamplerGui扩展实现 3、自定义取样器运行效果3.1、AbstractJavaSamplerClient运行效果3.2、AbstractSa…

工厂生产作业流程合规检测

工厂生产作业流程合规检测系统通过yolov7网络模型算法,工厂生产作业流程合规检测对作业人员的操作行为进行全面监测,通过图像识别算法和数据分析,对人员的操作动作、工具使用、安全防护等方面进行检测和评估,能够实时监测工人的操…

9.2 互补功率放大电路

目前使用最广泛的是无输出变压器的功率放大电路(OTL 电路)和无输出电容的功率放大电路(OCL 电路)。 一、OCL 电路的组成及工作原理 为了消除图9.1.5所示的基本 OCL 电路所产生的交越失真,应当设置合适的静态工作点&a…

YOLOv5算法改进(7)— 添加SimAM注意力机制

前言:Hello大家好,我是小哥谈。SimAM(Similarity-based Attention Mechanism)是一种基于相似度的注意力机制,它的原理是通过计算查询向量与每个键向量之间的相似度,从而确定每个键向量对于查询向量的重要性…

开始MySQL之路——MySQL约束概述详解

MySQL约束 create table [if not exists] 表名(字段名1 类型[(宽度)] [约束条件] [comment 字段说明],字段名2 类型[(宽度)] [约束条件] [comment 字段说明],字段名3 类型[(宽度)] [约束条件] [comment 字段说明] )[表的一些设置]; 概念 约束英文:constraint 约束实…

vscode | 开发神器vscode自定义用户代码片段

目录 一、增加二、删除三、语法四、变量 一、增加 点击:左下角设置齿轮按钮——>用户代码片段 点击:新建全局代码片段文件 输入文件名 会出现如下界面 配置以下语句 "cls": {"scope": "javascript,typescript",…

Linux学习之Ubuntu 20.04在github下载源码安装Openresty 1.19.3.1

参考的博文:《在 Ubuntu 上使用源码安装 OpenResty》 《OpenResty 安装安装详解-Ubuntu》 《Linux学习之CentOS 7源码安装openresty》 https://openresty.org/en/download.html是官网下载网址,页面往下拉有下载的链接。 https://github.com/openresty…

研磨设计模式day09原型模式

目录 场景 代码实现 有何问题 解决方案 代码改造 模式讲解 原型与new 原型实例与克隆出来的实例 浅度克隆和深度克隆 原型模式的优缺点 思考 何时选用? 相关模式 场景 代码实现 定义订单接口 package com.zsp.bike.day08原型模式;/*** 订单的接口*…

06-Numpy基础-线性代数

线性代数(如矩阵乘法、矩阵分解、行列式以及其他方阵数学等)是任何数组库的重要组成部分。 NumPy提供了一个用于矩阵乘法的dot函数(既是一个数组方法也是numpy命名空间中的一个函数) x.dot(y)等价于np.dot(x, y) 符(…

【C++设计模式】用简单工厂模式实现按汽车重量输出汽车类型

2023年8月24日&#xff0c;周四凌晨 #include<iostream>class CarType{ public:virtual std::string getType()0; };class MiniCar:public CarType{ public:std::string getType() override{return "小型车";}; };class MidSizeCar:public CarType{ public:std…

管家婆往来分析功能介绍

往来分析是企业管理的重要工具之一&#xff0c;主要用于监控和查询与往来单位的业务往来情况&#xff0c;包括进货金额、付款金额、销售金额、回款情况、此前应收应付、应收应付余额、应收应付限额及其超限余额等。通过往来分析&#xff0c;企业可以更好地了解和控制与往来单位…

mysql 默认的4个数据库 介绍

mysql 存储MySQL的用户账号和权限信息&#xff0c;一些存储过程、事件的定义信息 一些运行过程中产生的日志信息&#xff0c;一些帮助信息以及时区信息等 information_schema 存储Mysql服务器 维护的所有其它数据库的信息&#xff0c;比如有哪些表、哪些视图、哪些触发器、哪…