Windows和Linux环境中安装RocketMQ并集成SpringBoot进行本地测试(史上最全)

1.Windows环境中安装RocketMQ

1.1 安装前的环境准备

JDK1.8、Maven、Git

1.2 RocketMQ下载

RocketMQ · 官方网站 | RocketMQ (apache.org)
按照如下进行下载:
在这里插入图片描述

1.3 配置环境变量

在系统环境变量中新增如下配置:

变量名:ROCKETMQ_HOME
变量参数:D:\exploit\rocketmq-all-5.1.3   #自己本地的rocketmq地址

1.4 启动服务

  • 启动NAMESERVER
    ​ cmd中进入rocketMQ bin目录下执行
start mqnamesrv.cmd

在这里插入图片描述
出现如下页面表示启动成功:
在这里插入图片描述

  • 启动BROKER
    cmd中进入rocketMQ bin目录下执行
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

出现如下页面表示启动成功:
在这里插入图片描述

1.5 RocketMQ可视化插件

1.5.1 下载

https://gitee.com/liuhuanhuan963019/rocketmq-externals.git (下载打包即可运行)

这是我的个人仓库,github中新版的尝试了很久很久,一直无法进行打包,jar下载存在很大的问题(可能被墙了),但是尝试了其他方法之后也无法进行下载,在网上找到了其他博主发的旧版的地址,测试过后可以正常使用。如果需要正版地址,请去如下仓库进行下载。

https://github.com/apache/rocketmq-externals.git

https://github.com/apache/rocketmq-dashboard.git

1.5.2 服务地址配置

下载完成之后,进入 rocketmq-externals\rocketmq-console\src\main\resources\application.properties 进行配置,如下图所示:

server.contextPath=
# 随意定义
server.port=8089   ### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
# 改成本地的RocketMQ服务地址
rocketmq.config.namesrvAddr=127.0.0.1:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false

1.5.3 打包启动

进入rocketmq-externals\rocketmq-console 文件夹下,执行打包命令

mvn clean package -D maven.test.skip=true

