kafka:java client使用总结塈seek() VS commitSync()的区别(三)

最近一段日子接触了kafka这个消息系统,主要为了我的开源中间件项目simplemq增加kafka支持(基于kafka-client【java】),如今总算完成,本文是对这个过程中对kafka消息系统的使用总结

线程安全

关于线程安全,kafka-client的代码注释有明确说明,

KafkaProducer是线程安全的

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
– from Java Comment of org.apache.kafka.clients.producer.KafkaProducer

也就是说在工程实践中,KafkaProducer实例可以使用单例模式。不需要为了发送一条消息而频繁创建KafkaProducer实例。

KafkaConsumer不是线程安全的

Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
making the call. It is the responsibility of the user to ensure that multi-threaded access
is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
– from Java Comment of org.apache.kafka.clients.consumerKafkaConsumer

在工程实践中,如果希望对订阅的主题单独管理,那么对于订阅的每一个主题(topic)必须创建一个单独的KafkaConsumer实例负责接收消息。并且要注意对KafkaConsumer实例的多数方法也只能在消息接收线程中。

分区

KafkaConsumer.poll()方法返回拉取的消息对象迭代对象(Iterable),迭代元素类型为ConsumerRecord,从ConsumerRecord返回的字段可知包括了key,value,offset,partition,partition即为分区。
也就是说,如果topic有多个分区,那么每次摘取的一批消息可能是来自不同分区的。所以不能想当然认为每一批消息都是一个分区的。
每批次拉取的消息同一个分区的消息的消息偏移值都是连续的。即[33,34,35]这样的连续数字,
不同的分区的偏移值没有相关性

手动提交

创建KafkaConsumer实例时如果不指定enable.auto.commit参数为true,默认KafkaConsumer是自动提交的。
自动提交模式没啥好说的,不会存在重复消费和遗漏消息的问题。
如果要使用手动提交模式,调用方就要自己维护分区的偏移,以确保不会出现重复消费和遗漏消息问题。
本节讲述手动提交模式下,设计需要注意的问题

团进团出

团进团出是旅游行业的一个术语,即要求一个旅行团,整团出发入境时是多少人,返程出境时要一个不少的回来
在这里的意思就是手动提交模式下每次KafkaConsumer.poll()方法每次拉取一批消息(数量不等),处理完消息后,就要对这批消息进行手动提交处理。提交完成后,才能继续拉取下一批消息。不能在上一批消息还没有完成提交的时候,就调用KafkaConsumer.poll()方法拉取下一批消息。

所以如果你的项目中消息处理是异步的,那么一定要同步等待当前这批消息被处理完,才能再次执行KafkaConsumer.poll()方法拉取消息。

前面说过如果主题有多个分区,每批拉取的消息可能是来自不同分区的。
为方便举例,我们以如下格式表示收到的一条消息

0-100-true

消息由-号三段数字字母代表,

  • 第一段数字代表分区,
  • 第二段数字为偏移,
  • 最后的true/false代表该消息是否正确处理并提交确认,
    为true的需要提交,
    false则是因为各种原因处理失败不需要提交,希望下一轮拉取消息继续处理。

完整提交

如下面的分区0,如果一批消息中同一个分区的所有消息都被正确处理需要提交,那么它就是完整提交

[0-100-true,0-101-true,0-103-true]

如下调用 KafkaConsumer.commitSync方法就可以了。

/** 分区完整提交,提交偏移为最后一个偏移+1 */
// 分区0
TopicPartition topicPartition = new TopicPartition(topic_name, 0);
long lastTrueOffset = 103;
/** 提交的偏移指向最后偏移量的下一条记录 */
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastTrueOffset+1);
consumer.commitSync(Collections.singletonMap(topicPartition, offsetAndMetadata));

不完整提交

如下面的分区1,如果一批消息中同一个分区的消息有部分消息标记为false不能提交,那么它就是不完整提交。

[1-41-true,1-42-false,1-43-false,1-44-true]

对于不完整提交,我们只能从将第一个false之前的记录下次循环不用再处理,第一个false及之后的消息只能留给下次循环拉取消息再处理。如下使用seek()方法修改分区偏移

/** * 分区不完整提交:* 记录本轮第一个标记为false的记录之后所有提交标记为true的偏移 * 下一轮拉取消息从第一个标记为false的偏移开始*/
// 分区1
TopicPartition topicPartition = new TopicPartition(topic_name, 1);
long firstFalseOffset =42;
consumer.seek(topicPartition,firstFalseOffset);

在不完整提交的状态下,下次执行poll()方法拉取的消息中包含上一批消息为标记为true的消息,所以还需要有机制记录上一轮拉取的消息中不完整提交中标记为true的消息,这些消息不需要再被处理,否则就会出现重复消费问题。

重复消费问题

