学习RocketMQ(记录了个人艰难学习RocketMQ的笔记)

目录

一、部署单点RocketMQ

二、原理篇

三、实操篇

1、引入依赖

2、启动自动装配

3、配置application.yml

4、启动类

5、编写一个统一格式的消息对象

6、生产者

​编辑

7、定义一个constant

8、多/单个消费者订阅一个主题

1.实现消费者

2.编写接口发送消息

3.接口测试

9、测试多个消费者分别订阅不同主题

10、一个消费者订阅多个主题

11、多个消费者组订阅相同主题

1、实现消费者

2、编写接口发送消息

3、接口测试

四、文末大佬好文


一、部署单点RocketMQ

Docker 部署 RocketMQ (图文并茂超详细)_docker 部署rocketmq-CSDN博客

这个博主讲的很好,可食用,替大家实践了一遍

二、原理篇

为什么使用RocketMQ:

为什么选择RocketMQ | RocketMQ

关于一些原理,感觉官网讲的也非常透彻

领域模型概述 | RocketMQ

还有一些功能特性:

普通消息 | RocketMQ

本文的实操篇只是讲了发送普通消息

关于中间件对比,下面我之前有看过一些很好的文章:

Kafka、RabbitMQ、RocketMQ等消息中间件的对比_rabbimq rocket 差异-CSDN博客

rpc和zmq性能对比 rpc mq区别_mob6454cc70642f的技术博客_51CTO博客

RabbitMQ,RocketMQ,Kafka--区别/对比/选型_51CTO博客_rocketmq rabbitmq kafka选型

三、实操篇

先讲讲原理:

如果你需要不同业务,就需要不同消费者组,不要想着同一个消费者组可以通过订阅不同主题达到不同业务,因为同一个消费者组内的功能必须是一致的,可以换个角度想,既然你是一个业务,一个业务就是一个主题嘛,你用不同的业务实现,就多添加几个消费者组,分别订阅那个主题(业务),然后通过不同的Tag区分就行了,而且而且,不要想着说:一个消费者组一个主题通过不同Tag来区分,虽然我在刚刚学习的时候也这样子想,结果踩了一天的坑,看了好多博客好文来理解,在文末也有关于为什么不能这样子做。

1、引入依赖

RocketMQ的依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

demo案例的全部依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bluefoxyu</groupId><artifactId>RocketMQ-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.27</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.16</version></dependency></dependencies></project>

2、启动自动装配

2.2.3 版本的RocketMQ 没有适配 SpringBoot3,只适配SpringBoot2,所以需要自己去配置好自动装配。可以参考我下面这篇文章:

Springboot3+自动装配_springboot3自动装配-CSDN博客

在项目中的 resources 目录下创建 META-INF/spring 文件夹,并创建下面这个文件。

org.springframework.boot.autoconfigure.AutoConfiguration.imports

# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3、配置application.yml

server:port: 8080spring:profiles:active: devrocketmq:name-server: xxx:9876 # NameServer 地址producer:group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局发送者组定义send-message-timeout: 2000# 发送消息失败时的重试次数。设置为 1 表示如果发送失败,会再重试一次(总共尝试两次)。适用于同步发送消息失败时的重试次数。retry-times-when-send-failed: 1# 异步发送失败时的重试次数。设置为 1 表示在异步发送失败时会再尝试一次。适用于异步发送消息失败时的重试次数。retry-times-when-send-async-failed: 1logging:level:com:bluefoxyu:producer: infoconsumer: infocontroller: info

4、启动类

相比这个不必多说了。

RocketMQApplication:
package com.bluefoxyu;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@Slf4j
@SpringBootApplication
public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}
}

5、编写一个统一格式的消息对象

package com.bluefoxyu.message;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serial;
import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEvent implements Serializable {@Serialprivate static final long serialVersionUID = 1L;private String body;private String keys;}

上述实体类实现了Serializable接口,能够正常被序列化或者反序列化。

6、生产者

编写一个生产者,统一做好发送消息的一个模板,方便简化接口实现发送消息的代码编写,显得更加优雅一点,说到发送消息,就需要知道发送到哪个主题,然后哪些消费者组去消费,然后还有每条消息的唯一标识key,唯一标识可以用uuid生成,也可以用redis生成一个增长的不重复的id,这里使用uuid简化。

注意:如果你的项目里面只有一个消费者组,只有一个消费业务,这样子是不需要传Tag(过滤标签)的,但是正常情况都会有多个消息队列任务,下面提供两种重载的方法。