出现下图即表示打包成功:(此过程打包时间教程,需要等待时间比较长久
在这里插入图片描述
打包成功之后,进入 target 文件夹下,执行如下命令启动应用程序

java -jar rocketmq-console-ng-1.0.1.jar

在这里插入图片描述
出现如下界面表示启动成功啦!!!
在这里插入图片描述
然后在浏览器中查看localhost:8089 如图所示,启动成功
在这里插入图片描述

1.6 在运行过程中可能出现的错误

问题:rocketmq启动 错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_181\lib;C:\Program

解决方法:如果本地安装了Java,并且环境变量已经配置,按照如下步骤进行操作,如果没有配置环境变量,那么先配置环境变量。

​ 打开rocketMQ主目录下的bin,在bin中找到runbroker.cmd文件,在文件中将CLASSPATH加上英文双引号,重启即可。具体如下图:
在这里插入图片描述
配置之后重新启动即可。

2.Linux环境中安装RocketMQ

2.1 创建namesrv服务

2.1.1 拉取镜像

docker pull rocketmqinc/rocketmq

在这里插入图片描述

2.1.2 创建namesrv数据存储

mkdir -p  /docker/rocketmq/data/namesrv/logs   /docker/rocketmq/data/namesrv/store

2.1.3 启动namesrv,构建namesrv容器

docker run -d --restart=always --name rmqnamesrv -p 9876:9876 -v /docker/rocketmq/data/namesrv/logs:/root/logs -v /docker/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv 

出现如下页面即表示启动成功:
在这里插入图片描述

  • -restart=always docker重启时候容器自动重启
  • -name rmqnamesrv 把容器的名字设置为rmqnamesrv
  • -p 9876:9876 把容器内的端口9876挂载到宿主机9876上面
  • -v /docker/rocketmq/data/namesrv/logs:/root/logs 把容器内的/root/logs日志目录挂载到宿主机的 /docker/rocketmq/data/namesrv/logs目录
  • -v /docker/rocketmq/data/namesrv/store:/root/store 把容器内的/root/store数据存储目录挂载到宿主机的 /docker/rocketmq/data/namesrv目录
  • rmqnamesrv 容器的名字
  • -e “MAX_POSSIBLE_HEAP=100000000” 设置容器的最大堆内存为100000000
  • rocketmqinc/rocketmq 使用的镜像名称
  • sh mqnamesrv 启动namesrv服务

具体的参数含义,请参考本文作者其他文章:Docker使用容器命令

2.2 创建broker节点

2.2.1 创建broker数据存储路径

mkdir -p  /docker/rocketmq/data/broker/logs   /docker/rocketmq/data/broker/store /docker/rocketmq/conf

2.2.2 创建配置文件

创建文件目录

vi /docker/rocketmq/conf/broker.conf

在配置文件中插入如下内容

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址  
brokerIP1 = 172.28.216.81  
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95

2.2.3 构建broker容器

docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv \
-p 10911:10911 -p 10909:10909 -v  /docker/rocketmq/data/broker/logs:/root/logs \
-v  /docker/rocketmq/data/broker/store:/root/store \
-v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq \
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 

在这里插入图片描述

  • -name rmqbroker 把容器的名字设置为rmqbroker
  • –link rmqnamesrv:namesrv 和rmqnamesrv容器通信
  • -p 10911:10911 把容器的非vip通道端口挂载到宿主机
  • -p 10909:10909 把容器的vip通道端口挂载到宿主机
  • -e “NAMESRV_ADDR=namesrv:9876” 指定namesrv的地址为本机namesrv的ip地址:9876
  • -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker | 指定broker服务的最大堆内存
  • rocketmqinc/rocketmq 使用的镜像名称
  • sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 指定配置文件启动broker节点

2.3 创建rockermq-console服务

2.3.1 拉取镜像

docker pull pangliang/rocketmq-console-ng

在这里插入图片描述

2.3.2 构建rockermq-console容器

docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.28.216.81:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 9999:8080 \
pangliang/rocketmq-console-ng
  • -e 填写的是namesrv的地址
  • -restart=always docker重启时候镜像自动重启
  • -name rmqadmin 把容器的名字设置为rmqadmin
  • -e “JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 设置namesrv服务的ip地址
  • -Dcom.rocketmq.sendMessageWithVIPChannel=false” 不使用vip通道发送消息
  • –p 9999:8080 把容器内的端口8080挂载到宿主机上的9999端口

2.3.3 可能出现的问题

问题:启动容器报错:iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 9999 -j DNAT --to-destination 172.17.0.6:8080 ! iptables: No chain/target/match by that name

解决方法:重启docker

systemctl restart docker

问题:docker run -d \ --restart=always \ --name rmqadmin \ -e "JAVA_OPTS=-Drocketm

解决方法:注意配置broker.confbrokerIP1 为本服务器内网地址即可

参考文章:使用docker安装RocketMQ_docker rocketmq_皓亮君的博客-CSDN博客

3.本地RocketMQ集成SpringBoot测试

文末有惊喜哦,嘿嘿。。。。。

先前准备:新建四个项目用于测试

mqproductservice – 消息生产服务(提供接口用于创建消息到MQ中)
mqconsumerservice – 处理类型1的消息服务
mqconsumer1service – 处理类型1的消息服务
mqconsumer2service – 处理类型2的消息服务

网页版生成SpringBoot项目地址:Spring Initializr

3.1 简单消息操作

3.1.1 mqproductservice生产者配置

3.1.1.1 pom.xml配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mqproductservice</name><description>mq product service</description><properties><java.version>1.8</java.version></properties><dependencies><!--springboot--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version><scope>provided</scope></dependency><!--rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.1.1.2 application.yml配置如下:
server:port: 8082rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-grouplogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
#    com.anran.projectmanage.mapper: DEBUG
3.1.1.3 新建接口生产者生产消息
package com.example.demo.controller;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/mqMessageController")
public class MqMessageController {private static final Logger log = LoggerFactory.getLogger(MqMessageController.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;@RequestMapping("/pushMessage.action")public String get(@RequestParam("id") int id) {rocketMQTemplate.convertAndSend("first-topic","你好,我的小可爱" + id);return "connected";}
}
3.1.1.4 使用接口测试工具测试结果

测试结果如下:
在这里插入图片描述

3.1.1.4 RockeMQ可视化页面中查看

结果如图所示:
在这里插入图片描述

3.1.2 mqconsumerservice 消费者设置

3.1.2.1 pom.xml配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mqproductservice</name><description>mq product service</description><properties><java.version>1.8</java.version></properties><dependencies><!--springboot--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version><scope>provided</scope></dependency><!--rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.1.2.2 application.yml配置如下
server:port: 8083rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-grouplogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
#    com.anran.projectmanage.mapper: DEBUG
3.1.2.3 创建消费者类
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "first-topic",consumerGroup = "my-consumer-group")
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println(message);}
}
3.1.2.4 测试结果

