20250120 深入了解 Apache Flink 的 Checkpointing

Apache Flink 是一种用于实时流处理和批处理的分布式计算框架。在实时流处理任务中,保证数据的一致性和任务的容错性是至关重要的,而 Flink 的 Checkpointing 机制正是实现这一目标的核心技术。

本文将详细介绍 Flink 的 Checkpointing,包括其概念、原理、配置和实际应用。


什么是 Checkpointing?

Checkpointing 是 Flink 提供的一种用于容错的机制。它会在流处理任务运行过程中,定期将作业的状态流的处理进度保存到外部持久化存储(例如 HDFS 或 S3)中。当任务因故障而中断时,Flink 可以从最近一次成功的 Checkpoint 恢复,继续任务执行,而无需重新处理已经完成的数据。

Checkpointing 的核心功能
  1. 状态保存
    • 保存任务中所有算子的状态,例如窗口聚合、累加器或其他操作的中间结果。
  2. 进度保存
    • 保存流处理中数据源的消费位置(如 Kafka 的偏移量)。
  3. 故障恢复
    • 任务失败时,从最近的 Checkpoint 恢复状态和进度,保证作业的一致性。

Checkpointing 的原理

Flink 的 Checkpointing 采用 两阶段提交协议(Two-Phase Commit Protocol) 来确保状态的一致性。这一过程分为以下几个阶段:

1. 触发 Checkpoint
  • JobManager 定期触发 Checkpoint(由 enableCheckpointing 配置间隔时间),向所有并行任务发送 Checkpoint 触发信号。
2. 保存状态
  • 每个算子将其当前状态保存到本地或远程存储(如 HDFS、S3)。
  • 数据源(如 Kafka)会记录当前消费的偏移量。
3. 提交 Checkpoint
  • 当所有算子成功完成状态保存后,JobManager 将 Checkpoint 标记为成功。
  • 任务的恢复点会更新为该 Checkpoint。
4. 故障恢复
  • 如果任务失败,Flink 会从最近一次成功的 Checkpoint 恢复作业状态和数据流进度,确保任务继续执行。

如何启用 Checkpointing

