spring boot 集成rocketMq + 基本使用

1. RocketMq基本概念

1. NameServer
每个NameServer结点之间是相互独立,彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,
相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker
2. Broker
消息存储和中转角色,负责存储和转发消息
在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
3. topic : 一个消息的集合的名字
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。
4. 生产者
生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地,
并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,
轮询从队列列表中选择一个队列(默认轮询)
5. 消费者
消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上,
然后直接跟Broker建立连接通道,然后开始消费消息

2. maven 引入starter

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

3.yml配置

3.1 生产者yml 配置

rocketmq:name-server: 127.0.0.1:9876producer:group: my-group# 发送消息超时时间send-message-timeout: 5000# 发送消息失败重试次数retry-times-when-send-failed: 2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.2 消费者yml 配置

rocketmq:name-server: 127.0.0.1:9876consumer:topic: topic_testgroup: consumer_my-group

4.生产者发送消息

4.1 一般消息

@Resourceprivate RocketMQTemplate rocketMQTemplate;/***  一般消息* Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。* 使用 Tag 可以实现对 Topic 中的消息进行过滤。* **/@GetMapping("/send")public String send(){rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");return "rocketMq普通消息发送完成";}

4.2 顺序消息

/** 支持消费者按照发送消息的先后顺序获取消息 */@GetMapping("/send/orderly")public String sendOrder(){//发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");return "rocketMq顺序-消息发送成功";}

4.3 同步消息

@GetMapping("/send/sync")public String sendMsg() {String message = "我是同步消息:" + LocalDateTime.now();SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());log.info("同步-消息发送成功:" + LocalDateTime.now());return "rocketMq 同步-消息发送成功:" + result.getSendStatus();}

4.4 异步消息

/** 发送异步消息 */@GetMapping("/send/async")public String asyncSendMsg(){String message = "我是异步消息:" + LocalDateTime.now();rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.info("发送失败 (后执行)");}});return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();}

 4.5 单向消息:一般用来发送日志等不重要的消息

@GetMapping("/send/oneWay")public String sendOneWayMessage() {String message =  "我是单向消息:"+LocalDateTime.now();this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);log.info("单向发送消息完成:message = {}", message);return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();}

 

4.6 延时消息

/** 延时消息 */@GetMapping("/sendDelay")public String sendDelay(){String message = "我是延时消息:" + LocalDateTime.now();// 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2hrocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);return "rocketMq延时-消息发送成功";}

4.7 事务消息

4.7.1 事务消息发送代码

/** 事务消息 */@GetMapping("/send/transaction/{id}")public void sendTransactionMessage(@PathVariable("id") Integer id){//发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等//参数一:topic;参数二:消息// 事务idString[] tags = {"tagA", "tagB", "tagC"};int i = id%3;String transactionId = UUID.randomUUID().toString();String message = "我是事务消息:" + LocalDateTime.now();TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i], MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),// 给本地事务的参数2);//发送状态String sendStatus = result.getSendStatus().name();//本地事务执行状态String localState = result.getLocalTransactionState().name();log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);}

4.7.2 继承 RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
public class MyTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));try{//模拟网络波动Thread.sleep(3000);/**** 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。* 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。* 如果本地事务成功,消息会被提交并发送给消费者;* 如果失败,消息会被回滚,消费者不会接收到这个消息*/}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}// 执行本地事务String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));if (StringUtils.equals("tagA", tag)){//这里只讲TAGA消息提交,状态为可执行return RocketMQLocalTransactionState.COMMIT;}else if (StringUtils.equals("tagB", tag)) {return RocketMQLocalTransactionState.ROLLBACK;} else if (StringUtils.equals("tagC",tag)) {return RocketMQLocalTransactionState.UNKNOWN;}log.info("事务提交,消息正常处理: " + LocalDateTime.now());//执行成功,可以提交事务return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(transactionId + ",消息回查"+ LocalDateTime.now());return RocketMQLocalTransactionState.ROLLBACK;}
}

tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker 

 

 5. 消费端

/*** topic指定消费的主题,consumerGroup指定消费组,* 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费*  2.实现RocketMQListener接口*  如果想拿到消息的其他参数可以写成MessageExt*  selectorExpression = "tagA || tagB" 指定tag 的消费*/
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("topic_test: 所有的收到消息:"+s);}}

