Flink 批作业如何在 Master 节点出错重启后恢复执行进度?

摘要:本文撰写自阿里云研发工程师李俊睿(昕程),主要介绍 Flink 1.20 版本中引入了批作业在 JM failover 后的进度恢复功能。主要分为以下四个内容:

  1. 背景
  2. 解决思路
  3. 使用效果
  4. 如何启用

一、背景

在 Flink 1.20 版本之前,如果 Flink 的 JobMaster(JM)发生故障导致被终止,将会发生如下两种情况:

  • 如果作业未启用高可用性(HA),作业将失败。

  • 如果作业启用了 HA,JM 会被自动重新拉起 (JM failover)。在这种情况下,流作业将从最后一个成功的检查点恢复。然而,批作业由于缺乏检查点机制,将不得不从头开始运行,导 致之前的所有进度丢失。这对于需要长时间运行的批作业来说,意味着巨大的回退。

为了解决这一问题,我们在 Flink 1.20 版本中引入了批作业在 JM failover 后的进度恢复功能。这一功能的目的是使批作业在 JM failover 后能够尽可能地恢复到出错前的进度,避免重新运行已完成

的任务。

二、解决思路

为了实现这一目标,我们需要能够将 JM 的状态持久化到外部存储,从而在 JM 发生 failover 后,Flink 能够利用这些状态信息恢复作业到之前的运行进度。

我们设计了一种基于事件的 JM 状态恢复机制,在作业正常运行时,JM 会将状态变更事件写入外部持久化存储,以确保在 JM failover 后仍能获得作业的执行进度。此外,我们还需要解决 JM failover 后实际作业状态与状态变更事件可能不一致的问题。例如,某些 TaskManager (TM)在运行过程中意外丢失,可能导致中间数据结果无法访问。因此,Flink 必须从 TM 和 Remote Shuffle Service (RSS)获取中间结果数据的信息,来对作业运行进度的恢复结果进行校准。

该功能的整体流程分为如下几个阶段:

  1. 作业执行时 我们引入了 JobEventStore 组件,该组件负责在作业正常运行期间将 JM 的状态变更事件写入到外部文件系统中。其中需要被写入的状态变更事件分为如下以下几类:

    (1)自适应执行计划优化:Flink 会自适应地优化批作业的执行计划,这些优化结果是基于上游的执行结果来确定的。如果每次都依赖上游的执行结果进行重建,将会产生较大的开销。因此,记录这些优化结果对于任务调度和容错非常重要。

    (2)已经结束的 Task 信息:保存已完成任务的执行进度,以便在恢复作业时能够准确地继续从上次执行的位置开始。

    (3)OperatorCoordinator 状态:OperatorCoordinator 负责协调算子,实行算子之间的通信,其状态与数据一致性密切相关。例如,SourceCoordinator 中包含记录哪些数据分片已经分发的状态信息。重建该组件的状态有助于保证数据的一致性。

    (4)ShuffleMaster 状态:Flink 目前支持 RSS,而 RSS 的 Shuffle Master 可能会保存一些状态信息,如 Shuffle 数据的元数据。为了使新的 JobManager 能够复用这些中间结果,恢复 Shuffle Master 的状态是必不可少的。

  1. JM failover 期间 Flink 批作业在运行过程中,其中间结果数据会保存在 TM 上和 RSS 上。当 JM 发生故障时,TM 和 RSS 将保留与作业相关的中间结果数据,并不断地尝试重新连接到 JM。一旦新的 JM 重新被拉起来后,TM 和 RSS 将重新与 JM 建立连接,然后 TM 和 RSS 会主动上报它们持有的中间结果数据。

  1. JM failover 后的作业进度恢复

一旦 JM 重启,它会与 TM 和 RSS 重新建立连接,利用 JobEventStore 中记录的事件以及 TM 和 RSS 保留的中间结果数据,来重建作业的执行进度。

JM 首先会利用 JobEventStore 中记录的事件,恢复作业各个节点的执行状态。

然后根据 OperatorCoordinator 的状态,JM 会恢复尚未处理的 Source 数据分片,以避免数据丢失或重复。

随后,JM 将根据汇报上来的可用中间数据进一步校正执行进度。如果某个 task 产生的中间数据丢失,但这些数据仍被下游 task 所需要,那么该 task 将被重置并重新执行。

