【Kafka】Kafka再平衡机制及相关参数

背景

Kafka作为一款基于发布订阅模式的消息队列,生产者将消息发送到Kafka集群(Brokers)中,消费者(Consumer Group )拉取消息进行消费,实现了异步机制。Kafka中,消费者通常以消费者组的方式进行消费,消费特点为:

  • 每个分区只能被一个消费组中的一个消费者所消费。
  • 消费组中一个消费者可以消费多个分区。
  • 多个消费组,每个消费组都可以消费topic中的所有数据,且消费位移之间互不影响。

一、Kafka的再平衡机制

在Kafka中,如果消费者数量、分区数变更或者消费者订阅的topic发生变化,也就需要再进行消费者消费分区的重新分配,这也就是所谓的再平衡。

1.1 再平衡定义

再平衡是指的是Consumer Group 下的 Consumer 所订阅的Topic发生变化时 发生的一种分区重分配机制。

也就是说,再平衡也就是一种协议,它规定了如何让消费组下的所有消费者来分配 Topic 中的每一个分区。

举个栗子:一个 Topic 有 100 个分区,一个消费者组内有有 20 个消费者,在协调者的控制下让消费者组内的每一个消费者分配到 5 个分区,这个分区分配的过程就是再平衡。

1.2 再平衡触发条件

一般来说,触发Kafka再平衡的条件一般是以下三种:

  • 主题分区发生改变,Kafka 目前只支持分区增加,当出现分区数增加的时候就会触发再平衡。

  • Consumer Group 中Consumer 个数发生变化(新增或者减少),导致其所消费的分区需要分配到组内其他的Consumer 上。这里的减少有很大可能是被动的,就是某个消费者出现崩溃掉线了。

  • Consumer 所订阅的Topic发生了新增分区的行为(Kafka目前只支持新增分区),那么新增的分区就会分配给当前的Consumer ,此时就会触发再平衡。

  • Consumer 订阅的topic发生变化,比如订阅的Topic采用的是正则表达式的形式。如 test-* 此时如果有新建了一个topic test-user,那么这个Topic的所有分区也是会自动分配给当前的Consumer 的,此时就会发生再平衡。

简洁一点,触发再平衡的条件就是:

  • Consumer Group 成员数变更。
  • Consumer Group 订阅的主题的分区数发生变更。
  • Consumer Group 的订阅主题数发生变更。

再平衡有什么危害呢,首先我们要知道,再平衡的过程中,消费者是无法从 Kafka集群中消费消息的,这对 Kafka的 系统吞吐量(TPS)影响极大,而如果 Kafka 集群内节点较多,那么再平衡可能比较耗时。数分钟到数小时都有可能,而这段时间,Kafka 是处于不可用状态。所以在实际环境中,应该尽量避免。

1.3 再平衡通知机制

那么发生再平衡的时候Kafka集群是如何通知到消费者的呢,答案就是通过消费者与Kafka集群之间的心跳机制。Kafka 消费者需要定期地发送心跳请求(Heartbeat Request) 到 Broker 端的协调者(Coordinator ),以证明消费者还活着。
   在 Kafka 0.10.0.1版本之前,发送心跳请求是在消费者主线程中完成的。这样就有很多问题,最大的问题在于,消息处理逻辑也是在这个线程中完后的。因此,一旦消息处理消耗很长的时间,心跳请求将无法及时发送到协调者那里,使协调者误以为该消费者死掉。
   Kafka 0.10.0.1 版本之后,Kafka 就提供了一个专门的线程去发送心跳,避免了这个问题。

再平衡就是通过这个心跳线程去通知其他消费者触发再平衡机制。当协调者开启新一轮的再平衡之后,它会将"REBALANCE_IN_PROGREESS"封装到心跳线程的响应信息中,返回给消费者实例,当消费者收到响应信息中含有 “REBALANCE_IN_PROGREESS” 信息,就知道再平衡开始了,这就是再平衡的通知机制。

二、再平衡流程

从再平衡的定义和触发再平衡条件中我们可以看出,再平衡主要是由Kafka集群和Kafka消费端一起完成的,更精确的来说,是Kafka的Broker端的Coordinator 和Consumer端一起完成的。

2.1 消费端再平衡流程

在消费者端,再平衡主要分为两个步骤:

  • 重新加入消费者组中。
  • 等待领导消费者(Leader Consumer) 分配方案。

这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

当组内成员加入组时,消费者会向协调者发送 JoinGroup 请求,在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。

通常情况下,第一个发送 JoinGroup 请求的成员会自动成为领导者领导消费者的任务是收集所有成员的订阅信息,然后根据这些信息,指定具体的分区消费分配方案。

