kafka学习-消费者

目录

1、消费者、消费组

2、心跳机制

3、消费者常见参数配置

4、订阅

5、反序列化

基本概念

自定义反序列化器

6、位移提交

6.1、自动提交

6.2、手动提交

同步提交

异步提交

7、再均衡

7.1、定义与基本概念

7.2、缺陷

7.3、如何避免再均衡

7.4、如何进行组内分区分配

7.5、谁来执行再均衡和消费组管理

8、消费者拦截器

作用

自定义消费者拦截器


1、消费者、消费组

  • 消费者从订阅的主题消费消息,消费消息的偏移量保存在kafka中的__consumer_offsets的主题中。
  • 多个消费同一个主题的消费者,可以通过group.id配置,加入到同一个消费组中。消费组均衡地给消费者分配分区,每个分区只由消费组中的一个消费者消费,防止重复消费。
  • 同一个消费组里:一个分区只会对应一个消费者,但一个消费者可以消费多个分区。
  • group_id一半设置为应用或者业务的逻辑名称。

2、心跳机制

消费者4宕机,重新分配分区3的消费者
分区3所在broker宕机,重选分区3的leader分区

  • 消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区;
  • broker宕机,分区3重选leader副本,出发再平衡,重新分配分区3消息。

        心跳机制,就是consumer和broker之间的健康检查。consumer和broker之间保持长连接,通过心跳机制检测对方是否健康。心跳检测相关参数如下所示:

        在broker端,可配置sessionTimeoutMs参数,如果consumer心跳超期,broker会把消费者从消费组中移除,并触发再平衡,重新分配分区;

        在consumer端,可配置sessionTimeoutMs和rebalanceTimeoutMs参数,如果broker心跳超期,consumer则会告知broker主动退出消费组,并触发再平衡。

3、消费者常见参数配置

4、订阅

主题、分区(leader和follower分区)、消费者、消费组、订阅。

  • 主题:topic,用于分类管理消息的逻辑单元,可以用于区分业务类型;
  • 分区:partition,同一个topic的消息,会被分散到多个分区中,不同分区通常在不同broker上,方便水平扩展。分区可分为leader分区和follower分区,leader分区用于与生产者/消费者通信,follower分区用于备份leader分区的数据;
  • 消费者:与分区长连接,用于消费分区中的消息;
  • 消费组:消费组中可能会有多个消费者,保证一个消费组获取到特定主题的全部消息。消费组可以保证一个主题的分区只会被消费组中的一个消费者消费;
  • 订阅:消费者订阅主题,并将消费者加入到消费组中,采用pull模式,从broker分区中读取消息。kafka的消费者只有pull模式,该模式下消费者可以自主控制消费消息的速率。

5、反序列化

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息者接收消息后,需要将消息反序列化为指定的数据格式进行处理。
  • 消费者通过key.deserializer和value.deserializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Deserializer<T>接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArrayDeserializer、ByteBufferDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、StringDeserializer、LongDeserializer、ShortDeserializer。

自定义反序列化器

        实现org.apache.kafka.common.serialization.Deserializer<T>接口,并实现其中的deserializer方法。

public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer allocate = ByteBuffer.allocate(data.length);allocate.put(data);allocate.flip();int userId = allocate.getInt();int length = allocate.getInt();System.out.println(length);String username = new String(data, 8, length);return new User(userId, username);}@Overridepublic void close() {}
}

6、位移提交

  • 位移 = kafka分区消息的偏移量。
  • kafka中有一个主题,专门用于保存消费者的偏移量。
  • 消费者与分区一一对应,消费者在消费分区消息时,需要向kafka提交自己的位移(偏移量)信息,kafka只记录该消费者在对应分区的偏移量信息。
  • 消费者向kafka提交偏移量的过程,叫做位移提交。
  • 位移提交,分为自动提交和手动提交;也分为同步提交和异步提交。