code:

package com.bluefoxyu.producer;import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 封装全体的消息生产者*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GeneralMessageProducer {private final RocketMQTemplate rocketMQTemplate;/*** 发送普通消息** @param topic            消息发送主题,用于标识同一类业务逻辑的消息* @param keys             消息索引键,可根据关键字精确查找某条消息* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/public SendResult sendMessage(String topic, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).build();// 2000L 表示发送消息的超时时间为 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(topic,message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}/*** 发送普通消息** @param topic            消息发送主题,用于标识同一类业务逻辑的消息* @param tag              消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys             消息索引键,可根据关键字精确查找某条消息* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {// 构建消息的 destination (主题和标签)StringBuilder destinationBuilder = StrUtil.builder().append(topic);if (StrUtil.isNotBlank(tag)) {destinationBuilder.append(":").append(tag);  // 设置tag}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置消息的标签.build();// 2000L 表示发送消息的超时时间为 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}}

7、定义一个constant

package com.bluefoxyu.constant;/*** RocketMQ 常量类* @author bluefoxyu*/
public class RocketMQConstant {/*** Group 消费者组定义*/public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";/*** Topic 主题定义*/public static final String MESSAGE_TOPIC_1 = "message_topic_1";public static final String MESSAGE_TOPIC_2 = "message_topic_2";/*** Tag 标签*/public static final String MESSAGE_TAG_A = "message_tag_A";public static final String MESSAGE_TAG_B = "message_tag_B";public static final String MESSAGE_TAG_C = "message_tag_C";}

8、多/单个消费者订阅一个主题

1.实现消费者

这里需要实现监听的消息的实体类类型是什么,GeneralMessageEvent 是我们之前封装的统一消息对象

implements RocketMQListener<GeneralMessageEvent>

在onMessage方法中,通过

JSON.toJSONString(message)

就可以拿到解析好的消息内容,也就是我们真正需要发送的消息,下面我编写三个消费者来进行消费,不过绑定的都是同一个主题,类似负载均衡的功能,这里只用一个消费者也是一样的,因为后续还需要测其他功能,所以这里我写了三个消费者。

GeneralMessageConsumer1:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer1 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者GeneralMessageConsumer1] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer2:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer2 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者GeneralMessageConsumer2] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer3:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer3 implements RocketMQListener<GeneralMessageEvent> {@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者GeneralMessageConsumer3] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}

2.编写接口发送消息

发送消息需要发送这四要素:

  1.  topic 主题 
  2.  key 唯一标识
  3. message 需要发送的消息
package com.bluefoxyu.controller;import com.bluefoxyu.producer.GeneralMessageProducer;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;@RestController
@RequiredArgsConstructor
public class controller {private final GeneralMessageProducer generalMessageDemoProduce;@PostMapping("/send/topic1/general-messageA")public String sendTopic1GeneralMessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);// 返回发送成功的状态名return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageB")public String sendTopic1GeneralMessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageC")public String sendTopic1GeneralMessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}}

3.接口测试

准备这三个测试接口:

开始分别测试三个接口这里就不一一展示了。

看控制台:

如结果消费了三次

9、测试多个消费者分别订阅不同主题

如果相同消费组的三个消费者组分别订阅不同主题,会怎么样呢。修改的代码如下,

当然,哈哈哈哈哈哈,就是消费不到消息(对于小白的我也被困扰了好久),由于是有问题的,代码就不粘贴了【狗头】。如下:

这里参考了一篇大佬的文章:

rocketmq问题汇总-一个consumerGroup只对应一个topic_org.apache.rocketmq.client.exception.mqbrokerexcep-CSDN博客

看完后悟了很多,大概意思就是一个消费者组中的职责应该是一致的,应该都去订阅相同主题的,如果一个消费者订阅了两个主题,那么其他同组的消费者也应该订阅那两个主题,参考评论区这几大佬的评论:

这个大佬就说的很透彻了:

10、一个消费者订阅多个主题

在上面说了既然一个消费者可以订阅多个主题,但是前提条件是同一个消费组中必须订阅相同主题,那应该怎么实现呢。

直接给代码:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 对应主题* consumerGroup 指定消费的分组* RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class GeneralMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消费者GeneralMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));System.out.println("General message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消费者GeneralMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消费失败,异常消息为:{}",e.getMessage());}}}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 对应主题* consumerGroup 指定消费的分组* RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagAMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消费者TagAMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));System.out.println("tagA message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消费者TagAMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消费失败,异常消息为:{}",e.getMessage());}}
}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 对应主题* consumerGroup 指定消费的分组* RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagBMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消费者TagBMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));System.out.println("tagB message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消费者TagBMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消费失败,异常消息为:{}",e.getMessage());}}
}

