1.Windows环境中安装RocketMQ
1.1 安装前的环境准备
JDK1.8、Maven、Git
1.2 RocketMQ下载
RocketMQ · 官方网站 | RocketMQ (apache.org)
按照如下进行下载:
1.3 配置环境变量
在系统环境变量中新增如下配置:
变量名:ROCKETMQ_HOME
变量参数:D:\exploit\rocketmq-all-5.1.3 #自己本地的rocketmq地址
1.4 启动服务
- 启动NAMESERVER
cmd中进入rocketMQbin
目录下执行
start mqnamesrv.cmd
出现如下页面表示启动成功:
- 启动BROKER
cmd中进入rocketMQbin
目录下执行
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
出现如下页面表示启动成功:
1.5 RocketMQ可视化插件
1.5.1 下载
https://gitee.com/liuhuanhuan963019/rocketmq-externals.git (下载打包即可运行)
这是我的个人仓库,github中新版的尝试了很久很久,一直无法进行打包,jar下载存在很大的问题(可能被墙了),但是尝试了其他方法之后也无法进行下载,在网上找到了其他博主发的旧版的地址,测试过后可以正常使用。如果需要正版地址,请去如下仓库进行下载。
https://github.com/apache/rocketmq-externals.git
https://github.com/apache/rocketmq-dashboard.git
1.5.2 服务地址配置
下载完成之后,进入 rocketmq-externals\rocketmq-console\src\main\resources\application.properties 进行配置,如下图所示:
server.contextPath=
# 随意定义
server.port=8089 ### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
# 改成本地的RocketMQ服务地址
rocketmq.config.namesrvAddr=127.0.0.1:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false
1.5.3 打包启动
进入rocketmq-externals\rocketmq-console
文件夹下,执行打包命令
mvn clean package -D maven.test.skip=true
出现下图即表示打包成功:(此过程打包时间教程,需要等待时间比较长久)
打包成功之后,进入 target
文件夹下,执行如下命令启动应用程序
java -jar rocketmq-console-ng-1.0.1.jar
出现如下界面表示启动成功啦!!!
然后在浏览器中查看localhost:8089 如图所示,启动成功
1.6 在运行过程中可能出现的错误
问题:rocketmq启动 错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_181\lib;C:\Program
解决方法:如果本地安装了Java,并且环境变量已经配置,按照如下步骤进行操作,如果没有配置环境变量,那么先配置环境变量。
打开rocketMQ主目录下的bin,在bin中找到runbroker.cmd
文件,在文件中将CLASSPATH
加上英文双引号,重启即可。具体如下图:
配置之后重新启动即可。
2.Linux环境中安装RocketMQ
2.1 创建namesrv服务
2.1.1 拉取镜像
docker pull rocketmqinc/rocketmq
2.1.2 创建namesrv数据存储
mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
2.1.3 启动namesrv,构建namesrv容器
docker run -d --restart=always --name rmqnamesrv -p 9876:9876 -v /docker/rocketmq/data/namesrv/logs:/root/logs -v /docker/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
出现如下页面即表示启动成功:
- -restart=always docker重启时候容器自动重启
- -name rmqnamesrv 把容器的名字设置为rmqnamesrv
- -p 9876:9876 把容器内的端口9876挂载到宿主机9876上面
- -v /docker/rocketmq/data/namesrv/logs:/root/logs 把容器内的/root/logs日志目录挂载到宿主机的 /docker/rocketmq/data/namesrv/logs目录
- -v /docker/rocketmq/data/namesrv/store:/root/store 把容器内的/root/store数据存储目录挂载到宿主机的 /docker/rocketmq/data/namesrv目录
- rmqnamesrv 容器的名字
- -e “MAX_POSSIBLE_HEAP=100000000” 设置容器的最大堆内存为100000000
- rocketmqinc/rocketmq 使用的镜像名称
- sh mqnamesrv 启动namesrv服务
具体的参数含义,请参考本文作者其他文章:Docker使用容器命令
2.2 创建broker节点
2.2.1 创建broker数据存储路径
mkdir -p /docker/rocketmq/data/broker/logs /docker/rocketmq/data/broker/store /docker/rocketmq/conf
2.2.2 创建配置文件
创建文件目录
vi /docker/rocketmq/conf/broker.conf
在配置文件中插入如下内容
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 172.28.216.81
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95
2.2.3 构建broker容器
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv \
-p 10911:10911 -p 10909:10909 -v /docker/rocketmq/data/broker/logs:/root/logs \
-v /docker/rocketmq/data/broker/store:/root/store \
-v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq \
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
- -name rmqbroker 把容器的名字设置为rmqbroker
- –link rmqnamesrv:namesrv 和rmqnamesrv容器通信
- -p 10911:10911 把容器的非vip通道端口挂载到宿主机
- -p 10909:10909 把容器的vip通道端口挂载到宿主机
- -e “NAMESRV_ADDR=namesrv:9876” 指定namesrv的地址为本机namesrv的ip地址:9876
- -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker | 指定broker服务的最大堆内存
- rocketmqinc/rocketmq 使用的镜像名称
- sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 指定配置文件启动broker节点
2.3 创建rockermq-console服务
2.3.1 拉取镜像
docker pull pangliang/rocketmq-console-ng
2.3.2 构建rockermq-console容器
docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.28.216.81:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 9999:8080 \
pangliang/rocketmq-console-ng
- -e 填写的是namesrv的地址
- -restart=always docker重启时候镜像自动重启
- -name rmqadmin 把容器的名字设置为rmqadmin
- -e “JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 设置namesrv服务的ip地址
- -Dcom.rocketmq.sendMessageWithVIPChannel=false” 不使用vip通道发送消息
- –p 9999:8080 把容器内的端口8080挂载到宿主机上的9999端口
2.3.3 可能出现的问题
问题:启动容器报错:iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 9999 -j DNAT --to-destination 172.17.0.6:8080 ! iptables: No chain/target/match by that name
解决方法:重启docker
systemctl restart docker
问题:docker run -d \ --restart=always \ --name rmqadmin \ -e "JAVA_OPTS=-Drocketm
解决方法:注意配置broker.conf
中brokerIP1
为本服务器内网地址即可
参考文章:使用docker安装RocketMQ_docker rocketmq_皓亮君的博客-CSDN博客
3.本地RocketMQ集成SpringBoot测试
文末有惊喜哦,嘿嘿。。。。。
先前准备:新建四个项目用于测试
mqproductservice
– 消息生产服务(提供接口用于创建消息到MQ中)
mqconsumerservice
– 处理类型1的消息服务
mqconsumer1service
– 处理类型1的消息服务
mqconsumer2service
– 处理类型2的消息服务
网页版生成SpringBoot项目地址:Spring Initializr
3.1 简单消息操作
3.1.1 mqproductservice生产者配置
3.1.1.1 pom.xml配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mqproductservice</name><description>mq product service</description><properties><java.version>1.8</java.version></properties><dependencies><!--springboot--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version><scope>provided</scope></dependency><!--rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.1.1.2 application.yml配置如下:
server:port: 8082rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-grouplogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
# com.anran.projectmanage.mapper: DEBUG
3.1.1.3 新建接口生产者生产消息
package com.example.demo.controller;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/mqMessageController")
public class MqMessageController {private static final Logger log = LoggerFactory.getLogger(MqMessageController.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;@RequestMapping("/pushMessage.action")public String get(@RequestParam("id") int id) {rocketMQTemplate.convertAndSend("first-topic","你好,我的小可爱" + id);return "connected";}
}
3.1.1.4 使用接口测试工具测试结果
测试结果如下:
3.1.1.4 RockeMQ可视化页面中查看
结果如图所示:
3.1.2 mqconsumerservice 消费者设置
3.1.2.1 pom.xml配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mqproductservice</name><description>mq product service</description><properties><java.version>1.8</java.version></properties><dependencies><!--springboot--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version><scope>provided</scope></dependency><!--rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.1.2.2 application.yml配置如下
server:port: 8083rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-grouplogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
# com.anran.projectmanage.mapper: DEBUG
3.1.2.3 创建消费者类
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "first-topic",consumerGroup = "my-consumer-group")
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println(message);}
}
3.1.2.4 测试结果
这不就出现了吗~~
3.2 不同类型的消息发送 (以下列出部分代码,4个项目的pom配置是一样的和3.1中一致,下面的文章中不再列出)
3.2.1 消息生产者(mqproductservice)
3.2.1.1 application.yml配置
server:port: 8082#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#生产者配置producer:#组名group: anran-producer-group#目的地(topic:tag)#topictopic: anran-topic#sync tag(同步消息tag)sync-tag: anran-sync-tags#async tag(异步消息tag)async-tag: anran-async-tags#oneway tag(单向消息tag)oneway-tag: anran-oneway-tagslogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFOcom.anran.projectmanage.mapper: DEBUG
3.2.1.2 提供不同类型的接口下发不同类型的消息
-
MqMessageController.java
package com.example.demo.controller;import com.alibaba.fastjson.JSONObject;import com.example.demo.listener.SendCallbackListener; import com.example.demo.model.OrderStep; import com.example.demo.model.ResponseMsg; import com.example.demo.util.ListSplitter; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList; import java.util.List;@RestController @RequestMapping("/mqMessageController") public class MqMessageController {private static final Logger log = LoggerFactory.getLogger(MqMessageController.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.sync-tag}")private String syncTag;@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.async-tag}")private String asyncag;@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.oneway-tag}")private String onewayTag;/*** rocketmq 同步消息** @param id 消息* @return 结果*/@RequestMapping("/pushMessage.action")public ResponseMsg pushMessage(@RequestParam("id") int id) {log.info("pushMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置发送地和消息信息并发送同步消息SendResult sendResult = rocketMQTemplate.syncSend(syncTag, message);log.info("pushMessage finish : " + id + ", sendResult : " + JSONObject.toJSONString(sendResult));ResponseMsg msg = new ResponseMsg();// 解析发送结果if (sendResult.getSendStatus() == SendStatus.SEND_OK) {msg.setSuccessData(sendResult.getMsgId() + " : " + sendResult.getSendStatus());}return msg;}/*** 发送异步消息** @param id 消息* @return 结果*/@RequestMapping("/pushAsyncMessage.action")public ResponseMsg pushAsyncMessage(@RequestParam("id") int id) {log.info("pushAsyncMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置发送地和消息信息并发送异步消息rocketMQTemplate.asyncSend(asyncag, message, new SendCallbackListener(id));log.info("pushAsyncMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** 发送单向消息(不关注发送结果:记录日志)** @param id 消息* @return 结果*/@RequestMapping("/pushOneWayMessage.action")public ResponseMsg pushOneWayMessage(@RequestParam("id") int id) {log.info("pushOneWayMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置发送地和消息信息并发送单向消息rocketMQTemplate.sendOneWay(onewayTag, message);log.info("pushOneWayMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** 发送包含顺序的单向消息** @param id 消息* @return 结果*/@RequestMapping("/pushSequeueMessage.action")public ResponseMsg pushSequeueMessage(@RequestParam("id") int id) {log.info("pushSequeueMessage start : " + id);// 创建三个不同订单的不同步骤for (int i = 0; i < 3; i++) {// 处理当前订单唯一标识String myId = id + "" + i;// 获取当前订单的操作步骤列表List<OrderStep> myOrderSteps = OrderStep.buildOrderSteps(myId);// 依次操作步骤下发消息队列for (OrderStep orderStep : myOrderSteps) {// 构建消息String messageStr = String.format("order id : %s, desc : %s", orderStep.getId(), orderStep.getDesc());Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, orderStep.getId()).build();// 设置顺序下发rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {/*** 设置放入同一个队列的规则* @param list 消息列表* @param message 当前消息* @param o 比较的关键信息* @return 消息队列*/@Overridepublic MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {// 根据当前消息的id,使用固定算法获取需要下发的队列// (使用当前id和消息队列个数进行取模获取需要下发的队列,id和队列数量一样时,选择的队列坑肯定一样)int queueNum = Integer.valueOf(String.valueOf(o)) % list.size();log.info(String.format("queueNum : %s, message : %s", queueNum, new String(message.getBody())));return list.get(queueNum);}});// 设置发送地和消息信息并发送消息(Orderly)rocketMQTemplate.syncSendOrderly(syncTag, message, orderStep.getId());}}log.info("pushSequeueMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** rocketmq 延迟消息** @param id 消息* @return 结果*/@RequestMapping("/pushDelayMessage.action")public ResponseMsg pushDelayMessage(@RequestParam("id") int id) {log.info("pushDelayMessage start : " + id);// 构建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).build();// 设置超时和延时推送// 超时时针对请求broker然后结果返回给product的耗时// 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";SendResult sendResult = rocketMQTemplate.syncSend(syncTag, message, 1 * 1000l, 4);log.info("pushDelayMessage finish : " + id + ", sendResult : " + JSONObject.toJSONString(sendResult));ResponseMsg msg = new ResponseMsg();// 解析发送结果if (sendResult.getSendStatus() == SendStatus.SEND_OK) {msg.setSuccessData(sendResult.getMsgId() + " : " + sendResult.getSendStatus());}return msg;}/*** 同时发送10个单向消息(真正的批量)** @param id 消息* @return 结果*/@RequestMapping("/pushBatchMessage.action")public ResponseMsg pushBatchMessage(@RequestParam("id") int id) {log.info("pushBatchMessage start : " + id);// 创建消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 3; i++) {String myId = id + "" + i;// 处理当前订单唯一标识String messageStr = "order id : " + myId;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, myId).build();messages.add(message);}// 批量下发消息到broker,不支持消息顺序操作,并且对消息体有大小限制(不超过4M)ListSplitter splitter = new ListSplitter(messages, 1024 * 1024 * 4);while (splitter.hasNext()) {List<Message> listItem = splitter.next();rocketMQTemplate.syncSend(syncTag, listItem);}log.info("pushBatchMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** sql过滤消息** @param id 消息* @return 结果*/@RequestMapping("/pushSqlMessage.action")public ResponseMsg pushSqlMessage(@RequestParam("id") int id) {log.info("pushSqlMessage start : " + id);// 创建消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {String myId = id + "" + i;// 处理当前订单唯一标识String messageStr = "order id : " + myId;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, myId).setHeader("money", i).build();messages.add(message);}rocketMQTemplate.syncSend(syncTag, messages);log.info("pushSqlMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;}/*** 事务消息** @param id 消息* @return 结果*/@RequestMapping("/pushTransactionMessage.action")public ResponseMsg pushTransactionMessage(@RequestParam("id") int id) {log.info("pushTransactionMessage start : " + id);// 创建消息String messageStr = "order id : " + id;Message<String> message = MessageBuilder.withPayload(messageStr).setHeader(RocketMQHeaders.KEYS, id).setHeader("money", 10).setHeader(RocketMQHeaders.TRANSACTION_ID, id).build();TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(syncTag, message, null);log.info("pushTransactionMessage result : " + JSONObject.toJSONString(transactionSendResult));log.info("pushTransactionMessage finish : " + id);ResponseMsg msg = new ResponseMsg();msg.setSuccessData(null);return msg;} }
-
SendCallbackListener.java
package com.example.demo.listener;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;/*** rocketmq异步回调监听*/
@Slf4j
public class SendCallbackListener implements SendCallback {private int id;public SendCallbackListener(int id) {this.id = id;}@Overridepublic void onSuccess(SendResult sendResult) {log.info("CallBackListener on success : " + JSONObject.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {log.error("CallBackListener on exception : ", throwable);}
}
- TransactionListener.java
package com.example.demo.listener;import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;import java.util.HashMap;
import java.util.Map;/*** rocketmq异步回调监听*/
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TransactionListener implements RocketMQLocalTransactionListener {private static final Map<String, RocketMQLocalTransactionState> TRANSACTION_STATE_MAP = new HashMap<>();/*** 处理本地事务* @param message* @param o* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {log.info("执行本地事务");MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);TRANSACTION_STATE_MAP.put(transactionId, RocketMQLocalTransactionState.UNKNOWN);log.info("transactionId is {}",transactionId);try {Thread.sleep(10 * 1000l);} catch (InterruptedException e) {e.printStackTrace();}RocketMQLocalTransactionState state = RocketMQLocalTransactionState.ROLLBACK;if (Integer.parseInt(transactionId) % 2 == 0) {//执行成功,可以提交事务state = RocketMQLocalTransactionState.COMMIT;}log.info("transactionId is {}, state {}",transactionId, state.toString());TRANSACTION_STATE_MAP.remove(transactionId);return state;}/*** 校验事务状态* @param message* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("检查本地事务,事务ID:{}",transactionId);RocketMQLocalTransactionState state = TRANSACTION_STATE_MAP.get(transactionId);if(null != state){return state;}return RocketMQLocalTransactionState.ROLLBACK;}
}
- OrderStep.java
package com.example.demo.model;import lombok.Data;import java.util.ArrayList;
import java.util.List;/*** 订单消息模型*/
@Data
public class OrderStep {/*** 订单id*/private String id;/*** 操作步骤*/private String desc;/*** 步骤构建方法* @param id 订单唯一标识* @param desc 步骤描述*/public OrderStep(String id, String desc) {this.id = id;this.desc = desc;}/*** 构建订单的所有操作步骤:创建、支付、回调* @param id 订单唯一标识* @return 订单操作步骤列表*/public static List<OrderStep> buildOrderSteps(String id) {List<OrderStep> orderSteps = new ArrayList<>();OrderStep createStep = new OrderStep(id, "create order");orderSteps.add(createStep);OrderStep payStep = new OrderStep(id, "pay order");orderSteps.add(payStep);OrderStep callbackStep = new OrderStep(id, "call back order");orderSteps.add(callbackStep);return orderSteps;}
}
- ResponseMsg.java
package com.example.demo.model;import lombok.Data;@Data
public class ResponseMsg {public static final int CODE_FAIL = 500;public static final int CODE_SUCCESS = 200;public static final String MSG_SUCCESS = "success";public static final String MSG_FAIL = "fail";private int code;private String msg;private Object data;public ResponseMsg() {this.code = CODE_FAIL;this.msg = MSG_FAIL;}public void setSuccessData(Object data) {this.code = CODE_SUCCESS;this.msg = MSG_SUCCESS;this.data = data;}
}
- ListSplitter.java
package com.example.demo.util;import java.util.Iterator;
import java.util.List;/*** LIST对象进行分割*/
public class ListSplitter<T> implements Iterator<List<T>> {/*** 所有内容*/private final List<T> ts;/*** 多少数据进行分割*/private int size;/*** 已经分割的索引*/private int currIndex;/*** 构造方法* @param ts 所有内容* @param size 分割数量*/public ListSplitter(List<T> ts, int size) {this.ts = ts;this.size = size;}/*** 实现是否还厚后续数据* @return true:有 false:无*/@Overridepublic boolean hasNext() {return currIndex < ts.size();}@Overridepublic List<T> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < ts.size(); nextIndex++) {T t = ts.get(nextIndex);totalSize = totalSize + t.toString().length();if (totalSize > size) {break;}}List<T> subList = ts.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
3.2.2 消息消费者(mqconsumerservice服务)
3.2.2.1 application.yml配置
server:port: 8083#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#消费者配置consumer:#组名group: anran-consumer-group#监听主题topic: anran-topic#tags(监听多个tag时使用 || 进行分割,如果监听所有使用*或者不填)tags: anran-sync-tags||anran-async-tags||anran-oneway-tagslogging:file:path: /usr/log/mqconsumerservice/mqconsumerservice.loglevel:root: INFO
3.2.2.2 监听接口配置
Consumer.java
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 设置消息监听* 监听组:监听topic:监听tag(默认监听topic下所有)* 监听消费模式:默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)**/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}",selectorExpression = "${rocketmq.consumer.tags}", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println(message);}
}
3.2.3 消息消费者(mqconsumer1service服务)
3.2.3.1 application.yml配置
server:port: 8085#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#消费者配置consumer:#组名group: anran-consumer-group#监听主题topic: anran-topic#tags(监听多个tag时使用 || 进行分割,如果监听所有使用*或者不填),支持sql过滤# tags: anran-sync-tags||anran-async-tags||anran-oneway-tagstags: money > 5logging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
3.2.3.2 监听接口配置
Consumer.java
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 设置消息监听* 1.监听组(consumerGroup):监听topic(topic):监听tag(selectorExpression)(默认监听topic下所有)* 2.监听消费模式(messageModel):默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)* 3.设置顺序消息处理模式(consumeMode)(默认是所有线程可以处理同一个消息队列(ConsumeMode.CONCURRENTLY),当前消息没有线程在执行时其他线程才能够执行(ConsumeMode.ORDERLY)。* ps:一个线程顺序执行一个队列表时消息监听必须使用负载均衡messageModel = MessageModel.BROADCASTING)* 4.设置过滤tags类型:默认时tag(SelectorType.TAG),可以修改为sql语法(SelectorType.SQL92):*/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}",selectorExpression = "${rocketmq.consumer.tags}", messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY, selectorType = SelectorType.SQL92)
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info(message);}
}
3.2.4 消息消费者(mqconsumer2service服务)
3.2.4.1 application.yml 配置
server:port: 8084#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 127.0.0.1:9876#消费者配置consumer:#组名group: anran-consumer-group#监听主题topic: anran-topic#tags(监听多个tag时使用 || 进行分割,如果监听所有使用*或者不填)tags: anran-sync-tags||anran-async-tags||anran-oneway-tagslogging:file:path: /usr/log/mqproductservice/mqproductservice.loglevel:root: INFO
3.2.4.2 监听接口配置
Consumer.java
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 设置消息监听* 1.监听组(consumerGroup):监听topic(topic):监听tag(selectorExpression)(默认监听topic下所有)* 2.监听消费模式(messageModel):默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)* 3.设置顺序消息处理模式(consumeMode)(默认是所有线程可以处理同一个消息队列(ConsumeMode.CONCURRENTLY),当前消息没有线程在执行时其他线程才能够执行(ConsumeMode.ORDERLY)。* ps:一个线程顺序执行一个队列表时消息监听必须使用负载均衡messageModel = MessageModel.BROADCASTING)*/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}",selectorExpression = "${rocketmq.consumer.tags}", messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info(message);try {Thread.sleep(10 * 1000l);} catch (InterruptedException e) {e.printStackTrace();}}
}
3.2.5 测试
api接口测试下
成功返回数据,注意上图中的返回结果。
此时在mqconsumer1service消费者服务中,成功接收消息。
终于打工告成,完结撒花~~
本文参考文章地址:
https://blog.csdn.net/qq_34940987/article/details/120530597
下面给出上面4个项目的仓库地址:
https://gitee.com/liuhuanhuan963019/mqproductservice.git
https://gitee.com/liuhuanhuan963019/mqconsumerservice.git
https://gitee.com/liuhuanhuan963019/mqconsumer1service.git
https://gitee.com/liuhuanhuan963019/mqconsumer2service.git