关于flink重新提交任务,重复消费kafka的坑

异常现象1

按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据

env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));
env.setStateBackend(new FsStateBackend(PropUtils.getValueStr(Constant.ENV_FLINK_STATEBACKEND_PATH)));

原因

我以为checkpoint和fsbackend要同时设置,其实,1.14.3版本,setCheckpointStorage和stateBackend改成了分着设置

我上边代码这样设置,相当于首先指定了以下checkpoint按照默认的backend存储,然后又指定了按照fsbackend存储,因此首先指定的checkpoint目录没有数据。

正解

env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));

State Backends | Apache Flink

异常现象2

开启checkpoint eos,开启容错,每次任务重新提交都会重新消费kafka已经完成了checkpoint的数据

原因

我以为只要开启这两个配置就可以保证已经checkpoint的kafka数据不会被重复消费,其实不然

checkpoint是flink内部的容错机制,他能保证在设置了失败重启策略之后(setRestartStrategy),如果发生异常导致失败重试之后自动从最新checkpoint恢复。不是手动重启。。。手动重启默认不会进行加载状态数据,所以每次都会从头消费

正解

flink任务 -s 指定恢复点提交,这个恢复点可以是checkpoint也可以时savepoint。

# 启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar# 备份,创建savepoint
/home/cuadmin/flink-1.14.3/bin/flink savepoint 19f4bb5d103ea8695712d4d1a797893f /home/cuadmin/flink-1.14.3/savepoint# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \
/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

异常现象4

这是错误的

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \

/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \

按照上述命令执行,这个地方显示恢复点的加载情况,这里没显示,代表恢复点没有执行成功

原因

-s的位置有问题,我之前以为没有顺序,把-s 放到了命令最后,结果没报错,也没识别。。

正解

-s 位置要正确

# 指定savepoint启动
/home/cuadmin/flink-1.14.3/bin/flink run -d  \
-c cn.flink.ApplicationMaster \
-s  file:/home/cuadmin/flink-1.14.3/savepoint/savepoint-033556-251a9e55ed25  \
/home/cuadmin/portal-flink-2021.0.1-SNAPSHOT-shaded.jar

异常现象3

我记得savepoint和checkpoint是都可以用来flink -s 进行恢复点恢复的。但是每次都提示恢复失败,提示文件找不到,savepoint就可以。。。

原因

cancel job会将 checkpoint的数据删掉。。。

正解

测试的时候,直接stop-cluster,这样checkpoint数据就不会被删除了

保留 Checkpoint 

Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 配置项定义了当作业取消时,对作业 checkpoint 的操作:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。

总结

1、savepoint的数据要比checkpoint更加稳定,比如你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。checkpoint就不可以,因为他有很多相对路径配置。

2、savepoint和checkpoint一般都能作为恢复点使用,例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。

3、任务因为偶然原因内部重启(task级别),通过失败重试机制+checkpoint自动进行重放,任务因重启、断电、死机等外部因素(cluster级别),通过-s 指定checkpoint/savepoint恢复点进行手动重放。这样就可以保证状态数据的稳定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/152840.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[CISCN2019 华北赛区 Day1 Web5]CyberPunk 二次报错注入

buu上 做点 首先就是打开环境 开始信息收集 发现源代码中存在?file 提示我们多半是包含 我原本去试了试 ../../etc/passwd 失败了 直接伪协议上吧 php://filter/readconvert.base64-encode/resourceindex.phpconfirm.phpsearch.phpchange.phpdelete.php 我们通过伪协议全…

C++ 类和对象篇(八) const成员函数和取地址运算符重载

目录 一、const成员函数 1. const成员函数是什么? 2. 为什么有const成员函数? 3. 什么时候需要使用const修饰成员函数? 二、取地址运算符重载 1. 为什么需要重载取地址运算符? 2. 默认取地址运算符重载函数 3. 默认const取地址运…

查看当前目录下文件所占用内存 du -sh

1. du -sh 查看当前目录下文件所占用内存 2.查看当前文件夹下,每个文件所占用内存 du -ah --max-depth1/

Linux命令笔记

终端命令格式: bash command [-options] [parameter] 7个常见Linux命令: 01 ls | list | 查看当前文件夹下的内容 02 pwd | print work directory | 查看当前所在文件夹 03 cd [目录名] | change directory | 切换文件夹 04 touch [文件名] | touc…

基于WTMM算法的图像多重分形谱计算matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、WTMM算法概述 4.2、WTMM算法原理 4.2.1 二维小波变换 4.2.2 模极大值检测 4.2.3 多重分形谱计算 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部…

云原生监控系统Prometheus:基于Prometheus构建智能化监控告警系统

目录 一、理论 1.Promethues简介 2.监控告警系统设计思路 3.Prometheus监控体系 4.Prometheus时间序列数据 5.Prometheus的生态组件 6.Prometheus工作原理 7.Prometheus监控内容 8.部署Prometheus 9.部署Exporters 10.部署Grafana进行展示 二、实验 1.部署Prometh…

项目设计:YOLOv5目标检测+机构光相机(intel d455和d435i)测距