在这里插入图片描述
这不就出现了吗~~

3.2 不同类型的消息发送 (以下列出部分代码,4个项目的pom配置是一样的和3.1中一致,下面的文章中不再列出)

3.2.1 消息生产者(mqproductservice)

3.2.1.1 application.yml配置
server:port: 8082#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#生产者配置producer:#组名group: anran-producer-group#目的地(topic:tag)#topictopic: anran-topic#sync tag(同步消息tag)sync-tag: anran-sync-tags#async tag(异步消息tag)async-tag: anran-async-tags#oneway tag(单向消息tag)oneway-tag: anran-oneway-tagslogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFOcom.anran.projectmanage.mapper: DEBUG
3.2.1.2 提供不同类型的接口下发不同类型的消息
  • MqMessageController.java

    package com.example.demo.controller;import com.alibaba.fastjson.JSONObject;import com.example.demo.listener.SendCallbackListener;
    import com.example.demo.model.OrderStep;
    import com.example.demo.model.ResponseMsg;
    import com.example.demo.util.ListSplitter;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
    import java.util.List;@RestController
    @RequestMapping("/mqMessageController")
    public class MqMessageController {private static final Logger log = LoggerFactory.getLogger(MqMessageController.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.sync-tag}")private String syncTag;@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.async-tag}")private String asyncag;@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.oneway-tag}")private String onewayTag;/*** rocketmq 同步消息** @param id 消息* @return 结果*/@RequestMapping("/pushMessage.action")public ResponseMsg pushMessage(@RequestParam("id") int id) {log.info("pushMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置发送地和消息信息并发送同步消息SendResult sendResult = rocketMQTemplate.syncSend(syncTag, message);log.info("pushMessage finish : " + id + ", sendResult : " + JSONObject.toJSONString(sendResult));ResponseMsg msg = new ResponseMsg();// 解析发送结果if (sendResult.getSendStatus() == SendStatus.SEND_OK) {msg.setSuccessData(sendResult.getMsgId() + " : " + sendResult.getSendStatus());}return msg;}/*** 发送异步消息** @param id 消息* @return 结果*/@RequestMapping("/pushAsyncMessage.action")public ResponseMsg pushAsyncMessage(@RequestParam("id") int id) {log.info("pushAsyncMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置发送地和消息信息并发送异步消息rocketMQTemplate.asyncSend(asyncag, message, new SendCallbackListener(id));log.info("pushAsyncMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** 发送单向消息(不关注发送结果:记录日志)** @param id 消息* @return 结果*/@RequestMapping("/pushOneWayMessage.action")public ResponseMsg pushOneWayMessage(@RequestParam("id") int id) {log.info("pushOneWayMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置发送地和消息信息并发送单向消息rocketMQTemplate.sendOneWay(onewayTag, message);log.info("pushOneWayMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** 发送包含顺序的单向消息** @param id 消息* @return 结果*/@RequestMapping("/pushSequeueMessage.action")public ResponseMsg pushSequeueMessage(@RequestParam("id") int id) {log.info("pushSequeueMessage start : " + id);// 创建三个不同订单的不同步骤for (int i = 0; i < 3; i++) {// 处理当前订单唯一标识String myId = id + "" + i;// 获取当前订单的操作步骤列表List<OrderStep> myOrderSteps = OrderStep.buildOrderSteps(myId);// 依次操作步骤下发消息队列for (OrderStep orderStep : myOrderSteps) {// 构建消息String messageStr = String.format("order id : %s, desc : %s", orderStep.getId(), orderStep.getDesc());Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, orderStep.getId()).build();// 设置顺序下发rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {/*** 设置放入同一个队列的规则* @param list 消息列表* @param message 当前消息* @param o 比较的关键信息* @return 消息队列*/@Overridepublic MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {// 根据当前消息的id,使用固定算法获取需要下发的队列// (使用当前id和消息队列个数进行取模获取需要下发的队列,id和队列数量一样时,选择的队列坑肯定一样)int queueNum = Integer.valueOf(String.valueOf(o)) % list.size();log.info(String.format("queueNum : %s, message : %s", queueNum, new String(message.getBody())));return list.get(queueNum);}});// 设置发送地和消息信息并发送消息(Orderly)rocketMQTemplate.syncSendOrderly(syncTag, message, orderStep.getId());}}log.info("pushSequeueMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** rocketmq 延迟消息** @param id 消息* @return 结果*/@RequestMapping("/pushDelayMessage.action")public ResponseMsg pushDelayMessage(@RequestParam("id") int id) {log.info("pushDelayMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置超时和延时推送// 超时时针对请求broker然后结果返回给product的耗时// 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";SendResult sendResult = rocketMQTemplate.syncSend(syncTag, message, 1 * 1000l, 4);log.info("pushDelayMessage finish : " + id + ", sendResult : " + JSONObject.toJSONString(sendResult));ResponseMsg msg = new ResponseMsg();// 解析发送结果if (sendResult.getSendStatus() == SendStatus.SEND_OK) {msg.setSuccessData(sendResult.getMsgId() + " : " + sendResult.getSendStatus());}return msg;}/*** 同时发送10个单向消息(真正的批量)** @param id 消息* @return 结果*/@RequestMapping("/pushBatchMessage.action")public ResponseMsg pushBatchMessage(@RequestParam("id") int id) {log.info("pushBatchMessage start : " + id);// 创建消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 3; i++) {String myId = id + "" + i;// 处理当前订单唯一标识String messageStr = "order id : " + myId;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, myId).build();messages.add(message);}// 批量下发消息到broker,不支持消息顺序操作,并且对消息体有大小限制(不超过4M)ListSplitter splitter = new ListSplitter(messages, 1024 * 1024 * 4);while (splitter.hasNext()) {List<Message> listItem = splitter.next();rocketMQTemplate.syncSend(syncTag, listItem);}log.info("pushBatchMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** sql过滤消息** @param id 消息* @return 结果*/@RequestMapping("/pushSqlMessage.action")public ResponseMsg pushSqlMessage(@RequestParam("id") int id) {log.info("pushSqlMessage start : " + id);// 创建消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {String myId = id + "" + i;// 处理当前订单唯一标识String messageStr = "order id : " + myId;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, myId).setHeader("money", i).build();messages.add(message);}rocketMQTemplate.syncSend(syncTag, messages);log.info("pushSqlMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** 事务消息** @param id 消息* @return 结果*/@RequestMapping("/pushTransactionMessage.action")public ResponseMsg pushTransactionMessage(@RequestParam("id") int id) {log.info("pushTransactionMessage start : " + id);// 创建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).setHeader("money", 10).setHeader(RocketMQHeaders.TRANSACTION_ID, id).build();TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(syncTag, message, null);log.info("pushTransactionMessage result : " + JSONObject.toJSONString(transactionSendResult));log.info("pushTransactionMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}
    }
    
  • SendCallbackListener.java