6.广播消费模式

生产端是一样的,但是消费端需要增加一个参数

messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("consumer2---topic_test: 所有的收到消息:"+s);}}// 第2个消费者类,他们都是一样的代码,
//为了表示广播,就是一个消息,会被这两个消费者消费@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("consumer1--topic_test: 所有的收到消息:"+s);}}

7.其他

RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/309818.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Blender表面细分的操作

在使用Blender的过程中,刚开始创建的模型,都会比较少面,这样操作起来比较流畅,减少电脑的计算量,当设计快要完成时,就会增加表面细分,这样更加圆滑,看起来更加顺眼。 比如创建一个猴头,它会默认显示如下: 从上图可以看到,有一些表面会比较大,棱角很多。 这时候你…

微商商城源码小程序好用么?

商城APP作为电子商务行业的重要组成部分&#xff0c;已经成为了人们购物的主要方式之一。为了在竞争激烈的市场中脱颖而出&#xff0c;开发一款专业且思考深度的商城APP方案显得尤为关键。本文将从专业性和思考深度两个方面&#xff0c;探讨商城APP的开发方案。 一、专业性的重…

CloudCompare——win11配置CloudComPy

CloudComPy配置 1 基本环境介绍2 安装Anaconda2.1 下载anaconda2.2 安装anaconda2.3 配置镜像源2.4 更改虚拟环境的默认创建位置2.5 其他问题2.5.1 激活自己创建的环境提示&#xff1a;系统找不到指定的路径2.5.2 InvalidVersionSpecError: Invalid version spec: 2.72.5.3 卸载…

如何解决网站建设打开速度慢的问题?

如何解决网站建设打开速度慢的问题&#xff1f;在浏览网站的时候&#xff0c;网站打开速度的快慢也是能够直接影响到用户的体验感的。因为网站打开速度太慢&#xff0c;不仅浪费了大家的时间&#xff0c;同时还容易消耗浏览者的很大一部分耐心。 所以说不管是对于企业来说&…

hive了解系列一

“ 随着智能手机的普及&#xff0c;互联网时代红利的爆发&#xff0c;用户数量和产生的数据也越发庞大。为了解决这个问题&#xff0c;提高数据的使用价值。 Hadoop生态系统就被广泛得到应用。 在早期&#xff0c;Hadoop生态系统就是为处理如此大数据集而产生的一个合乎成本效益…

C++ 红黑树模拟实现

&#x1f493;博主CSDN主页:麻辣韭菜&#x1f493;   ⏩专栏分类&#xff1a;C知识分享⏪   &#x1f69a;代码仓库:C高阶&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习更多C知识   &#x1f51d;&#x1f51d; 前言 前面我们实现了AVL树&#xff0c;发明AVL树…

蓝桥杯备赛刷题——css

新鲜的蔬菜 这题需要使用grid 我不会 去学一下 一.什么是grid Grid 布局与 Flex 布局有一定的相似性&#xff0c;都可以指定容器内部多个项目的位置。但是&#xff0c;它们也存在重大区别。 Flex 布局是轴线布局&#xff0c;只能指定"项目"针对轴线的位置&#…

使用冒泡排序模拟实现qsort函数

目录 冒泡排序qsort函数的使用1.使用qsort函数排序整型数据2.使用qsort函数排序结构数据 冒泡排序模拟实现qsort函数今日题目1. 字符串旋转结果2.杨氏矩阵3.猜凶手4.杨辉三角 总结 冒泡排序 冒泡排序的核心思想是:两两相邻的元素进行比较 代码如下: //⽅法1 void bubble_so…

第四百五十四回

文章目录 1. 问题描述2. 优化方法2.1 缩小范围2.2 替代方法 3. 示例代码4. 内容总结 我们在上一章回中介绍了"如何获取AppBar的高度"相关的内容&#xff0c;本章回中将介绍关于MediaQuery的优化.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 问题描述 我们在…

头歌-机器学习 第13次实验 特征工程——共享单车之租赁需求预估

