Flink Checkpoint 流程解析
Checkpoint 流程解析
- Flink Checkpoint 流程解析
- Checkpint 流程概括
- Checkpoint 触发流程解析 (Flink 1.20)
- 任务启动后 JobManager 开始定期对任务执行 Checkpoint
- JobManager 使用 CheckpointCoordinator 触发 Checkpoint
- CheckpointCoordinator 初始化 Checkpoint 所需要的信息
- 触发所有 OperatorCoordinator Checkpoint
- 触发 MastersHooks 状态快照
- CheckpointCoordinator 通知子任务开始 Checkpoint
- 子任务开始触发 Checkpoint
- MailBoxProcessor 异步执行 Checkpoint 事件
- 初始化输入端状态
- 触发 StreamOperator 状态快照
- 下游算子接收到 CheckpointBarrier 后开始 Checkpoint
- CheckpointBarrierHandler 处理 Barrier
Checkpint 流程概括
任务运行后 JobMaster 定时执行 Checkpoint,JobMaster 会通过调用 CheckpointCoordinator 对作业进行 Checkpoint。
CheckpointCoordinator 开始进行 Checkpoint,它首先会先创建 PendingCheckpoint,然后开始给 Checkpoint 计时,再关闭网关开始触发 OperatorCoordinator 的 Checkpoint。
如果是 SourceOperatorCoordinator,则这时会调用 Source 的 getSplitSerializer,获取分片序列化器,然后将 SplitAssignmentTracker 中任务运行时分配的分片序列化创建 Snapshot,再将 Snapshot 放入 PendingCheckpoint 中。
OperatorCoordinator 状态触发完后,开始触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFStreamOperator 内部的实现 WithMasterCheckpointHook 接口的 Function 创建,用于在 Master 触发 Checkpoint 时,Function 需要进行的操作。
MasterHooks 调用完后,CheckpointCoordinator 将给子任务 TaskManager 发送请求,通知它们开始 Checkpoint。
TaskExecutor 获取相应的任务 Task,Task 调用 StreamTask 开始进行 Checkpoint,StreamTask 调用 Mailbox 执行 Checkpoint 事件,Mailbox 执行 Checkpoint 事件时, Source 将不会从数据源读取数据。
Checkpoint 事件开始执行,如果 Checkpoint 需要强制对齐,那么需要异步创建 Channel 和结果分区的数据快照, 随后在执行传播 Barrier 前,SubtaskCheckpointCoordinatorImpl 会调用 OperatorChain 让 Operator 进行 Barrier 前的准备操作,然后开始往下游传播 Barrier。
SubtaskCheckpointCoordinatorImpl 创建 CheckpointBarrier 并将 CheckpointBarrier 发送给 RecordWriterOutput 将 Barrier 传输给下游任务,然后注册 Barrier 对齐超时计时器。
Barrier 传播完后,如果之前创建了 Channel 状态快照 ,那么还需要异步完成 Channel Output 的数据快照。
最后 SubtaskCheckpointCoordinatorImpl 开始对当前子任务的所有算子进行 Checkpoint,这时会进行算子创建快照时的操作,算子状态是存储在 OperatorStateBackend 和 KeyedStateBackend 中的, SubtaskCheckpointCoordinator 将会创建 OperatorStateBackend 和 KeyedStateBackend 的状态快照。
下游任务这时是正常处理上游发送过来的数据的,但是上游正在进行 Checkpoint,数据也是被发送过来的 CheckpointBarrier 分割开了,处理到后面会接收到上游的 CheckpointBarrier,也就表示着当前 Checkpoint 上游快照数据已经处理完,下游也开始进行 Checkpoint 了,下游进行 Checkpoint 的过程也是和上面的一样,继续调用 SubtaskCheckpointCoordinatorImpl 开始进行 Checkpoint。
总的来说,Checkpoint 将创建 Coordinator 状态、托管键值状态、托管算子状态、未处理的键值状态、未处理的算子状态、输入通道状态和结果分区状态的快照。
Checkpoint 触发流程解析 (Flink 1.20)
任务启动后 JobManager 开始定期对任务执行 Checkpoint
-
Task 任务恢复
Task#restoreAndInvoke
-
…
-
更新任务状态为 RUNNING 状态,TaskExecutor 通知 JobMaster 任务状态更新
TaskManagerActions#updateTaskExecutionState
TaskExecutor.TaskManagerActionsImpl#updateTaskExecutionState
JobMasterGateway#updateTaskExecutionState
-
…
-
-
JobMaster 调用 SchedulerBase、DefaultExecutionGraph 更新任务状态,定期触发 Checkpoint
JobMaster#updateTaskExecutionState
SchedulerBase#updateTaskExecutionState
DefaultExecutionGraph#updateState
DefaultExecutionGraph#updateStateInternal
-
[CheckpointCoordinator 开始定期执行 Checkpoint](#JobManager 使用 CheckpointCoordinator 触发 Checkpoint)
CheckpointCoordinator#startCheckpointScheduler
-
JobManager 使用 CheckpointCoordinator 触发 Checkpoint
-
JobMaster 触发 Checkpoint
JobMaster#triggerCheckpoint
-
调度器触发 Checkpoint
SchedulerNG#triggerCheckpoint
-
从 ExecutionGraph 中获取 CheckpointCoordinator,创建 CheckpointTriggerRequest,并使用 CheckpointCoordinator 通过 CheckpointRequestDecider 决定需要处理的 Checkpoint 请求触发 Checkpoint
CheckpointCoordinator#triggerCheckpoint
CheckpointRequestDecider#chooseRequestToExecute
CheckpointCoordinator#startTriggeringCheckpoint
-
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
-
触发和通知所有 OperatorCoordinator 开始 Checkpoint
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
-
触发 MasterHooks 状态快照
CheckpointCoordinator#snapshotMasterState
MasterTriggerRestoreHook#triggerCheckpoint
-
CheckpointCoordinator 通知子任务开始 Checkpoint
-
-
-
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
-
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
- 计算 Checkpoint 执行计划
CheckpointPlanCalculator#calculateCheckpointPlan
-
校验所有任务是否已经初始化
-
如果有任务已经完成,那么创建所有任务完成后计算检查点的计划
DefaultCheckpointPlanCalculator#calculateAfterTasksFinished
-
如果没有任务完成,那么创建当所有任务都在运行时计算检查点的计划,该计划为所有任务都将标记为需要触发 Checkpoint,并将所有任务标记为需要等待和提交
DefaultCheckpointPlanCalculator#calculateWithAllTasksRunning
-
校验所有任务是否都在运行中
-
Checkpoint 计数加一
-
创建待处理的的 Checkpoint
CheckpointCoordinator#createPendingCheckpoint
-
追溯待处理的 Checkpoint 状态
CheckpointCoordinator#trackPendingCheckpointStats
-
创建一个新的挂起检查点跟踪器
CheckpointStatsTracker#reportPendingCheckpoint
-
报告单个子任务的统计信息
CheckpointCoordinator#reportFinishedTasks
-
-
创建待处理的的 Checkpoint(PendingCheckpoint)
-
开始 Checkpoint 计时,时间超时则取消 Checkpoint
-
返回待处理的的 Checkpoint
-
-
初始化 Checkpoint 地址
CheckpointCoordinator#initializeCheckpointLocation
-
如果该 Checkpoint 类型为 Savepoint,则初始化 Savepoint 地址
CheckpointStorageCoordinatorView#initializeLocationForSavepoint
-
否则,先初始化 Checkpoint Base 地址,再开始初始化地址
CheckpointStorageCoordinatorView#initializeBaseLocationsForCheckpoint
CheckpointStorageCoordinatorView#initializeLocationForCheckpoint
-
返回 Checkpoint 地址
-
触发所有 OperatorCoordinator Checkpoint
-
触发和通知所有 OperatorCoordinator 开始 Checkpoint
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
-
触发 OperatorCoordinator Checkpoint
OperatorCoordinatorCheckpoints#triggerAllCoordinatorCheckpoints
-
关闭网关,获取并等待所有事件完成
OperatorCoordinatorHolder#closeGateways
IncompleteFuturesTracker#getCurrentIncompleteAndReset
-
网关标记当前的 Checkpoint
-
OperatorCoordinator 触发 Checkpoint
OperatorCoordinator#checkpointCoordinator
-
根据 Coordinator 的 Checkpoint 后的状态创建并返回 CoordinatorSnapshot
-
-
通知所有 CheckpointCoordinator Checkpoint 结果
OperatorCoordinatorCheckpoints#acknowledgeAllCoordinators
-
触发 MastersHooks 状态快照
-
触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFOperator 内部的实现 WithMasterCheckpointHook 接口的 UDF 创建,表示在 Master 触发 Checkpoint 时,UDF 可以做什么。
CheckpointCoordinator#snapshotMasterState
MasterTriggerRestoreHook#triggerCheckpoint
CheckpointCoordinator 通知子任务开始 Checkpoint
-
CheckpointCoordinator 给子任务发送 Checkpoint 请求
CheckpointCoordinator#triggerCheckpointRequest
-
发送任务 Checkpoint 请求
CheckpointCoordinator#triggerTasks
-
向所有的 Exeuction 对应的 Taskmanager 网关发送 Checkpoint 请求,子任务接收到请求后会开始触发 Checkpoint
Execution#triggerCheckpointHelper
TaskManagerGateway#triggerCheckpoint
-
-
任务 Checkpoint 请求发送完后取消定时器
-
子任务开始触发 Checkpoint
-
TaskManager 触发指定子任务的 Checkpoint
TaskExecutor#triggerCheckpoint
Task#triggerCheckpointBarrier
-
创建 Checkpoint 元数据 CheckpointMetaData
-
算子 Mailbox 异步执行 Checkpoint,因为 Checkpoint 在 MailboxProcessor 执行,所以这时将不会有数据传入
CheckpointableTask#triggerCheckpointAsync
StreamTask#triggerCheckpointAsync
-
如果 InputGateway 分区数据未处理完成,则触发未完成的数据通道 Checkpoint
StreamTask#triggerUnfinishedChannelsCheckpoint
这这情况是考虑已完成任务的 Checkpoint ,如果非 Source 任务成为新的主任务,则可能会通过 RPC 触发检查点。在这种情况下,他们将通知该检查点的 CheckpointBarrierHandle。
-
创建一个 CheckpointBarrier,并通知所有未完成的 Channel 处理该 Barrier,并尝试触发 Checkpoint
CheckpointBarrierHandler#processBarrier
-
-
如果 InputGateway 分区数据已经处理完成,则直接开始触发 Checkpoint
StreamTask#triggerCheckpointAsyncInMailbox
-
初始化输入端 Channel 状态
SubtaskCheckpointCoordinator#initInputsCheckpoint
-
SubtaskCheckpointCoordinatorImpl 开始执行 Checkpoint
StreamTask#performCheckpoint
SubtaskCheckpointCoordinator#checkpointState
-
-
-
MailBoxProcessor 异步执行 Checkpoint 事件
-
算子调用 SubtaskCheckpointCoordinator 执行 Checkpoint
SubtaskCheckpointCoordinatorImpl#checkpointState
-
如果当前 Checkpint 被终止了,那么向下游发送 CancelCheckpointMarker事件,以防下游背压,并结束当前 Checkpoint。
-
如果 Checkpoint 之前没有对齐过,并且 Checkpoint 配置的对齐类型是强制对齐,那么首先将当前 Checkpoint 类型设置为不再需要对齐了,然后初始化输入端的状态,可见初始化输入端状态
CheckpointOptions#withUnalignedSupported
SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
-
准备 Checkpoint,算子执行 Snapshot 和 发送 Barrier 前的操作
OperatorChain#prepareSnapshotPreBarrier
-
创建 CheckpointBarrier,并往下游发送 CheckpointBarrier 事件,开始 Barrier 对齐操作
OperatorChain#broadcastEvent
-
注册对齐计时器以在超时时对齐未对齐的 barrier
SubtaskCheckpointCoordinator#registerAlignmentTimer
-
如果前面进行了 Channel Checkpoint,那么在这里完成状态通道 Writer 快照
ChannelStateWriter#finishOutput
-
SubtaskCheckpointCoordinator 同步获取算子的所有的状态快照
SubtaskCheckpointCoordinator#takeSnapshotSync
-
如果 Checkpoint 是可超时和可不对齐的,则从 ChannelStateWriter 中获取通道状态写结果(ChannelStateWriteResult)
-
解析 Checkpoint 存储地址
SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#resolveCheckpointStorageLocation
-
触发 OpeartorChain 状态快照
OperatorChain#snapshotState
-
如果是 RegularOperatorChain,则获取所有算子,并触发所有算子的状态快照
RegularOperatorChain#buildOperatorSnapshotFutures
-
**构建 StreamOpeartor 算子状态快照 Future **
StreamOperator#snapshotState
-
如果算子是主算子或者是尾算子,那么将通道和结果分区的状态快照结果 Future 设置到AsyncCheckpointRunnable 中
-
-
如果是FinishedOperatorChain,则只将通道和结果分区的状态快照结果 Future 设置到 OperatorSnapshotFutures 中
-
向 CheckpointCoordinator 发送已接收 Checkpoint 事件
OperatorChain#sendAcknowledgeCheckpointEvent
-
-
清理 Checkpoint 缓存
SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#clearCacheFor
-
设置 Checkpoint 持续时间的指标
CheckpointMetricsBuilder#setSyncDurationMillis
-
-
如果获取 SnapShot 成功,则异步完成 Checkpoint
SubtaskCheckpointCoordinator#finishAndReportAsync
-
创建并异步执行 AsyncCheckpointRunnable
AsyncCheckpointRunnable#start
-
开始状态快照,并等待所有 SnapshotFuture 完成
AsyncCheckpointRunnable#finalizedFinishedSnapshots
AsyncCheckpointRunnable#finalizeNonFinishedSnapshots
-
计算 Channel 和分区对齐时的状态大小,并设置相关指标
-
-
-
否则,清理 SubtaskCheckpointCoordinator
SubtaskCheckpointCoordinator#cleanup
-
初始化输入端状态
-
子任务初始化 Checkpoint
SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
-
如果 Checkpoint 可不需要对齐
-
初始化写状态通道
ChannelStateWriter#start
-
创建CheckpointStartRequest,并将请求分发到 Writer
ChannelStateWriteRequestDispatcher#dispatch
-
分发器处理 CheckpointStartRequest
ChannelStateWriteRequestDispatcherImpl#handleCheckpointStartRequest
-
为该子任务 Writer 注册 ChannelStateWriteResult,用于收集 Checkpoint 过程中传输过来的数据
ChannelStateCheckpointWriter#registerSubtaskResult
-
-
-
-
准备正在传输中的数据快照,等待输入端的数据达到 Barrier
SubtaskCheckpointCoordinatorImpl#prepareInflightDataSnapshot
-
准备输入端快照
StreamTask#prepareInputSnapshot
StreamTaskInput#prepareSnapshot
-
网络输入端准备快照
StreamTaskNetworkInput#prepareSnapshot
-
获取所有还未处理的 Buffer,并添加到状态写状态通道中
ChannelStateWriter#addInputData
-
返回所有 Barriers 屏障接受 Future
-
-
-
等所有 Barriers 屏障接受后,完成对给定检查点 id 的通道状态数据的写入
-
将 CheckpointInProgressRequest 请求提交到通道状态写请求执行器(ChannelStateWriteRequestExecutor)中
-
通道状态写请求执行器执行对应请求
ChannelStateCheckpointWriter#completeInput
-
完成状态写入,写入的状态存放在 ChannelStateWriteResult 中,里面存放着写入的状态柄 InputChannelStateHandle 和 ResultSubpartitionStateHandle
ChannelStateCheckpointWriter#finishWriteAndResult
-
-
-
-
-
如果 Checkpoint 是可超时的,那么除了上面准备输入端快照那一步骤外,其他步骤都需要进行
-
触发 StreamOperator 状态快照
-
触发 StreamOperator 的 Checkpoint
RegularOperatorChain#checkpointStreamOperator
-
StreamOperatorStateHandler 创建快照
StreamOperatorStateHandler#snapshotState
-
创建算子快照环境和算子快照 Futures
-
真正的触发算子快照,该步操作可以通过算子自定义
StreamOperatorStateHandler.CheckpointedStreamOperator#snapshotState
-
算子和 Keyd 状态后端触发快照
Snapshotable#snapshot
-
-
下游算子接收到 CheckpointBarrier 后开始 Checkpoint
-
下游算子处理上游发送过来的事件
CheckpointedInputGate#handleEvent
-
如果接收到的事件为 CheckpointBarrier 事件,则开始处理 Barrier,尝试开始 Checkpoint
CheckpointBarrierHandler#processBarrier
-
CheckpointBarrierHandler 处理 Barrier
-
CheckpointBarrierHandler 处理 Barrier
CheckpointBarrierHandler#processBarrier
-
如果该 Barrier Id 大于上一次 PendingCheckpoint 的 Id 并且当前开启的 Channel 只有一个,标记对齐开始和结束,并通知开始 Checkpoint,然后结束该次处理
CheckpointBarrierHandler#markAlignmentStartAndEnd
CheckpointBarrierHandler#notifyCheckpoint
StreamTask#triggerCheckpointOnBarrier
SubtaskCheckpointCoordinator#checkpointState
-
否则尝试从等待的 Checkpoint 队列中寻找该 CheckpointBarrier
-
如果找到了,则说明 Barrier 已经对齐,标记已经完成对齐,并开始触发 Checkpoint,可见[MailBoxProcessor 异步执行 Checkpoint 事件](#MailBoxProcessor 异步执行 Checkpoint 事件)
CheckpointBarrierTracker#triggerCheckpointOnAligned
CheckpointBarrierHandler#notifyCheckpoint
StreamTask#triggerCheckpointOnBarrier
SubtaskCheckpointCoordinator#checkpointState
-
否则将该 Barrier 添加到Checkpoint 队列中,开始对齐
-
-
参考:
Flink Stateful Stream Processing:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/stateful-stream-processing/