package com.example.demo.listener;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;/*** rocketmq异步回调监听*/
@Slf4j
public class SendCallbackListener implements SendCallback {private int id;public SendCallbackListener(int id) {this.id = id;}@Overridepublic void onSuccess(SendResult sendResult) {log.info("CallBackListener on success : " + JSONObject.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {log.error("CallBackListener on exception : ", throwable);}
}
  • TransactionListener.java
package com.example.demo.listener;import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;import java.util.HashMap;
import java.util.Map;/*** rocketmq异步回调监听*/
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TransactionListener implements RocketMQLocalTransactionListener {private static final Map<String, RocketMQLocalTransactionState> TRANSACTION_STATE_MAP = new HashMap<>();/*** 处理本地事务* @param message* @param o* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {log.info("执行本地事务");MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);TRANSACTION_STATE_MAP.put(transactionId, RocketMQLocalTransactionState.UNKNOWN);log.info("transactionId is {}",transactionId);try {Thread.sleep(10 * 1000l);} catch (InterruptedException e) {e.printStackTrace();}RocketMQLocalTransactionState state = RocketMQLocalTransactionState.ROLLBACK;if (Integer.parseInt(transactionId) % 2 == 0) {//执行成功,可以提交事务state = RocketMQLocalTransactionState.COMMIT;}log.info("transactionId is {}, state {}",transactionId, state.toString());TRANSACTION_STATE_MAP.remove(transactionId);return state;}/*** 校验事务状态* @param message* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("检查本地事务,事务ID:{}",transactionId);RocketMQLocalTransactionState state = TRANSACTION_STATE_MAP.get(transactionId);if(null != state){return state;}return RocketMQLocalTransactionState.ROLLBACK;}
}
  • OrderStep.java
package com.example.demo.model;import lombok.Data;import java.util.ArrayList;
import java.util.List;/*** 订单消息模型*/
@Data
public class OrderStep {/*** 订单id*/private String id;/*** 操作步骤*/private String desc;/*** 步骤构建方法* @param id 订单唯一标识* @param desc 步骤描述*/public OrderStep(String id, String desc) {this.id = id;this.desc = desc;}/*** 构建订单的所有操作步骤:创建、支付、回调* @param id 订单唯一标识* @return 订单操作步骤列表*/public static List<OrderStep> buildOrderSteps(String id) {List<OrderStep> orderSteps = new ArrayList<>();OrderStep createStep = new OrderStep(id, "create order");orderSteps.add(createStep);OrderStep payStep = new OrderStep(id, "pay order");orderSteps.add(payStep);OrderStep callbackStep = new OrderStep(id, "call back order");orderSteps.add(callbackStep);return orderSteps;}
}
  • ResponseMsg.java
package com.example.demo.model;import lombok.Data;@Data
public class ResponseMsg {public static final int CODE_FAIL = 500;public static final int CODE_SUCCESS = 200;public static final String MSG_SUCCESS = "success";public static final String MSG_FAIL = "fail";private int code;private String msg;private Object data;public ResponseMsg() {this.code = CODE_FAIL;this.msg = MSG_FAIL;}public void setSuccessData(Object data) {this.code = CODE_SUCCESS;this.msg = MSG_SUCCESS;this.data = data;}
}
  • ListSplitter.java
package com.example.demo.util;import java.util.Iterator;
import java.util.List;/*** LIST对象进行分割*/
public class ListSplitter<T> implements Iterator<List<T>> {/*** 所有内容*/private final List<T> ts;/*** 多少数据进行分割*/private int size;/*** 已经分割的索引*/private int currIndex;/*** 构造方法* @param ts 所有内容* @param size 分割数量*/public ListSplitter(List<T> ts, int size) {this.ts = ts;this.size = size;}/*** 实现是否还厚后续数据* @return true:有 false:无*/@Overridepublic boolean hasNext() {return currIndex < ts.size();}@Overridepublic List<T> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < ts.size(); nextIndex++) {T t = ts.get(nextIndex);totalSize = totalSize + t.toString().length();if (totalSize > size) {break;}}List<T> subList = ts.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

3.2.2 消息消费者(mqconsumerservice服务)

3.2.2.1 application.yml配置
server:port: 8083#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#消费者配置consumer:#组名group: anran-consumer-group#监听主题topic: anran-topic#tags(监听多个tag时使用 || 进行分割,如果监听所有使用*或者不填)tags: anran-sync-tags||anran-async-tags||anran-oneway-tagslogging:file:path: /usr/log/mqconsumerservice/mqconsumerservice.loglevel:root: INFO
3.2.2.2 监听接口配置

Consumer.java

package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 设置消息监听* 监听组:监听topic:监听tag(默认监听topic下所有)* 监听消费模式:默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)**/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}",selectorExpression = "${rocketmq.consumer.tags}", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println(message);}
}

