场景
想创建一个新的consumer去消费一个已经再使用的topic时,默认情况下会从topic中的第一条消息开始消费,大多数情况是需要从最新的消息开始。然后再使用CONSUME_FROM_LAST_OFFSET设置时并不会对新的consumer生效,它只是在停用consumer重新启用时,如果之前订阅OFFSET消息已经不存在了(默认rocketmq中存放的消息是72小时)就会从最后一条开始。所以代码层面无法实现新的consumer订阅topic最新消息开始消费的操作。
如何实现
RocketMQ为我们提供了一个强大的消息控制台rocketmq-dashboard,其中就有消息相关的控制功能。操作步骤如下:
1.创建新的consumer,已存在可以跳过这一步
Consumer>add
- clusterName:选择需要订阅rocketmq集群名称
- brokerName:选择需要订阅的broker名称,可能是多组集群
- groupName:输入新建的Consumer名称
- consumeEnable:是否开启Consumer消费,这里先将消费关闭(灰色),启动后再开启消费。
2.启动消费组程序,已启动的跳过。
- 启动消费程序后就可以看到Consumer消费节点数量,延时的消息数量。
- 因为我们consumeEnable为false关闭状态,所有Consumer并没有消费消息。
3.设置Consumer消费offset
- Topic>SKIP_MESSAGE_ACCUMULATE
- 在Topic栏目中选择需要跳过的topic,点击SKIP_MESSAGE_ACCUMULATE操作
- 在SKIP_MESSAGE_ACCUMULATE弹出框SubscriptionGroup中选择需要操作的消费组
- commit后生效,弹出设置的消费信息框如下
- 如果新的Consumer订阅了多个topic,也需要将其他进行操作。
4.开启consumeEnable开关
Consumer>Config
- 在Consumer的模块中,找到新增的消费组SubscriptionGroup ,打开后面Config窗口
- 打开consumeEnable开关(红色),如果有过个broker集群需要都打开。
- 设置完成后消费程序会开始消费新的消息