消费者的Rebalance机制
客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、
Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的正常消费的?
Rebalance服务的类图
RebalanceImpl的核心属性
- ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable:记录MessageQueue和ProcessQueue的关系,MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
- ConcurrentMap<String, Set< MessageQueue >> topicSubscribeInfoTable:Topic路由信息。保存Topic和MessageQueue的关系
- ConcurrentMap<String(Topic), SubscriptionData> subscriptionInner:真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
- AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQUeue消息分配策略的实现
- MQClientInstance mQClientFactory:client实例对象
RebalanceImpl的核心方法
-
boolean lock():为MessageQueue加锁
-
void doRebalance():执行Rebalance操作
-
void messageQueueChanged():通知Message发生变化,这个方法在Push和Pull两个类中被重写
-
boolean removeUnnecessaryMessageQueue():去掉不再需要的MessageQueue
-
void dispatchPullRequest():执行消息拉取请求
- boolean updateProcessQueueTableInRebalance():在Rebalance中更新processQueue
Rebalance过程
- 消费者实例在收到Broker通知后是怎么执行Reblance的?这个操作是通过调用
MQClientInstance.rebalanceImmediately()来实现的
- 这种设计是RocketMQ种典型的锁方式,执行wakeup命令后,this.waitForRunning()就会暂停,再执行this.mqClientFactory.doRebalance()
- doRebalance()方法主要有以下几个步骤
1.查找当前clientId对应的全部的消费者组,全部执行一次Rebalance.
虽然消费者实现分别为Pull消费和Push消费两种默认实现,调用的是不同实现类的Rebalance方法,但是实现逻辑都差不多
2.判断Rebalance开关,如果没有被暂停,则调用RebalancePushImpl.rebalance()方法
3.在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的Topic,
循环对每个Topic进行Rebalance.待全部的Rebalance都执行完之后,将不属于当前
消费者的队列删除
4.Topic队列重新分配,这里也就是客户端Rebalance的核心逻辑之处,根据是集群消费还是广播消费分别执行MessageQueue重新分配的逻辑,以集群消费为例分析
4.1.获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll),只有当两者都不为空时,才执行Rebalance
4.2.将全部的MessageQueue(代码中时mqAll)和消费者客户端(cidAll)进行排序。
由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,
保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者
客户端都是一样的试图,做Rebalance时才不会分配错
4.3.按照当前设置的队列分配策略执行Queue分配。队列分配策略接口AllocateMessageQueueStrategy,该接口中,有两个方法allocate()和getName()
allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者
getName():获取当前分配算法的名字
目前队列分配策略有五种实现:
AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)
AllocateMessageQueueAveragelyByCircle:环形分配策略
AllocateMessageQueueByConfig:手动配置
AllocateMessageQueueConsistentHash:一致性Hash分配
AllocateMessageQueueByMachineRoom:机房分配策略
4.4.动态更新ProcessQueue,在队列重新分配后,当前消费者消费的队列可能不会发生变化,也可能发生变化,不管时新增加了队列需要消费,还是减少了队列,都需要执行updateProcessQueueTableInRebalance()方法来更新ProcessQueue,如果有MessageQueue不再分配给当前的消费者消费,则设置ProcessQueue.setDropped(true),表示放其当前MessageQueue的Pull消息
如果在重新分配MessageQueue后,新增加了MessageQueue,
则添加一个对应的ProcessQueue,查询Queue拉取位点,包装一个新的PullRequest
来Pull消息;同理如果减少了MessageQueue,则将其对应的ProcessQueue删除,
不管MessageQueue时新增还是减少,都会设置changed为True,表示当前消费者
消费的MessageQueue有变化,源码中是分别两个集合遍历来判断是新增还是减少的。
PullRequest初始化的具体实现,新增的PullRequest对象将被分配出去拉取MessageQueue中的消息
4.5.执行messageQueueChanged()方法,如果有MessageQueue订阅关系发生变化,
则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,
通知所有Broker,当前订阅关系发生了改变