Kafka 位移

Consumer位移管理机制

Consumer的位移数据作为一条条普通的Kafka消息,提交到__consumer_offsets中。可以这么说,__consumer_offsets的主要作用是保存Kafka消费者的位移信息。使用Kafka主题来保存位移。

消息格式

位移主题就是普通的Kafka主题。也是一个内部主题,但它的消息格式却是Kafka自己定义的KV对(Key和Value分别表示消息的键值和消息体),用户不能修改,Kafka Consumer有API去提交位移,也就是向位移主题写消息。不要自己写个Producer随意向该主题发送消息。

主题消息的Key中应该保存标识Consumer的字段,也就是Consumer GroupGroup ID,标识唯一的Consumer Group,因为Consumer提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key中还应该保存 Consumer要提交位移的分区

总结:位移主题的Key中应该保存3部分内容:<Group ID,主题名,分区号>

还有2种格式:

        1. 用于保存Consumer Group信息的消息,用来注册Consumer Group

        2. tombstone消息,即墓碑消息,也称delete mark:用于删除Group过期位移甚至是删除Group的消息。

位移主题的创建

当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。

分区数是怎么设置的呢?这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是50,因此Kafka会自动创建一个50分区的位移主题。Broker端另一个参数offsets.topic.replication.factor 控制副本数,默认为3。所以:如果位移主题是Kafka自动创建的,那么该主题的分区数是50,副本数是3。

提交位移(Committing Offsets)

Consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。当Consumer发生故障重启之后,就能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍

从用户的角度来说,位移提交分为自动提交手动提交从Consumer端的角度来说,位移提交分为同步提交异步提交

Kafka Consumer提交位移的方式有两种:自动提交位移手动提交位移

手动提交位移

enable.auto.commit 如果值是false,则为手动提交,它能够把控位移提交的时机和频率可以使用Kafka Consumer API的consumer.commitSync等方法,当调用这些方法时,Kafka会向位移主题写入相应的消息。

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}

调用consumer.commitSync()方法的时机,是在处理完了poll()方法返回的所有消息之后。如果过早提交了位移,就可能会出现消费数据丢失的情况。它还也有一个缺陷,就是在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,影响整个应用程序的TPS。

Kafka社区为手动提交位移提供了另一个API方法:KafkaConsumer#commitAsync() ,这是一个异步操作。调用commitAsync()之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。由于它是异步的,Kafka提供了回调函数(callback),在实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用commitAsync()的方法:

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}

commitAsync是否能够替代commitSync呢?

        答案是不能。commitAsync的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过 期”或不是最新值了。因此,异步提交的重试其实没有意义,所以commitAsync是不会重试的。 

将commitSync和commitAsync组合使用

try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch(Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}

对于常规性、阶段性的手动提交,我们调用commitAsync()避免程序阻塞,而在Consumer要关闭前,我们调用commitSync()方法执行同步阻塞式的位移提交,以确保Consumer关闭前能够保存正确的位移数据。将两者结合后,既实现了异步无阻塞式的位移管理,也确保了Consumer位移的正确性。

分批处理(细粒度的位移提交)

        commitSync(Map<TopicPartition, OffsetAndMetadata>)

         commitAsync(Map<TopicPartition, OffsetAndMetadata>)

它们的参数是一个Map对象,键就 是TopicPartition,即消费的分区,而值是一个OffsetAndMetadata对象,保存的主要是位移数据。

例如:如何每处理100条消息就提交一次位移呢?以commitAsync为例,展示一段代码,实际上,commitSync的调用方法和它是一模一样的。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
// 其他操作
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record); // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()) , new OffsetAndMetadata(record.offset() + 1);if(count % 100 == 0)consumer.commitAsync(offsets, null); // 回调处理逻辑是nullcount++;}}
}

程序先是创建了一个Map对象,用于保存Consumer消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。要提交下一条消息的位移,这里构造OffsetAndMetadata对象时,使用当前消息位移加1的原因。代码的最后部分是做位移的提交。这里设置了一个计数器,每累计100条消息就统一提交一次位移。与调用无参的 commitAsync不同,这里调用了带Map对象参数的commitAsync进行细粒度的位移提交。这样,这段代码就能够实现每处理100条消息就提交一次位移,不用再受poll方法返回的消息总数的限制了。 

自动提交位移

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer 定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。但是没法把控Consumer端的位移管理。

 一旦设置了enable.auto.commit为true,Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer每5秒自动提交一次位移。现在,我们假设提交位移之后的3秒发生了Rebalance操作。在Rebalance之后,所有Consumer从上一次提交的位移处继续消费但该位移已经是3秒前的位移数据了,故在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然能够通过减少auto.commit.interval.ms的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。 

