消息消费过程

前言

本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。

消费者与消费组

Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。

开发流程

  1. 配置consumer参数并创建consumer实例;
  2. 订阅主题;
  3. 拉取消息并消费;
  4. 提交消费偏移量;
  5. 关闭consumer;
class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap serverprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");// group.id, 如果当前consumer需要加入到某个group中, 否则自成一个groupprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");// 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");// 自动提交offset设置, 样例中为手动提交// props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");// 自动提交offset的时间间隔// props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");// offset reset配置props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");// key和value的deserializer配置props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);boolean running = true;while(running) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records) {// 消费消息}// 消费成功提交offsetconsumer.commitSync();}consumer.close();}
}

主题与分区

每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。

例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。

反序列化

consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。

interface Deserializer {T deserialize(String topic, byte[] data);void close();
}

主题订阅

  1. 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
  2. 订阅多个特定主题, subscribe(collection);
  3. 订阅某种模式的主题, subscribe(pattern);
  4. 订阅某个主题的特定partition, assign(partition);
  5. 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;

消息获取

  1. 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
  2. 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
  3. 拉模式: client主动发起消息请求, 从server端拉取消息;
  4. 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)

内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。

位移提交

位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:
请添加图片描述

  1. 消息是按照partition存储的;
  2. 消息写入partition时, offset单调递增;
  3. 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;

因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;

提交方式说明优缺点
自动提交(默认方式) Kafka Client周期性地提交偏移量优点是简单, 确定是重复消费和丢失风险
手动提交由用户主动提交偏移量优点是可细粒度管理, 缺点是相对复杂

自动提交

自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。
在这里插入图片描述

  1. consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
  2. 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  3. 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  4. 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;

手动提交

也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |

指定offset

消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?

  1. 由参数auto.offset.reset设定默认行为
    | 参数值 | 行为 |
    |----|----|
    | earliest | 从分区第一条消息的offset开始 |
    | latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
    | none | 程序逻辑自定义, 如果未设置则抛出异常 |

  2. 程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。

  3. 由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。

再平衡

再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。

Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。

消费者拦截器

Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。

拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/197286.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

十三、Docker的安装

0.安装Docker Docker 分为 CE 和 EE 两大版本。CE 即社区版&#xff08;免费&#xff0c;支持周期 7 个月&#xff09;&#xff0c;EE 即企业版&#xff0c;强调安全&#xff0c;付费使用&#xff0c;支持周期 24 个月。 Docker CE 分为 stable test 和 nightly 三个更新频道…

CTFhub-RCE-过滤cat

查看当前目录&#xff1a;输入:127.0.0.1|ls 127.0.0.1|cat flag_42211411527984.php 无输出内容 使用单引号绕过 127.0.0.1|cat flag_42211411527984.php|base 64 使用双引号绕过 127.0.0.1|c""at flag_42211411527984.php|base64 使用特殊变量绕过 127.0.0.…

第四篇 《随机点名答题系统》——基础设置详解(类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统)

目录 1.功能需求 2.数据库设计 3.流程设计 4.关键代码 4.1.设置题库 4.1.1数据请求示意图 4.1.2选择题库&#xff08;index.php&#xff09;数据请求代码 4.1.3取消题库&#xff08;index.php&#xff09;数据请求代码 4.1.4业务处理Service&#xff08;xztk.p…

计算机的发展

硬件的发展 第一台电子数字计算机&#xff1a;ENIAC&#xff08;1946&#xff09;&#xff0c;作者&#xff1a;冯诺依曼&#xff0c;逻辑元件&#xff1a;电子管 bug&#xff1a;小虫子&#xff0c;会影响打点 Intel&#xff1a; 机器字长&#xff1a;计算机一次整数运算所能…

企业计算机服务器中了mallox勒索病毒怎么解决,勒索病毒解密文件恢复

随着科技技术的不断发展&#xff0c;网络技术得到了快速提升&#xff0c;但网络安全威胁也不断增加&#xff0c;近期&#xff0c;云天数据恢复中心陆续接到很多企业的求助信息&#xff0c;企业的计算机服务器遭到了mallox勒索病毒攻击&#xff0c;导致企业的所有业务中断&#…

[nlp] 损失缩放(Loss Scaling)loss sacle

在深度学习中,由于浮点数的精度限制,当模型参数非常大时,会出现数值溢出的问题,这可能会导致模型训练不稳定。为了解决这个问题,损失缩放(Loss Scaling)技术被引入,它通过缩放损失值来解决这个问题。 在深度学习中,损失缩放技术通常是通过将梯度进行缩放来实现的。具…

鸿蒙APP外包开发上线流程

鸿蒙系统的上线流程可能会根据具体的版本和平台要求而略有不同。在进行上线之前&#xff0c;开发人员应该详细了解并遵循鸿蒙生态系统的相关规定和要求。鸿蒙&#xff08;HarmonyOS&#xff09;应用的上线流程通常包括以下步骤&#xff0c;希望对大家有所帮助。北京木奇移动技术…

【深度学习】pytorch快速得到mobilenet_v2 pth 和onnx

