Kafka Offset 自动提交和手动提交 - 漏消费与重复消费

目录

1. 引言

2. Offset 提交方式概述

2.1 自动提交 Offset

2.2 手动提交 Offset

3. 漏消费与重复消费的问题分析

3.1 自动提交模式下的漏消费和重复消费

漏消费

重复消费

3.2 手动提交模式下的漏消费和重复消费

漏消费

重复消费

4. 自动提交与手动提交的选择

4.1 适用场景

4.2 配置建议

5. 代码示例

5.1 自动提交示例

5.2 手动提交示例

6. 结论

参考文档


1. 引言

Kafka 是当前广泛使用的分布式消息队列系统,其强大的吞吐量和可靠性使其在实时数据流处理中广受欢迎。在 Kafka 消费过程中,Offset 是一个重要的概念,它记录了每个消费组读取消息的进度。本文将详细探讨 Kafka Offset 的自动提交和手动提交模式,并分析它们可能导致的漏消费和重复消费问题。

2. Offset 提交方式概述

2.1 自动提交 Offset

在 Kafka 中,enable.auto.commit 配置项决定是否开启自动提交。当设置为 true 时,Kafka Consumer 会定期(由 auto.commit.interval.ms 配置项指定的时间间隔)自动提交当前的 Offset。自动提交的优点是实现简单,使用方便,但缺点是可能会导致漏消费或重复消费的问题。

2.2 手动提交 Offset

手动提交 Offset 是指由程序员在消费逻辑中显式地调用提交方法(如 commitSync()commitAsync())进行 Offset 提交。手动提交提供了对 Offset 更精细的控制,能够减少漏消费和重复消费的风险,但也增加了实现的复杂性。

3. 漏消费与重复消费的问题分析

3.1 自动提交模式下的漏消费和重复消费

漏消费

在自动提交模式下,Kafka 会按固定的时间间隔提交 Offset,如果在 Offset 自动提交之后但在实际消费消息之前应用崩溃或发生其他错误,可能导致该 Offset 被提交,但实际消息并未消费。这就会造成消息的漏消费。

重复消费

自动提交可能会在消息实际处理完成之前提交 Offset。如果在 Offset 提交之后但消息处理尚未完成时应用崩溃,则在重启后,Kafka 将从已提交的 Offset 开始重新消费,导致部分消息被重复消费。

3.2 手动提交模式下的漏消费和重复消费

漏消费

在手动提交模式下,如果消息处理完成但在手动提交 Offset 之前应用崩溃或发生错误,则会导致该批次消息未被提交 Offset,从而在下次消费时从上一次提交的 Offset 开始重新消费,理论上不会导致漏消费问题。

重复消费

由于手动提交模式通常在消息处理完成后提交 Offset,因此应用崩溃可能导致上一次提交的 Offset 和实际消费的消息之间出现重复,但通过精细控制可以尽量减少重复消费的风险。

4. 自动提交与手动提交的选择

4.1 适用场景

  • 自动提交:适用于对消息偶尔漏消费或重复消费容忍度较高的场景,比如一些日志数据处理,自动提交可以简化代码逻辑。
  • 手动提交:适用于对数据一致性要求较高的场景,比如金融数据处理,手动提交可以更精细地控制消费流程,减少数据误差。

4.2 配置建议

  • 若使用 自动提交,应确保 auto.commit.interval.ms 设置合理,避免过长的提交间隔导致更多的重复消费。
  • 若使用 手动提交,应使用 commitSync() 进行同步提交,确保 Offset 成功提交;或者使用 commitAsync() 提高性能,但要处理可能的失败提交。

5. 代码示例

5.1 自动提交示例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}

5.2 手动提交示例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");  // 禁用自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 手动同步提交consumer.commitSync();
}

6. 结论

Kafka Offset 的自动提交和手动提交各有优缺点,选择适合的方式需要根据具体的业务场景需求来决定。自动提交适合简单场景,但容易发生漏消费和重复消费,而手动提交提供了更高的灵活性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/476269.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL - 数据库基础 | 数据库操作 | 表操作