6.1、自动提交

  • 开启⾃动提交: enable.auto.commit=true,kafka默认为自动提交。
  • 配置⾃动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s。
        自动提交模式下,Kafka会保证在开始调⽤ poll ⽅法时,提交上次 poll 返回的所有消息,因此⾃动提交不会出现消息丢失,但会重复消费,比如:
  1. Consumer 5s 提交一次offset
  2. 假设提交 offset 后的 3s 发⽣了 Rebalance
  3. Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费
  4. 因此 Rebalance 发⽣前 3s 的消息会被重复消费

6.2、手动提交

同步提交

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}
  • 使⽤ KafkaConsumer#commitSync(),会提交 KafkaConsumer#poll() 返回的最新 offset
  • ⼿动同步提交可以控制offset提交的时机和频率
  • 调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
  • 会影响 TPS
  • 如果提交间隔过长,consumer重启后,会有更多的消息被重复消费。

异步提交

while (true) {ConsumerRecords<String, String> records = consumer.poll(3_000);process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null) {handle(exception);}});
}
  • 使⽤ KafkaConsumer#commitAsync():会提交 KafkaConsumer#poll() 返回的最新 offset
  • commitAsync出现问题不会⾃动重试,可通过异步提交与同步提交相结合的方式解决。

7、再均衡

7.1、定义与基本概念

        也叫做重平衡,主要是为了让消费组下的消费者来重新分配主题下的每一个分区。再均衡的触发条件有如下三个:

  1. 消费组内成员变更(增加和减少消费者),⽐如消费者宕机退出消费组,或者新增一个消费者。
  2. 主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发再均衡。
  3. 订阅的主题发⽣变化,比如消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发再均衡。

7.2、缺陷

再均衡过程中,消费者无法从kafka消费消息。如果kafka节点过多,再均衡过程会及其耗时(数分钟甚至小时),过程中kafka基本处于不可用状态。

7.3、如何避免再均衡

完全避免,那不可能,因为你无法保证消费者不会故障。但是我们可以通过避免增加分区、增加订阅的主题、增加消费者这几种情况,减少再均衡的触发。
但有时候,kafka会错误地认为一个正常的消费者已经挂掉,从而触发再均衡。我们要做的,就是避免这种情况。
消费者和kafka之间通过心跳机制来做健康检查。当消费者宕机、网络阻塞或是消费者因负载过重没来得及发送心跳时,kafka都会认为消费者挂掉了。所以,设置合理的健康检查参数可以有效减少再均衡的发生。比较重要的参数如下:
  1. session.timout.ms:控制⼼跳超时时间,推荐设置为6s
  2. heartbeat.interval.ms:控制⼼跳发送频率,频率越高越不容易误判,但也会消耗更多资源,推荐设置为2s
  3. max.poll.interval.ms:控制poll的间隔,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。推荐为消费者处理消息最长耗时 + 1分钟。

7.4、如何进行组内分区分配

有三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。

7.5、谁来执行再均衡和消费组管理

        kafka里有一个角色,叫做Group Coordinator,用于执行消费组的管理。
        Group Coordinator——每个消费组分配一个消费组协调器⽤于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

8、消费者拦截器

作用

  1. 消费者在拉取了分区消息后,会先通过反序列化对key和value进行处理;
  2. 然后可通过设置消费者拦截器对消息进行处理,允许更改消费者接收到的消息,或者做一些监控、日志处理
  3. 应用程序处理消费者拉取的分区消息;

自定义消费者拦截器

        ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。

        自定义消费者拦截器需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口,并实现其中的configure()、onConsume()、onCommit()、close()方法,其中:

  • onConsume():该方法在poll方法返回之前调用,调用结束后poll方法就返回消息了。可通过该方法修改消费者消息,返回新的消息。
  • onCommit():当消费者提交偏移量时,调用该方法。
  • close():用于关闭该拦截器用到的资源,如打开的文件、连接的数据库等。
  • configure():用于获取消费者的参数配置。
public class MyInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// poll方法返回结果之前最后要调用的方法System.out.println("MyInterceptor -- 开始");// 消息不做处理,直接返回return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {// 消费者提交偏移量的时候,经过该方法System.out.println("MyInterceptor -- 结束");}@Overridepublic void close() {// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等}@Overridepublic void configure(Map<String, ?> configs) {// 用于获取消费者的设置参数configs.forEach((k, v) -> {System.out.println(k + "\t" + v);});}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128749.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电脑同时连接有线和无线网络怎么设置网络的优先级