3.2.3 消息消费者(mqconsumer1service服务)

3.2.3.1 application.yml配置
server:port: 8085#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#消费者配置consumer:#组名group: anran-consumer-group#监听主题topic: anran-topic#tags(监听多个tag时使用 || 进行分割,如果监听所有使用*或者不填),支持sql过滤#    tags: anran-sync-tags||anran-async-tags||anran-oneway-tagstags: money > 5logging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
3.2.3.2 监听接口配置

Consumer.java

package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 设置消息监听* 1.监听组(consumerGroup):监听topic(topic):监听tag(selectorExpression)(默认监听topic下所有)* 2.监听消费模式(messageModel):默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)* 3.设置顺序消息处理模式(consumeMode)(默认是所有线程可以处理同一个消息队列(ConsumeMode.CONCURRENTLY),当前消息没有线程在执行时其他线程才能够执行(ConsumeMode.ORDERLY)。*   ps:一个线程顺序执行一个队列表时消息监听必须使用负载均衡messageModel = MessageModel.BROADCASTING)* 4.设置过滤tags类型:默认时tag(SelectorType.TAG),可以修改为sql语法(SelectorType.SQL92):*/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}",selectorExpression = "${rocketmq.consumer.tags}", messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY, selectorType = SelectorType.SQL92)
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info(message);}
}

