背景
Kafka作为一款基于发布订阅模式的消息队列,生产者将消息发送到Kafka集群(Brokers)中,消费者(Consumer Group )拉取消息进行消费,实现了异步机制。Kafka中,消费者通常以消费者组的方式进行消费,消费特点为:
- 每个分区只能被一个消费组中的一个消费者所消费。
- 消费组中一个消费者可以消费多个分区。
- 多个消费组,每个消费组都可以消费topic中的所有数据,且消费位移之间互不影响。
一、Kafka的再平衡机制
在Kafka中,如果消费者数量、分区数变更或者消费者订阅的topic发生变化,也就需要再进行消费者消费分区的重新分配,这也就是所谓的再平衡。
1.1 再平衡定义
再平衡是指的是Consumer Group 下的 Consumer 所订阅的Topic发生变化时 发生的一种分区重分配机制。 |
也就是说,再平衡也就是一种协议,它规定了如何让消费组下的所有消费者来分配 Topic 中的每一个分区。
举个栗子:一个 Topic 有 100 个分区,一个消费者组内有有 20 个消费者,在协调者的控制下让消费者组内的每一个消费者分配到 5 个分区,这个分区分配的过程就是再平衡。
1.2 再平衡触发条件
一般来说,触发Kafka再平衡的条件一般是以下三种:
-
主题分区发生改变,Kafka 目前只支持分区增加,当出现分区数增加的时候就会触发再平衡。
-
Consumer Group 中Consumer 个数发生变化(新增或者减少),导致其所消费的分区需要分配到组内其他的Consumer 上。这里的减少有很大可能是被动的,就是某个消费者出现崩溃掉线了。
-
Consumer 所订阅的Topic发生了新增分区的行为(Kafka目前只支持新增分区),那么新增的分区就会分配给当前的Consumer ,此时就会触发再平衡。
-
Consumer 订阅的topic发生变化,比如订阅的Topic采用的是正则表达式的形式。如
test-*
此时如果有新建了一个topic test-user
,那么这个Topic的所有分区也是会自动分配给当前的Consumer 的,此时就会发生再平衡。
简洁一点,触发再平衡的条件就是:
- Consumer Group 成员数变更。
- Consumer Group 订阅的主题的分区数发生变更。
- Consumer Group 的订阅主题数发生变更。
再平衡有什么危害呢,首先我们要知道,再平衡的过程中,消费者是无法从 Kafka集群中消费消息的,这对 Kafka的 系统吞吐量(TPS)影响极大,而如果 Kafka 集群内节点较多,那么再平衡可能比较耗时。数分钟到数小时都有可能,而这段时间,Kafka 是处于不可用状态。所以在实际环境中,应该尽量避免。
1.3 再平衡通知机制
那么发生再平衡的时候Kafka集群是如何通知到消费者的呢,答案就是通过消费者与Kafka集群之间的心跳机制。Kafka 消费者需要定期地发送心跳请求(Heartbeat Request) 到 Broker 端的协调者(Coordinator ),以证明消费者还活着。
在 Kafka 0.10.0.1版本之前,发送心跳请求是在消费者主线程中完成的。这样就有很多问题,最大的问题在于,消息处理逻辑也是在这个线程中完后的。因此,一旦消息处理消耗很长的时间,心跳请求将无法及时发送到协调者那里,使协调者误以为该消费者死掉。
Kafka 0.10.0.1 版本之后,Kafka 就提供了一个专门的线程去发送心跳,避免了这个问题。
再平衡就是通过这个心跳线程去通知其他消费者触发再平衡机制。当协调者开启新一轮的再平衡之后,它会将"REBALANCE_IN_PROGREESS"
封装到心跳线程的响应信息中,返回给消费者实例,当消费者收到响应信息中含有 “REBALANCE_IN_PROGREESS”
信息,就知道再平衡开始了,这就是再平衡的通知机制。
二、再平衡流程
从再平衡的定义和触发再平衡条件中我们可以看出,再平衡主要是由Kafka集群和Kafka消费端一起完成的,更精确的来说,是Kafka的Broker端的Coordinator 和Consumer端一起完成的。
2.1 消费端再平衡流程
在消费者端,再平衡主要分为两个步骤:
- 重新加入消费者组中。
- 等待领导消费者(Leader Consumer) 分配方案。
这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
当组内成员加入组时,消费者会向协调者发送 JoinGroup 请求,在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
通常情况下,第一个发送 JoinGroup 请求的成员会自动成为领导者。领导消费者的任务是收集所有成员的订阅信息,然后根据这些信息,指定具体的分区消费分配方案。
选出领导者之后,协调者会把消费者的订阅信息封装在 JoinGroup 请求的响应中,然后发送给领导者,由领导者统一做出分区消费分配方案后,在进行下一步,发送 SyncGroup 请求。
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。当然,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式发给所有成员,这样组内的成员都知道自己该消费哪些分区的数据了。
因此,JoinGroup 请求的主要作用是将组成员的订阅信息发送给领导者消费者,待领导者制定好分配方案后,再平衡流程进入到 SyncGroup 请求阶段。而SyncGroup 请求的主要目的就是让协调者把领导者的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入 Stable 状态,即开始正常的消费工作。
2.2 Broker端的再平衡流程
Broker端的再平衡主要是Coordinator 处理再平衡的流程。从触发再平衡的条件来看,与Coordinator 相关的主要是新成员加入消费者组、消费者组成员主动离开、消费者组成员崩溃离组、组成员提交位移。
再平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前 Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个再平衡过程。严格来说,这套状态机属于非常底层的设计,Kafka 官网并没有提及过。目前,Kafka 为消费者组定义了 5 中状态,分别是:Empty
、Dead
、PreparingRebalance
、CompletingReblance
、Stable
。每种状态对应的含义如下:
状态 | 含义 |
---|---|
Empty | 组内没有任何成员,但消费者可能存在已提交的位移数据,而且这些位移尚未过期。 |
Dead | 同样是在组内没有成员,但组内元数据信息已经在协调者端被移除。协调者组件保存着当前向它注册过的所有组信息,所谓的元数据信息类似于这个注册信息。 |
PreparingRebalance | 消费者组开启再平衡,此时所有成员都要重新加入消费者组。 |
CompletingRebalance | 消费者组小所有成员已经加入,各个成员已在等待分配方案。该状态在老一点版本中称为AwaitingSync,它和CompletingReblance是等价的。 |
Stable | 消费者组的稳定状态。该状态表名再平衡已经完成,组内各成员能够正常消费数据了。 |
一个消费者组最开始是 Empty
状态,当再平衡开启后,它会被置为 PreparingRebalance
状态等待成员加入,之后变更为 CompletingReblance
状态等待分配方案,最后流转为 Stable
,完成再平衡过程。
2.2.1 新成员加入消费者组
新成员加入消费者组导致触发再平衡主要指的当消费者组处于 Stable 状态后,有新成员加入。如果对全新启动一个消费者组,Kafka 是有一些自己的优化,流程会有些许的不同。我们这里要讨论的是,消费者组稳定之后有新成员加入的情形。
当协调者收到新的 JoinkGroup 请求后,它会通过心跳响应的方式通知组内现有的所有成员,强制它们开启新一轮的再平衡。具体的过程和之前的客户端再平衡流程是一样的。现在,用一张时序图说明协调者一端是如何处理新成员入组的。
2.2.2 消费者组成员主动离组
消费者实例所在线程或进程调用 Close() 方法主动通知协调者它要退出。这个场景就涉及到第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
2.2.3 消费者组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为主动发起的离组,协调者能马上感知并处理。但是崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms
控制的。也就是说,Kafka 一般不会超过 session.timeout.ms
就能感知到这个崩溃。处理崩溃离组的流程如下:
2.2.4 协调者对组内成员移交位移处理
正常情况下,每个组成员都会定期汇报位移给协调者。当再平衡开启时,协调者会给予成员一端缓冲时间,要求每个成员必须在这段时间内快速上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup请求发送。
三、再平衡相关参数
session.timeout.ms
:该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置较小,可以更早发现消费者崩溃的信息,从而更快的开启再平衡,避免消费滞后,但同时这也会频繁的再平衡,需要根据实际业务来衡量。max.poll.interval.ms
:该参数表示消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1 分钟,那么该参数就需要设置成大于 1 分钟的值,否则就会被 Coordinator 剔除消息组然后再平衡。heartbeat.interval.ms
:该参数是消费端与Coordinator的心跳时间,该参数跟session.timeout.ms
紧密相关,前面也说过,只要在session.timeout.ms
时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔时间就是session.timeout.ms
,因此,该参数值必须小于session.timeout.ms
,以保持session.timeout.ms
时间内有心跳。每个consumer 都会根据heartbeat.interval.ms
参数指定的时间周期性地向Coordinator 发送 hearbeat,Coordinator 会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含REBALANCE_IN_PROGRESS
标识,这样各个consumer就知道已经发生了rebalance,同时Coordinator 也知道了各个consumer的存活情况。
3.1 heartbeat.interval.ms 与 session.timeout.ms 的对比
session.timeout.ms
是指:Coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。
举个栗子:session.timeout.ms=10
,heartbeat.interval.ms=3
。
session.timeout.ms
指定了一个阈值—10秒,在这个阈值内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。而heartbeat.interval.ms
主要是告诉Consumer要每3秒给Coordinator发一个心跳包,heartbeat.interval.ms
越小,发的心跳包越多,它是会影响发TCP包的数量的。
如果Coordinator在一个heartbeat.interval.ms
周期内未收到Consumer的心跳,就把该Consumer移出group,这有点说不过去。就好像Consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能Consumer出现了一次长时间GC,影响了心跳包的到达,说不定下一个heartbeat就正常了。
因此,heartbeat.interval.ms
肯定是要小于session.timeout.ms
的,如果Consumer Group发生了rebalance,通过心跳包里面的REBALANCE_IN_PROGRESS
,Consumer就能及时知道发生了rebalance,从而更新Consumer可消费的分区。而如果超过了session.timeout.ms
,Coordinator都认为Consumer挂了,那也当然不用把 rebalance信息告诉该Consumer了。
3.2 session.timeout.ms 和 max.poll.interval.ms
在kafka0.10.1之后的版本中,将session.timeout.ms
和 max.poll.interval.ms
解耦了。
也就是说:创建Kafka消费者实例后,消费者不停地执行consumer.poll
拉取消息这个过程中,其实背后是有2个线程的,即一个Kafka Consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程。
processing线程可理解为调用consumer.poll
方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms
可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms
有关,heartbeat线程 每隔heartbeat.interval.ms
向Coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms
时间内向 Coordinator发送过心跳包,那么Coordinator就认为当前的Kafka Consumer是活着的。
在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的。
如果一条消息处理时长要5min,而session.timeout.ms=3000ms
,那么等 Kafka Consumer处理完消息,Coordinator早就将Consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向Coordinator发送心跳包,超过3000ms未发送心跳包,Coordinator就将该Consumer移出group了。
而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么,就算一条消息需要处理5min,只要heartbeat线程在session.timeout.ms
向Coordinator发送了心跳包,那Consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果Consumer出了问题,那么在session.timeout.ms
内就能检测出来,而不用等到max.poll.interval.ms
时长后才能检测出来。
TODO:后续根据sarama源码来看Kafka的再平衡过程。
参考
1、kafka 中参数:session.timeout.ms 和 heartbeat.interval.ms的区别
2、sarama 源码解析–Kafka的重平衡
3、kafka学习(五):消费者分区策略(再平衡机制)
4、Kafka【再平衡】