最后作业将从恢复出来的进度继续执行。

三、使用效果

以下是一个 JM 出错重启后进度恢复的效果示例。

该批作业的拓扑结构为 Source -> Map -> Sink ,当作业运行到 Map 节点时,因为外部服务的原因导致 JM 所在机器下线,从而造成了 JM failover。

随后,高可用服务将会自动拉起新的 JM 进程,作业将进入 RECONCILING 状态,表示作业进入了恢复运行进展的阶段。

当作业恢复完成后,将进入 RUNNING 状态。

点进作业详情页后,可以观察到作业已经恢复到 JM failover 前到进展了。

四、如何启用

要使用 Flink 批作业的状态恢复功能,用户需要:

  1. 确保已启用集群高可用:目前 Flink 提供了基于 Zookeeper 和 Kubernetes 的两种高可用服务,更多细节详见官方文档。
  2. 配置 execution.batch.job-recovery.enabled: true

所有 new source 都支持批处理作业在 JM 出错后进行进度恢复。然而,为了实现细粒度的进度恢复,new source的 SplitEnumerator 需要实现 SupportsBatchSnapshot 接口,否则只有在该 source 的所有并发任务完成后,才能在 JM 出错恢复后避免重新执行这个 source 的 task。当前,FileSource 和 HiveSource 已经实现了该接口。详细信息请参见官方文档。

考虑到不同集群和作业的差异,为了让批作业在 job master failover 后能够尽可能的恢复出错前的进度,避免重新运行已完成的任务,用户可以参考此文档进行配置项调优。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/446025.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react antd redux 全局状态管理 解决修改菜单状态 同步刷新左侧菜单

