分布式 - 消息队列Kafka:Kafka 消费者的消费位移

文章目录

      • 01. Kafka 分区位移
      • 02. Kafka 消费位移
      • 03. kafka 消费位移的作用
      • 04. Kafka 消费位移的提交
      • 05. kafka 消费位移的存储位置
      • 06. Kafka 消费位移与消费者提交的位移
      • 07. kafka 消费位移的提交时机
      • 08. Kafka 维护消费状态跟踪的方法

01. Kafka 分区位移

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。

每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9。

02. Kafka 消费位移

对于kafka中的消费者而言,也有一个offset的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

消费位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。消费者会定期地将已经消费的消息的位移提交到Kafka集群中,以便在下一次启动时从上次消费的位置继续消费。

每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。注意,这和分区位移完全不是一个概念。“分区位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器嘛。另外每个消费者有着自己的消费者位移。

03. kafka 消费位移的作用

消费者位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。它的作用主要有以下几个方面:

① 消费者可以通过记录偏移量来实现断点续传。当消费者下线或者重启时,它可以通过记录的偏移量来恢复之前的消费状态,从而避免重复消费已经处理过的消息。

② Kafka 通过偏移量来保证消息的顺序性。在同一个分区中,消息的顺序是有序的,消费者可以通过记录偏移量来保证消费的顺序性。

③ Kafka 还可以通过偏移量来实现消息的回溯。消费者可以通过指定偏移量来重新消费之前的消息,这在某些场景下非常有用,比如重新处理之前出现的错误。

总之,Kafka 消息偏移量是非常重要的一个概念,它可以帮助消费者实现断点续传、保证消息的顺序性以及实现消息的回溯等功能。

04. Kafka 消费位移的提交

消费者可以通过订阅一个或多个主题来拉取消息。当消费者调用 poll() 方法时,它会从 Kafka 集群中拉取一批消息,这些消息会被缓存在消费者的本地缓存中,等待消费者进一步处理。在消费者处理完这批消息后,它可以再次调用 poll() 方法来拉取下一批消息。如果消费者在处理消息时发生了错误,那么这批消息将会被重新拉取,直到消费者成功地处理它们为止。

因此每次调用poll()方法,它总是会返回还没有被消费者读取过的记录,这意味着我们可以追踪哪些记录是被群组里的哪个消费者读取过的。要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题 __consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

05. kafka 消费位移的存储位置

消费者默认将 offset 保存在Kafka一个内置的 topic 中,该 topic 为 __consumer_offsets。

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --list
__consumer_offsets

消费者会向一个叫作 __consumer_offset 的内置主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。

消费 offset 案例:

① __consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

② 创建主题 haha

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic haha
Created topic test1.

③ 启动生产者生产数据:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic haha
>hello,haha!
>你好,haha!
>

④ 启动消费者消费数据:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic haha --group group-haha --from-beginning
hello,haha!
你好,haha!

⑤ 查看消费者消费主题__consumer_offsets:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning# key是[消费者组,消费的主题,消费的分区],value中已经消费的消息在当前分区的offset+1
[group-haha,haha,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)

06. Kafka 消费位移与消费者提交的位移

如下图,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,不过当前消费者需要提交的消费位移并不是 x,而是 x+1,它表示下一条需要拉取的消息的位置。
在这里插入图片描述
如果使用自动提交或不指定提交的偏移量,那么将默认提交poll()返回的最后一个位置之后的偏移量,即提交比客户端从poll()返回的最后一个位置大1的偏移量。在进行手动提交或需要提交特定的偏移量时,一定要记住这一点。

07. kafka 消费位移的提交时机

当前一次 poll() 操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。
在这里插入图片描述
① 如果最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会丢失:

如图,如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。

② 如果最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理:

如图,如果消费完所有拉取到的消息之后才进行位移提交,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

08. Kafka 维护消费状态跟踪的方法

在Kafka中,消费者组可以通过消费者偏移量(consumer offset)来跟踪它们在分区中消费的消息。消费者偏移量是一个整数,表示消费者已经成功读取的消息的位置。当消费者读取消息时,它会将偏移量保存在内存中,以便在下一次读取消息时能够从正确的位置开始读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/93041.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01- vdom 和模板编译源码

组件渲染的过程 template --> ast --> render --> vDom --> 真实的Dom --> 页面 Runtime-Compiler和Runtime-Only的区别 - 简书 编译步骤 模板编译是Vue中比较核心的一部分。关于 Vue 编译原理这块的整体逻辑主要分三个部分,也可以说是分三步&am…

周期 角频率 频率 振幅 初相角

周期 角频率 频率 振幅 初相角 当我们谈论傅里叶级数或波形分析时,以下术语经常出现: 周期 T T T: 函数在其图形上重复的时间或空间的长度。周期的倒数是频率。 频率 f f f: 周期的倒数,即一秒内波形重复的次数。单位通常为赫兹&#xff…

20、stm32使用FMC驱动SDRAM(IS42S32800G-6BLI)

