一、概述
所谓顺序消费指的是可以按照消息的发送顺序来进行消费。例如一笔订单产生了3条消息,即下订单==》减库存==》增加订单,消费时要按照顺序消费才有意义,要不然就乱套了(PS:你总不能订单还没下,就开始减库存吧),与此同时多笔订单之间又是可以并行消费的。
二、RocketMQ实现顺序消费的方式
(1)保证同一个订单的消息,一定要发送到同一个队列;
(2)并且该队列只有一个消费者,也就是说同一个队列不能出现多个消费者并行消费的情况。到这里可能会有人产生疑问,一个队列只有一个消费者,那性能岂不是很低?关于这种情况,RocketMQ的解决方法是,虽然同一个队列不能并行消费,但是可以并行消费不同的队列。通俗点讲队列和消费者是一对一的关系,即一个队列只属于某一个消费者,消费者和队列是1对多的关系,即一个消费者可以对应多个队列。进而提升RocketMQ的效率。
三、队列消费的两种模式
3.1、并发消费模式
当同一类消息被送入不同队列,且这些消息在处理上并不需要按照时序消费时,可以考虑使用并发消费模式。并发消费模式生产者会将消息轮询发送到不同的队列中,这些队列会和消费者建立多个连接(线程)将消息并发的送给不同的消费者。因为消费者处理速度有快有慢,所以并不能保证物流数据会按照1~9的顺序依次消费。并发消费模式处理效率很高,但是无法保证有序性。
3.2、有序消费模式
有序消费是指生产者在生产数据的时候,根据hash规则指定让消息放入哪个队列,在消费者消费时会保证不同消费者针对每一个队列只有唯一一个连接(线程)用于消费指定队列。有序消费模式可以保证消息按队列FIFO顺序依次被消费,但因此失去并发性能,有序消费模式只有在业务要求必须按照顺序消费的场景下才允许使用。
3.2.1、RocketMQ如何实现有序消息
要想实现RocketMQ实现有序消息,只需要做如下两点调整:
(1)生产者端要求按照ID等唯一标识分配消息队列;
(2)消费者端采用专用的监听器保证对队列的单线程应用;
3.2.2、发送顺序消息
/*** 发送顺序消息* @throws Exception*/
@Test
public void sendOrderlyMessage() {// 1、创建生产者,指定组名DefaultMQProducer producer = new DefaultMQProducer("GROUP_ORDERLY");// 2、指定Namesrv地址producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);try {// 3、启动生产者producer.start();// 模拟10笔订单for(Integer orderId = 1; orderId <= 10; orderId++) {// 每笔订单要发送3条消息,即:创建订单==》扣减库存==》增加积分for (int i = 0; i < 3; i++) {String data = "";switch (i % 3) {case 0:data = orderId + "号创建订单";break;case 1:data = orderId + "号扣减库存";break;case 2:data = orderId + "号增加积分";break;}// 创建消息对象 topic:TOPIC_ORDER、tags:TAG_ORDER、key:orderIdMessage message = new Message("TOPIC_ORDERLY","TAG_ORDERLY",orderId.toString(),data.getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息,实现[MessageQueueSelector]接口SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> messageQueues, Message message, Object o) {int orderId = Integer.parseInt(message.getKeys());int size = messageQueues.size(); // 队列的数量int index = orderId % size; // 队列的索引MessageQueue messageQueue = messageQueues.get(index); // 选择的队列log.info("订单id:{},队列数量:{},队列索引:{},发送内容:{},队列id:{}",orderId,size,index,new String(message.getBody()),messageQueue.getQueueId());return messageQueue;}}, null);}}} catch (Exception e) {log.error("error:{}",e.getMessage());}
}
// 控制台打印结果
"C:\Program Files\Java\jdk1.8.0_202\bin\java.exe" -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent:D:\Programs\ideaIU-2018.3.6\lib\idea_rt.jar=51556:D:\Programs\ideaIU-2018.3.6\bin -Dfile.encoding=UTF-8 -classpath "D:\Programs\ideaIU-2018.3.6\lib\idea_rt.jar;D:\Programs\ideaIU-2018.3.6\plugins\junit\lib\junit-rt.jar;D:\Programs\ideaIU-2018.3.6\plugins\junit\lib\junit5-rt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\rt.jar;D:\augenstemn@gitee\rocketmq\rocketmq-example\target\test-classes;D:\augenstemn@gitee\rocketmq\rocketmq-example\target\classes;D:\mavenRepository\com\alibaba\fastjson\1.2.83\fastjson-1.2.83.jar;D:\mavenRepository\org\apache\rocketmq\rocketmq-client\4.9.2\rocketmq-client-4.9.2.jar;D:\mavenRepository\org\apache\rocketmq\rocketmq-common\4.9.2\rocketmq-common-4.9.2.jar;D:\mavenRepository\org\apache\rocketmq\rocketmq-remoting\4.9.2\rocketmq-remoting-4.9.2.jar;D:\mavenRepository\io\netty\netty-all\4.1.65.Final\netty-all-4.1.65.Final.jar;D:\mavenRepository\org\apache\rocketmq\rocketmq-logging\4.9.2\rocketmq-logging-4.9.2.jar;D:\mavenRepository\commons-validator\commons-validator\1.7\commons-validator-1.7.jar;D:\mavenRepository\commons-beanutils\commons-beanutils\1.9.4\commons-beanutils-1.9.4.jar;D:\mavenRepository\commons-digester\commons-digester\2.1\commons-digester-2.1.jar;D:\mavenRepository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\mavenRepository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\mavenRepository\org\projectlombok\lombok\1.18.22\lombok-1.18.22.jar;D:\mavenRepository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\mavenRepository\ch\qos\logback\logback-classic\1.2.10\logback-classic-1.2.10.jar;D:\mavenRepository\ch\qos\logback\logback-core\1.2.10\logback-core-1.2.10.jar;D:\mavenRepository\junit\junit\4.13.2\junit-4.13.2.jar;D:\mavenRepository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\mavenRepository\org\apache\commons\commons-lang3\3.11\commons-lang3-3.11.jar;D:\mavenRepository\org\apache\commons\commons-collections4\4.4\commons-collections4-4.4.jar" com.intellij.rt.execution.junit.JUnitStarter -ideVersion5 -junit4 org.star.OrderMessageProducer,sendOrderlyMessage
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
11:04:01.604 [main] INFO org.star.OrderMessageProducer - 订单id:1,队列数量:4,队列索引:1,发送内容:1号创建订单,队列id:1
11:04:01.803 [main] INFO org.star.OrderMessageProducer - 订单id:1,队列数量:4,队列索引:1,发送内容:1号扣减库存,队列id:1
11:04:01.804 [main] INFO org.star.OrderMessageProducer - 订单id:1,队列数量:4,队列索引:1,发送内容:1号增加积分,队列id:1
11:04:01.807 [main] INFO org.star.OrderMessageProducer - 订单id:2,队列数量:4,队列索引:2,发送内容:2号创建订单,队列id:2
11:04:01.808 [main] INFO org.star.OrderMessageProducer - 订单id:2,队列数量:4,队列索引:2,发送内容:2号扣减库存,队列id:2
11:04:01.810 [main] INFO org.star.OrderMessageProducer - 订单id:2,队列数量:4,队列索引:2,发送内容:2号增加积分,队列id:2
11:04:01.811 [main] INFO org.star.OrderMessageProducer - 订单id:3,队列数量:4,队列索引:3,发送内容:3号创建订单,队列id:3
11:04:01.813 [main] INFO org.star.OrderMessageProducer - 订单id:3,队列数量:4,队列索引:3,发送内容:3号扣减库存,队列id:3
11:04:01.816 [main] INFO org.star.OrderMessageProducer - 订单id:3,队列数量:4,队列索引:3,发送内容:3号增加积分,队列id:3
11:04:01.818 [main] INFO org.star.OrderMessageProducer - 订单id:4,队列数量:4,队列索引:0,发送内容:4号创建订单,队列id:0
11:04:01.820 [main] INFO org.star.OrderMessageProducer - 订单id:4,队列数量:4,队列索引:0,发送内容:4号扣减库存,队列id:0
11:04:01.822 [main] INFO org.star.OrderMessageProducer - 订单id:4,队列数量:4,队列索引:0,发送内容:4号增加积分,队列id:0
11:04:01.826 [main] INFO org.star.OrderMessageProducer - 订单id:5,队列数量:4,队列索引:1,发送内容:5号创建订单,队列id:1
11:04:01.831 [main] INFO org.star.OrderMessageProducer - 订单id:5,队列数量:4,队列索引:1,发送内容:5号扣减库存,队列id:1
11:04:01.834 [main] INFO org.star.OrderMessageProducer - 订单id:5,队列数量:4,队列索引:1,发送内容:5号增加积分,队列id:1
11:04:01.836 [main] INFO org.star.OrderMessageProducer - 订单id:6,队列数量:4,队列索引:2,发送内容:6号创建订单,队列id:2
11:04:01.838 [main] INFO org.star.OrderMessageProducer - 订单id:6,队列数量:4,队列索引:2,发送内容:6号扣减库存,队列id:2
11:04:01.840 [main] INFO org.star.OrderMessageProducer - 订单id:6,队列数量:4,队列索引:2,发送内容:6号增加积分,队列id:2
11:04:01.841 [main] INFO org.star.OrderMessageProducer - 订单id:7,队列数量:4,队列索引:3,发送内容:7号创建订单,队列id:3
11:04:01.844 [main] INFO org.star.OrderMessageProducer - 订单id:7,队列数量:4,队列索引:3,发送内容:7号扣减库存,队列id:3
11:04:01.846 [main] INFO org.star.OrderMessageProducer - 订单id:7,队列数量:4,队列索引:3,发送内容:7号增加积分,队列id:3
11:04:01.850 [main] INFO org.star.OrderMessageProducer - 订单id:8,队列数量:4,队列索引:0,发送内容:8号创建订单,队列id:0
11:04:01.852 [main] INFO org.star.OrderMessageProducer - 订单id:8,队列数量:4,队列索引:0,发送内容:8号扣减库存,队列id:0
11:04:01.853 [main] INFO org.star.OrderMessageProducer - 订单id:8,队列数量:4,队列索引:0,发送内容:8号增加积分,队列id:0
11:04:01.856 [main] INFO org.star.OrderMessageProducer - 订单id:9,队列数量:4,队列索引:1,发送内容:9号创建订单,队列id:1
11:04:01.858 [main] INFO org.star.OrderMessageProducer - 订单id:9,队列数量:4,队列索引:1,发送内容:9号扣减库存,队列id:1
11:04:01.860 [main] INFO org.star.OrderMessageProducer - 订单id:9,队列数量:4,队列索引:1,发送内容:9号增加积分,队列id:1
11:04:01.860 [main] INFO org.star.OrderMessageProducer - 订单id:10,队列数量:4,队列索引:2,发送内容:10号创建订单,队列id:2
11:04:01.864 [main] INFO org.star.OrderMessageProducer - 订单id:10,队列数量:4,队列索引:2,发送内容:10号扣减库存,队列id:2
11:04:01.866 [main] INFO org.star.OrderMessageProducer - 订单id:10,队列数量:4,队列索引:2,发送内容:10号增加积分,队列id:2Process finished with exit code 0
3.2.2、接收顺序消息(先启动监听)
/*** 接收顺序消息:消费者1* @throws Exception*/
@Test
public void receiveOrderlyMessageConsumer1() throws Exception {// 1、创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_ORDERLY");// 2、连接Namesrvconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);/*** 3、订阅主题* 参数1:主题名称* 参数2:主题中消息类型,*表示订阅所有*/consumer.subscribe("TOPIC_ORDERLY","*");// 4、注册监听器,要想实现顺序消费,消费者端要增加[MessageListenerOrderly]监听器,用于实现有序队列consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {// 遍历输出messageExts.forEach(e -> {log.info("我是消费者1,我正在消费消息,消息内容:{},队列id:{}",new String(e.getBody()),context.getMessageQueue().getQueueId());});return ConsumeOrderlyStatus.SUCCESS;}});// 5、启动消费者consumer.start();// 6、挂起当前JVMSystem.in.read();
}/*** 接收顺序消息:消费者2* @throws Exception*/
@Test
public void receiveOrderlyMessageConsumer2() throws Exception {// 1、创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_ORDERLY");// 2、连接Namesrvconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);/*** 3、订阅主题* 参数1:主题名称* 参数2:主题中消息类型,*表示订阅所有*/consumer.subscribe("TOPIC_ORDERLY","*");// 4、注册监听器,要想实现顺序消费,消费者端要增加[MessageListenerOrderly]监听器,用于实现有序队列consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {// 遍历输出messageExts.forEach(e -> {log.info("我是消费者2,我正在消费消息,消息内容:{},消息队列:{}",new String(e.getBody()),context.getMessageQueue().getQueueId());});return ConsumeOrderlyStatus.SUCCESS;}});// 5、启动消费者consumer.start();// 6、挂起当前JVMSystem.in.read();
}
消费者1控制台打印结果
11:04:01.802 [ConsumeMessageThread_5] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:1号创建订单,队列id:1
11:04:01.804 [ConsumeMessageThread_6] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:1号扣减库存,队列id:1
11:04:01.808 [ConsumeMessageThread_7] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:1号增加积分,队列id:1
11:04:01.829 [ConsumeMessageThread_8] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:4号创建订单,队列id:0
11:04:01.831 [ConsumeMessageThread_9] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:5号创建订单,队列id:1
11:04:01.831 [ConsumeMessageThread_10] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:4号扣减库存,队列id:0
11:04:01.833 [ConsumeMessageThread_10] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:4号增加积分,队列id:0
11:04:01.836 [ConsumeMessageThread_11] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:5号扣减库存,队列id:1
11:04:01.838 [ConsumeMessageThread_12] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:5号增加积分,队列id:1
11:04:01.853 [ConsumeMessageThread_13] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:8号创建订单,队列id:0
11:04:01.856 [ConsumeMessageThread_14] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:8号扣减库存,队列id:0
11:04:01.858 [ConsumeMessageThread_15] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:8号增加积分,队列id:0
11:04:01.858 [ConsumeMessageThread_16] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:9号创建订单,队列id:1
11:04:01.860 [ConsumeMessageThread_17] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:9号扣减库存,队列id:1
11:04:01.865 [ConsumeMessageThread_18] INFO org.star.OrderMessageConsumer - 我是消费者1,我正在消费消息,消息内容:9号增加积分,队列id:1
消费者2控制台打印结果
11:04:01.815 [ConsumeMessageThread_1] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:2号创建订单,消息队列:2
11:04:01.817 [ConsumeMessageThread_2] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:3号创建订单,消息队列:3
11:04:01.820 [ConsumeMessageThread_2] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:3号扣减库存,消息队列:3
11:04:01.821 [ConsumeMessageThread_2] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:3号增加积分,消息队列:3
11:04:01.820 [ConsumeMessageThread_1] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:2号扣减库存,消息队列:2
11:04:01.821 [ConsumeMessageThread_1] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:2号增加积分,消息队列:2
11:04:01.840 [ConsumeMessageThread_3] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:6号创建订单,消息队列:2
11:04:01.843 [ConsumeMessageThread_4] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:6号扣减库存,消息队列:2
11:04:01.844 [ConsumeMessageThread_5] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:6号增加积分,消息队列:2
11:04:01.845 [ConsumeMessageThread_6] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:7号创建订单,消息队列:3
11:04:01.848 [ConsumeMessageThread_7] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:7号扣减库存,消息队列:3
11:04:01.859 [ConsumeMessageThread_8] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:7号增加积分,消息队列:3
11:04:01.866 [ConsumeMessageThread_9] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:10号创建订单,消息队列:2
11:04:01.868 [ConsumeMessageThread_10] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:10号扣减库存,消息队列:2
11:04:01.868 [ConsumeMessageThread_10] INFO org.star.OrderMessageConsumer - 我是消费者2,我正在消费消息,消息内容:10号增加积分,消息队列:2
3.2.3、结果分析
通过对上述发送消息和接收消息控制台日志的分析,发现订单id相同的消息,都发送到了同一个队列中,也即保证了同一个订单的消息,发送到了同一个队列中。与此同时观察消费者的消费日志,发现队列id相同的消息队列中被分配给同一个消费者了,也即保证了一个队列唯一分配给了一个消费者,保证了顺序消费。另外观察日志发现,对于同一个消费者,同一个组的消息能够保证顺序,而不同组的消息无法保证顺序,也即局部有序。
3.2.4、如何实现全局顺序消费
只需要在生产者端固定将所有消息发送到0号队列即可保证全局有序,这也意味着全局采用单线程消费,这种方式执行效率极差。
@Override
public MessageQueue select(List<MessageQueue> messageQueues, Message message, Object o) {return messageQueues.get(0);
}
3.2.5、顺序消费的使用限制
有序消费模式只支持集群模式(messageModel = MessageModel.CLUSTERING),不支持广播模式(messageModel = MessageModel.BROADCASTING),采用广播模式将无法接收到数据。