Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

image-20231227120405997

源码地址:Gitee

整合RocketMQ

依赖版本

  • JDK 17
  • Spring Boot 3.2.0
  • RocketMQ-Client 5.0.4
  • RocketMQ-Starter 2.2.0

可以参考这篇进行RocketMQ安装

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:
image-20231227062105302

参考配置文件

# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false

参考Issue

  • 方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入

  • 方法二:在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports
    文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题

import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}

RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}

RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic  消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch   是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}

定义RocketMQ消费者

import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}

定义测试类发送消息

import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}

测试

springboot3-RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/227917.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言——扫雷

扫雷是一款经典的小游戏&#xff0c;那如何使用C语言实现一个扫雷游戏呢&#xff1f; 一、全部源码 直接把全部源码放在开头&#xff0c;如有需要&#xff0c;直接拿走。 源码分为三个文件&#xff1a; test.cpp/c 主函数的位置 #include "game.h"int main() {…

x-cmd pkg | gum - 很好看的终端 UI 命令行工具

目录 简介首次用户功能特点Bubbles 与 Lip Gloss进一步探索 简介 gum 由 Charm 组织于 2022 年使用 Go 语言开发。旨在帮助用户编写 Shell 脚本与 dotfiles 时提供一系列快捷使用&#xff0c;可配置&#xff0c;可交互&#xff0c;美观的 Terminal UI 组件。 首次用户 使用 x…

常用的 MySQL 可视化客户端

数据库可视化客户端&#xff08;GUI&#xff09;让用户在和数据库进行交互时&#xff0c;能直观地查看、创建和修改对象&#xff0c;如&#xff1a;表、行和列。让数据库操作变得更方便了。 今天&#xff0c;我们来了解下目前市场上最常用的 MySQL 可视化客户端。 官方&#x…

详解数组的轮转

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…

c# listbox 添加图标和文字