分别测试三个接口:

参考这位大佬的博客:

rocketmq (消费者消费同一个消费组,不同的topic)_rocketmq一个消费组消费多个topic-CSDN博客

11、多个消费者组订阅相同主题

这个业务经常是有的,希望订阅同一种业务,但是有不同的实现,这时候就需要使用Tag过滤标签来区分了。

1、实现消费者
MessageConsumerA:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_A,selectorExpression = MESSAGE_TAG_A
)
public class MessageConsumerA implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者MessageConsumerA] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerB:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_B,selectorExpression = MESSAGE_TAG_B
)
public class MessageConsumerB implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者MessageConsumerB] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerC:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 对应主题* consumerGroup 指定消费的分组*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_C,selectorExpression = MESSAGE_TAG_C
)
public class MessageConsumerC implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消费者MessageConsumerC] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}

2、编写接口发送消息

再controller添加那三个接口

    @PostMapping("/send/topic2/messageA")public String sendTopic2MessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_A,keys,generalMessageEvent);// 返回发送成功的状态名return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageB")public String sendTopic3MessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_B,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageC")public String sendTopic2MessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_C,keys,generalMessageEvent);return sendResult.getSendStatus().name();}
3、接口测试

消费成功!

也可以参考一个佬的博客:RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?_rocketmq一个topic多个tag-CSDN博客

当然,如果订阅不同主题也是没问题的,这里就不作演示了。

四、文末大佬好文

最后加上两个大佬好文,感觉讲的都很好:

面试官:RocketMQ同一个消费组内的消费者订阅了不同tag,会有问题吗?_rocketmq 订阅多个tag-CSDN博客

面试官:RocketMQ一个消费组内订阅同一个主题不同的TAG为什么会丢消息_为什么rocketmq相同消费组不同tag会有问题-CSDN博客

对于一个消费者组订阅同一个主题不同tag会丢消息,在前几天从0到1学习的时候,以为是可以的,但是踩了大坑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/465182.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全关键型嵌入式系统设计模式整理及应用实例

本文提供了对安全关键型嵌入式系统设计模式的全面概述&#xff0c;这些模式旨在提高系统在面临潜在故障时的安全性和可靠性。文中详细介绍了15种设计模式&#xff0c;包括同质冗余&#xff08;HmD&#xff09;、异质冗余&#xff08;HtD&#xff09;、三模冗余&#xff08;TMR&…

京东零售推荐系统可解释能力详解

作者&#xff1a;智能平台 张颖 本文导读 本文将介绍可解释能力在京东零售推荐系统中的应用实践。主要内容包括以下几大部分&#xff1a;推荐系统可解释定义、系统架构、排序可解释、模型可解释、流量可解释。 推荐系统可解释定义 推荐系统可解释的核心包括三部分&#xff0…

java项目之校园周边美食探索及分享平台(springboot)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的校园周边美食探索及分享平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 校园周边美食…

stack和queue --->容器适配器

不支持迭代器&#xff0c;迭代器无法满足他们的性质 边出边判断 实现 #define _CRT_SECURE_NO_WARNINGS 1 #include<iostream> #include<stack> #include<queue> using namespace std; int main() {stack<int> st;st.push(1);st.push(2);st.push(3);…

vue3动态监听div高度案例

案例场景 场景描述&#xff1a;现在左边的线条长度需要根据右边盒子的高度进行动态变化 实践代码案例 HTML部分 <div v-for"(device, index) in devices" :key"index"><!-- 动态设置 .left-bar 的高度 --><div class"left-bar"…

华为OD机试真题(Python/JS/C/C++)- 考点 - 细节

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题 点这里。 本专栏收录于《华为OD机试真题&#xff08;Python/JS/C/C&#xff09;》。

Spring源码学习(五):Spring AOP

免责声明 本人还处于学习阶段&#xff0c;如果内容有错误麻烦指出&#xff0c;敬请见谅&#xff01;&#xff01;&#xff01;Demo <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId><version>1.8.8<…

vue 使用docx-preview 预览替换文档内的特定变量

