这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
最近在维护RocketMQ
经常会出现这种问题
消息发送方和接收方出现扯皮,消息发送方说我的消息已经发送成功了,消费方说我没接收到消息。两边各持己见,谁也不会说服谁。这时候就非常希望RocketMQ
能有消息的一个消息发送和消费的一个业务log了,类似什么时候发送了消息,什么时候消费了消息,消费成功还是失败了
正常的消息查询页面一般只有消息是否消费,没有消息消费成功还是失败
不管消费成功还是失败,这里显式的都是CONSUMED
,非常不方便排查问题,那么RocketMQ
是不是有类似的log功能呢?
答案是有的,这里就引出了我们今天的主角,消息轨迹
RocketMQ版本
- 5.1.0
消息轨迹是什么
RocketMQ
消息轨迹主要是用来记录消息的发送消费记录,算是一种消息的log
如何使用
RocketMQ的消息轨迹开启主要是三个地方
- broker
- producer
- consumer
broker
broker
启动配置文件添加如下配置
traceTopicEnable=true
producer开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
和一般的消息发送不同,主要是添加一个新的构造函数的参数
之前的构造方式
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
consumer开启消息轨迹
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);
和一般的消息消费也不同,我们也添加了enableMsgTrace
为true
测试
消息发送
public class LocalProducer {/*** The number of produced messages.*/public static final int MESSAGE_COUNT = 1;public static final String PRODUCER_GROUP = "xiao-zou-topic-producer";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9000";public static final String TOPIC = "xiao-zou-topic";public static final String TAG = "TagA";public static void main(String[] args) throws MQClientException, InterruptedException {/** Instantiate with a producer group name.*/DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.addRetryResponseCode(RemotingSysResponseCode.SYSTEM_BUSY);producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg = new Message(TOPIC /* Topic */,TAG /* Tag */,("Hello xiaozou " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
// msg.setDelayTimeLevel(2);SendResult sendResult = producer.send(msg, 5000);DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
消息消费
public class LocalConsumer {public static final String CONSUMER_GROUP = "gid-xiao-zou-topic";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9001";public static final String TOPIC = "xiao-zou-topic";public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.RECONSUME_LATER;});consumer.start();System.out.printf("Consumer Started.%n");}
}
这里消费故意返回失败,便于观察消费轨迹
然后我们发送消息后用msgId
去查看消息轨迹看看
消息轨迹查看
我们一般用的是消息的查询,现在我们直接去消息轨迹那里查看
我们查看消息轨迹可以看到非常详细的消费记录
包括消息的
- 发送时间
- 消费是否成功还是失败
- 重试测试等
- 消费者的ip
- broker的ip
消息轨迹的存储
消息轨迹默认存储的Topic
是RMQ_SYS_TRACE_TOPIC
,也可以自己设置。
存储方式有两种
普通模式
RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。
物理IO隔离模式
对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹(只在该broker创建轨迹Topic),使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
总结
总的来说消息轨迹开启后会非常方便我们定位问题,但是会增加额外的存储开支,如果消息量很大,推荐使用物理隔离的方式,单独使用一个broker
存储消息轨迹
参考
- 官方文档: https://rocketmq.apache.org/zh/docs/4.x/bestPractice/03messagetra/