1、观察consumer线程
使用arthas分析
-
MQClientFactoryScheduledThread 定时任务线程
定时任务线程,包含如下任务:
每2分钟更新nameServer列表
每30秒更新topic的路由信息
每30秒检查broker的存活,发送心跳请求
每5秒持久化消费队列的offset。如果是广播模式,持久化在本地;如果是集群模式,反馈给broker
每分钟调整线程池大小(实际上并没有作用。因为最终执行是空方法) -
PullMessageService 从broker拉取msg的线程。
-
RebalanceService 重平衡线程。每20秒执行一次
具体可查看org.apache.rocketmq.client.impl.factory.MQClientInstance#start()
2、重平衡,任务是如何创建的
重平衡,就是在消费者组动态伸缩的时候,自动把队列重新分配。具体工作的线程,就是RebalanceService。如下是整个重平衡的类图流程
如图,启动时,会触发重平衡任务org.apache.rocketmq.client.impl.consumer.RebalanceService#run()
重平衡的关键点在于如何动态伸缩,重点内容在org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
- 第一步,获取topic对应的队列集合Set<> mqSet
- 第二步,随机从一个可选broker上,获取所有消费者的集合List cidAll。cid就是消费端的唯一标识。格式如下:“ip@pid#时间戳”,比如127.01.01.01@1723#2926328724786400
- 第三步,按字段排序mqSet和cidAll。Collections.sort()
if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}
}
- 第四步、选择重平衡策略
总共有6种。常用的是AllocateMessageQueueAveragely 平均hash队列算法;AllocateMessageQueueAveragelyByCircle 圆周平均hash队列算法 - 第五步,判断当前consumer节点是否有伸缩变更。有则创建PullRequest请求体
- 第六步,将PullRequest请求体put到PullMessageService拉取任务的队列pullRequestQueue
topic的路由信息,是如何更新的
回看上一节的第一步。队列集合Set<> mqSet。在重平衡线程中,是直接获取这个集合的。这个集合,其实是启动时和定时任务线程MQClientFactoryScheduledThread 更新的
每30秒更新topic的路由信息
详细如图:
总结
首先启动时,会从nameserver获取topic的所有queue。这些queue分布在多个broker上。
构建了mqSet。再随机从一个broker上,获取当前消费者组,包含的所有消费者List<> cidAll。
将两者排序,根据分配策略,分配当前消费者负责的队列。(比如总共12个队列,4个消费者。当前消费者,负责 4,5,6队列)。如此看,是客户端重平衡。通过排序然后策略分配的方式,实现消费者互不通信的条件下协同合作。
启动时,内存都是空,所以会触发构建PullRequest请求体。将请求体,put进拉取线程PullMessageService的队列。
每过20秒,会做一次重平衡;
每过30秒,会更新一次路由信息;
后续分析:
- 拉取线程PullMessageService的工作;
- 运行过程中,没有重平衡的情况,RebalanceService是不会再创建PullRequest请求体的。如何重复构建PullRequest请求体,循环拉取?(这块代码在PullMessageService中实现)