即使如上面所说在程序中有机制记录上次不完整提交中标记为true的消息,在下次循环拉取消息后,对上次已经标记为true的消息不再被重复处理,还是无法完全避免重复消费问题。因为这只是解决当前消费者实例在当前消费循环中的重复消费问题。
在消息循环结束前最后一次拉取消息如果是不完整提交,如果这些不完整提交的数据没有持久化保存,那么在下次创建的消费者实例还是会有已经被确认消费的消息被重复消费的情况。
所以如果要完全解决重复消费问题,需要应用层对不完全提交的消息进行额外处理:

  1. 将确认为false的消息存储到缓冲区或持久化存储中:在处理确认为false的消息时,你可以将这些消息存储到缓冲区或持久化存储中,例如内存队列、数据库或文件系统。这样,下次启动消费者时,可以从缓冲区或存储中加载这些消息,并进行再次处理。
  2. 使用定时任务重新处理消息:你可以设置一个定时任务,定期检查确认为false的消息,并重新进行处理。定时任务可以根据需要从缓冲区或持久化存储中获取这些消息,并重新发送给消费者进行处理。

seek() VS commitSync()

seek()方法和commitSync()方法的作用都是通过更新分区的偏移值,控制拉取消息的位置,但这两个方法肯定是有区别的否则不可能设计两个方法干同样的事儿。

commitAsync()commitSync()方法作用是一样的,区别在于commitAsync()是异步提交
事实上我通过输出日志的方式发现commitAsync()执行结束调用OffsetCommitCallback对象时所在线程与commitAsync()执行在同一线程,也就是说commitAsync()可能也是同步提交

我通过反复的实验,对它们的差别有了初步的判断。但并不太确定。
于是,关于seek()方法和commitSync()方法的区别我问了bito机器人,这是它的回答,证实了我的想法,与我的实验结论是一致的。
在这里插入图片描述

我在机器人回答的基础上再做一些示例补充就是如下完整的说明:
commitSync() 方法管理的是消费者下次启动时获取消息的偏移量。当调用 commitSync() 方法时,消费者会将当前消费的最新偏移量提交给Kafka,并在下次启动时从该偏移量处继续消费。

比如:本次poll拉取了 100,101,102三条消息,commitSync提交偏移101,那么下次一轮执行poll拉取消息会从偏移103开始,此刻如果中止拉取消息,下次再重新启动消费者时拉取偏移为101。

seek() 方法更直接,它会修改当前消费者实例下次循环拉取消息的偏移量。如果你在消费者实例中调用 seek() 方法来设置偏移量,并在之后中止拉取消息,下次再启动消费者实例时,它会从你设置的偏移量处开始拉取消息。

还以上例,本次poll拉取了 100,101,102三条消息,seek修改偏移101,那么下次一轮执行poll拉取消息会从偏移101开始,如果此刻中止拉取消息,下次再重新启动消费者时拉取偏移为100,因为我们没有执行commitSync将偏移量持久化。

因此, commitSync() 方法影响的是下次消费者启动时的偏移量,而 seek() 方法影响的是当前消费者实例下次循环拉取消息的偏移量,并不会影响下次再启动消费者实例时的偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/81291.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java中javamail发送带附件的邮件实现方法

java中javamail发送带附件的邮件实现方法 本文实例讲述了java中javamail发送带附件的邮件实现方法。分享给大家供大家参考。具体分析如下: JavaMail,顾名思义,提供给开发者处理电子邮件相关的编程接口。它是Sun发布的用来处理email的API。它…

Python连接Hive实例教程

一 Python连接hive环境实例 经在网络查询相关的教程,发现有好多的例子,发现连接底层用的的驱动基本都是pyhive和pyhs2两种第三方库的来连接的 hive,下面将简介windows 10 python 3.10 连接hive的驱动程序方式,开发工具:pycharm …

找工作的才是大爷?面试了一个工作4年的测试工程师,一问连自动化基础都不清楚,还反过来怼我....

我们公司也开始大量招人了,我这次是公司招聘的面试官之一,主要负责一些技术上的考核,这段时间还真让我碰到了不少奇葩求职者 昨天公司的HR小席刚跟我吐槽:这几个星期没有哪天不加班的!各种招聘网站上的消息源源不断&a…

MyCat管理及监控——zookeeper及MyCat-web安装

1.MyCat管理 2.MyCat-eye 3.zookeeper安装 第一步:解压 第二部: 切换目录,创建data文件夹 第三步:修改zookeeper配置文件 这样zookeeper安装及配置就完成了 4.MyCat-web安装 注意mycat-web要与zookeeper关联,…

FL Studio低版本怎么免费升级:FL Studio升级要钱吗?

为了更好的服务国内FL Studio用户,FL Studio 官网提供了跨版本升级的服务,用户可以通过缴纳一定的费用,将自己已购买的入门版或其他非完整版的版本,升级为更高的版本,解锁更多的插件,而无需重新购买整套版本…

深入浅出对话系统——闲聊对话系统进阶

引言 本文主要关注生成式闲聊对话系统的进阶技术。 基于Transformer的对话生成模型 本节主要介绍GPT系列文章,这是由OpenAI团队推出的,现在大火的ChatGPT也是它们推出的。 GPT : Improving Language Understanding by Generative Pre-Traini ng 在自…