在开发合同管理中&#xff0c;需要使用到此功能&#xff0c;就是替换合同模板内的一些字符串&#xff0c;如&#xff1a;甲乙方名称&#xff0c;金额日期等&#xff0c;合同内容不变。效果如下&#xff1a; 使用docx-preview 好处是只预览不可编辑内容。 前端vue import { re…

若依项目搭建

若依的大版本 基本环境搭建 搭建注意点

(11)(2.1.6) Hobbywing DroneCAN ESC(二)

文章目录 前言 2 配置ESC 3 测试 4 设置视频 5 参数说明 前言 具有 CAN 接口&#xff08;including these&#xff09;的业余 ESC 支持 DroneCAN&#xff0c;它允许自动驾驶仪通过 CAN 控制 ESC /电机&#xff0c;并检索单个转速、电压、电流和温度。 2 配置ESC 默认情…

JVM结构图

JVM&#xff08;Java虚拟机&#xff09;是Java编程语言的核心组件之一&#xff0c;负责将Java字节码翻译成机器码并执行。JVM由多个子系统组成&#xff0c;包括类加载子系统、运行时数据区、执行引擎、Java本地接口和本地方法库。 类加载子系统&#xff08;Class Loading Subsy…

【智能算法改进】混沌映射策略--一网打尽

摘要 本文研究了多种混沌映射策略在智能算法中的改进效果&#xff0c;提出了一种综合不同混沌映射策略的多元混合方法&#xff0c;以提高算法的全局优化能力和收敛速度。通过引入不同的混沌映射&#xff08;如 Logistic、Tent、Sine 等&#xff09;生成初始种群分布&#xff0…

QML —— QML调用C++两种方法(附完整测试源码)

代码效果 说明 QML 其实是对JavaScript 的扩展,融合了Qt Object 系统,它是一种新的解释型的语言, QML 引擎虽然由Qt C++ 实现,但QML 对象的运行环境,说到底和C++ 对象的上下文环境是不同的,是平行的两个世界。如果你想在QML 中访问C++ 对象,那么必然要找到一种途径来在两…

剧本杀小程序,市场发展下的新机遇

剧本杀作为休闲娱乐的一种游戏方式&#xff0c;在短时间内进入了大众视野中&#xff0c;受到了广泛关注。近几年&#xff0c;剧本杀行业面临着创新挑战&#xff0c;商家需求寻求新的发展机遇&#xff0c;在市场饱和度下降的趋势下&#xff0c;获得市场份额。 随着科技的不断进…

mysql error:1449权限问题 及 用户授权

一、权限问题 Got error: 1449: The user specified as a definer (skip-grants userskip-grants host) does not exist when using LOCK TABLES 在迁移数据库时&#xff0c;定义的definer&#xff0c;在两个数据库之间不同步时&#xff0c;要将不存在的definer改成数据库中已…

Spark 的Standalone集群环境安装与测试

目录 一、Standalone 集群环境安装 &#xff08;一&#xff09;理解 Standalone 集群架构 &#xff08;二&#xff09;Standalone 集群部署 二、打开监控界面 &#xff08;一&#xff09;master监控界面 &#xff08;二&#xff09;日志服务监控界面 三、集群的测试 &a…

基于SpringBoot+Vue实现新零售商城系统

作者主页&#xff1a;编程千纸鹤 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验&#xff0c;被多个学校常年聘为校外企业导师&#xff0c;指导学生毕业设计并参…

SAP RFC 用户安全授权

一、SAP 通讯用户 对于RFC接口的用户&#xff0c;使用五种用户类型之一的“通讯”类型&#xff0c;这种类型的用户没有登陆SAPGUI的权限。 二、对调用的RFC授权 在通讯用户内部&#xff0c;权限对象&#xff1a;S_RFC中&#xff0c;限制进一步可以调用的RFC函数授权&#xff…

【Java Web】搭建Web环境以及初识JSP Tomcat

文章目录 程序架构Web服务器TomcatJSP概述主要特点基本语法综合示例程序调试和排错 代码示例 程序架构 C/S&#xff08;Client/Server&#xff0c;客户端/服务器&#xff09;和 B/S&#xff08;Browser/Server&#xff0c;浏览器/服务器&#xff09;是两种常见的软件系统架构模…

如何建购物网站提升用户体验

在构建一个购物网站时&#xff0c;用户体验是至关重要的&#xff0c;它直接影响到顾客的满意度和转化率。为了提升用户体验&#xff0c;可以从以下几个方面入手。 首先&#xff0c;网站设计应简洁明了。确保导航栏清晰易懂&#xff0c;让用户在寻找商品时不会迷失。此外&#x…