文章目录 1、数据库基础1.1为什么要有数据库1.2主流的数据库1.3连接MySQL1.4服务器、数据库、表的关系1.5 MySQL框架1.6 SQL分类1.7储存引擎 2.数据库操作2.1创建数据库2.2字符集和校验规则2.3删除数据库2.4修改数据库2.5备份与恢复2.6查看连接情况 3.表的操作3.1创建表3.2查看…

通过vite+vue3+pinia从0到1搭建一个uniapp应用

最近项目上要做一个app&#xff0c;选择了用uniapp作为开发框架&#xff1b;我大概看了一下uniapp的文档&#xff0c;根据文档从0到1搭了一个uniapp应用供大家参考。 因为本人习惯使用了WebStorm编译器&#xff0c;但是uniapp官方推荐使用HBuilder搭建&#xff0c;如果和我一样…

学习路之phpstudy--安装mysql5.7后在my.ini文件中无法修改sql_mode

windows环境下使用phpstudy安装mysql5.7后需要修改mysql中的sql_mode配置&#xff0c;但是在phpstudy中打开mysql配置文件my.ini后&#xff0c; 通过查找找不到sql_mode或sql-mode&#xff0c; 此时无法在my.ini文件中直接进行修改&#xff0c;可以使用mysql命令进行修改&#…

IDEA:2023版远程服务器debug

很简单&#xff0c;但是很多文档没有写清楚&#xff0c;wocao 一、首先新建一个远程jvm 二、配置 三、把上面的参数复制出来 -agentlib:jdwptransportdt_socket,servery,suspendn,address5005 四、然后把这串代码放到服务器中&#xff08;这里的0.0.0.0意思是所有IP都能访问&a…

ts: 定义一个对象接收后端返回对象数据,但是报错了有红色的红线为什么

问&#xff1a; const backendProgressData ref<object>&#xff08;{}&#xff09; 这是我的代码&#xff0c;但是当我进行使用的时候&#xff1a; backendProgressData.value xxxx接口返回数据progressData:{percentage:123,text:"文字"} 在template中{{…

解决Docker环境变量的配置的通用方法

我们部署的很多服务都是以Docker容器的形式存在的。 在运行Docker容器前&#xff0c;除了设置网络、数据卷之外&#xff0c;还需要设置各种各样的环境变量。 有时候&#xff0c;由于容器版本的问题&#xff0c;一些文档没有及时更新&#xff0c;可能同时存在多个新旧版本的环…

【腾讯云产品最佳实践】腾讯云CVM入门技术与实践:通过腾讯云快速构建云上应用

目录 前言 什么是腾讯云CVM&#xff1f; 腾讯云CVM的技术优势 基于最佳技术实践&#xff0c;使用腾讯云CVM搭建应用 1. 开通CVM实例 2. 连接CVM实例 3. 配置Web环境 4. 部署PHP应用 腾讯云CVM行业应用案例&#xff1a;电商平台的双十一攻略 1. 弹性伸缩解决高并发问题…

51c嵌入式~IO合集2

我自己的原文哦~ https://blog.51cto.com/whaosoft/11697814 一、STM32串口通信基本原理 通信接口背景知识 设备之间通信的方式 一般情况下&#xff0c;设备之间的通信方式可以分成并行通信和串行通信两种。并行与串行通信的区别如下表所示。 串行通信的分类 1、按照数据传…

七、电机三环控制

电机三环控制指的是&#xff0c;直流有刷电机三环&#xff08;电流环速度环位置环&#xff09;PID 控制。 1、三环PID控制原理 三环 PID 控制就是将三个 PID 控制系统&#xff08;例如&#xff1a;电流环、速度环以及位置环&#xff09;串联起来&#xff0c;然后对前一个系统…

NLP论文速读(多伦多大学)|利用人类偏好校准来调整机器翻译的元指标

