Kafka consumer_offsets 主题深度剖析

Kafka consumer_offsets 主题深度剖析

在 Apache Kafka 的消息消费机制中,确保消息被可靠消费是一个核心问题。为了解决这个问题,Kafka 设计了一个特殊的内部主题 consumer_offsets,用于跟踪和管理消费者组的消费进度。

consumer_offsets 的基本概念

consumer_offsets 是 Kafka 的一个内部主题,它具有以下特征:

  1. 默认包含 50 个分区(可通过 offsets.topic.num.partitions 配置)
  2. 使用 3 个副本因子(可通过 offsets.topic.replication.factor 配置)
  3. 采用日志压缩(log compaction)的清理策略
  4. 消息格式为二进制的键值对

这个主题存储了所有消费者组的位移信息。每个消费者组消费某个主题分区时,都会定期将自己的消费位置(offset)提交到这个主题中。当消费者重启或发生再平衡时,可以从这个主题中恢复之前的消费位置,确保消息不会丢失或重复消费。

通过代码来演示如何实现消费者位移的提交和管理:

public class ConsumerOffsetDemo {private final KafkaConsumer<String, String> consumer;private final String topic;private final String groupId;public ConsumerOffsetDemo(String bootstrapServers, String topic, String groupId) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 关闭自动提交,手动控制位移提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");this.consumer = new KafkaConsumer<>(props);this.topic = topic;this.groupId = groupId;}public void consumeAndCommit() {try {consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息processRecord(record);// 手动提交单条消息的位移Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));consumer.commitSync(offsets);}}} finally {consumer.close();}}
}

位移提交机制

位移提交是 consumer_offsets 主题的核心功能。当消费者消费消息时,需要定期将自己的消费进度提交到这个主题。提交的消息包含以下信息:

  1. key:包含 <消费者组ID, 主题名称, 分区号> 的三元组
  2. value:包含 offset(位移)、timestamp(时间戳)等信息

提交方式分为自动提交和手动提交:

  1. 自动提交:由消费者自动定期提交,通过 auto.commit.interval.ms 配置提交间隔
  2. 手动提交:由应用程序控制提交时机,可以选择同步提交或异步提交

下面是一个完整的位移监控实现:

public class OffsetMonitor {private final AdminClient adminClient;private final KafkaConsumer<byte[], byte[]> consumer;public OffsetMonitor(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);props.put(ConsumerConfig.GROUP_ID_CONFIG, "offset-monitor");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());this.consumer = new KafkaConsumer<>(props);}public Map<String, ConsumerGroupOffset> getConsumerGroupOffsets(String groupId) {Map<String, ConsumerGroupOffset> result = new HashMap<>();try {// 获取消费者组的位移信息ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(groupId);Map<TopicPartition, OffsetAndMetadata> offsets = offsetsResult.partitionsToOffsetAndMetadata().get();// 获取主题的结束位移Map<TopicPartition, Long> endOffsets = consumer.endOffsets(offsets.keySet());// 计算消费延迟for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {TopicPartition tp = entry.getKey();long committedOffset = entry.getValue().offset();long endOffset = endOffsets.get(tp);long lag = endOffset - committedOffset;result.put(tp.topic(), new ConsumerGroupOffset(committedOffset, endOffset, lag));}} catch (Exception e) {e.printStackTrace();}return result;}
}

位移管理和运维

在实际运维中,我们需要对 consumer_offsets 主题进行管理和监控。主要包括以下几个方面:

  1. 位移重置:当需要重新消费某个主题的消息时,可以重置消费者组的位移
  2. 消费者组管理:包括删除不再使用的消费者组等操作
  3. 监控告警:监控消费延迟,及时发现消费异常

下面是一个位移管理工具的实现:

public class OffsetManager {private final AdminClient adminClient;public OffsetManager(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);}// 重置消费者组位移public void resetOffset(String groupId, String topic, int partition, long offset) {try {TopicPartition tp = new TopicPartition(topic, partition);Map<TopicPartition, OffsetAndMetadata> offsetMap = Collections.singletonMap(tp, new OffsetAndMetadata(offset));adminClient.alterConsumerGroupOffsets(groupId, offsetMap).all().get();System.out.printf("Successfully reset offset for group=%s, topic=%s, " +"partition=%d to %d%n",groupId, topic, partition, offset);} catch (Exception e) {e.printStackTrace();}}// 删除消费者组public void deleteConsumerGroup(String groupId) {try {adminClient.deleteConsumerGroups(Collections.singleton(groupId)).all().get();System.out.printf("Successfully deleted consumer group: %s%n", groupId);} catch (Exception e) {e.printStackTrace();}}// 监控消费延迟public void monitorConsumerLag(String groupId, String topic) {try {TopicPartition tp = new TopicPartition(topic, 0);Map<TopicPartition, OffsetAndMetadata> offsetMap = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();long currentOffset = offsetMap.get(tp).offset();long endOffset = getEndOffset(tp);long lag = endOffset - currentOffset;if (lag > 10000) { // 设置告警阈值System.out.printf("Warning: High lag detected for group=%s, topic=%s: %d%n",groupId, topic, lag);}} catch (Exception e) {e.printStackTrace();}}private long getEndOffset(TopicPartition tp) {try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(new Properties())) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(tp));return endOffsets.get(tp);}}
}