3.2.4 消息消费者(mqconsumer2service服务)

3.2.4.1 application.yml 配置
server:port: 8084#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#消费者配置consumer:#组名group: anran-consumer-group#监听主题topic: anran-topic#tags(监听多个tag时使用 || 进行分割,如果监听所有使用*或者不填)tags: anran-sync-tags||anran-async-tags||anran-oneway-tagslogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
3.2.4.2 监听接口配置

Consumer.java

package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 设置消息监听* 1.监听组(consumerGroup):监听topic(topic):监听tag(selectorExpression)(默认监听topic下所有)* 2.监听消费模式(messageModel):默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)* 3.设置顺序消息处理模式(consumeMode)(默认是所有线程可以处理同一个消息队列(ConsumeMode.CONCURRENTLY),当前消息没有线程在执行时其他线程才能够执行(ConsumeMode.ORDERLY)。*   ps:一个线程顺序执行一个队列表时消息监听必须使用负载均衡messageModel = MessageModel.BROADCASTING)*/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}",selectorExpression = "${rocketmq.consumer.tags}", messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info(message);try {Thread.sleep(10 * 1000l);} catch (InterruptedException e) {e.printStackTrace();}}
}

3.2.5 测试

api接口测试下
在这里插入图片描述

成功返回数据,注意上图中的返回结果。

此时在mqconsumer1service消费者服务中,成功接收消息。
在这里插入图片描述
终于打工告成,完结撒花~~

本文参考文章地址:
https://blog.csdn.net/qq_34940987/article/details/120530597

下面给出上面4个项目的仓库地址:

