springcloud RocketMQ ,一个mq消息发送后,客户端是怎么一步步拿到消息去消费的?我们要从代码层面探究这个问题。
找的流程图,有待考究。
以下我们开始debug:
拉取数据的线程:
PullMessageService.java 本质是一个线程类
public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();// ..
}
执行逻辑,一直循环拿取阻塞队列的内容,阻塞队列的内容由负载均衡服务提供。(阻塞队列中保存了目前客户端占有的 brokder - queue 信息)
然后进入 DefaultMQPushConsumerImpl.java 的 pullMessage(关键)
这里面有个关键的方法,this.pullAPIWrapper.pullKernelImpl(…) 这里传入了成功回调 pullCallback。
一直执行到 pullMessageAsync 是异步拉取消息,成功后会执行回调。
成功后的回调逻辑:
ConsumeMessageConcurrentlyService.java 的 submitConsumeRequest 方法,将任务下发给消费者线程池 consumeExecutor (ThreadPoolExecutor 类型)去执行。(日志显示就是这里执行的消费业务)
~~
ok,我们看看开启的这个线程做了什么。
首先,单独一个线程是无法debug跨线程的,所以我们继续在 ConsumeMessageConcurrentlyService.ConsumeRequest 消费者请求线程中debug run方法,看看是怎么执行到我们的业务逻辑的。
发现是 监听器 listener 的消费逻辑
这个 listener 是一个接口,而且这个接口没有找到代码impl,也就是可能是匿名的视线
我们debug直接跳到了 RocketMQInboundChannelAdapter.java 的监听器,当时就是从这里把监听器注册进来的。
匿名方法执行了 RocketMQInboundChannelAdapter.this.consumeMessage
执行了一段 retry 逻辑(spring的重试框架),里面执行了发送消息逻辑。
发现底层用的是 spring 的 integration 消息通信框架!
debug进去send逻辑,会发送到一个 channel 中去
channel 里就有我们的处理方法的代理对象,是转发 dispatcher 的目标处理器 handlers 之一。
后面不出所料,就是通过反射去执行这个方法。
然后就跑到了我们的逻辑:
创作不易,希望点赞、收藏、关注支持~