线程池包含多个任务
代码如下
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);}
任务1、更新nameSrvAddr
当nameSrvAddr 部署是云端动态的,比如是aws的。那么需要每两分钟更新一次nameSrvAddr。固定写死的,该任务不会跑。注意源码的if条件。
任务2、每30秒更新一次topicRoute
这个任务,我在之前的文章已有描述,查看rocketmq-push模式-消费侧重平衡-类流程图分析
任务3、每30秒清理失效broker && 发送心跳给ALL broker
-
清理失效的broker
具体看org.apache.rocketmq.client.impl.factory.MQClientInstance#cleanOfflineBroker
目的是更新org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable
更新的方法是
org.apache.rocketmq.client.impl.factory.MQClientInstance#isBrokerAddrExistInTopicRouteTable
通过任务2,更新了org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable
遍历topicRouteTable,查看broker是否还存在,不存在则移除。
处理流程就是依赖任务2,只需要在内存中做更新 -
发送心跳给broker
遍历org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable
这里解析下这个成员变量
ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId /, String/ address */>> brokerAddrTable
存储的是一个brokerName,对应的主从集群。master节点,brokerId是0,slave是正数
遍历brokerAddrTable,组装HeartbeatData数据,发送给这些broker。
HeartbeatData由三个成员变量组成。反映该client,生产者group消费者group的信息。
public class HeartbeatData extends RemotingSerializable {private String clientID;private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
任务4、每5秒持久化一次offset,通知给broker
调用方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#persistAllConsumerOffset
这里,根据消费者实现类的不同,分pull和push的 不同实现
- push模式
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#persistConsumerOffset
public void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}}
从rebalanceImpl上获取MessageQueue
- pull 模式
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#persistConsumerOffset
public void persistConsumerOffset() {try {checkServiceState();Set<MessageQueue> mqs = new HashSet<MessageQueue>();if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);} else if (this.subscriptionType == SubscriptionType.ASSIGN) {Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues();mqs.addAll(assignedMessageQueue);}this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e);}
}
根据订阅类型不同,获取MessageQueue的方式有不同。后续需要熟悉下SUBSCRIBE和ASSIGN模式的区别
对于集群模式,采用的实现类方法是
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll
该方法就是遍历org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#offsetTable成员变量,处理上一步获取到的MessageQueue
然后往broker的master节点上发送队列的offset。至于主从节点如何同步offset,猜测要在broker上查看。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);}if (findBrokerResult != null) {UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());requestHeader.setCommitOffset(offset);requestHeader.setBname(mq.getBrokerName());if (isOneway) {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);}} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}}
任务5、每分钟调整一次ThreadPool
目前调深入,都是空方法,代码没有作用。