给listbox 添加 DrawItem 事件 private void listBox1_DrawItem(object sender, DrawItemEventArgs e){int index e.Index;//获取当前要进行绘制的行的序号&#xff0c;从0开始。Graphics g e.Graphics;//获取Graphics对象。Rectangle bound e.Bounds;//获取当前要绘制的行的…

【设计模式】状态模式

文章目录 引例状态模式理论状态模式代码优化结合享元模式并发问题解决 策略模式 VS 状态模式 引例 交通信号灯系统的设计与实现 方案一 传统设计方案 定义交通灯颜色的枚举 public enum LightColor { Green,Red,Yellow }交通灯类TrafficLight&#xff0c;处理颜色转换等业务…

【AMD Xilinx】ZUBoard(3):通过AXI GPIO接收PL端的按键输入

【AMD Xilinx】ZUBoard&#xff08;3&#xff09;&#xff1a;通过AXI GPIO接收PL端的按键输入 一、本项目实现的功能二、Vivado工程1. 添加AXI GPIO2. 配置AXI GPIO3. 根据原理图查找对应管脚4. I/O Planning5.XDC 三、ARM代码1. 地址空间2. 函数说明3. 实际的C代码实现4. 运行…

VitulBox中Ubuntu虚拟机安装JAVA环境——备赛笔记——2024全国职业院校技能大赛“大数据应用开发”赛项

前言 在进行之后操作是请下载好JDK&#xff0c;之后的内容是以Ubuntu虚拟机中安装java环境续写。 提示&#xff1a;以下操作是在虚拟机hadoop用户下操作的&#xff0c;并为安装java环境作准备 一、更新APT 为了确保Hadoop安装过程顺利进行&#xff0c;建议用hadoop用户登录…

基于SpringBoot实现的前后端分离电影评分项目,功能:注册登录、浏览影片、热门影片、搜索、评分、片单、聊天、动态

一、项目介绍 本项目主要基于SpringBoot、Mybatis-plus、MySQL、Redis实现的影片评分项目。 本系统是前后端分离的&#xff0c;分别由三个子项目构成&#xff1a;java服务端、用户前端、管理员管理前端 关键词&#xff1a;springboot java vue mysql reids websocket 毕业设计…

2023年“中银杯”四川省职业院校技能大赛“云计算应用”赛项样题卷①

2023年“中银杯”四川省职业院校技能大赛“云计算应用”赛项&#xff08;高职组&#xff09; 样题&#xff08;第1套&#xff09; 目录 2023年“中银杯”四川省职业院校技能大赛“云计算应用”赛项&#xff08;高职组&#xff09; 样题&#xff08;第1套&#xff09; 模块一…

Java Log 学习笔记

参考文章&#xff1a; 1.Java 日志从入门到实战 2.Java日志框架的发展历史&#xff0c;你不想了解一下吗 背景 想自定义 logback 配置文件进行日志分级别记录到不同文件&#xff0c;遇到了几个问题&#xff08;使用的是 spring-boot 构建的项目&#xff0c;spring-boot 版本为…

多模态大模型-CogVLm 论文阅读笔记

多模态大模型-CogVLm 论文阅读笔记 COGVLM: VISUAL EXPERT FOR LARGE LANGUAGEMODELS 论文地址 :https://arxiv.org/pdf/2311.03079.pdfcode地址 : https://github.com/THUDM/CogVLM时间 : 2023-11机构 : zhipuai,tsinghua关键词: visual language model效果:&#xff08;2023…

MySQL基础入门(二)

多表内容 一对多 这个内容是黑马的入门问题&#xff0c;可以带大家思考一下这个怎么设计 我们要知道一个岗位可以对应很多用户&#xff0c;而一个用户只能对应一个岗位&#xff0c;这就属于一对多的类型 那么我们需要怎么将他们进行关联呢&#xff1f; 现在我们可以通过一个…

蓝桥杯C/C++程序设计——单词分析

题目描述 小蓝正在学习一门神奇的语言&#xff0c;这门语言中的单词都是由小写英文字母组 成&#xff0c;有些单词很长&#xff0c;远远超过正常英文单词的长度。小蓝学了很长时间也记不住一些单词&#xff0c;他准备不再完全记忆这些单词&#xff0c;而是根据单词中哪个字母出…

Zookeeper 实战 | Zookeeper 和Spring Cloud相结合解决分布式锁、服务注册与发现、配置管理

专栏集锦&#xff0c;大佬们可以收藏以备不时之需&#xff1a; Spring Cloud 专栏&#xff1a;http://t.csdnimg.cn/WDmJ9 Python 专栏&#xff1a;http://t.csdnimg.cn/hMwPR Redis 专栏&#xff1a;http://t.csdnimg.cn/Qq0Xc TensorFlow 专栏&#xff1a;http://t.csdni…

Media Encoder各版本安装指南

Media Encoder 链接地址如下&#xff1a; https://pan.baidu.com/s/1qOowHfm9rhrOOMljYgQCsA?pwd0531 1.鼠标右击【ME2023(64bit)】压缩包选择&#xff08;win11系统需先点击“显示更多选项”&#xff09;【解压到 ME2023(64bit)】。 2.打开解压后的文件夹&#xff0c;鼠标右…

16.综合项目实战

一、基础演练&#xff1a; 1、建库、建表 # 创建数据库 create database mysql_exampleTest; use mysql_exampleTest; # 学生表 CREATE TABLE Student( s_id VARCHAR(20), s_name VARCHAR(20) NOT NULL DEFAULT , s_birth VARCHAR(20) NOT NULL DEFAULT , s_sex VARC…

分布式存储考点梳理 + 高频面试题

欢迎来到分布式存储模环节&#xff0c;本文我将和你一起梳理面试中分布式系统的数据库的高频考点&#xff0c;做到温故知新。 面试中如何考察分布式存储 广义的分布式存储根据不同的应用领域&#xff0c;划分为以下的类别&#xff1a; 分布式协同系统 分布式文件系统 分布式…

运行时错误‘53’文件未找到:MathPage.WLL,安装MathType后Word不能复制粘贴问题的解决

两步解决&#xff1a; 1. 打开Word-->文件-->选项-->信任中心-->信任中心设置-->受信任位置&#xff0c;解决宏问题 添加如下受信任位置&#xff0c; 我的路径&#xff1a;C:\Program Files\Microsoft Office\root\Office16\STARTUP\ 2. 找到MathType下的MathT…

GitHub 一周热点汇总 第3期 (2023/12/24-12/30)

GitHub一周热点汇总第三期 (2023/12/24-12/30)&#xff0c;梳理每周热门的GitHub项目&#xff0c;了解热点技术趋势&#xff0c;掌握前沿科技方向&#xff0c;发掘更多商机。元旦就要到了&#xff0c;提前祝大家新年快乐。 #1 StreamDiffusion 项目名称&#xff1a;StreamDiff…