消息中间件以前常用RabbitMQ和ActiveMQ,由于业务需要,后期业务偏向大数据,现着重学习一下RocketMQ(RocketqMQ原理同ctg-mq),后续更新Kafka
一、RocketMQ特性
-
Kafka特性 (高性能分布式)
吞吐量大,支持消息大量推挤,支持topic离线,支持分布式,使用ZooKeeper实现负载均衡,支持Hadoop数据并行加载 -
RocketMQ特性
(1)Broker 服务器:Broker 服务器是RocketMQ的核心,主要功能:消息的处理(接收生产者Producer发送过来的消息(持久化),推送消息给消费者Consumer),消息的存储。NameServer 服务器:记录Producert信息、Broker信息、Consumer信息、Topic主题信息,NameServer服务器在这里作为控制中心、注册中心、路由,
服务启动顺序,先启动NameServer再启动Broker,将Broker服务器的ip注册到NameServer服务中
业务流程:如果生产者Producer需要推送消息至Broker服务器中,需要先去NameServer服务器中查找到对应的Broker服务器,然后生产者端Producer与Broker服务器建立连接
(2)能够保证严格的消息顺序(顺序消费、顺序拉取)
丰富的消息拉取模式:push模式(等待Broker推送消息,推荐使用,),pull模式(主动向Broker主动拉取消息),pull与push模式同时可以满足使用需求的情况下,建议优先使用push模式
(3)可以多节点生产和多节点消费
(4)消息事务机制,目前只有RocketMQ支持,Kafka和RabbitMQ不支持
(5)亿级消息堆积
(6)吞吐量高,但比Kafka低
(7)消息重推、死信队列 -
RabbitMQ特性
吞吐量比Kafka、RocketMQ低…
二、RocketMQ消费模式
1.Push推模式-DefaultMQPushConsumer原理
Consumer消费者向Broker服务器发送请求,Consumer通过请求与Broker服务器保持一种长连接的形式,Broker服务器每5s检查一次是否存在消息,如果有就推送给Consumer消费者
2.Pull拉模式-DefaultMQPullConsumer原理
Consumer消费者主动去Broker服务器拉取数据,一般使用本地定时任务去拉取,由于需要保证消息的及时性,一般推荐使用Push推模式订阅消息
3. 轮询监控机制
RocketMQ默认将Producer生产者消息发送至4(不一定4个)个队列中进行存储,Consumer消费方通过轮询的方式去监控这个4个队列(轮询监控机制)
4.ack机制
LocalTransactionState标识消息的状态,通过判断返回的枚举值enum做出相应处理
- COMMIT_MESSAGE
消息可见,目前事务消息分为提交不可见消息和可见消息 - ROLLBACK_MESSAGE
消息需要回滚 - UNKNOW
消息异常或超时时返回该枚举值,重复回查信息
三、RocketMQ实战
(1)消息发送实现流程
- 引入pom依赖,目前最新版本为5.3.0,推荐使用4.4.0
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>版本</version>
</dependency>
或
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
- 创建DefualtMQProducer实例对象
- 设置NaemServer地址
- 开启DefaultMQProducer
- 创建消息Message
- 发送消息
- 关闭DefaultMQProducer
(2)消息消费流程实现
目前业务需要实现消费业务,着重学习消费端逻辑
- 创建DefaultMQPushConsumer
- 设置NameServer地址
- 设置subscribe,这里是要读取的主题信息
- 创建监听器MessageListener
- 获取消息信息
- 返回消息读取状态
消费者流程代码,这里使用Push推模式实现,并设置了消息拉取最大上限setConsumeMessageBatchMaxSize(2)为2条消息
监听器这么选择普通监听器MessageListenerConcurrently,如果需要实现顺序消费可以选用MessageListenerOrderly
消息消费时如果有异常出现,注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发RocketMQ消息重推机制
如果消息消费成功,只需返回枚举类enum ConsumeConcurrentlyStatus.CONSUME_SUCCESS即可表示消息推送成功
public class Consumer {public static void main(String[] args) throws MQClientException {//1. 创建DefaultMQPushConsumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");//2. 设置NameServer地址consumer.setNamesrvAddr("192.168.211.141:9876");//3. 设置subscribe,这里是要读取的主题信息consumer.subscribe("Topic_Name",//执行要消费的主题"Tags || TagsA || TagsB");//过滤规则 "*"则表示全部订阅//4. 创建监听器MessageListener//4.1 设置消息拉取最大数(上限)-最大拉取两条consumer.setConsumeMessageBatchMaxSize(2);consumer.setMessageListener(new MessageListenerConcurrently() {/*MessageListenerConcurrently 普通消息的接收MessageListenerOrderly 顺序消息的接收*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {/*** List<MessageExt> msgs 可以从Broker获取多条数据,默认是32条,可以设置上限*///5. 获取消息信息//迭代消息信息for (MessageExt msg : msgs) {//获取主题String topic = msg.getTopic();//获取标签String tags = msg.getTags();//获取信息byte[] body = msg.getBody();try {String result = new String(body, RemotingHelper.DEFAULT_CHARSET);//todo 实现业务...System.out.println("Consumer消费信息--topic: " + topic + ", tags: " + tags + ", result: "+ result);} catch (UnsupportedEncodingException e) {//throw new RuntimeException(e);//注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发重推机制e.printStackTrace();//消息消费失败,重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}//6. 返回消息读取状态//消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return null;}});//开启RockerMQ消费端consumer.start();}}