consumer_offsets 主题是 Kafka 消息消费机制的核心组件,它通过存储和管理消费位移信息,确保了消息消费的可靠性和可恢复性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/36891.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在 Node.js 中使用 .env 文件管理环境变量 ?

Node.js 应用程序通常依赖于环境变量来管理敏感信息或配置设置。.env 文件已经成为一种流行的本地管理这些变量的方法&#xff0c;而无需在代码存储库中公开它们。本文将探讨 .env 文件为什么重要&#xff0c;以及如何在 Node.js 应用程序中有效的使用它。 为什么使用 .env 文…

《视觉SLAM十四讲》ch13 设计SLAM系统 相机轨迹实现

前言 相信大家在slam学习中&#xff0c;一定会遇到slam系统的性能评估问题。虽然有EVO这样的开源评估工具&#xff0c;我们也需要自己了解系统生成的trajectory.txt的含义&#xff0c;方便我们更好的理解相机的运行跟踪过程。 项目配置如下&#xff1a; 数据解读&#xff1a; …

软考高级信息系统管理工程师通关100题(21-40)附记忆口诀

文章目录 21.常用存储模式的技术与应用对比22.物联网架构23.云计算服务提供的资源层次24.大数据25.区块链26.人工智能27.虚拟现实VR28.IT治理的内涵29.IT 治理活动30.IT治理本质31.IT审计目标32.IT审计方法33.治理系统设计34.数据管理能力成熟度评估模型35.项目管理原则36.管理…

Redisson 分布式锁原理

加锁原理 # 如果锁不存在 if (redis.call(exists, KEYS[1]) 0) then# hash结构,锁名称为key,线程唯一标识为itemKey&#xff0c;itemValue为一个计数器。支持相同客户端线程可重入,每次加锁计数器1.redis.call(hincrby, KEYS[1], ARGV[2], 1);# 设置过期时间redis.call(pexpi…

主流加固方案深度剖析(梆梆/腾讯/阿里)

1. 加固技术演进与核心原理 1.1 移动端加固技术图谱 graph TD A[代码防护] --> A1[混淆] A --> A2[虚拟化] A --> A3[动态加载] B[数据防护] --> B1[资源加密] B --> B2[协议加密] C[运行时防护] --> C1[反调试] C --> C2[环境检测] C --> C…

大模型之蒸馏模型

蒸馏模型&#xff08;Distilled Model&#xff09;是一种通过知识蒸馏&#xff08;Knowledge Distillation&#xff09;技术训练得到的轻量级模型&#xff0c;其核心思想是将一个复杂的大模型&#xff08;称为教师模型&#xff09;的知识“迁移”到一个更小、更高效的模型&…

iPaaS集成平台中的API可视化编排能给企业带来什么作用

随着企业数字化转型的加速&#xff0c;API&#xff08;应用程序接口&#xff09;作为企业数字化资产的核心组成部分&#xff0c;其数量和复杂性不断增加。为了满足业务敏捷化交付的要求&#xff0c;API可视化编排平台应运而生。谷云科技作为这一领域的领先者&#xff0c;其API可…

演员马晓琳正式加入创星演员出道计划,开启演艺事业新篇章

3月19日&#xff0c;演员马晓琳正式加入“创星演员出道计划”&#xff0c;不仅得到参演都市爱情喜剧《和我结婚吧》角色的机会&#xff0c;还获得文旅精品网剧《醉梦灵州》的出演机会&#xff0c;自此开启全新影视之路。对表演抱有极大热情的马晓琳&#xff0c;相信未来可以凭借…

绿盟科技春招面试

