手拉手整合Springboot3+RocketMQ2.3

RocketMQ 基本概念

消息模型Message Model

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址, Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息生产者Producer

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

消息消费者Consumer

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。提供了两种消费形式:拉取式消费、推动式消费。

主题Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。

代理服务器Broker Server

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务Name Server

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

Pom.xml加入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>

application.yml配置

rocketmq:
name-server: 192.168.68.133:9876
producer:
#生产者组名,一个应用里面必须唯一
group: test-producer
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 2
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 2

消费者监听器

报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队,根据application.yml的配置

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
/不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队
System.out.println("接收到消息:"+new String(messages.getBody()));}
}


 

发送同步消息

生产者

* 发送同步消息
* destination 目的地-主题
* payload 消息

@Test
void sendMsg() {
rocketMQTemplate.syncSend("TopicTest", "同步消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送异步消息

生产者


* 发送异步消息
* destination 目的地-主题
* payload 消息

@Test
void asyncTest() {rocketMQTemplate.asyncSend("TopicTest", "异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送延时消息

生产者

* 发送延时消息
* destination 目的地-主题
* payload 消息
* timestamp 连接超时
* delayLevel 延时级别

@Test
void delayTest() {Message<String> msg = MessageBuilder.withPayload("延时消息").build();
rocketMQTemplate.syncSend("TopicTest", msg, 3000, 3);
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送单向消息

生产者

* 发送单向消息
* destination 目的地-主题
* payload 消息

@Test
void OneWayTest() {rocketMQTemplate.sendOneWay("TopicTest", "单向消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送顺序消息

顺序消息 生产者需要将一组消息都发送到同一个队列 ,消费者需要单线程消费

生产者

生产者需要将一组消息都发送到同一个队列

List<MessageM> messageMs = Arrays.asList(
new MessageM("sn0001", 1, "下单"),
new MessageM("sn0001", 1, "付款"),
new MessageM("sn0001", 1, "配送"),
new MessageM("sn0002", 2, "下单"),
new MessageM("sn0002", 2, "付款"),
new MessageM("sn0002", 2, "配送")
);@Test
void orderlyTest() {
/*** destination 目的地-主题
* payload 消息
*/
for (MessageM messageM : messageMs) {
rocketMQTemplate.syncSendOrderly("orderlyTest", JSON.toJSON(messageM), messageM.getSn());
}
}

消费者

CONCURRENTLY 同时

ORDERLY有序

消费者需要单线程消费

@Component
@RocketMQMessageListener(topic = "orderlyTest",consumerGroup = "orderly",consumeMode = ConsumeMode.ORDERLY)
public class orderlyListener implements RocketMQListener<MessageExt> {@Override
public void onMessage(MessageExt messageExt) {
MessageM messageM = JSON.parseObject(new String(messageExt.getBody()), MessageM.class);
System.out.println(messageM);
}
}

发送带标签tag

生产者

@Test
void ProducerTagTest(){
rocketMQTemplate.syncSend("TagMQ:tagA","带tagA的消息");
rocketMQTemplate.syncSend("TagMQ:tagB","带tagB的消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TagMQ",
consumerGroup = "TagMQGroup",
selectorType = SelectorType.TAG, //tag过滤模式
selectorExpression = "tagA || tagB")
public class MsgListenerTag implements RocketMQListener<MessageExt> {@Override
public void onMessage(MessageExt messageExt) {
System.out.println(new String( messageExt.getBody()));
}
}

发送带Key消息

Key带在消息头中

生产者

@Test
void keyTest(){
String Key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder
.withPayload("带key消息").
setHeader(RocketMQHeaders.KEYS, Key)
.build();
/**
* 带Key消息
*/
rocketMQTemplate.syncSend("ketTopic",msg);
}

消费者

@Component
@RocketMQMessageListener(topic = "ketTopic",consumerGroup = "ketConsumerGroup-springboot")
public class keyMQListener implements RocketMQListener<MessageExt> {/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
System.out.println("key:"+messages.getKeys());
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/280268.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法沉淀——贪心算法四(leetcode真题剖析)

算法沉淀——贪心算法四 01.最长回文串02.增减字符串匹配03.分发饼干04.最优除法 01.最长回文串 题目链接&#xff1a;https://leetcode.cn/problems/longest-palindrome/ 给定一个包含大写字母和小写字母的字符串 s &#xff0c;返回 通过这些字母构造成的 最长的回文串 。 …

我的自建博客之旅06之Mrdoc

这个是我折腾笔记项目的最后一篇文章了,这个项目是类似于语雀的文档笔记项目,因为我当初想找一个既可以当做笔记,又可以作为团队文档分享的笔记,除了语雀,就发现了这个项目。 这个开源项目的界面或者文档组织方式其实是我最喜欢的,但是我后来放弃它的原因是它的后台编辑逻…

slab分配器

什么是slab分配器&#xff1f; 用户态程序可以使用malloc及其在C标准库中的相关函数申请内存&#xff1b;内核也需要经常分配内存&#xff0c;但无法使用标准库函数&#xff1b;linux内核中&#xff0c;伙伴分配器是一种页分配器&#xff0c;是以页为单位的&#xff0c;但这个…

RISC-V架构的三种特权模式如何切换

1、RISC-V的三种特权模式 特权模式功能描述机器模式&#xff08;M-mode&#xff09;具有最高特权等级&#xff0c;具有访问所有资源的权限&#xff0c;通常运行固件和内核用户模式&#xff08;U-mode&#xff09;权限要比M模式低&#xff0c;通常是用来运行操作系统内核管理员…

iOS常见崩溃简介

1. 崩溃 多指在移动设备&#xff08;如iOS、Android设备&#xff09;中或不可移动设备&#xff08;如:Windows、Linux等设备&#xff09;&#xff0c; 在打开或使用应用程序时出现的突然退出中断的情况&#xff08;类似于Windows的应用程序崩溃&#xff09;。 多表现为&#…

2024.3.21 如何将idea的注释设置为在首字母前开始而不是句首

2024.3.21 如何将idea的注释设置为在首字母前开始而不是句首 两种写法的差异 修改办法 将右下角的勾去掉即可。

[ROS 系列学习教程] rosbag Python API

ROS 系列学习教程(总目录) 本文目录 1. 构造函数与关闭文件2. 属性值3. 写bag文件内容4. 读bag文件内容5. 将bag文件缓存写入磁盘6. 重建 bag 文件索引7. 获取bag文件的压缩信息8. 获取bag文件的消息数量9. 获取bag文件记录的起止时间10. 获取话题信息与消息类型 rosbag 的 Pyt…

【Leetcode-73.矩阵置零】

题目&#xff1a; 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a;[[1,0,1],[0,0,0],[1,0,1]]示例 2&…

echart trigger 为 axis 的时候不显示 tooltip 解决办法

echart trigger 为 axis 的时候不显示 tooltip 解决办法 在项目 vitetsvue3 中使用 echart 显示了一个曲线图&#xff1a; 但当把图表的 trigger 设置成 axis 的时候&#xff0c;鼠标扫过并不显示具体的数值&#xff0c;如上图所示。 但 trigger item 的时候是正常的。 解决…

在DevEco Studio中第一次使用网络图片不显示问题

当我们新建项目 第一次使用网络图片 没有显示时 加这段代码就可以了 如果刷新图片还是没有显示 就重启编辑器。 "requestPermissions": [{"name": "ohos.permission.INTERNET"}],

如何构建Docker自定义镜像

说明&#xff1a;平常我们使用Docker运行各种容器&#xff0c;极大地方便了我们对开发应用的使用&#xff0c;如MySQL、Redis&#xff0c;以及各种中间件&#xff0c;使用时只要拉镜像&#xff0c;运行容器即可。本文介绍如何创建一个Demo&#xff0c;自定义构建一个镜像。 开…

31.HarmonyOS App(JAVA)鸿蒙系统app Service服务的用法

鸿蒙系统app Service服务的用法 后台任务调度和管控 HarmonyOS将应用的资源使用生命周期划分为前台、后台和挂起三个阶段。前台运行不受资源调度的约束&#xff0c;后台会根据应用业务的具体任务情况进行资源使用管理&#xff0c;在挂起状态时&#xff0c;会对应用的资源使用进…

【研发日记】,Matlab/Simulink开箱报告(十)——Requirements Toolbox

前言 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;五&#xff09;——S-Fuction模块(C MEX S-Function)》 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;六&#xff09;——S-Fuction模块&#xff08;TLC&#xff09;》 见《开…

Python学习:注释和运算符

python 注释 在Python中&#xff0c;注释用于在代码中添加解释、说明或者提醒&#xff0c;但并不会被解释器执行。Python中的注释以#开头&#xff0c;直到行末为止。下面是关于Python注释的详细解释和举例&#xff1a; 单行注释&#xff1a;使用#符号在行的开头添加注释&…

Vue3:标签的ref属性用法

一、情景说明 我们在写前端页面的时候&#xff0c;肯定会遇到获取DOM内容的情况。 以往&#xff0c;我们是用原生的js方法去获取&#xff0c;如document.getXxxx 但是&#xff0c;这中方法会有个问题&#xff0c;如果父组件和子组件的id相同&#xff0c;则会出错。 在Vue3中&…

酷开科技聚焦大屏端数据研究,构建酷开系统深度挖掘大屏商业价值

中国所有的彩色大屏中&#xff0c;智能电视规模已经过半&#xff0c;OTT平台的数据价值越发引起人们关注。作为OTT行业的头部代表&#xff0c;酷开科技一直聚焦大屏端数据研究&#xff0c;目前已经形成一套基于大屏指数的智慧营销体系&#xff0c;让OTT大屏的数字营销化水平实现…

安装MySQL5.7.19 + 解决数据库乱码

文章目录 1.删除mysql服务 sc delete mysql2.解压到D:\mysql5.7下3.配置管理员环境变量4.D:\mysql5.7\mysql-5.7.19-winx64下创建my.ini1.创建文件2.文件内容 5.管理员打开cmd&#xff0c;切换到 D:\mysql5.7\mysql-5.7.19-winx64\bin6.输入 mysqld -install 安装mysql服务7.初…

2 使用GPU理解并行计算

2.1 简介 本章旨在对并行程序设计的基本概念及其与GPU技术的联系做一个宽泛的介绍。本章主要面向具有串行程序设计经验&#xff0c;但对并行处理概念缺乏了解的读者。我们将用GPU的基本知识来讲解并行程序设计的基本概念。 2.2 传统的串行代码 绝大多数程序员是在串行程序占据…

数学建模-邢台学院

文章目录 1、随机抽取的号码在总体的排序2、两端间隔对称模型 1、随机抽取的号码在总体的排序 10个号码从小到大重新排列 [ x 0 , x ] [x_0, x] [x0​,x] 区间内全部整数值 ~ 总体 x 1 , x 2 , … , x 10 总体的一个样本 x_1, x_2, … , x_{10} ~ 总体的一个样本 x1​,x2​,……

《圣斗士星矢》AI制作CG大电影欣赏

《圣斗士星矢》AI制作CG大电影欣赏 In the darkest corners of the universe, legends are born. 宇宙最幽暗的角落&#xff0c;传奇应运而生。 The gods of Olympus descend, bringing chaos and terror. 奥林匹斯众神降临&#xff0c;带来混乱与恐怖。 The armor of the Sain…