选出领导者之后,协调者会把消费者的订阅信息封装在 JoinGroup 请求的响应中,然后发送给领导者,由领导者统一做出分区消费分配方案后,在进行下一步,发送 SyncGroup 请求。
在这里插入图片描述
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。当然,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式发给所有成员,这样组内的成员都知道自己该消费哪些分区的数据了。
在这里插入图片描述

因此,JoinGroup 请求的主要作用是将组成员的订阅信息发送给领导者消费者,待领导者制定好分配方案后,再平衡流程进入到 SyncGroup 请求阶段。而SyncGroup 请求的主要目的就是让协调者把领导者的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入 Stable 状态,即开始正常的消费工作。

2.2 Broker端的再平衡流程

Broker端的再平衡主要是Coordinator 处理再平衡的流程。从触发再平衡的条件来看,与Coordinator 相关的主要是新成员加入消费者组、消费者组成员主动离开、消费者组成员崩溃离组、组成员提交位移。

再平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前 Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个再平衡过程。严格来说,这套状态机属于非常底层的设计,Kafka 官网并没有提及过。目前,Kafka 为消费者组定义了 5 中状态,分别是:EmptyDeadPreparingRebalanceCompletingReblanceStable。每种状态对应的含义如下:

状态含义
Empty组内没有任何成员,但消费者可能存在已提交的位移数据,而且这些位移尚未过期。
Dead同样是在组内没有成员,但组内元数据信息已经在协调者端被移除。协调者组件保存着当前向它注册过的所有组信息,所谓的元数据信息类似于这个注册信息。
PreparingRebalance消费者组开启再平衡,此时所有成员都要重新加入消费者组。
CompletingRebalance消费者组小所有成员已经加入,各个成员已在等待分配方案。该状态在老一点版本中称为AwaitingSync,它和CompletingReblance是等价的。
Stable消费者组的稳定状态。该状态表名再平衡已经完成,组内各成员能够正常消费数据了。

一个消费者组最开始是 Empty状态,当再平衡开启后,它会被置为 PreparingRebalance 状态等待成员加入,之后变更为 CompletingReblance 状态等待分配方案,最后流转为 Stable,完成再平衡过程。
在这里插入图片描述

2.2.1 新成员加入消费者组

新成员加入消费者组导致触发再平衡主要指的当消费者组处于 Stable 状态后,有新成员加入。如果对全新启动一个消费者组,Kafka 是有一些自己的优化,流程会有些许的不同。我们这里要讨论的是,消费者组稳定之后有新成员加入的情形。

当协调者收到新的 JoinkGroup 请求后,它会通过心跳响应的方式通知组内现有的所有成员,强制它们开启新一轮的再平衡。具体的过程和之前的客户端再平衡流程是一样的。现在,用一张时序图说明协调者一端是如何处理新成员入组的。

在这里插入图片描述

2.2.2 消费者组成员主动离组

消费者实例所在线程或进程调用 Close() 方法主动通知协调者它要退出。这个场景就涉及到第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
在这里插入图片描述

2.2.3 消费者组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为主动发起的离组,协调者能马上感知并处理。但是崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。处理崩溃离组的流程如下:
在这里插入图片描述

2.2.4 协调者对组内成员移交位移处理

正常情况下,每个组成员都会定期汇报位移给协调者。当再平衡开启时,协调者会给予成员一端缓冲时间,要求每个成员必须在这段时间内快速上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup请求发送。
在这里插入图片描述

三、再平衡相关参数

  • session.timeout.ms:该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置较小,可以更早发现消费者崩溃的信息,从而更快的开启再平衡,避免消费滞后,但同时这也会频繁的再平衡,需要根据实际业务来衡量。
  • max.poll.interval.ms:该参数表示消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1 分钟,那么该参数就需要设置成大于 1 分钟的值,否则就会被 Coordinator 剔除消息组然后再平衡。
  • heartbeat.interval.ms:该参数是消费端与Coordinator的心跳时间,该参数跟 session.timeout.ms 紧密相关,前面也说过,只要在 session.timeout.ms时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔时间就是 session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms时间内有心跳。每个consumer 都会根据 heartbeat.interval.ms参数指定的时间周期性地向Coordinator 发送 hearbeat,Coordinator 会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS标识,这样各个consumer就知道已经发生了rebalance,同时Coordinator 也知道了各个consumer的存活情况。

3.1 heartbeat.interval.ms 与 session.timeout.ms 的对比

session.timeout.ms是指:Coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。

举个栗子:session.timeout.ms=10heartbeat.interval.ms=3