npm i react-redux1.src新建两个文件 globalState.js 全局状态定义 store.js 全局存储定义 2.globalState.js import { createSlice } from "reduxjs/toolkit";export const globalState createSlice({name: "globalState",initialState: { data: {} },r…

rpa批量发送邮件如何通过编辑器编发邮件?

rpa批量发送邮件的技巧?怎么使用rpa邮箱群发助手? 手动发送邮件变得越来越繁琐且效率低下。为了解决这一问题,越来越多的企业开始采用RPA技术来批量发送邮件。AokSend将详细探讨如何通过编辑器来实现rpa批量发送邮件的功能,从而提…

微信小程序处理交易投诉管理,支持多小程序,一键授权模式

大家好,我是小悟 1、问题背景 玩过微信小程序生态的,或许就有这种感受,如果收到投诉单,不会及时通知到手机端,而是每天早上10:00向小程序的管理员及运营者推送通知。通知内容为截至前一天24时该小程序账号内待处理的交…

计算机视觉之YOLO算法基本原理和应用场景

YOLO算法基本原理 整体流程 YOLO 将目标检测问题转化为一个回归问题。它将输入图像划分成多个网格单元,每个网格单元负责预测中心点落在该网格内的目标。对于每个网格单元,YOLO 预测多个边界框以及这些边界框中包含目标的类别概率。边界框通常由中心点坐…

前端开发笔记--css 黑马程序员1

文章目录 1. css 语法规范2.css的书写风格3.基础选择器选择器的分类标签选择器类选择器类选择器的特殊使用--多类名 id 选择器 字体属性常见字体字体大小字体粗细字体倾斜字体的复合简写字体属性总结 文本属性文本颜色文本对齐装饰文本文本缩进文本间距文本属性总结 css的引入方…

【机器学习】知识总结1(人工智能、机器学习、深度学习、贝叶斯、回归分析)

目录 一、机器学习、深度学习 1.人工智能 1.1人工智能概念 1.2人工智能的主要研究内容与应用领域 1.2.1主要研究内容: 1.2.2应用领域 2.机器学习 2.1机器学习的概念 2.2机器学习的基本思路 2.3机器学习的分类 3.深度学习 3.1深度学习的概念 3.2人工智能…

Java体系中的泛型

1. 泛型 一般的类和方法,只能够使用基本类型,要么是自定义的类,如果要编写可以应用于多种数据类型的代码,这种刻板的限制对代码的约束就会很大,那么如何实现可应用于多种数据类型的代码,而不局限于单一一种…

服务器数据恢复—EMC存储RAID5磁盘阵列数据恢复案例

服务器数据恢复环境: 一台EMC某型号存储设备,该存储中有一组由12块(包括2块热备盘)STAT硬盘组建的raid5阵列。 服务器故障: 该存储在运行过程中突然崩溃,raid瘫痪。数据恢复工程师到达现场对故障存储设备进…

肺结节分割与提取系统(基于传统图像处理方法)

Matlab肺结节分割(肺结节提取)源程序,GUI人机界面版本。使用传统图像分割方法,非深度学习方法。使用LIDC-IDRI数据集。 工作如下: 1、读取图像。读取原始dicom格式的CT图像,并显示,绘制灰度直方图; 2、图像…

欧科云链研究院深掘链上数据:洞察未来Web3的隐秘价值

目前链上数据正处于迈向下一个爆发的重要时刻。 随着Web3行业发展,公链数量呈现爆发式的增长,链上积聚的财富效应,特别是由行业热点话题引领的链上交互行为爆发式增长带来了巨量的链上数据,这些数据构筑了一个行为透明但与物理世…

extern “C“ 的作用、C++ 和 C 编译的不同、C++ 编译过程的五个主要阶段

在 C 中,如果需要从 C 语言导入函数或与 C 代码交互,需要使用 extern "C" 关键字。这是因为 C 和 C 在编译过程中的 符号命名机制(即 "名称修饰" 或 "name mangling")不同。 1. extern "C&qu…

MokeJs使用实例

文章目录 MokeJs使用实例介绍使用安装配置文件导入配置到main.js使用 axios 发送网络请求测试(如果不会axios,具体可以见上篇文章axios)启动示例 MokeJs使用实例 介绍 使用 安装 npm install mockjs --save-dev # 或者 yarn add mockj…

【超详细】基于YOLOv11的PCB缺陷检测

主要内容如下: 1、数据集介绍 2、下载PCB数据集 3、不同格式数据集预处理(Json/xml),制作YOLO格式训练集 4、模型训练及可视化 5、Onnxruntime推理 运行环境:Python3.8(要求>3.8)&#xff…

matlab不小心删除怎么撤回

预设项——>删除文件——>移动至临时文件夹 tem临时文件夹下

【RabbitMQ】初识 RabbitMQ

🥰🥰🥰来都来了,不妨点个关注叭! 👉博客主页:欢迎各位大佬!👈 文章目录 1. MQ 是什么?1.1 MQ 本质1.2 系统间通信 2. MQ的作用是什么?2.1 异步解耦2.2 流量削…

【ProtoBuf】ProtoBuf基础与安装

本篇文章介绍 C 使用方向 文章目录 ProtoBuf简介ProtoBuf安装WindowsLinux ProtoBuf简介 ProtoBuf(全称为 Protocol Buffer)是一种序列化结构数据的方法 序列化是将对象转换为可存储的或传输的格式的过程,通常用于数据交换或持久化存储。我们在C/Java中编写的类不…

2.13寸电子墨水屏HINK-E0213+esp8266

记录好数字 2.13寸电子墨水屏HINK-E0213esp8266 声明:大部分资料来源于微雪电子http://微雪电子-官网 https://www.waveshare.net/ 前言 很久以前买的一块电子墨水屏,运气很好,这个型号HINK-E0213资料很全,而且微雪官网也有相关电路资料http://2.13inch e-Paper HAT - Waves…

【GaussDB】产品简介

产品定位 GaussDB 200是一款具备分析及混合负载能力的分布式数据库,支持x86和Kunpeng硬件架构,支持行存储与列存储,提供PB(Petabyte)级数据分析能力、多模分析能力和实时处理能力,用于数据仓库、数据集市、实时分析、实时决策和混…

【UI】将 naive ui 的 message 封装进axios 中,关于naiveui的message相关的用法

文章目录 前言在setup外进行使用直接包裹使用vue 单文件中使用 参考文章: 关于naiveui的message相关的用法 前言 最近新建了一个vite vu3 的项目,完全是从0 到1 ,封装到request 的时候 想对axios 请求做一个全局的处理,但发现…

【尚硅谷】FreeRTOS学笔记(更新中更新时间2024.10.12)

在网上看到的一段很形象的描述,放在这里给大家娱乐一下。 裸机开发:n个人拉屎,先进去一个拉完,下一个再来。看门狗:如果有人拉完屎还占着,茅坑刷视频,把他拖出去中断系统:n个人拉屎&…