《网安面试指南》https://mp.weixin.qq.com/s/RIVYDmxI9g_TgGrpbdDKtA?token1860256701&langzh_CN 5000篇网安资料库https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247486065&idx2&snb30ade8200e842743339d428f414475e&chksmc0e4732df793fa3bf39…

双碳战略下的电能质量革命:解码电力系统的健康密码

安科瑞顾强 在能源结构转型的深水区&#xff0c;电能质量正成为制约产业升级的隐形门槛。国家能源局数据显示&#xff0c;我国工业企业每年因电能质量问题造成的经济损失高达3000亿元&#xff0c;而新能源项目因并网质量问题导致的发电效率损失超过15%。在这场关乎能源安全的攻…

Microsoft Edge浏览器的取证分析(基于Chromium)

概述 早在2019年&#xff0c;微软就用Chromium替换了EdgeHTML浏览器引擎&#xff0c;这是微软支持谷歌Chrome浏览器的一个开源项目。通过切换到Chromium&#xff0c;Edge与Chrome浏览器共享一个共同的架构&#xff0c;这意味着用于Chrome浏览器调查的取证技术也适用于Edge。 …

python学智能算法(八)|决策树

【1】引言 前序学习进程中&#xff0c;已经对KNN邻近算法有了探索&#xff0c;相关文章链接为&#xff1a; python学智能算法&#xff08;七&#xff09;|KNN邻近算法-CSDN博客 但KNN邻近算法有一个特点是&#xff1a;它在分类的时候&#xff0c;不能知晓每个类别内事物的具…

RTSP/Onvif安防监控系统EasyNVR级联视频上云系统EasyNVS报错“Login error”的原因排查与解决

EasyNVR安防视频云平台是旭帆科技TSINGSEE青犀旗下支持RTSP/Onvif协议接入的安防监控流媒体视频云平台。平台具备视频实时监控直播、云端录像、云存储、录像检索与回看、告警等视频能力&#xff0c;能对接入的视频流进行处理与多端分发&#xff0c;包括RTSP、RTMP、HTTP-FLV、W…

通信网络安全防护定级备案需要材料汇总

通信网络安全防护定级备案工作需要到指定的系统上先写基本信息&#xff0c;然后上传对应的材料&#xff0c;提交后会流转到地方通管局或部里审核。对于第一次使用该系统的朋友来说&#xff0c;通信网络安全防护定级备案需要什么材料是目前比较关注的问题。下面calm13就结合以往…

15:00面试,15:06就出来了,问的问题有点变态。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到8月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40%…

ORACLE 19.8版本数据库环境EXPDP导数据的报错处理

近期用户在做EXPDP导出时&#xff0c;报错异常termination终止;EXPDP本身是简单的功能并且这个环境也是经常做导出的&#xff0c;到底是什么原因导致了这个问题呢&#xff1f; 导出脚本报错&#xff1a; 分析导出日志&#xff0c;当时系统资源充足但是进程启动失败&#xff0c;…

【Editor】动态添加/移除宏定义

ProjectSetting中OtherSettings页签 执行工具指令 using UnityEditor; using UnityEngine; using System.Linq;public class Tools : Editor {//在菜单栏中点击自动添加[MenuItem("Tools/AddScriptingSymbols")]private static void AddScriptingSymbols(){//获取当…

Web-Machine-N7靶机实战攻略

1.安装并开启靶机 下载VirtualBox&#xff1a;https://www.virtualbox.org 导入虚拟机 设置为桥接模式 2.获取靶机IP Kali设为桥接模式 3.访问靶机 4.获取敏感目录文件和端口 gobuster dir -u http://172.16.2.68 -w /usr/share/wordlists/dirbuster/directory-list-2.3-me…

C语言实验:数组,指针实现问题求解

实验目的&#xff1a;掌握数组&#xff0c;指针的使用 实验内容&#xff1a; 1直接选择排序 2字符串运算 3交换数字 流程图&#xff1a; 1直接选择排序 2字符串运算 3交换数字 程序调试 1直接选择排序 1-1出现问题&#xff08;贴图并说明&#xff09; 错误原因&#xf…

【HarmonyOS Next】鸿蒙中App、HAP、HAR、HSP概念详解

【HarmonyOS Next】鸿蒙中App、HAP、HAR、HSP概念详解 &#xff08;图1-1&#xff09; 一、鸿蒙中App、HAP、HAR、HSP是什么&#xff1f; &#xff08;1&#xff09;App Pack&#xff08;Application Package&#xff09; 是应用发布的形态&#xff0c;上架应用市场是以App Pa…