第1关&#xff1a;数据探索与可视化 任务描述 本关任务&#xff1a;编写python代码&#xff0c;完成一天中不同时间段的平均租赁数量的可视化功能。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a; 读取数据数据探索与可视化 读取数据 数据保存在./step1/…

Linux C应用编程:MQTT物联网

1 MQTT通信协议 MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传 输&#xff09;是一种基于客户端-服务端架构的消息传输协议&#xff0c;如今&#xff0c;MQTT 成为了最受欢迎的物联网协议&#xff0c;已广泛应用于车联网、智能家居、即时聊…

不想升级到win11要怎么取消,怎么拒绝升级win11

微软公布了一个会导致win11数据损坏的罪魁祸首,受到影响的win11系统,是搭载了支持最新VAES指令集的处理器。这次的bug是坑了intel用户呀,Intel从10代酷睿(Ice Lake )和第三代至强可扩展处理器(IceLake-SP)开始才添加了对VAES的支持,AMD这边则是Zen 3锐龙5000,它也是AVX-51…

太好玩了,我用 Python 做了一个 ChatGPT 机器人

毫无疑问&#xff0c;ChatGPT 已经是当下编程圈最火的话题之一&#xff0c;它不仅能够回答各类问题&#xff0c;甚至还能执行代码&#xff01; 或者是变成一只猫 因为它实在是太好玩&#xff0c;我使用Python将ChatGPT改造&#xff0c;可以实现在命令行或者Python代码中调用。…

手动实现简易版RPC(上)

手动实现简易版RPC(上) 前言 什么是RPC&#xff1f;它的原理是什么&#xff1f;它有什么特点&#xff1f;如果让你实现一个RPC框架&#xff0c;你会如何是实现&#xff1f;带着这些问题&#xff0c;开始今天的学习。 本文主要介绍RPC概述以及一些关于RPC的知识&#xff0c;为…

【电子通识】吸锡带/线的作用和替代方法

吸锡带简介 吸锡带(或称吸锡线、脱焊织物)是手工焊接的好助手,手焊或维修时吸锡带能够去除电路板上多余焊锡,减少了电子产品的返工和修理的时间,降低了烙铁对电路板造成过热损伤的危险,因此是一个既廉价又有效的物品。 市面上卖的最多的的吸锡带类型如下所示: 吸锡带的选型…

普乐蛙VR神州飞船设备VR太空舱体验馆VR博物馆

中国航天式浪漫知多少&#xff1f;千百年来古人对浩瀚宇宙有着无尽的浪漫想象&#xff0c;而在一代又一代中国航天事业奋斗者的努力中&#xff0c;远古神话不再是幻想&#xff0c;它终被照进现实——中国载人飞船“神舟”、中国载人空间站“天宫”、中国绕月人造卫星“嫦娥一号…

二叉树例题分享

文章目录 二叉树例题分享[235. 二叉搜索树的最近公共祖先](https://leetcode.cn/problems/lowest-common-ancestor-of-a-binary-search-tree/)[701. 二叉搜索树中的插入操作](https://leetcode.cn/problems/insert-into-a-binary-search-tree/)[108. 将有序数组转换为二叉搜索树…

python怎么输出小数

先将整型转换成float型&#xff0c;再进行计算&#xff0c;结果就有小数了。 >>> a 10 >>> b 4 >>> c a/b >>> a,b,c (10, 4, 2) >>> a float(a) >>> d a/b >>> a,b,d (10.0, 4, 2.5) >>> 注意&…

LabVIEW闭环步进电机运动系统设计及精度分析

LabVIEW闭环步进电机运动系统设计及精度分析 在自动化设备不断发展的当代&#xff0c;闭环步进电机以其高精度和可靠性成为了自动化设备的重要组成部分。以LabVIEW软件为核心&#xff0c;结合运动控制卡及驱动器模块&#xff0c;设计并实现了一个闭环步进电机的多轴运动控制系…

speccpu2017安装与使用

国产化桌面下Speccpu2017安装与使用 1、 安装依赖库 安装speccpu2017前需要安装依赖包&#xff0c;通过终端命令对依赖包进行安装 sudo apt-get install gcc g gfortran &#xff08;以上是已经安装好的&#xff09; 注&#xff1a;若安装不上&#xff0c;需替换/etc/apt下的s…