论文速读|MetaMetrics-MT: Tuning Meta-Metrics for Machine Translation via Human Preference Calibration 论文信息&#xff1a; 简介&#xff1a; 本文的背景是机器翻译&#xff08;MT&#xff09;任务的评估。在机器翻译领域&#xff0c;由于不同场景和语言对的需求差异&a…

【Vue】Vue指令

目录 概念 作用 分类 内容渲染指令 属性绑定指令 事件绑定指令 条件渲染指令 v-if/v-else-if/v-else多分支渲染 v-show和v-if的区别 列表渲染指令 v-for中的key属性 双向绑定指令 示例&#xff1a;图片切换 示例&#xff1a;可折叠面板 示例&#xff1a;书架…

C++:类和对象(三)

1.深入了解构造函数 1.1初始化列表 引入&#xff1a;我们首先要知道&#xff0c;在类中我们是声明变量&#xff0c;在实例化的时候才是开空间&#xff0c;在调用构造函数的时候又分两段&#xff0c;一段是定义&#xff08;所有的成员变量进行定义&#xff09;&#xff0c;一段…

北京申请中级职称流程(2024年)

想找个完整详细点的申请流程资料真不容易&#xff0c;做个分享送给需要的人吧。 不清楚为什么说文章过度宣传&#xff0c;把链接和页面去掉了&#xff0c;网上自己找一下。 最好用windows自带的EDGE浏览器打开申请网站&#xff0c;只有在开始申请的时间内才可以进行网上申报&…

好用的js组件库

lodash https://www.lodashjs.com/https://www.lodashjs.com/ uuid 用于生成随机数&#xff0c;常用于生成id标识 GitHub - uuidjs/uuid: Generate RFC-compliant UUIDs in JavaScripthttps://github.com/uuidjs/uuid dayjs 常用于时间的处理 安装 | Day.js中文网 (fenxi…

php:使用socket函数创建WebSocket服务

一、前言 闲来无事&#xff0c;最近捣鼓了下websocket&#xff0c;但是不希望安装第三方类库&#xff0c;所以打算用socket基础函数创建个服务。 二、构建websocket服务端 <?phpclass SocketService {// 默认的监听地址和端口private $address 0.0.0.0;private $port 8…

Tailscale 自建 Derp 中转服务器(全程无 Docker + 无域名纯 IP 版本)

文章目录 整体大纲目的&#xff1a;为什么要建立 Derp 中转服务器云服务器安装 DerpDerp 中转服务器介绍安装 Go 环境通过 Go 安装 Derp处理证书文件自签一个域名给 Derp验证 Derp 是否启动 云服务器安装并登录 Tailscale第一种组合&#xff1a; 官方 Tailscale 账号 自己的 D…

shell--第一次作业

1.接收用户部署的服务名称 # 脚本入口 read -p "请输入要部署的服务名称&#xff1a;" service_name 2.判断服务是否安装 # 判断服务是否安装 if rpm -q "$service_name" &>/dev/null; then echo "服务 $service_name 已安装。" 已…

项目部署问题bug记录(长期更新)

一、编译相关 1.submodules/simple-knn/simple_knn.cu(90): error: identifier "FLT_MAX" is undefined me.minn { FLT_MAX, FLT_MAX, FLT_MAX }; 部署photoreg工程&#xff0c;在编译simple_knn的时候&#xff0c;报错&#xff1a; (photoreg) leelee…

ant-design-vue中table组件多列排序

antD中table组件多列排序 使用前注意实现效果图实现的功能点及相关代码1. 默认按某几个字段排序2. 点击排序按钮可同时对多个字段进行排序3. 点击重置按钮可恢复默认排序状态。 功能实现完整的关键代码 使用前注意 先要确认你使用的antD版本是否支持多列排序&#xff0c;我这里…

影视后期学习Ⅰ~

1.DV是光盘 磁带 2.序列就是我们要制作的一个视频。 打开界面显示&#xff1a; 一号面板放的是素材&#xff0c;二号面板叫源监视器面板&#xff08;它的名字需要记住&#xff09;在一号面板点击文件之后&#xff0c;进入二号面板&#xff0c;在二号面板预览没问题后&#xf…