本文将使用安富莱的STM32H743XIH板子驱动SDRAM 引脚连接情况 一、CubeMx配置工程 1、开启调试口 2、开启外部高速时钟 配置时钟树 3、开启串口1 4、配置MPU 按照安富莱的例程配置: /* ********************************************************************…

双链表的插入,删除以及遍历

在上一节我们讲解了单链表的头插法和尾插法 http://t.csdn.cn/RixAu 但是单链表无法反向检索,对于某些情景可能造成不便,所以我们今天学习双链表 目录 1.双链表的初始化 2.双链表的插入 3.双链表的删除 4.遍历双链表 1.双链表的初始化 typedef i…

(二)结构型模式:8、代理模式(Proxy Pattern)(C++示例)

目录 1、代理模式(Proxy Pattern)含义 2、代理模式的UML图学习 3、代理模式的应用场景 4、代理模式的优缺点 5、C实现代理模式的实例 1、代理模式(Proxy Pattern)含义 代理模式(Proxy),为…

离谱的Bug

离谱的 Bug Bug 情况发现 Bug修改 Bug其他感受历史 Bug火星Spirit号Mars Global Surveyor任务 Bug 情况 有一次,我在开发一个网页应用程序时,遇到了一个令人目瞪口呆的Bug。这个Bug出现在一个特定的页面上,当用户点击某个按钮时,…

智安网络|深入比较:Sass系统与源码系统的差异及选择指南

随着前端开发的快速发展,开发人员需要使用更高效和灵活的工具来处理样式表。在这个领域,Sass系统和源码系统是两个备受关注的选项。 Sass系统 Sass(Syntactically Awesome Style Sheets)是一种CSS预处理器,它扩展了CS…

在 OpenCV 中使用深度学习进行年龄检测-附源码

文末附完整源码和模型文件下载链接 在本教程中,我们将了解使用 OpenCV 创建年龄预测器和性别分类器项目的整个过程。 年龄检测 我们的目标是创建一个程序,使用图像来预测人的性别和年龄。但预测年龄可能并不像你想象的那么简单,为什么呢?您可能会认为年龄预测是一个回归问…

【Linux命令详解 | wget命令】 wget命令用于从网络下载文件,支持HTTP、HTTPS和FTP协议

文章标题 简介一,参数列表二,使用介绍1. 基本文件下载2. 递归下载整个网站3. 限制下载速率4. 防止SSL证书校验5. 断点续传6. 指定保存目录7. 自定义保存文件名8. 增量下载9. 使用HTTP代理10. 后台下载 总结 简介 在编程世界中,处理网络资源是…

快速搭建图书商城小程序的简易流程与优势

很多人喜欢阅读电子书,又有很多人依旧喜欢实体书,而实体书店拥有一个图书商城小程序便成为了满足用户需求的理想选择。如果您也想进入这一充满潜力的领域,但担心开发难度和复杂流程,别担心!您能做到快速搭建一个专业、…

CXL 寄存器介绍 (1) - 寄存器分类

🔥点击查看精选 CXL 系列文章🔥 🔥点击进入【芯片设计验证】社区,查看更多精彩内容🔥 📢 声明: 🥭 作者主页:【MangoPapa的CSDN主页】。⚠️ 本文首发于CSDN&#xff0c…

Mac安装opencv后无法导入cv2的解决方法

前提条件:以下两个插件安装成功 pip install opencv-python pip install --user opencv-contrib-python 注:直接用pip install opencv-contrib-python如果报错,就加上“–user" 第一步: 设置–添加python解释器 第二步&am…

机器学习|Softmax 回归的数学理解及代码解析

机器学习|Softmax 回归的数学理解及代码解析 Softmax 回归是一种常用的多类别分类算法,适用于将输入向量映射到多个类别的概率分布。在本文中,我们将深入探讨 Softmax 回归的数学原理,并提供 Python 示例代码帮助读者更好地理解和…

Python入门--关键字

关键字是Python编程语言中具有特殊含义的保留单词,不能用作变量名、函数名、类名或其他标识符。以下是Python 3.9.0版本中的关键字列表: False, None, True, and, as, assert, async, await, break, class, continue, def, del, elif, else, except, f…

Microsoft 图像BERT,基于大规模图文数据的跨模态预训练

视觉语言任务是当今自然语言处理(NLP)和计算机视觉领域的热门话题。大多数现有方法都基于预训练模型,这些模型使用后期融合方法融合下游任务的多模态输入。然而,这种方法通常需要在训练期间进行特定的数据注释,并且对于…

web前端开发基础入门html5+css3+js学习笔记(一)

目录 1.第一个前端程序2.前端工具的选择与安装3.VSCode开发者工具快捷键4.HTML5简介与基础骨架4.1 HTML5的DOCTYPE声明4.2 HTML5基本骨架4.2.1 html标签4.2.2 head标签4.2.3 body标签4.2.4 title标签4.2.5 meta标签 5.标签之标题5.1 快捷键5.1 标题标签位置摆放 6.标签之段落、…

Docker实战专栏简介

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 &a…

星星之火:国产讯飞星火大模型的实际使用体验(与GPT对比)

#AIGC技术内容创作征文|全网寻找AI创作者,快来释放你的创作潜能吧!# 文章目录 1 前言2 测试详情2.1 文案写作2.2 知识写作2.3 阅读理解2.4 语意测试(重点关注)2.5 常识性测试(重点关注)2.6 代码…

深入学习SpringCloud Alibaba微服务架构,揭秘Nacos、Sentinel、Seata等核心技术,助力构建高效系统!

课程链接: 链接: https://pan.baidu.com/s/1hRN0R8VFcwjyCTWCEsz-8Q?pwdj6ej 提取码: j6ej 复制这段内容后打开百度网盘手机App,操作更方便哦 --来自百度网盘超级会员v4的分享 课程介绍: 📚【第01阶段】课程简介:全…

flinksql实时统计程序背压延迟优化

问题: flinkcdcflinksql做实时读取sls日志和实时统计业务指标,今天发现程序背压了,业务延迟了6个小时。解决办法: 1、资源优化 作业并发大时:在作业的高级配置的资源配置中,增加JobManager的资源&#xf…