大家好,我是苍何。
顺序消息是业务中常用的功能之一,而 RocketMQ 默认发送的事普通无序的消息,那该如何发送顺序消息呢?
要保证消息的顺序,要从生产端到 broker 消息存储,再到消费消息都要保证链路的顺序,才可以做到真正的顺序消息。
何为顺序
那苍何首先抛出一个「玄学」问题,何为顺序?
你肯定会用你聪明的大脑瓜子说"顺序不就是有序吗?有啥好说的"。不,那你就浅了,我从顺序的严格划分程度来说,可分为:
- 普通顺序
- 严格顺序
普通顺序是指的相同队列收到的消息是有序的(有时也叫局部有序)。这有个前提条件,必须消息在同一个队列,我们知道队列本身就是先进先出嘛,故而自带顺序。
那严格顺序就好理解啦,我可不管你是哪个队列,哪个 topic,不管你在天涯海角的哪一台服务器,都要满足顺序(有时也叫全局有序)。
这一看就很难对不对,在地球上天涯海角的 2 台服务器,硬是要来保证顺序。
那如果是业务来分呢,顺序还可以分为:因果顺序和时间顺序。
凡事有因必有果,比如交易系统中订单创建、支付、退款等流程,先要创建订单才能支付,支付完成才能退款。这些步骤间有因果顺序。
对于时间顺序步骤之间没有因果联系,只要满足先进先出的顺序,比如股票交易中,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。
所以一个简单的玄学问题硬是可以整出好几个概念,哈哈,这就是计算机的魅力。总结下何为顺序吧:
顺序消息的使用场景
其实顺序消息的使用场景比较多,只要你的业务需要用消息来保证顺序就都用到顺序消息,你肯定会说"苍何,你这说了和没说一样😂"
别急,除了刚刚的股票交易场景,在 MySQL 数据库利用 binlog 消息来进行数据实时增量同步也是需要顺序消息的。
增量同步是指的两个不同数据库,当有一方增删改时,另一方也要同步进行增删改。
增量同步的目的是为了保证数据数据一致性,如果是普通消息,数据可能就乱了。
就这两场景啊,也不多啊,别急,以下这些业务场景也都可用到顺序消息:
- 库存管理:保证商品入库、出库、盘点等操作的顺序性,避免超卖或库存错误。
- 事件溯源:在事件驱动架构中,保证事件按照发生的顺序被处理和记录。
- 消息重试机制:在处理失败需要重试的场景中,保证重试消息的顺序。
顺序发送技术原理
关关难过,关关过。要保证顺序消息,第一关,当然是,先让生产者发送的消息是顺序的吧。
在之前文章 [[图解RocketMQ之生产者如何进行消息重试]]中,说到了在 RocketMQ 中消息发送的 3 种方式:同步、异步、单向。
要保证发送消息的顺序,只能保证同步发送,且必须是单个生产者。这 2 个条件缺一不可。
究其原因还是因为顺序发送的技术原理,其技术原理也比较简单,就是要将同一类消息发送到相同队列即可。
RocketMQ 顺序消息的顺序关系是通过消息组(MessageGroup)判定和识别。发送消息的时候需要为每条消息设置 MessageGroup。那如何保证同一 MessageGroup 下同一业务 ID 的消息发送到指定相同的队列呢?
我们不妨看看 RocketMQ 的源码,在 SelectMessageQueueByHash 类中:
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode();if (value < 0) {value = Math.abs(value);}value = value % mqs.size();return mqs.get(value);,}
}
在发送消息的时候,生产者可以使用这个选择器,来选择指定队列的消息;
SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderId);
- orderId 指的就是业务 ID,取 orderId 的 hash 值(想通的业务 ID 具有想通的 hash 值)
- 用哈希值和队列数 mqs.size() 取摸,得到一个索引值,结果会小于队列数
- 根据索引值从队列列表中取出一个队列,那 hash 值相同,取出的队列也就相同。
通过发送消息的时候指定队列方式就保证了同一个业务 ID 发往相同的队列,也即保证消息发送的顺序性。
队列列表的获取,Producer 会从 Nameserver 中获取对应 Topic 的 Broker 列表,并将结果缓存到本地,下次将直接从缓存中拿到结果。
顺序存储技术原理
我们在之前的[[图解RocketMQ之消息如何存储]]中有介绍通过 commitlog 存储全量的消息,且会按照 topic 和队列分配到 consumerQueue 中。
由于我们发送消息的时候指定了队列,那对于相同业务 ID 的消息,也会被存储到相同的 consumerQueue 中,且通常在实际项目中,同一个业务位于同一个消息组。
这样相同一笔订单,无论是创建、支付还是退款消息,都按顺序会被发送到相同的队列,不同的订单会被分配到不同的队列中。且消息存储也是按照顺序的。
这个时候问题很多的小明又问了,如果不同消息组的消息都发往 Topic 中的同一个队列,那这个时候存储的 consumerQueue 中也会有多个消息组的消息,如何保证顺序呢?
这其实就扯到一个概念了,我们这里所说的顺序其实还只是分区顺序,也就是同一个消息组的消息在队列中是能保证顺序的,不同消息组的消息在同一个队列中无法保证顺序。
上图中的例子就是对于消息组 1 中的订单消息在 order-队列 1 中的消息是按照顺序存储的,同理消息组 3 中的消息 msg-1 和 msg-2 在 order-队列 1 中消息也是顺序的。
但消息组 1 和消息组 3 虽然都放在了同一个队列,但并不涉及到顺序。
顺序消费技术原理
RocketMQ 支持 2 种消费模式,集群消费和广播消费。
集群消费模式下每一条消息只会被 ConsumerGroup 分组下的一个 Consumer 消费,而广播消费模式下,每个 Consumer 都会消费这条消息。
多数场景下用的都是集群消费,也就是一次消费代表一次业务处理,每一条消息都将由集群中的一个实例来对应处理。
而顺序消费也叫有序消费,如果消息是顺序发送,且顺序存储,那理应消费也是一条条消费,这个用屁股想也知道,但实际却没这么简单。
在 Consumer 中不止一个线程在那消费,因为同一个消费者可能会处理不同的队列消息。如果只有一个线程。那不得慢死,实际上会有多个线程同时消费,对应的是 Consumer 中的消费线程池。
多个线程消费同一条消息,如何防止消息被重复消费又是一大问题。
如果是你,会怎么做呢?
没错,就是加锁,在 RocketMQ 中用了 3 把锁来保证,分别是分布式锁、Synchronized、ReentrantLock。
我们先来看看第一把锁:分布式锁。
顺序消费用的是 MessageListenerOrderly 来保证顺序消费,RocketMQ 默认已经提供了一个实现类 ConsumeMessageOrderlyService 。
这个 service 在启动的时候就会向 Broker 申请当前消费者负责的队列锁,会将自己的消费组、自己的客户端ID、以及负责的队列发往 Broker,Broker 就把对应的队列与这个消费者绑定,将这个关系存储在了本地。
加了这分布式锁,就可以保证在同一个消费组内,一个队列只能被一个 Consumer 消费。
这个分布式锁在 broker 中会过期,默认消费者每 20 s 去续签这把锁。
在 Consumer 中消费线程池会并发消费,分布式锁可管不到这,那就需要另外一把本地锁,那就是 Synchronized。
Synchronized 其实是为了保证同一个队列的消息只会被 Consumer 线程池中的一个线程所消费。
最后终于费了九牛二虎之力获得了消费的资格,还不够,在消费内部逻辑中又加了一把更细粒度的 ReentrantLock 锁来标记队列还有消费者在消费。
特别注意在顺序消费时,如果有线程消费发生异常,会阻塞该队列中的其他消息,因为他拿着锁不妨,别的消费者依然也无法获取,之前说过有重试机制可以重试,直到超出最大重试次数,在这段期间内,该队列的消息都将会被阻塞。
实际顺序消息中最大重试次数要谨慎设置,防止消息大量堆积。
实战——如何发送和消费顺序消息
以 PmHub 项目中的顺序发和消费消息为例,我们来实战一波。
顺序发消息
public class OrderMessageProducer {private DefaultMQProducer producer;public OrderMessageProducer() throws Exception {producer = new DefaultMQProducer("order_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();}public void sendOrderMessage(String orderId, String status) throws Exception {Message msg = new Message("OrderTopic", status, orderId.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {String orderId = (String) arg;int index = Math.abs(orderId.hashCode()) % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("Send order message: %s, status: %s, result: %s\n", orderId, status, sendResult);}public void shutdown() {producer.shutdown();}
}
在发消息时候,OrderMessageProducer 使用 MessageQueueSelector 来确保同一订单的消息被发送到同一个队列。且需要注意设置发送消息为同步发送(默认)。
顺序消费
public class OrderMessageConsumer {private DefaultMQPushConsumer consumer;public OrderMessageConsumer() throws Exception {consumer = new DefaultMQPushConsumer("order_consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("OrderTopic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String orderId = msg.getKeys();String status = new String(msg.getBody());System.out.printf("Consume order message: %s, status: %s\n", orderId, status);// 在这里处理订单状态更新的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Order message consumer started.");}public void shutdown() {consumer.shutdown();}
}
OrderMessageConsumer 使用 MessageListenerOrderly 来保证消息的顺序消费。
这样我们就实现了顺序生产和消费消息。
实际上企业级项目中,实现顺序消息需要考虑更为复杂,稍微一不注意就无法保证顺序性,且顺序消息的性能和队列数有很大关系,一般实际项目中都只会分区顺序即可。
好啦,今天的分享结束。
我是苍何,这是图解 RocketMQ 教程的第 9 篇,我们下篇见~