session.timeout.ms指定了一个阈值—10秒,在这个阈值内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。而heartbeat.interval.ms主要是告诉Consumer要每3秒给Coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多,它是会影响发TCP包的数量的。

如果Coordinator在一个heartbeat.interval.ms周期内未收到Consumer的心跳,就把该Consumer移出group,这有点说不过去。就好像Consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能Consumer出现了一次长时间GC,影响了心跳包的到达,说不定下一个heartbeat就正常了。

因此,heartbeat.interval.ms肯定是要小于session.timeout.ms的,如果Consumer Group发生了rebalance,通过心跳包里面的REBALANCE_IN_PROGRESS,Consumer就能及时知道发生了rebalance,从而更新Consumer可消费的分区。而如果超过了session.timeout.ms,Coordinator都认为Consumer挂了,那也当然不用把 rebalance信息告诉该Consumer了。

3.2 session.timeout.ms 和 max.poll.interval.ms

在kafka0.10.1之后的版本中,将session.timeout.msmax.poll.interval.ms 解耦了。

也就是说:创建Kafka消费者实例后,消费者不停地执行consumer.poll拉取消息这个过程中,其实背后是有2个线程的,即一个Kafka Consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程

processing线程可理解为调用consumer.poll方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms有关,heartbeat线程 每隔heartbeat.interval.ms向Coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms时间内向 Coordinator发送过心跳包,那么Coordinator就认为当前的Kafka Consumer是活着的。

在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的。

如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 Kafka Consumer处理完消息,Coordinator早就将Consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向Coordinator发送心跳包,超过3000ms未发送心跳包,Coordinator就将该Consumer移出group了。

而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么,就算一条消息需要处理5min,只要heartbeat线程在session.timeout.ms向Coordinator发送了心跳包,那Consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果Consumer出了问题,那么在session.timeout.ms内就能检测出来,而不用等到max.poll.interval.ms 时长后才能检测出来。

TODO:后续根据sarama源码来看Kafka的再平衡过程。

参考

1、kafka 中参数:session.timeout.ms 和 heartbeat.interval.ms的区别
2、sarama 源码解析–Kafka的重平衡
3、kafka学习(五):消费者分区策略(再平衡机制)
4、Kafka【再平衡】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/127328.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Sentinel】ProcessorSlotChain处理器插槽链与Node

文章目录 1、Sentinel的基本概念2、ProcessorSlotChain3、Node 1、Sentinel的基本概念 Sentinel实现限流、隔离、降级、熔断等功能,本质要做的就是两件事情: 统计数据:统计某个资源的访问数据(QPS、RT等信息)规则判断…

Redis高并发分布式锁实战

高并发场景秒杀抢购超卖bug实战重现 秒杀抢购场景下实战JVM级别锁与分布式锁 大厂分布式锁Resisson框架实战 Lua脚本语言快速入门与使用注意事项 Redisson分布式锁源码剖析 Redis主从架构锁失效问题解析 从CAP角度剖析Redis与Zookeeper分布式锁区别 Redlock分布式锁原理与…

Qt QTreeWidge解决setItemWidget后,导致复选框失效

一、问题: QTreeWidget某一项加上itemWidget后,导致复选框失效问题 二、解决方法 将要加上的widget控件加到该项的后续的列,即控件跟复选框不同一列 三、具体代码 QTreeWidget* treeW new QTreeWidget; treeW->setColumnCount(2); /…

centos编译升级cmake,痛苦的Linux小白

环境 root 用户 下载 cmake官网下载地址:https://cmake.org/download/ 获取下载地址,右击cmake-3.27.4.tar.gz 命令行输入链接地址,下载 wget https://github.com/Kitware/CMake/releases/download/v3.27.4/cmake-3.27.4.tar.gz解压 tar -zx…

Git_回退到上一次commit与pull

git 回退到上个版本 rollback 回滚 git reset HEAD, git 回退到上一版本

MySQL 连接查询

文章目录 1.什么是连接查询2.连接类型内连接交叉连接左连接右连接自然连接 3.连接条件4.隐式连接使用逗号连接表逗号与 JOIN 的优先级 5.全外连接6.小结参考文献 1.什么是连接查询 在关系型数据库管理系统(RDBMS)中,连接查询是一项重要的数据…

桉木板材的优缺点

桉木(Eucalyptus)是一种常见的木材品种,具有一些独特的特点和用途。以下是桉木板材的一些优点和缺点: 优点:强度高:桉木具有较高的密度和强度,使其在承重和结构应用中表现出色。它的强度比一些其…

C# 命令行参数分割