每次执行@Test方法前都执行一次DB初始化(SpringBoot Test + JUnit5环境)

引言 在执行单元测试时,可以使用诸如H2内存数据库替代线上的Mysql数据库等,如此在执行单元测试时就能尽可能模拟真实环境的SQL执行,同时也无需依赖线上数据库,增加了测试用例执行环境的可移植性。而使用H2数据库时,通…

两个多选框(select)之间值的左右上下移动

<!DOCTYPE html> <html> <head><meta charset"utf-8"><title>两个多选框(select)之间值的左右上下移动</title> </head> <script src"https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>&…

看重ARM?苹果、三星、英伟达等知名企业纷纷表示加大投资

根据日经亚洲的报道&#xff0c;芯片设计公司Arm计划进行首次公开募股并在纳斯达克上市。苹果、三星电子、英伟达、英特尔等知名企业计划在Arm美股上市后投资该公司。 据悉&#xff0c;Arm将于9月份上市&#xff0c;预计估值将达到至少600亿美元&#xff08;约合4314亿元人民币…

Spring Boot多级缓存实现方案

1.背景 缓存&#xff0c;就是让数据更接近使用者&#xff0c;让访问速度加快&#xff0c;从而提升系统性能。工作机制大概是先从缓存中加载数据&#xff0c;如果没有&#xff0c;再从慢速设备(eg:数据库)中加载数据并同步到缓存中。 所谓多级缓存&#xff0c;是指在整个系统架…

Jmeter录制HTTPS脚本

Jmeter录制HTTPS脚本 文章目录 添加“HTTP代理服务器”设置浏览器代理证书导入存在问题 添加“HTTP代理服务器” 设置浏览器代理 保持端口一致 证书导入 点击一下启动让jmeter自动生成证书&#xff0c;放在bin目录下&#xff1a; 打开jmeter的SSL管理器选择刚刚生成的证书&…

Linux root用户执行修改密码命令,提示 Permission denied

问题 linux系统中&#xff08;ubuntu20&#xff09;&#xff0c;root用户下执行passwd命令&#xff0c;提示 passwd: Permission denied &#xff0c;如下图&#xff1a; 排查 1.执行 ll /usr/bin/passwd &#xff0c;查看文件权限是否正确&#xff0c;正常情况是 -rwsr-xr…

20230807通过ffmpeg将DTS编码的AUDIO音频转换为AAC编码

20230807通过ffmpeg将DTS编码的AUDIO音频转换为AAC编码 2023/8/7 20:04 ffmpeg dts 转AAC 缘起&#xff1a;由于网上找的电影没有中文字幕&#xff0c;有内置的英文字幕&#xff0c;但是还是通过剪映/RP2023识别一份英文字幕备用&#xff01; I:\Downloads\2005[红眼航班]Red E…

一、MySql前置知识

文章目录 一、什么是数据库&#xff08;一&#xff09;存储数据用文件就可以了&#xff0c;为什么还要弄个数据库?&#xff08;二&#xff09;数据库存储介质&#xff1a;&#xff08;三&#xff09;主流数据库 二、数据库基本操作&#xff08;一&#xff09;连接服务器&#…

基于Spring Boot的医院预约挂号网站设计与实现(Java+spring boot+MySQL)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于Spring Boot的医院预约挂号网站设计与实现&#xff08;Javaspring bootMySQL&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;Java spring…

Linux 远程登录

Linux 远程登录 Linux 一般作为服务器使用&#xff0c;而服务器一般放在机房&#xff0c;你不可能在机房操作你的 Linux 服务器。 这时我们就需要远程登录到Linux服务器来管理维护系统。 Linux 系统中是通过 ssh 服务实现的远程登录功能&#xff0c;默认 ssh 服务端口号为 2…

机器学习深度学习——从全连接层到卷积

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——非NVIDIA显卡怎么做深度学习&#xff08;坑点排查&#xff09; &#x1f4da;订阅专栏&#xff1a;机器…

流量、日志分析

流量分析 知识点&#xff1a; 流量包分析简介 - CTF Wiki (ctf-wiki.org) Wireshark 基本语法&#xff0c;基本使用方法&#xff0c;及包过虑规则_wireshark语法_竹痕的博客-CSDN博客 MISC&#xff1a;流量包取证&#xff08;pcap文件修复、协议分析、数据提取&#xff09;…

【linux-keepalive】keepalive避免单点故障,高可用配置

keepalive: [rootproxy ~]# yum install -y keepalived [rootproxy ~]# vim /etc/keepalived/keepalived.conf global_defs {router_id proxy1 //设置路由ID号vrrp_iptables //不添加任何防火墙规则 } vrrp_instance V…

AI Chat 设计模式:13. 代理模式

本文是该系列的第十三篇&#xff0c;采用问答式的方式展开&#xff0c;和前面的文章有一些不同&#xff0c;我不再进行提问了&#xff0c;改为由 GPT 1 号提问&#xff0c;GPT 2 号作答&#xff0c;每一节的小标题是我从 GPT 1 号的提问中总结出来的。我现在是完完全全的旁观者…