1.介绍 1.1 Intel D455 Intel D455 是一款基于结构光(Structured Light)技术的深度相机。 与ToF相机不同,结构光相机使用另一种方法来获取物体的深度信息。它通过投射可视光谱中的红外结构光图案,然后从被拍摄物体表面反射回来…

Python为Excel中每一个单元格计算其在多个文件中的平均值

本文介绍基于Python语言,对大量不同的Excel文件加以跨文件、逐单元格平均值计算的方法。 首先,我们来明确一下本文的具体需求。现有一个文件夹,其中有如下所示的大量Excel文件,我们这里就以.csv文件为例来介绍。其中,每…

用 HTTP 提交数据,基本就这 5 种方式

网页开发中,向服务端提交数据是一个基本功能,工作中会大量用 xhr/fetch 的 api 或者 axios 这种封装了一层的库来做。 可能大家都写过很多 http/https 相关的代码,但是又没有梳理下它们有哪几种呢? 其实通过 http/https 向服务端…

HarmonyOS/OpenHarmony原生应用-ArkTS万能卡片组件Span

作为Text组件的子组件,用于显示行内文本的组件。无子组件 一、接口 Span(value: string | Resource) 从API version 9开始,该接口支持在ArkTS卡片中使用。 参数: 参数名 参数类型 必填 参数描述 value string | Resource 是 文本内…

C++ 类和对象篇(三) 空类和6个默认成员函数

目录 一、空类 1. 是什么? 2. 空类中的成员 3. 空类的大小 二、6个默认成员函数 三、 构造函数 1. 构造函数是什么? 2. 为什么C要引入构造函数? 四、析构函数 1. 析构函数是什么? 2. 为什么要有析构函数? 五、拷贝构造…

Windows环境下下载安装Elasticsearch和Kibana

Windows环境下下载安装Elasticsearch和Kibana 首先说明这里选择的版本都是7.17 ,为什么不选择新版本,新版本有很多坑,要去踩,就用7就够了。 Elasticsearch下载 Elasticsearch是一个开源的分布式搜索和分析引擎,最初由…

【FreeRTOS】【STM32】01从零开始的freertos之旅 浏览源码下的文件夹

基于野火以及正点原子 在打开正点原子的资料pdf时,我遇到了pdf无法复制粘贴的问题,这里有个pdf解锁文字复制功能的网址,mark一下。超级pdf 参考资料《STM32F429FreeRTOS开发手册_V1.2》 官方资料 FreeRTOS 的源码和相应的官方书籍均可从官…

cap分布式理论

cap 理论 cap是实现分布式系统的思想。 由3个元素组成。 Consistency(一致性) 在任何对等 server 上读取的数据都是最新版,不会读取出旧数据。比如 zookeeper 集群,从任何一台节点读取出来的数据是一致的。 Availability&…

Go 复合类型之切片类型介绍

Go 复合类型之切片类型 文章目录 Go 复合类型之切片类型一、引入二、切片(Slice)概述2.1 基本介绍2.2 特点2.3 切片与数组的区别 三、 切片声明与初始化3.1 方式一:使用切片字面量初始化3.2 方式二:使用make函数初始化3.3 方式三:基于数组的切…

使用企业订货系统后的效果|软件定制开发|APP小程序搭建

使用企业订货系统后的效果|软件定制开发|APP小程序搭建 企业订货系统是一种高效的采购管理系统,它可以帮助企业更好地管理采购流程,降低采购成本,提高采购效率。 可以帮助企业提高销售效率和降低成本的软件工具。使用该系统后,企业…

《DevOps 精要:业务视角》- 读书笔记(二)

DevOps 精要:业务视角(二) 第2章 基础2.1 精益生产2.1.1 关键事实2.1.2 挑战 2.2 敏捷2.2.1 关键事实2.2.2 挑战 第2章 基础 2.1 精益生产 2.1.1 关键事实 正如1.2节提到的,DevOps非常依赖于精益生产的原则与实践。有些人甚至相信&#xf…

R语言快速实现图片布局(1)

&#xff08;1&#xff09;简单的一排或者对称的多排&#xff0c;使用patchwork即可。/表示分行&#xff0c;|表示分列 library(patchwork) pp1<-ggplot(mtcars) geom_point(aes(mpg, disp)) pp2<-ggplot(mtcars) geom_boxplot(aes(gear, disp, group gear)) pp3 <…

08_selenium实战——学习平台公开数据批量获取

0、:前言 该实战任务是对某视频平台中’标题’、 ‘点赞数量’、 ‘投币数量’、‘收藏数量’、‘播放次数’、以及前五条评论进行爬取。要求1:可以控制爬取视频的主题(爬取主题搜索之后的内容)要求2:可以控制爬取视频的数量要求3:对于评论数不足5条的用0填充评论内容爬虫…

Eclipse iceoryx™ - 真正的零拷贝进程间通信

1 序言 通过一个快速的背景教程&#xff0c;介绍项目范围和安装所需的所有内容以及第一个运行示例。 首先&#xff1a;什么是冰羚&#xff1f; iceoryx是一个用于各种操作系统的进程间通信&#xff08;IPC&#xff09;中间件&#xff08;目前我们支持Linux、macOS、QNX、FreeBS…