自动提交位移问题:

自动提交位移,那么就可能存在一个问题:只要Consumer一直启动着,它就会无限期地向位移主题写入消息。

假设Consumer当前消费到了某个主题的最新一条消息,位移是100,之后该主题没有任何新消息产生,故Consumer无消息可消费了,所以位移永远保持在100。由于是自动提交位移位移主题中会不停地写入位移=100的消息。显然Kafka只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

Kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义Compact策略中的过期呢?对于同一个Key的两条消息M1M2,如果M1的发送时间早于 M2,那么M1就是过期消息。Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起

图中位移为0、2和3的消息的Key都是K1。Compact之后,分区只需要保存位移为3的消息,因为它是最新发送的。 

Kafka提供了专门的后台线程定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据。这个后台线程叫LogCleaner

参考:Kafka 核心技术与实战 (geekbang.org)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/363983.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Type-C接口OTG转接器的应用与发展

随着科技的飞速发展&#xff0c;智能移动设备已成为我们生活中不可或缺的一部分。而在这些设备的连接与数据传输中&#xff0c;Type-C接口以其高效、便捷的特性逐渐占据了主导地位。OTG&#xff08;On-The-Go&#xff09;技术则进一步扩展了Type-C接口的功能&#xff0c;使得设…

JavaSE主要内容(全套超完整)

一、为什么选择Java&#xff08;Java的优势&#xff09; 1、应用面广&#xff1a; 相较于其他语言&#xff0c;Java的应用面可谓是非常广&#xff0c;这得益于他的跨平台性和其性能的稳定性。他在服务器后端&#xff0c;Android应用开发&#xff0c;大数据开发&#xf…

鼠尾草(洋苏草)

鼠尾草&#xff08;Salvia japonica Thunb.&#xff09;&#xff0c;又名洋苏草、普通鼠尾草、庭院鼠尾草&#xff0c;属于唇形科鼠尾草属多年生草本植物。鼠尾草以其独特的蓝紫色花序和长而细密的叶片为特点&#xff0c;常用于花坛、庭院和药用植物栽培。 鼠尾草的名字源自于…

25考研:今年初试时间比去年更早了?

过去5年考研初试时间安排如下&#xff1a; 24考研&#xff1a;2023年12月23-24日&#xff08;倒数第二个周末&#xff09; 23考研&#xff1a;2022年12月24-25日&#xff08;倒数第二个周末&#xff09; 22考研&#xff1a;2021年12月25-26日&#xff08;最后一个周末&#xf…

Al+医学,用这个中文多模态医学大模型帮你看胸片

随着人工智能技术的飞速发展&#xff0c;AI 在医学领域的应用已经成为现实。特别是在医学影像诊断方面&#xff0c;AI 大模型技术展现出了巨大的潜力和价值&#xff0c;但目前针对中文领域医学大多模态大模型还较少。 今天马建仓为大家介绍的这款 XrayGLM&#xff0c;就是由澳…

浅谈安科瑞ACRELCLOUD-1200光伏发电系统在建筑节能中的应用

摘要&#xff1a;21世纪以来&#xff0c;随着不可再生能源的逐渐减少&#xff0c;人们越来越重视能源的利用率&#xff0c;不断开发绿色能源。通过光伏发电系统&#xff0c;能够提升能源利用率&#xff0c;减少不可再生能源的开发。同时&#xff0c;也能加强我国建筑节能系统的…

wsl2收缩虚拟磁盘,减少空间占用

一、说明 由于WSL2使用的是虚拟磁盘&#xff0c;当虚拟磁盘的空间变大时&#xff0c;仅仅删除WSL2文件系统中没有用到的大文件&#xff0c;磁盘空间是无法自动收缩回收的。本文介绍了一种回收WSL2虚拟磁盘空间的方法。 二、停止WSL2 在收缩 WSL2 虚拟磁盘之前&#xff0c;需…

Cent0S7 Docker安装 YOLOv8

githup 源码及其作者&#xff1a;ultralytics/ultralytics&#xff1a;新增 - PyTorch 中的 YOLOv8 &#x1f680; > ONNX > OpenVINO > CoreML > TFLite (github.com) yolo是什么&#xff1f; 实时视觉检测技术&#xff0c;通过对不同的角度拍摄的视觉图片进行人…

实现自动化:如何利用阿里云OSS上传文件并自动打标签

