概述
再引入mq
解耦部分业务操作后,一些场景还需要顺序处理;
这就需要mq
顺序消费了;
rocketmq
的顺序消费关键点在于对messagequeue
的有序消费;
一个topic
下有多个messagequeue
(默认是4个),而且这些messagequeue
可能分布在不同的broker
上;
要想有序消费,先要确保生产者要将有序消费的消息投递到同一个messagequeue
中;
然后这个messagequeue
只有1个消费者消费
,并且这个消费者内部要对消息进行顺序消费
提升顺序消息的能力;
可以将不同业务场景的顺序消息投递到不同的messagequeue
中(比如A业务投递到a队列
,B业务投递到b队列
),先拆一部分;
然后消费的时候可以批量获取消息,减少和broker
的交互次数
rocketmq
版本:4.X
java版本:1.8
生产者顺序投递
可以通过相同的shardkey
,把消息投递到同一个message queue
中
官方的代码demo
https://rocketmq.apache.org/docs/4.x/producer/03message2
public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//arg就是消息发送时的对象;这个demo中是orderIdInteger id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}
消费者顺序消费
核心在于让消费者顺序处理message queue
中的消息;
rocketmq
的1个message queue
只会分配给同一个分组
下的1个消费者
,所以这个时候消费者只要保证顺序消费就行;
push模式(DefaultMQPushConsumer实例化)
的消费者,一般有3
个因素影响;
batchPullSize
:一次从broker
拉取消息的数量,拉取后按照下面2个
参数分批次并行消费
consumeThreadMin和consumeThreadMax
:消费者启动时有多少个线程去处理消息
consumeMessageBatchMaxSize
:消费者每个线程每次处理的消息数量
可以使用org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
来替换org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
public class Consumer {/*** 消费者实体对象*/private DefaultMQPushConsumer consumer;/*** 消费者组*/public static final String CONSUMER_GROUP = "consumer_group";/*** 通过构造函数 实例化对象*/public Consumer() throws MQClientException {consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);consumer.setNamesrvAddr("IP:9876");//TODO 这里真的是个坑,我product设置VipChannelEnabled(false),但消费者并没有设置这个参数,之前发送普通消息的时候也没有问题。能正常消费。//TODO 但在顺序消息时,consumer一直不消费消息了,找了好久都没有找到原因,直到我这里也设置为VipChannelEnabled(false),竟然才可以消费消息。consumer.setVipChannelEnabled(false);//订阅主题和 标签( * 代表所有标签)下信息consumer.subscribe(JmsConfig.TOPIC, "*");//注册消费的监听 这里注意顺序消费为MessageListenerOrderly 之前并发为ConsumeConcurrentlyContextconsumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {//获取消息MessageExt msg = msgs.get(0);//消费者获取消息 这里只输出 不做后面逻辑处理log.info("Consumer-线程名称={},消息={}", Thread.currentThread().getName(), new String(msg.getBody()));return ConsumeOrderlyStatus.SUCCESS;});consumer.start();}
}
MessageListenerConcurrently
顺序消费解析
从org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
中可以看到,在启动时判断为顺序消费时;
会启动一个线程,对message
对应的processQueue
设置是否锁定;没有锁定时,在提交消息消费任务给消费线程池时会再次尝试加锁
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start
消息在消费时,不管顺序消费还是并行消费都是提交给线程池去消费;顺序消费在提交给线程池时会对message queue
使用synchronized
锁
提交消息消费任务给线程池
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
对message queue
进行加锁
org.apache.rocketmq.client.impl.consumer.MessageQueueLock#fetchLockObject