https://gitee.com/liuhuanhuan963019/mqproductservice.git

https://gitee.com/liuhuanhuan963019/mqconsumerservice.git

https://gitee.com/liuhuanhuan963019/mqconsumer1service.git

https://gitee.com/liuhuanhuan963019/mqconsumer2service.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/113432.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新增!视频智能分析/AI算法智能分析网关V5告警功能添加教程来咯!

智能分析网关系列是基于边缘AI计算技术&#xff0c;可对前端摄像头采集的视频流进行实时检测分析&#xff0c;能对监控画面中的人、车、物进行识别&#xff0c;可实现的检测包括&#xff1a;人脸检测与识别、车辆检测与识别、烟火识别、安全帽/反光衣识别、区域入侵识别等&…

IIS搭建本地电脑服务器:通过内网穿透技术实现公网访问的步骤指南

1.前言 在网上各种教程和介绍中&#xff0c;搭建网页都会借助各种软件的帮助&#xff0c;比如网页运行的Apache和Nginx、数据库软件MySQL和MSSQL之类&#xff0c;为方便用户使用&#xff0c;还出现了XAMPP、PHPStudy、宝塔面板等等一系列集成服务&#xff0c;都是为了方便我们…

AMEYA360:兆易创新获得ISO 26262 ASIL D流程认证, 汽车功能安全管理体系再上新台阶

中国北京(2023年8月29日) —— 业界半导体器件供应商兆易创新GigaDevice(股票代码 603986)今日宣布&#xff0c;获得由国际公认的测试、检验和认证机构通标标准技术服务有限公司(以下简称SGS)授予的ISO 26262:2018汽车功能安全最高等级ASIL D流程认证证书&#xff0c;这标志着兆…

PPPoE连接无法建立的排查和修复

嗨&#xff0c;亲爱的读者朋友们&#xff01;你是否曾经遇到过PPPoE连接无法建立的问题&#xff1f;今天我将为你详细解析排查和修复这个问题的步骤。 检查物理连接 首先&#xff0c;我们需要确保物理连接没有问题。请按照以下步骤进行检查&#xff1a; - 检查网线是否插好&…

微服务(rpc)

微服务&#xff08;rpc&#xff09; 微服务必备的模块生产者消费者管理平台流量控制集群情况下如何做到流量监控 负载均衡服务发现和治理序列化传输序列化和反序列化 微服务是一种架构风格&#xff0c;将一个应用程序拆分为一组小型、独立的服务&#xff0c;每个服务都可以独立…

C++二叉搜索树

C二叉搜索树 二叉搜索树概念二叉搜索树操作结点类的实现中序遍历实现二叉搜索树的插入非递归实现递归实现 二叉搜索树的查找非递归实现递归实现 二叉搜索树的删除非递归实现递归实现 构造函数拷贝构造函数析构函数赋值运算符重载 二叉搜索树的应用二叉搜索树的性能分析 二叉搜索…

一键快速还原修复人脸,CodeFormer 助力人脸图像修复

今天在查资料的时候无意间看到了一个很有意思的工具&#xff0c;就是CodeFormer &#xff0c;作者给出来的说明是用于人脸修复任务的&#xff0c;觉得很有意思就拿来实践了一下&#xff0c;这里记录分享一下。 首先对人脸修复任务进行简单的回顾总结&#xff1a; 人脸修复是指…

【Docker】01-Centos安装、简单使用

参考教程&#xff1a; https://www.bilibili.com/video/BV1Qa4y1t7YH/?p5&spm_id_frompageDriver&vd_source4964ba5015a16eb57d0ac13401b0fe77 什么是Docker&#xff1f; Docker是一种开源的容器化平台&#xff0c;用于构建、打包、部署和运行应用程序。它通过使用容…

冠达管理 :主升浪前最后一次洗盘?