在当前数字化时代&#xff0c;数据管理变得愈发重要&#xff0c;特别是对于需要大规模存储和管理文件的场景。阿里云对象存储服务&#xff08;OSS&#xff09;作为业界领先的解决方案&#xff0c;不仅提供了稳定可靠的云存储&#xff0c;还支持丰富的扩展功能&#xff0c;如文件…

DNF手游鬼剑士攻略:全面解析流光星陨刀的获取与升级!云手机强力辅助!

《地下城与勇士》&#xff08;DNF&#xff09;手游是一款广受欢迎的多人在线角色扮演游戏&#xff0c;其中鬼剑士作为一个经典职业&#xff0c;因其强大的输出能力和炫酷的技能特效&#xff0c;吸引了众多玩家的青睐。在这篇攻略中&#xff0c;我们将详细介绍鬼剑士的一把重要武…

【Flink metric(1)】Flink指标系统的系统性知识:获取metric以及注册自己的metric

文章目录 一. Registering metrics&#xff1a;向flink注册新自己的metrics1. 注册metrics2. Metric types:指标类型2.1. Counter2.2. Gauge2.3. Histogram(ing)2.4. Meter 二. Scope:指标作用域1. User Scope2. System Scope ing3. User Variables 三. Reporter ing四. System…

基于AT89C52单片机的超声波测距设计—数码管显示

点击链接获取Keil源码与Project Backups仿真图: https://download.csdn.net/download/qq_64505944/89456475?spm=1001.2014.3001.5503 C 源码+仿真图+毕业设计+实物制作步骤+10 在这里插入图片描述 题 目: 基于52的超声波测距汽车防撞系统 学生姓名 [姓名] 学 号 [学号…

接口自动化测试关联token的方法?

引言&#xff1a; 在接口自动化测试中&#xff0c;有时候我们需要关联token来进行身份验证或权限管理。本文将从零开始&#xff0c;介绍如何详细且规范地实现接口自动化测试中token的关联。 步骤一&#xff1a;准备工作 在开始之前&#xff0c;我们需要确保以下准备工作已完成…

【股指期权投教】一手股指期权大概多少钱?

一手股指期权的权利金大概在几千人民币左右&#xff0c;如果是作为期权卖方还需要另外缴纳保证金的。国内的股指期权有三种&#xff0c;沪深300、上证50、中证1000股指期权&#xff0c;每点合约人民币100 元。 期权合约的价值计算可以通过此公式得出&#xff1a;权利金的支付或…

excel实现下拉筛选(超简单)

excel实现下拉筛选 引言1、需求&#xff1a;预警状态下的列 实现下拉筛选2、实现2.1、数据验证2.2、下拉筛选内容2.3、去掉预警状态单元格的下拉筛选 引言 通常&#xff0c;我们会单独新建一张sheet表 专门存每个列的下拉内容。下面我将专门建立一张名为代码表的sheet表来存放…

1/7精确到100位,1000位,100000位怎么算?

双精度 Console.WriteLine("1/7的值是" (double)1 / 7);结果:0.14285714285714285 即使使用双精度浮点数&#xff0c;精确的位数也是有限的&#xff0c;如果想精确到小数点后100位&#xff0c;1000位&#xff0c;甚至更高哪&#xff1f; 朴素的除法 除数 余数 商…

Vue--》从零开始打造交互体验一流的电商平台(四)完结篇

今天开始使用 vue3 + ts 搭建一个电商项目平台,因为文章会将项目的每处代码的书写都会讲解到,所以本项目会分成好几篇文章进行讲解,我会在最后一篇文章中会将项目代码开源到我的github上,大家可以自行去进行下载运行,希望本文章对有帮助的朋友们能多多关注本专栏,学习更多…

二刷算法训练营Day45 | 动态规划(7/17)

目录 详细布置&#xff1a; 1. 139. 单词拆分 2. 多重背包理论基础 3. 背包总结 3.1 背包递推公式 3.2 遍历顺序 01背包 完全背包 详细布置&#xff1a; 1. 139. 单词拆分 给你一个字符串 s 和一个字符串列表 wordDict 作为字典。如果可以利用字典中出现的一个或多个单…

STM32使用PWM驱动WS2812_RGB灯珠

项目场景&#xff1a; 使用STM32标准库产生PWM实现RGB灯珠控制。 芯片型号&#xff1a;stm32f405rgt6 设计优点&#xff1a;不需要使用定时器中断资源&#xff0c;可以使得STM32在驱动RGB灯珠的同时能够执行其他任务。 RGB灯珠简介 项目所使用的RGB灯珠如下所示&#xff0c;封装…