flink 写入数据到 kafka 后,数据过一段时间自动删除

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:
  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/132827.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sqlserver 监控使用磁盘空间情况

最近遇到一个小问题&#xff1a;为了保存以往的一些数据&#xff0c;间了大量临时表&#xff0c;导致SQLserver 数据增长过快&#xff0c;不得不想个办法监控磁盘空间使用情况。 网上一般有几种办法&#xff1a; 一是使用 dm_os_volume_stats函数&#xff0c;缺点是 无法获取非…

Redis缓存更新策略、详解并发条件下数据库与缓存的一致性问题以及消息队列解决方案

0、前言 我们知道&#xff0c;缓存由于在内存中&#xff0c;数据处理速度比直接操作数据库要快很多&#xff0c;因此常常将数据先读到缓存中&#xff0c;再进行查询、更新等操作。 但与之而来的问题就是&#xff0c;内存中的数据不仅没有持久化&#xff0c;而且需要保证…

Dajngo02_第一个Django案例

Dajngo02_第一个Django案例 经过之前学习&#xff0c;我们已经可以创建Django环境 现在开始尝试快速使用Django开发一个案例 案例&#xff1a;利用Django实现一个查看当前时间的web页面。 在django中要提供数据展示给用户,一般情况下我们需要完成3个步骤&#xff1a; 在urls.…

云原生之使用Docker部署Teedy轻量级文档管理系统

云原生之使用Docker部署Teedy轻量级文档管理系统 一、Teedy介绍1.1 Teedy简介1.2 Teedy特点 二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本 四、下载Teedy镜像五、部署Teedy轻量级文…

TDE和数据脱敏功能介绍

TDE(Transparent Data Encryption)和数据脱敏(Data Masking)是两种常见的数据安全技术&#xff0c;它们在保护敏感数据和增强数据隐私方面起着至关重要的作用。接下来&#xff0c;将对这两种技术进行详细的介绍。 TDE&#xff0c;全称透明数据加密(Transparent Data Encryption…

Android 开发小贴士

Android 开发小贴士 应用编译时报错&#xff1a;Unable to merge dex 可能原因&#xff1a; 包引用重复 、方法数超限或者几个库之间有重复代码块(特别是在整理module时容易犯)。 解决办法&#xff1a; app的build.gradle 中 // 1. 添加配置 defaultConfig {......multiDexEn…

Kafka消费者组重平衡(二)

文章目录 概要重平衡通知机制消费组组状态消费端重平衡流程Broker端重平衡流程 概要 上一篇Kafka消费者组重平衡主要介绍了重平衡相关的概念&#xff0c;本篇主要梳理重平衡发生的流程。 为了更好地观察&#xff0c;数据准备如下&#xff1a; kafka版本&#xff1a;kafka_2.1…

lvs负载均衡、LVS集群部署

四&#xff1a;LVS集群部署 lvs给nginx做负载均衡项目 218lvs&#xff08;DR 负载均衡器&#xff09; yum -y install ipvsadm&#xff08;安装这个工具来管理lvs&#xff09; 设置VIP192.168.142.120 创建ipvsadm的文件用来存放lvs的规则 定义策略 ipvsadm -C //清空现有…

Java经典问题解答(9题)

文章目录 1、通关jwt靶场的其中任意两关&#xff08;该题与Java无关&#xff09;启动环境第4关第5关第7关 2、java是如何跨平台通信的3、java为什么需要类名和文件名一致4、main函数的作用是什么5、.class文件和.java是什么关系6、java在编写函数的时候void是什么意思7、java声…

慢查询SQL如何优化

一.什么是慢SQL? 慢SQL指的是Mysql中执行比较慢的SQL,排查慢SQL最常用的方法是通过慢查询日志来查找慢SQL。Mysql的慢查询日志是Mysql提供的一种日志记录&#xff0c;它用来记录Mysql中响应时间超过long_query_time值的sql,long_query_time的默认时间为10s. 二.查看慢SQL是否…

模拟信号电压或电流信号转变频器频率传感器信号隔离变送器0-5V/0-10V/0-20mA/4-20mA转0-5KHz/0-10KHz/1-5KHz

主要特性: 精度等级&#xff1a;0.1 级、0.2 级。产品出厂前已检验校正&#xff0c;用户可以直接使用输 入 &#xff1a;0-5V/0-10V/1-5V,0-10mA/0-20mA/4-20mA 等输出信号&#xff1a;0-5KHz/0-10KHz/1-5KHz 等标准信号辅助电源&#xff1a;5V、9V、12V、15V 或 24V 直流单电…

使用branch and bound分支定界算法选择UTXO

BnB算法原理 分支定界算法始终围绕着一颗搜索树进行的&#xff0c;我们将原问题看作搜索树的根节点&#xff0c;从这里出发&#xff0c;分支的含义就是将大的问题分割成小的问题。 大问题可以看成是搜索树的父节点&#xff0c;那么从大问题分割出来的小问题就是父节点的子节点…

怎么裁剪图片?总结了下面几个方法

怎么裁剪图片&#xff1f;在日常的生活中&#xff0c;图片已经成为了我们不可或缺的一部分。或许你正在整理自己的相册时&#xff0c;或者我们需要向互联网上发布一些图片的时候&#xff0c;总之我们随时都可能会遇到一张需要进行裁剪的图片。比如说&#xff0c;一些图片上存在…

每日一博 - 反向代理、API 网关、负载均衡

文章目录 概述图解 概述 反向代理、API网关和负载均衡是在网络和服务器架构中用于不同目的的重要组件&#xff0c;它们有不同的功能和应用场景。以下是它们之间的区别和联系&#xff1a; 反向代理&#xff08;Reverse Proxy&#xff09;&#xff1a; 功能&#xff1a;反向代理…

Xilinx FPGA未使用管脚上下拉状态配置(ISE和Vivado环境)

文章目录 ISE开发环境Vivado开发环境方式1&#xff1a;XDC文件约束方式2&#xff1a;生成选项配置 ISE开发环境 ISE开发环境&#xff0c;可在如下Bit流文件生成选项中配置。 右键点击Generate Programming File&#xff0c;选择Process Properties&#xff0c; 在弹出的窗口选…

分类预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost多输入分类预测

分类预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost多输入分类预测 目录 分类预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost多输入分类预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于SVM-Adaboost支持向量机结合Ada…

软件开发代码审查(review)工具

软件开发代码审查&#xff08;Code Review&#xff09;是一个重要的质量保证实践&#xff0c;旨在发现和修复潜在的问题、缺陷和安全漏洞。为了进行有效的代码审查&#xff0c;开发团队通常使用各种代码审查工具。以下是一些常见的软件开发代码审查工具及其特点&#xff0c;希望…

数据结构之洗牌算法

洗牌算法 1.买一副牌(生成一副牌)2.洗牌3.揭牌完整代码 1.买一副牌(生成一副牌) 2.洗牌 3.揭牌 完整代码 card中的代码: cardDemo中的代码 测试类代码

【日积月累】SpringBoot启动流程

目录 SpringBoot启动流程 1.前言2.构造一个SpringApplication的实例&#xff0c;完成初始化的工作SpringApplication实例构造完之后调用run方法&#xff0c;启动SpringApplication3.SpringBoot启动代码SpringBootConfigurationComponentScanEnableAutoConfiguration 总结参考…

神经网络-pytorch版本

pytorch神经网络基础 torch简介 torch和numpy import torch import numpy as np np_datanp.arange(6).reshape((2,3)) torch_datatorch.from_numpy(np_data) tensor2arraytorch_data.numpy() print(np_data,"\n",torch_data,"\n",tensor2array)torch的数…