🎯 导读:本文介绍了RocketMQ消息队列系统中的几种消息发送模式及其应用场景,包括同步消息、异步消息以及事务消息。同步消息确保了消息的安全性,但牺牲了一定的性能;异步消息提高了响应速度,适用于对响应时间敏感的场景;事务消息则保证了消息与本地事务的一致性,适用于需要预执行业务逻辑以决定消息是否发送的场景。此外,文章还探讨了Topic与Tag的应用策略,以及如何利用自定义Key来方便消息的查询和去重,提供了丰富的代码示例帮助理解。
文章目录
- RocketMQ发送同步消息*
- RocketMQ发送异步消息*
- 异步消息生产者
- 异步消息消费者
- RocketMQ发送单向消息
- 单向消息生产者
- 单向消息消费者
- RocketMQ发送延迟消息*
- 延迟消息生产者
- 延迟消息消费者
- RocketMQ发送顺序消息
- 场景分析
- 定义消息实体
- 顺序消息生产者
- 顺序消息消费者
- RocketMQ发送批量消息
- 批量消息生产者
- 批量消息消费者
- RocketMQ发送事务消息(不够Seata方便)
- 事务消息的发送流程
- 事务消息生产者
- 事务消息消费者
- 测试结果
- RocketMQ发送带标签的消息*(消息过滤)
- 订阅关系一致
- 标签消息生产者
- 标签消息消费者
- Topic 和 Tag 的应用推荐(官方推荐)
- 发送消息携带自定义Key
- 携带Key好处
- 携带 key 消息生产者
- 携带 key 消息消费者
使用*标注的为常用的消息类型
RocketMQ发送同步消息*
方法有返回Result的是同步消息,可以参考快速入门案例的实现
- 同步消息发送过后会有一个返回值(MQ 服务器接收到消息后返回的一个确认),这种方式非常安全,但是性能没有那么高
- 在 MQ 集群中,要等到所有的从机都复制了消息以后才会返回(要等很久)
- 应用场景:重要的消息可以选择这种方式
RocketMQ发送异步消息*
- 异步消息用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker响应的场景
- 发送完以后会有一个异步消息通知告诉生产者消息是否发送成功
异步消息生产者
@Test
public void asyncProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());producer.send(message, new SendCallback() {// 异步回调方法@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable e) {System.err.println("发送失败:" + e.getMessage());}});System.out.println("我先执行");// 挂起jvmSystem.in.read();
}
异步消息消费者
@Test
public void testAsyncConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe("TopicTest", "*");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() + "----" + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
RocketMQ发送单向消息
- 这种方式主要用在不关心发送结果的场景(没有同步或者异步回调,不在乎消息是否发送成功),例如日志信息的发送
- 这种方式吞吐量很大,但是存在消息丢失的风险
单向消息生产者
@Test
public void testOnewayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();Message msg = new Message("TopicTest", ("单向消息").getBytes());// 发送单向消息producer.sendOneway(msg);// 关闭实例producer.shutdown();
}
单向消息消费者
消费者和上面一样
RocketMQ发送延迟消息*
- 消息放入 MQ 后,过一段时间,才会被消费者监听到
- 在下单业务中,提交一个订单后,发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单,释放库存。类似场景还有 7 天自动收货
延迟消息生产者
RocketMQ 4.x 版本不支持任意时间的延时,只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
@Test
public void testDelayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("ms-consumer-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();Message msg = new Message("TopicTest", ("延迟消息").getBytes());// 给这个消息设定一个延迟级别,每个级别对应一个时间// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);// 发送单向消息producer.send(msg);// 打印时间System.out.println(new Date());// 关闭实例producer.shutdown();
}
RocketMQ 5.x 版本支持任意时间的延时(使用时间轮算法)
Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());
// 直接设置延迟多少秒
message.setDelayTimeSec(500);
// 设置延迟多少毫秒
// message.setDelayTimeMs(100);
// 发延迟消息
producer.send(message);
延迟消息消费者
延时消息会有一点小误差,不完全准时
/*** 发送时间Fri Apr 21 16:19:54 CST 2023* 收到消息了Fri Apr 21 16:20:20 CST 2023* --------------* 发送时间Fri Apr 21 16:21:08 CST 2023* 收到消息Fri Apr 21 16:21:18 CST 2023** @throws Exception*/
@Test
public void msConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("orderMsTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("收到消息了" + new Date());System.out.println(new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
RocketMQ发送顺序消息
- 消息有序指的是可以按照消息的发送顺序来消费(FIFO)
- RocketMQ 可严格保证消息有序,例如下单之后,先发送短信、再发货
【有序类型】
- 分区有序(可以直接通过MessageListenerOrderly实现)
- 全局有序(MessageListenerOrderly + 设置Broker队列数量为1)
可能大家会有疑问,mq本身不就是FIFO吗?
答:一个broker中有四个queue,消费默认是并发的,线程之间有竞争关系,不能确保顺序
【顺序消费的原理解析】
- 默认消息发送时采取Round Robin轮询方式把消息发送到不同的queue(分区队列)
- 消费消息的时候,从多个queue上拉取消息,消费不能保证全局有序,只能保证分区有序(每个queue的消息都是有序的)
- 但如果控制消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,就保证了全局有序(发送和消费参与的队列只有一个)
场景分析
模拟一个订单的发送流程,创建两个订单,发送的消息分别是
- 订单号1000111 发消息流程 下订单->物流->签收
- 订单号1000222 发消息流程 下订单->物流->拒收
即实现分区有序
定义消息实体
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {private String orderSn;private Integer userId;private String desc; // 下单 短信 物流
}
顺序消息生产者
private List<MsgModel> msgModels = Arrays.asList(new MsgModel("1000111", 1, "下单"),new MsgModel("1000111", 1, "短信"),new MsgModel("1000111", 1, "物流"),new MsgModel("1000222", 2, "下单"),new MsgModel("1000222", 2, "短信"),new MsgModel("1000222", 2, "物流")
);@Test
public void orderlyProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();// 发送顺序消息 发送时要确保有序 并且要发到同一个队列下面去msgModels.forEach(msgModel -> {String msgModelString = msgModel.toString();Message message = new Message("orderlyTopic", msgModelString.getBytes());try {producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// arg就是接收到的订单号 orderSn// 在这里 选择队列int hashCode = arg.toString().hashCode();// 根据订单号hashCode,取模放到队列中,订单号一样,放到的队列一样return mqs.get(hashCode % mqs.size());}},// 传递订单号进去msgModel.getOrderSn());} catch (Exception e) {e.printStackTrace();}});producer.shutdown();System.out.println("发送完成");
}
顺序消息消费者
@Test
public void orderlyConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("orderlyTopic", "*");// MessageListenerConcurrently 并发模式 多线程的 重试16次// MessageListenerOrderly 顺序模式 单线程的 无限重试Integer.Max_Valueconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.println("线程id:" + Thread.currentThread().getId());System.out.println(new String(msgs.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();
}
下面的结果就是分区有序,同一个订单(同一队列)的消息会按照顺序消费,但是不同订单的消息没有顺序约束
RocketMQ发送批量消息
RocketMQ可以一次性发送一组消息,这一组消息被投递到同一个队列中,会被当做一条消息进行消费
批量消息生产者
@Test
public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();// 消息实际上还是分开发的,接收的时候还是一条一条接收就行List<Message> msgs = Arrays.asList(new Message("TopicTest", "我是一组消息的A消息".getBytes()),new Message("TopicTest", "我是一组消息的B消息".getBytes()),new Message("TopicTest", "我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown();
}
三个消息放在一个队列中
批量消息消费者
消费者还是一条一条来消费
@Test
public void testBatchConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 表达式,默认是*consumer.subscribe("TopicTest", "*");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
RocketMQ发送事务消息(不够Seata方便)
事务消息的发送流程
它允许发送方在发送消息之前执行某些业务逻辑,并根据这些业务逻辑的结果来决定消息是否应该被发送。
- 如果业务逻辑执行成功,则消息会被提交并最终被消费者消费;
- 如果业务逻辑执行失败或不确定,则消息的状态将保持未知,直到通过回查机制确认其状态为止。
下图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交
1、发送消息(half消息)
2、服务端响应消息写入结果(如果写入失败,此时half消息对业务不可见,本地事务不执行)
3、根据发送结果执行本地事务
4、根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)
事务补偿
补偿阶段用于解决消息UNKNOW或者Rollback发生超时或者失败的情况
1、对没有Commit/Rollback的事务消息(pending状态的消息),发起一次“回查”
2、Producer收到回查消息,检查回查消息对应的本地事务的状态
3、根据本地事务状态,重新 Commit 或者 Rollback
事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态
- TransactionStatus.CommitTransaction: 提交事务,允许消费者消费此消息
- TransactionStatus.RollbackTransaction: 回滚事务,该消息将被删除,不允许被消费
- TransactionStatus.Unknown: 中间状态,需要检查消息队列来确定状态
事务消息生产者
通过在broker.conf文件中设置如下参数
- transactionCheckInterval:Broker检查事务消息状态的默认间隔时间(单位为毫秒)。默认Broker每1分钟(60000毫秒)会对未确认的事务消息进行一次状态检查
- transactionTimeOut:事务消息的有效期,即事务消息在未得到确认前能存在的最长时间
- transactionCheckMax:事务消息的最大检测次数(默认回查15次)。如果在达到最大检测次数后事务消息的状态仍未得到确认,Broker会默认认为事务已失败,并对消息进行回滚
private Random random = new Random();
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 创建订单,扣减库存** @throws Exception*/
@Test
public void createOrderAndDeductStock() throws Exception {// 构建消息体TransactionMQProducer producer = new TransactionMQProducer("async-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);// 设置事务消息的监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println(sdf.format(new Date()) + "-->执行本地事务:" + new String(msg.getBody()) + ";事务参数:" + arg);// 获取订单信息String orderInfo = new String(msg.getBody());// 做一些业务操作...// 检查订单状态if (isOrderValid(orderInfo)) {System.out.println("订单状态没有问题,消息发送给消费者处理");return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.println("订单状态有问题,等会重新 检查本地事务");// 返回 UNKNOW ,后面会调用checkLocalTransaction(msg)方法return LocalTransactionState.UNKNOW;}}/*** 回查,确认上面的业务是否有结果* 触发条件:* 1、当上面执行本地事务返回结果 UNKNOW 时,或者回查方法也返回 UNKNOW 时,会触发* 2、上面操作超过 20s 没有做出一个结果,也就是超时或者卡住了,也会进行回查* @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println(sdf.format(new Date()) + "-->检查本地事务:" + new String(messageExt.getBody()));// 检查订单状态String orderInfo = new String(messageExt.getBody());if (isOrderValid(orderInfo)) {System.out.println("订单状态没有问题,消息发送给消费者处理");return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.println("订单状态还是有问题,这个消息不要了");return LocalTransactionState.ROLLBACK_MESSAGE;}}});producer.start();for (int i = 0; i < 3; i++) {String orderId = UUID.randomUUID().toString().replace("-", "");int amount = random.nextInt(10) + 1;// 构建订单信息String orderInfo = "订单号:" + orderId + " 数量:" + amount;// 尝试保存订单信息boolean saveOrderSuccess = saveOrder(orderInfo);// 发送事务消息Message message = new Message("Affair_Topic", orderInfo.getBytes());System.out.println(sdf.format(new Date()) + "-->发送事务消息:" + orderInfo);producer.sendMessageInTransaction(message, "hahaha");}System.in.read();
}private boolean saveOrder(String orderInfo) {// 模拟保存订单信息到数据库// 假设保存成功return true;
}private boolean isOrderValid(String orderInfo) {// 模拟检查订单状态// 假设订单有效int random = this.random.nextInt(2);return random == 1; // 应该替换为实际的订单状态检查逻辑
}
事务消息消费者
@Test
public void testTransactionConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 设置nameServer地址consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe("Affair_Topic", "*");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(sdf.format(new Date()) + "-->" + Thread.currentThread().getName() + " 执行消费:" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
测试结果
RocketMQ发送带标签的消息*(消息过滤)
- RocketMQ提供消息过滤功能,通过tag或者key进行区分。同一个主题对应多个tag。我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,通过过滤来区别对待
- 场景:订单主题中。有的消息是关于生鲜,需要及时配送;有的消息是关于服装,可以慢一点发货。不同标签的处理逻辑不同
订阅关系一致
- 订阅关系:一个消费者组订阅一个 Topic 的某一个 Tag(个人实测,一个标签最好对应一个标签,不然信息会被过滤掉,不被正常消费),这种记录被称为订阅关系。
- 订阅关系一致:同一个消费者组下所有消费者实例所订阅的 Topic、Tag 必须完全一致。如果订阅关系(消费者组名 - Topic - Tag)不一致,会导致消费消息紊乱,甚至消息丢失。
标签消息生产者
@Test
public void tagProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());producer.send(message);producer.send(message2);System.out.println("发送成功");producer.shutdown();
}
标签消息消费者
/*** vip1** @throws Exception*/
@Test
public void tagConsumer1() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);// 主题,标签(写 * 是所有标签)consumer.subscribe("tagTopic", "vip1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}/*** vip1 || vip2** @throws Exception*/
@Test
public void tagConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("tagTopic", "vip1 || vip2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
Topic 和 Tag 的应用推荐(官方推荐)
总结:不同的业务应该使用不同的Topic,如果是相同的业务里面有不同的表现形式,我们要使用tag进行区分,可以从以下几个方面进行判断:
1、消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分
2、业务是否相关联:淘宝交易消息、京东物流消息使用不同的 Topic 进行区分;同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分
3、消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分
4、消息量级是否相当:有些业务消息虽然量小但是实时性要求高(如生鲜订单),如果跟某些万亿量级的消息使用同一个 Topic,可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
发送消息携带自定义Key
在 RocketMQ 中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或key来进行查询
携带Key好处
- 方便查阅
- 方便去重
携带 key 消息生产者
@Test
public void keyProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();String key = UUID.randomUUID().toString();System.out.println(key);Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());producer.send(message);System.out.println("发送成功");producer.shutdown();
}
携带 key 消息消费者
@Test
public void keyConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("keyTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println("我是vip1的消费者,我正在消费消息" + new String(messageExt.getBody()));// 获取keySystem.out.println("我们业务的标识:" + messageExt.getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}