背景
RocketMQ与SpingBoot相结合可以大大降低我们开发的复杂度,但是最近在一个新项目中使用RocketMQMessageListener 监听消息,导致消费者启动失败,提示该消费组已经被创建了,请重新申请一个消费者组。
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[pomp_pstConsumerGroup] has been created before, specify another name please.
于是更换了一个消费者组,依然提示上面的错误。
问题分析
代码中没有看出有什么异常之处,在Nacos中预制了一个开关,然后控制消费逻辑。
@RocketMQMessageListener(consumerGroup = "pomp_pstConsumerGroup",topic = "pts-topic")
@Component
@RefreshScope
public class PtsAuditConsumer implements RocketMQListener<PtsAuditEvent> {@Value("${mq.swtich:false}")private boolean isStop;@Overridepublic void onMessage(PtsAuditEvent event) {if(isStop){return;}// 省略业务代码}
}
我们在RocketMQ 消费者启动代码中发现,是在DefaultMQPushConsumerImpl类中525行报错,原因是在
第521行注册消费者的时候注册失败了。
于是在registerConsumer中我们看到在执行putIfAbsent的时候出错了,说明消费者组与消费者是一一对应的,导致注册失败了。
但是我在项目中检查,该消费者组与消费者并没有重复出现,那么Spring为什么会提示重复呢?既然提示消费者重复,那么我们在启动消费者的时候指定instanceName名,继承RocketMQPushConsumerLifecycleListener,随机生产一个uuid作为instanceName,这样确保Consumer不会重复。
@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {defaultMQPushConsumer.setInstanceName(UUIDUtil.getUUID32());}
此时消费者已经启动了,但是我们在RocketMQ的控制台可以看到,该topic下有四个queue,有两个consumerClient均分4个queue。此时又开始疑惑了,命名只启动了一个消费者,为什么会有两个consumerClinet呢?这也就是为什么不指定InstanceName时消费者组重复的根本原因。
于是开始推测,是什么原因导致Spring在启动的时候,该Bean在被创建了两次?最后我们从RocketMQMessageListener加载的代码开始分析问题。
果然在ListenerContainerConfiguration中获取RocketMQMessageListener注解的所有实例,获取到两个,这两个消费者是指向同一个类,其中一个的bean是以scopedTarget开头,那么我们就要寻找在什么地方将beanName代理成了scopedTarget.beanName。
在最终debug下我们发现在ScopedProxyUtils中会将加了@RefreshScope注解的类重新代理了,这就导致我们在注入RocketMQ在启动消费者的时候加载了两次。所有我们最终得出结论,RocketMQMessageListener与RefreshScope注解不能同时使用,可以将RefreshScope作为一个Configuration注入到消费者即可。
思考
1、使用了@RefreshScope + 指定instanceName的方式启动消费者有什么问题?
由于指定了instanceName导致服务中存在两个消费者,如果在Nacos中刷新了配置,则有一个消费者能监听到配置更新,另一个消费者无法监听到配置更新;另一方面频繁的更新配置会触发消费者reblance机制,使得消费性能下降。
2、如果使用了Apollo作为配置中心,是否存在这个问题呢?
Apollo不依赖于RefreshScope 的作用域,所有不加RefreshScope 注解,Apollo也可正常启动,不受影响。