在 Flink 程序中,启用 Checkpoint 非常简单,只需在执行环境中调用 enableCheckpointing 方法:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CheckpointExample {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启 Checkpoint,每隔 5000 毫秒触发一次env.enableCheckpointing(5000);// 配置 Checkpoint 的额外参数env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置超时时间env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同时只允许一个 Checkpointenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 两次 Checkpoint 之间的最小间隔// 添加数据源和作业逻辑env.fromElements("hello", "flink", "checkpointing").map(String::toUpperCase).print();// 执行作业env.execute("Flink Checkpoint Example");}
}

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class CheckpointExample { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启 Checkpoint,每隔 5000 毫秒触发一次 env.enableCheckpointing(5000); // 配置 Checkpoint 的额外参数 env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同时只允许一个 Checkpoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 两次 Checkpoint 之间的最小间隔 // 添加数据源和作业逻辑 env.fromElements("hello", "flink", "checkpointing") .map(String::toUpperCase) .print(); // 执行作业 env.execute("Flink Checkpoint Example"); } }

重要配置
  • enableCheckpointing(interval):设置 Checkpoint 的触发间隔,单位为毫秒。
  • setCheckpointTimeout(timeout):设置单个 Checkpoint 的最大超时时间。
  • setMaxConcurrentCheckpoints(n):设置同时允许进行的最大 Checkpoint 数量。
  • setMinPauseBetweenCheckpoints(milliseconds):两次 Checkpoint 之间的最小间隔时间。

Checkpointing 的应用场景

1. Kafka 数据消费

在使用 Kafka 作为数据源时,Checkpoint 会保存 Kafka 的偏移量。当任务重启时,Flink 会从最近的偏移量开始重新消费数据,确保数据不会丢失或重复处理。

2. 窗口操作

对于基于窗口的聚合操作(如实时统计点击量),Checkpoint 保存中间结果。当任务失败后,中间结果可以恢复,不需要重新计算。

3. 用户状态管理

用户自定义的状态(例如计数器、缓存)也可以通过 Checkpoint 保存。通过恢复这些状态,确保任务逻辑的一致性。


Checkpointing 与 Savepoint 的区别

特性CheckpointSavepoint
触发方式自动触发(定期执行)手动触发
用途故障恢复程序升级、迁移、测试
存储生命周期短期(任务失败后自动清理)长期(由用户管理,手动删除)
操作复杂度无需手动操作需要用户显式触发

Checkpointing 的注意事项

1. 存储路径
  • Checkpoint 的数据通常会存储在外部持久化存储中,如 HDFS、S3 或本地文件系统。
  • 配置存储路径:
    env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:port/flink/checkpoints");
    

2. 性能影响
  • Checkpoint 是一个重量级操作,频率过高可能会影响任务性能。
  • 通常设置为每 5-10 秒触发一次,根据任务需求进行调整。
3. 容错机制
  • Checkpoint 默认提供 精确一次(Exactly Once) 的语义。如果对性能要求较高,可以选择 至少一次(At Least Once)

总结

Flink 的 Checkpointing 是流处理容错的核心技术,具备以下特点:

  1. 定期保存任务的状态和进度,确保数据一致性。
  2. 支持任务的快速恢复,避免重新处理已完成的数据。
  3. 与外部存储(如 HDFS、S3)的集成,为分布式任务提供强大的容错能力。

在实际使用中,Checkpointing 是实现 高可用性数据一致性 的基础。通过合理配置 Checkpoint,可以确保 Flink 作业在高负载和分布式环境下的可靠运行。

如果你正在使用 Flink 进行实时流处理任务,Checkpoint 是你必须深入了解和掌握的关键机制! 😊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/4932.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C 语言雏启:擘画代码乾坤,谛观编程奥宇之初瞰

大家好啊,我是小象٩(๑ω๑)۶ 我的博客:Xiao Xiangζั͡ޓއއ 很高兴见到大家,希望能够和大家一起交流学习,共同进步。* 这一课主要是让大家初步了解C语言,了解我们的开发环境,main函数,库…

Java - WebSocket

一、WebSocket 1.1、WebSocket概念 WebSocket是一种协议,用于在Web应用程序和服务器之间建立实时、双向的通信连接。它通过一个单一的TCP连接提供了持久化连接,这使得Web应用程序可以更加实时地传递数据。WebSocket协议最初由W3C开发,并于2…

C语言内存之旅:从静态到动态的跨越

大家好,这里是小编的博客频道 小编的博客:就爱学编程 很高兴在CSDN这个大家庭与大家相识,希望能在这里与大家共同进步,共同收获更好的自己!!! 本文目录 引言正文一 动态内存管理的必要性二 动态…

气膜料仓:工业仓储的高效与安全新选择—轻空间

在工业仓储领域,如何实现高效、安全、环保的存储方式成为企业关注的重点。气膜料仓以其独特的无梁无柱设计和智能化功能,为工业仓储带来了全新的解决方案。 空间利用率高:无障碍的大容量仓储 气膜料仓内部无梁无柱,形成了完全开…

Windows FileZila Server共享电脑文件夹 映射21端口外网连接

我有这样一个使用场景,在外部网络环境下,通过手机便捷地读取存储在电脑上的视频文件。比如在外出旅行、出差,身边没有携带电脑,仅依靠手机设备,就能随时获取电脑里存储的各类视频,无论是学习资料视频、工作…

CentOS部署FastDFS+Nginx并实现远程访问本地服务器中文件

文章目录 前言1. 本地搭建FastDFS文件系统 1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址…

2023年江西省职业院校技能大赛网络系统管理赛项(Linux部分样题)

一、Linux项目任务描述 你作为一个Linux的技术工程师,被指派去构建一个公司的内部网络,要为员工提供便捷、安全稳定内外网络服务。你必须在规定的时间内完成要求的任务,并进行充分的测试,确保设备和应用正常运行。任务所有规划都基于Linux操作系统,请根据网络拓扑、基本配…

【Spring】定义的Bean缺少隐式依赖

问题描述 初学 Spring 时,我们往往不能快速转化思维。例如,在程序开发过程中,有时候,一方面我们把一个类定义成 Bean,同时又觉得这个 Bean 的定义除了加了一些 Spring 注解外,并没有什么不同。所以在后续使…

使用Chrome和Selenium实现对Superset等私域网站的截图

最近遇到了一个问题,因为一些原因,我搭建的一个 Superset 的 Report 功能由于节假日期间不好控制邮件的发送,所以急需一个方案来替换掉 Superset 的 Report 功能 首先我们需要 Chrome 浏览器和 Chrome Driver,这是执行数据抓取的…

vulnhub靶场【IA系列】之Tornado

前言 靶机:IA-Tornado,IP地址为192.168.10.11 攻击:kali,IP地址为192.168.10.2 都采用虚拟机,网卡为桥接模式 本文所用靶场、kali镜像以及相关工具,我放置在网盘中,可以复制后面链接查看 htt…

不用编程即可实现多台PLC的MQTT协议JSON文件发布与订阅的智能网关的配置说明

IGT-SER系列智能网关支持各种PLC的以太网和串口协议,以及Modbus、OPC通讯,通过网关所带的参数配置工具软件,不用编程,即可打包和解析JSON格式的设备数据,通过MQTT、HTTP等协议发布和订阅。相关案例 IGT-SER系列智能网关…

为什么相关性不是因果关系?人工智能中的因果推理探秘

目录 一、背景 (一)聚焦当下人工智能 (二)基于关联框架的人工智能 (三)基于因果框架的人工智能 二、因果推理的基本理论 (一)因果推理基本范式:因果模型&#xff0…

ARCGIS国土超级工具集1.3更新说明

ARCGIS国土超级工具集V1.3版本,功能已增加至49 个。在V1.2的基础上修复了若干使用时发现的BUG,完善了部分已有的功能,新增了“面要素狭长面检测分割”等功能,新工具使用说明如下: 一、勘测定界工具栏更新土地分类面积表…

阿里云 Serverless 助力盟主直播:高并发下的稳定性和成本优化

在直播场景中,阿里云 Serverless 应用引擎 SAE 提供的无缝弹性伸缩与极速部署能力,确保直播间高并发时的流畅体验,降低了我们的运营成本,简化了运维流程。结合阿里云云原生数据库 PolarDB 的 Serverless 能力,实现了数…

网络编程 | UDP组播通信

1、什么是组播 在上一篇博客中,对UDP的广播通信进行了由浅入深的总结梳理,本文继续对UDP的知识体系进行探讨,旨在将UDP的组播通信由浅入深的讲解清楚。 组播是介于单播与广播之间,在一个局域网内,将某些主机添加到组中…

日历热力图,月度数据可视化图表(日活跃图、格子图)vue组件

日历热力图,月度数据可视化图表,vue组件 先看效果👇 在线体验https://www.guetzjb.cn/calanderViewGraph/ 日历图简单划分为近一年时间,开始时间是 上一年的今天,例如2024/01/01 —— 2025/01/01,跨度刚…

使用nginx搭建通用的图片代理服务器,支持http/https/重定向式图片地址

从http切换至https 许多不同ip的图片地址需要统一进行代理 部分图片地址是重定向地址 nginx配置 主站地址:https://192.168.123.100/ 主站nginx配置 server {listen 443 ssl;server_name localhost;#ssl证书ssl_certificate ../ssl/ca.crt; #私钥文件ssl_ce…

WPS数据分析000001

目录 一、表格的新建、保存、协作和分享 新建 保存 协作 二、认识WPS表格界面 三、认识WPS表格选项卡 开始选项卡 插入选项卡 页面布局选项卡 公式选项卡 数据选项卡 审阅选项卡 视图选项卡 会员专享选项卡 一、表格的新建、保存、协作和分享 新建 ctrlN------…

使用 HTML 开发 Portal 页全解析

前言 在当今数字化时代,网站作为企业和个人展示信息、提供服务的重要窗口,其重要性不言而喻。而 Portal 页,作为网站的核心页面之一,承担着引导用户、整合信息等关键任务。那么,如何使用 HTML 开发一个功能齐全、界面…

Spring Boot 项目启动报错 “找不到或无法加载主类” 解决笔记

一、问题描述 在使用 IntelliJ IDEA 开发基于 Spring Boot 框架的 Java 程序时,原本项目能够正常启动。但在后续编写代码并重建项目后,再次尝试运行却出现了 “错误:找不到或无法加载主类 com.example.springboot.SpringbootApplication” 的…