随着科技的不断发展&#xff0c;人们关于金融商场的了解也越来越深入。在股市中&#xff0c;洗盘是一个非常重要的概念。洗盘是指许多的股票被清洗出某个价位上的持有者&#xff0c;从而拉低该价位上的股票价格&#xff0c;为后续上涨做出铺垫。而在股市中&#xff0c;主升浪前…

算法工程题(非递减顺序 排列)

* 题意说明&#xff1a; * 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c; * 分别表示 nums1 和 nums2 中的元素数目。 * 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。…

Flutter(九)Flutter动画和自定义组件

目录 1.动画简介2.动画实现和监听3. 自定义路由切换动画4. Hero动画5.交织动画6.动画切换7.Flutter预置的动画过渡组件自定义组件1.简介2.组合组件3.CustomPaint 和 RenderObject 1.动画简介 Animation、Curve、Controller、Tween这四个角色&#xff0c;它们一起配合来完成一个…

2023年高教社杯 国赛数学建模思路 - 复盘:人力资源安排的最优化模型

文章目录 0 赛题思路1 描述2 问题概括3 建模过程3.1 边界说明3.2 符号约定3.3 分析3.4 模型建立3.5 模型求解 4 模型评价与推广5 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 描述 …

maven推包The environment variable JAVA_HOME is not correctly set

解决办法&#xff1a; 打开idea查看jdk安装位置 1.在/etc下面创建&#xff08;如果存在就是更新&#xff09;launchd.conf。里面添加一行&#xff1a; setenv JAVA_HOME /Library/Java/JavaVirtualMachines/jdk1.8.0_351.jdk/Contents/Home #JAVA_HOME后面是我的java安装路径…

Python正则表达式简单教程

当涉及到处理文本数据时&#xff0c;正则表达式是一个非常有用的工具。它可以用于在字符串中进行模式匹配、搜索、替换等操作。以下是一个简单的Python正则表达式教程&#xff0c;从基础开始介绍如何使用正则表达式。 什么是正则表达式&#xff1f; 正则表达式&#xff08;Re…

【跟小嘉学 Rust 编程】二十、进阶扩展

系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…

windows安装新openssl后依然显示旧版本

1、Windows环境下安装升级新版本openssl后&#xff0c;通过指令openssl version -a查看版本号&#xff1a;如下 这个版本号还是是以前的老版本&#xff0c;看来得把原先的老版本删除掉才可以生效&#xff0c;但是不知道在哪里。 2、网上找了老半天也没找到答案&#xff0c;最后…

JS三座大山 —— 原型和原型链

系列文章目录 内容链接2023前端面试笔记HTML52023前端面试笔记CSS3 文章目录 系列文章目录前言一、原型是什么&#xff1f;二、原型链是什么&#xff1f;2.1 原型链全方面解析2.2 为什么构造函数也有原型&#xff1f; 总结 前言 理解原型和原型链可以帮助我们更好地理解 Java…

后台管理系统:项目路由搭建与品牌管理

路由的搭建 先删除一些不需要的界面 然后发现跑不起来&#xff0c;我们需要去配置 删减成这样&#xff0c;然后自己新建需要的路由组件 改成这样&#xff0c;这里要注意。我们是在layout这个大的组件下面的&#xff0c;meta 中的title就是我们侧边栏的标题&#xff0c;icon可…

积分游戏小程序模板源码

积分游戏小程序模板源码是一款可以帮助用户快速开发小程序的工具&#xff0c;此模板源码包含五个静态页面&#xff0c;分别是首页、任务列表、大转盘、猜拳等五个页面&#xff0c;非常适合进行积分游戏等相关开发。 此模板源码的前端部分非常简单易用&#xff0c;用户可以根据…

mongodb 分片集群部署

文章目录 mongodb 分片部署二进制安装三台config 配置shard 分片安装shard1 安装shard2 安装shard3 安装mongos 安装数据库、集合启用分片创建集群认证文件创建集群用户部署常见问题 mongodb 分片部署 二进制安装 mkdir -p /data/mongodb tar xvf mongodb-linux-x86_64-rhel7…