电脑同时连接有线和无线网络怎么设置网络的优先级&#xff1a; 我们知道在 笔记本电脑系统 中&#xff0c;可以通过有线或无线网络进行联网。如果电脑在有线网络和无线网络同时存在的情况&#xff0c;应该怎么设置有线网络优先连接呢?对此我们提供下面的方法可以让电脑在有Wi…

链路追踪Skywalking快速入门

目录 1 Skywalking概述1.1 微服务系统监控三要素1.2 什么是链路追踪1.2.1 链路追踪1.2.2 OpenTracing1、数据模型&#xff1a;2、核心接口语义 1.3 常见APM系统1.4 Skywalking介绍1、SkyWalking 核心功能&#xff1a;2、SkyWalking 特点&#xff1a;3、Skywalking架构图&#x…

【C++】string类模拟实现上篇(附完整源码)

目录 前言1. string的基本结构2. 构造函数、析构函数2.1 构造函数的实现2.1.1带参构造函数 2.2析构函数2.3无参构造函数2.4无参和带参构造函数合并 3. string的遍历3.1 operator[ ]3.2迭代器模拟实现 (简单实现&#xff09;3.3 const迭代器模拟实现 4. 数据的增删查改4.1 reser…

基于TensorFlow 2.3.0 的手势识别系统设计

一、开发环境 Windows 10PyCharm 2021.3.2Python 3.7TensorFlow 2.3.0 二、制作数据集&#xff0c;作者使用了10个类别的手势数集据 三、开始训练模型&#xff0c;作者使用自己开发的软件进行训练模型&#xff0c;方便快捷。软件介绍及下载地址&#xff1a; 手把手教你使用T…

原生js之dom表单改变和鼠标常用事件

那么好,本次我们聊聊表单改变时如何利用onchange方法来触发input改变事件以及鼠标常用的滑入滑出,点击down和点击up事件. 关于onchange方法 onchange方法在鼠标输入完后点击任何非输入框位置时触发.触发时即可改变原有输入框的值. out 、leave、over、down、up鼠标方法 当用…

KMP超高效匹配算法

简介&#xff1a; KMP算法是一种改进的字符串匹配算法&#xff0c;其中&#xff0c;KMP算法的运用核心是利用匹配失败后的信息&#xff0c;最大进度的减少模式串与目标串的匹配次数以达到快速匹配的效果。算法与暴力求解的改进在于每当一趟匹配过程中出现的字符比较不相等时&am…

Django创建应用、ORM的进阶使用及模型类数据库迁移

1 Django项目创建第一个应用 Django 项目就是基于 Django 框架开发的 Web 应用&#xff0c;它包含了一组配置和多个应用&#xff0c;我们把应用称之为 App&#xff0c;在前文中对它也做了相应的介绍&#xff0c;比如 auth、admin&#xff0c;它们都属于 APP。 一个 App 就是一…

postman token 请求头添加

思路&#xff1a; 1、登录成功后将 得到的token设置为集合变量 2、在需要携带Authorization的请求头上使用该集合变量 关键代码 const responseData pm.response.json(); if(responseData.code 1) {// 获取tokenconst {data:{token}} responseData// 设置为集合变量pm.colle…

C# 实现电子签名

本项目基于Emgu.CV&#xff08;C#下OpenCv的封装&#xff09;开发的&#xff0c;编译器最新版Vs2022&#xff0c;编译环境x86 直接看效果图 1.主页面 2.我们先看手写的方式&#xff1a; 点击确认就到主界面&#xff0c;如下 &#xff1a; 点击自动适配-&#xff0c;再点击生成…

《C++设计模式》——创建型

前言 创建型为了创建东西才是有用的&#xff0c;创建型设计模式使用的场景&#xff1a; 1、创建一个东西&#xff1b; 2、可重复利用&#xff1b; 3、灵活性高&#xff0c;代码可因地制宜。 Factory Method(工厂模式) 工厂模式将目的将创建对象的具体过程屏蔽隔离起来&#…

go channel实践与源码探索(初始化、发送消息、接收消息、关闭)

文章目录 概要一、并发编程1.1、Actor模型1.2、CSP模型 二、Go Channel实践三、源码分析3.1、初始化3.2、发送消息3.3、接收消息3.4、关闭通道 总结 概要 通道&#xff08;Channel&#xff09;是Go语言提供的协程之间通信与同步的方式。我们知道在并发编程&#xff08;多进程、…

时序分解 | MATLAB实现北方苍鹰优化算法NGO优化VMD信号分量可视化

时序分解 | MATLAB实现北方苍鹰优化算法NGO优化VMD信号分量可视化 目录 时序分解 | MATLAB实现北方苍鹰优化算法NGO优化VMD信号分量可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 北方苍鹰优化算法NGO优化VMD&#xff0c;对其分解层数&#xff0c;惩罚因子数做优化…

基于Python开发的玛丽大冒险小游戏(源码+可执行程序exe文件+程序配置说明书+程序使用说明书)

一、项目简介 本项目是一套基于Python开发的玛丽冒险小游戏程序&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Python学习者。 包含&#xff1a;项目源码、项目文档等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xf…

Alibaba(商品详情)API接口

为了进行电商平台 的API开发&#xff0c;首先我们需要做下面几件事情。 1&#xff09;开发者注册一个账号 2&#xff09;然后为每个alibaba应用注册一个应用程序键&#xff08;App Key) 。 3&#xff09;下载alibaba API的SDK并掌握基本的API基础知识和调用 4&#xff09;利…

管理类联考——数学——汇总篇——知识点突破——应用题——交叉比例法/杠杆原理

读书笔记 甲有&#xff1a;x个a&#xff0c;乙有&#xff1a;y个b&#xff0c;甲乙的平均值为c&#xff0c;根据总数相等&#xff0c;得&#xff1a;axbyc(xy)&#xff0c;即ax-cxcy-by&#xff0c;则 x y c − b a − c \frac{x}{y}\frac{c-b}{a-c} yx​a−cc−b​ &#…

OpenCV(二十五):边缘检测(一)

目录 1.边缘检测原理 2.Sobel算子边缘检测 3.Scharr算子边缘检测 4.两种算子的生成getDerivKernels() 1.边缘检测原理 其原理是基于图像中灰度值的变化来捕捉图像中的边界和轮廓。梯度则表示了图像中像素强度变化的强弱和方向。 所以沿梯度方向找到有最大梯度值的像素&…

17-数据结构-查找-(顺序、折半、分块)

简介&#xff1a;查找&#xff0c;顾名思义&#xff0c;是我们处理数据时常用的操作之一。大概就是我们从表格中去搜索我们想要的东西&#xff0c;这个表格&#xff0c;就是所谓的查找表&#xff08;存储数据的表&#xff09;。而我们怎么设计查找&#xff0c;才可以让计算机更…

sqli-labs关卡之一(两种做法)

目录 一、布尔盲注(bool注入) 二、时间盲注(sleep注入) 一、布尔盲注(bool注入) 页面没有报错和回显信息&#xff0c;只会返回正常或者不正常的信息&#xff0c;这时候就可以用布尔盲注 布尔盲注原理是先将你查询结果的第一个字符转换为ascii码&#xff0c;再与后面的数字比较…

Redis带你深入学习数据类型set

目录 1、set 2、set相关命令 2.1、添加元素 sadd 2.2、获取元素 smembers 2.3、判断元素是否存在 sismember 2.4、获取set中元素数量 scard 2.5、删除元素spop、srem 2.6、移动元素smove 2.7、集合中相关命令&#xff1a;sinter、sinterstore、sunion、sunionstore、s…

mac电脑安装paste教程以及重新安装软件后不能使用解决方法

问题背景 mac电脑安装paste教程以及重新安装软件后不能使用解决方法。 mac电脑安装paste失败&#xff0c;安装好后还是无法使用&#xff0c;paste显示还是历史粘贴信息&#xff0c;导致无法使用。新 copy的内容也无法进入历史粘贴版里面。 笔者电脑配置信息&#xff1a;MacB…