在linux执行这个程序&#xff1a; import torch import torch.onnx from torchvision import transforms, models from PIL import Image import os# Load MobileNetV2 model model models.mobilenet_v2(pretrainedTrue) model.eval()# Download an example image from the P…

安卓中轻量级数据存储方案分析探讨

轻量级数据存储功能通常用于保存应用的一些常用配置信息&#xff0c;并不适合需要存储大量数据和频繁改变数据的场景。应用的数据保存在文件中&#xff0c;这些文件可以持久化地存储在设备上。需要注意的是&#xff0c;应用访问的实例包含文件所有数据&#xff0c;这些数据会一…

Qt6版使用Qt5中的类遇到的问题解决方案

如果有需要请关注下面微信公众号&#xff0c;会有更多收获&#xff01; 1.QLinkedList 是 Qt 中的一个双向链表类。它提供了高效的插入和删除操作&#xff0c;尤其是在中间插入和删除元素时&#xff0c;比 QVector 更加优秀。下面是使用 QLinkedList 的一些基本方法&#xff1a…

微服务学习 | Eureka注册中心

微服务远程调用 在order-service的OrderApplication中注册RestTemplate 在查询订单信息时&#xff0c;需要同时返回订单用户的信息&#xff0c;但是由于微服务的关系&#xff0c;用户信息需要在用户的微服务中去查询&#xff0c;故需要用到上面的RestTemplate来让订单的这个微…

JVM虚拟机:通过日志学习PS+PO垃圾回收器

我们刚才设置参数的时候看到了-XXPrintGCDetails表示输出详细的GC处理日志&#xff0c;那么我们如何理解这个日志呢&#xff1f;日志是有规则的&#xff0c;我们需要按照这个规则来理解日志中的内容&#xff0c;它有两个格式&#xff0c;一个格式是GC的格式&#xff08;新生代&…

YOLOv8/YOLOv7/YOLOv5/YOLOv4/Faster-rcnn系列算法改进【NO.79】改进损失函数为VariFocal Loss

前言 作为当前先进的深度学习目标检测算法YOLOv8&#xff0c;已经集合了大量的trick&#xff0c;但是还是有提高和改进的空间&#xff0c;针对具体应用场景下的检测难点&#xff0c;可以不同的改进方法。此后的系列文章&#xff0c;将重点对YOLOv8的如何改进进行详细的介绍&…

Egress-TLS-Origination

目录 文章目录 目录本节实战1、出口网关TLS发起2、通过 egress 网关发起双向 TLS 连接关于我最后 本节实战 实战名称&#x1f6a9; 实战&#xff1a;Egress TLS Origination-2023.11.19(failed)&#x1f6a9; 实战&#xff1a;通过 egress 网关发起双向 TLS 连接-2023.11.19(测…

CDN是什么,能起到什么作用

随着互联网的快速发展&#xff0c;用户对于快速、稳定、高效的互联网体验的需求日益增长。为了满足这一需求&#xff0c;内容分发网络&#xff08;CDN&#xff09;应运而生&#xff0c;并在近年来得到了广泛应用。CDN通过在全球范围内部署大量的服务器和网络节点&#xff0c;实…

excel怎么能锁住行 和/或 列的自增长,保证粘贴公式的时候不自增长或者只有部分自增长

例如在C4单元格中输入了公式&#xff1a; 现在如果把C4拷贝到C5&#xff0c;D3会自增长为D4&#xff1a; 现在如果想拷贝的时候不自增长&#xff0c;可以先把光标放到C4单元格&#xff0c;然后按F4键&#xff0c;行和列的前面加上了$符号&#xff0c;锁定了&#xff1a; …

QEMU显示虚拟化的几种选项

QEMU可以通过通过命令行"-vga type"选择为客户机模拟的VGA卡的类别,可选择的类型有多个: -vga typeSelect type of VGA card to emulate. Valid values for type arecirrusCirrus Logic GD5446 Video card. All Windows versions starting from Windows 95 should …

深度学习YOLO图像视频足球和人体检测 - python opencv 计算机竞赛

文章目录 0 前言1 课题背景2 实现效果3 卷积神经网络4 Yolov5算法5 数据集6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习YOLO图像视频足球和人体检测 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非…

Vatee万腾未来科技之航:Vatee创新引领的新纪元

在当今数字化时代&#xff0c;Vatee万腾科技正在开创一段引领未来的全新征程。以其卓越的创新能力和领导地位&#xff0c;Vatee万腾成为数字化领域的引领者。其未来科技之航展现了一种独特的数字化愿景&#xff0c;引领着科技创新进入新的纪元。 Vatee万腾在数字科技领域展现出…

html-网站菜单-点击显示导航栏

一、效果图 1.点击显示菜单栏&#xff0c;点击x号关闭&#xff1b; 2.点击一级菜单&#xff0c;展开显示二级&#xff0c;并且加号变为减号&#xff1b; 3.点击其他一级导航&#xff0c;自动收起展开的导航。 二、代码实现 <!DOCTYPE html> <html><head>&…