CommandLineToArgvW 函数 [DllImport("shell32.dll", SetLastError true)] private static extern IntPtr CommandLineToArgvW([MarshalAs(UnmanagedType.LPWStr)] string lpCmdLine, out int pNumArgs); 参数: [in] lpCmdLine 类型:…

C++学习笔记(堆栈、指针、命名空间、编译步骤)

C 1、堆和栈2、指针2.1、指针的本质2.2、指针的意义2.3、清空指针2.4、C类中的this 3、malloc and new4、命名空间4.1、创建命名空间4.2、使用命名空间 5、编译程序的四个步骤5.1、预处理5.2、编译5.3、汇编5.4、链接 1、堆和栈 堆(heap)和栈&#xff0…

Jenkins Jenkinsfile管理 Pipeline script from SCM

一、Jenkinsfile理解 Jenkins Pipeline 提供了一套可扩展的工具,用于将“简单到复杂”的交付流程实现为“持续交付即代码”。Jenkins Pipeline 的定义通常被写入到一个文本文件(称为 Jenkinsfile )中,该文件可以被放入项目的源代码…

RK开发板的USB连接(Ubuntu)

一、安装连接工具 sudo apt-get install putty 二、启动putty工具 sudo putty 三、连接usb,并查看相关的信息 # 查看接入的是否有usb ls /dev/tty* 显示如下:(含有usb接口: /dev/ttyUSB0) /dev/tty /dev/tty23 /d…

设计模式-01简单工厂模式详解 详细代码对比

目录 ChatGpt问答原生代码简单工厂模式代码 简单工厂模式(Simple Factory Pattern)新增boat 对比两种方法原生代码为什么使用强制转换?简单工厂模式 简单工厂方法总结与原生代码的区别:优点:缺点: 参考 本文将介绍什么…

直接接入电商API接口实现调用封装好的商品详情SKU数据参数及返回

什么是API? API全称为Application Programming Interface,中文是应用程序编程接口。它其实是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工…

Python判断多个文件夹的文件夹名是否包含“分公司”或“营销中心”怎么处理?(方法一)...

点击上方“Python爬虫与数据挖掘”,进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 晓畅军事,试用于昔日。 大家好,我是皮皮。 一、前言 前几天在Python最强王者群【哎呦喂 是豆子~】问了一个Python自…

【日常笔记】使用Server过程中可能遇到的一些问题

使用Server过程中可能遇到的一些问题 1. 如何查找GPU型号与驱动版本之间的关系?2. 如何查看当前Server的内核版本?3. 使用Nvidia过程中可能用到的命令4. 对Jupyter Notebook的一些配置5. TensorFlow的一般操作6. 使用PyTorch的一些操作7. 修改安装源为国…

Yolov8-pose关键点检测:模型轻量化创新 | ​BiLevelRoutingAttention 动态稀疏注意力 | CVPR2023 BiFormer

💡💡💡本文解决什么问题:BiLevelRoutingAttention ,通过双层路由(bi-level routing)提出了一种新颖的动态稀疏注意力(dynamic sparse attention ) ​BiLevelRoutingAttention | GFLOPs从9.6降低至8.5,参数量从6482kb降低至6134kb, mAP50从0.921提升至0.926 Yolov8…

二蛋赠书一期:《快捷学习Spring》

文章目录 前言活动规则参与方式本期赠书《快捷学习Spring》关于本书作者介绍内容简介读者对象 结语 前言 大家好!我是二蛋,一个热爱技术、乐于分享的工程师。在过去的几年里,我一直通过各种渠道与大家分享技术知识和经验。我深知&#xff0c…

自然语言处理 微调ChatGLM-6B大模型

自然语言处理 微调ChatGLM-6B大模型 1、GLM设计原理2、大模型微调原理1、P-tuning v2方案2、LORA方案 1、GLM设计原理 bert的主要任务是随机的去除掉某个单词,使用上下文将其预测出来(相当于完形填空任务); GPT的主要任务是根据前…

二、环境配置,项目运行 —— TinyWebServer

环境配置,项目运行 —— TinyWebServer 一、前言 上一期已经介绍过这个项目的基本结构,不懂得可以点开主页查找。 写代码前。一般的步骤就是,先把别人的代码下载下来运行。一、一方面看看最终效果是否是自己想要的,二、掌握项目…

工程可以编译通过,但是Vscode依然有波浪线提示

前言 (1)我们在使用Vscode进行开发的时候,命名文件成功编译通过了,但是Vscode还是有波浪线的提示。 (2)其实成功编译通过就行,但是肯定还会存在一些强迫症患者,硬要消除这个报错。接…