003 Springboot操作RabbitMQ

Springboot整合RabbitMQ

文章目录

    • Springboot整合RabbitMQ
      • 1.pom依赖
      • 2.yml配置
      • 3.配置队列、交换机
        • 方式一:直接通过配置类配置bean
        • 方式二:消息监听通过注解配置
      • 4.编写消息监听发送测试
      • 5.其他类型交换机配置
        • 1.FanoutExchange
        • 2.TopicExchange
        • 3.HeadersExchange
      • 6.延迟消息处理(TTL)
        • 方式一:ttl配置
        • 方式二:消息发送设置
      • 7.死信队列

1.pom依赖

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>

2.yml配置

#配置使用的配置文件
spring:#配置rabbitmqrabbitmq:host: 47.122.26.28 #主机地址port: 5672  #端口号username: xxx #用户名password: xxx #密码virtual-host: my_vhost  #虚拟主机地址#开启消息送达提示publisher-returns: true# springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果publisher-confirm-type: correlatedlistener:  #消息监听配置type: simplesimple:acknowledge-mode: manual #manual手动确认消息  auto没有异常时 进行自动确认 (异常类型 消息重新入队)prefetch: 1 #限制每次发送一条数据。concurrency: 3 #同一个队列启动几个消费者max-concurrency: 3 #启动消费者最大数量#重试策略相关配置retry:# 开启消费者(程序出现异常)重试机制,默认开启并一直重试enabled: true# 最大重试次数max-attempts: 5# 重试间隔时间(毫秒)initial-interval: 3000server:port: 18082address: 127.0.0.1servlet:context-path: /

3.配置队列、交换机

方式一:直接通过配置类配置bean

推送消息时不存在创建队列和交换机

/*** direct模式声明配置*/
@Configuration
public class RabbitDirectConfig {public static final String EXCHANGE_NAME="direct-exchange";public static final String QUEUE_NAME="direct-queue";public static final String BINDING_KEY="change:direct";/*** 声明直连交换机* name:交换机的名称* durable 队列是否持久化* autoDelete:是否自动删除,(当该交换机上绑定的最后一个队列解除绑定后,该交换机自动删除)* argument:其他一些参数*/@Beanpublic DirectExchange directExchange() {return new DirectExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*  queue 队列的名称*  durable 队列是否持久化*  exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。*/@Beanpublic Queue directQueue() {return new Queue(QUEUE_NAME,false,false,false,null);}/*** 交换机队列绑定*/@Beanpublic Binding springExchangeBindSpringQueue() {return BindingBuilder.bind(directQueue()).to(directExchange()).with(BINDING_KEY);}}
方式二:消息监听通过注解配置

启动时创建队列和交换机

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct1-queue",durable = "true"),exchange = @Exchange(value = "direct1-exchange",type = ExchangeTypes.DIRECT,durable = "true"),key = "change1:direct"))

注意:rabbitmq同名的队列只能创建一个,创建多个会报错,推送消息时需确保队列和交换机已存在,

方式一队列和交换机在第一次推送消息时才会自动创建队列和交换机,方式二注解在启动时就会创建

4.编写消息监听发送测试

监听

@Slf4j
@Component
public class RabbitMQListener {@RabbitListener(queues = "direct-queue")@RabbitHandlerpublic void bootMsg(Channel channel, Message message){String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" direct 消费者:'" + message1 + "'");//手动确认该消息try {//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException e) {log.error("执行异常",e);// 拒绝消息并重新入队channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); }}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct1-queue",durable = "true"),exchange = @Exchange(value = "direct1-exchange",type = ExchangeTypes.DIRECT,durable = "true"),key = "change1:direct"))@RabbitHandlerpublic void bootMsg1(Channel channel, Message message){String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" direct 消费者:'" + message1 + "'");//手动确认该消息try {//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException e) {log.error("执行异常",e);// 拒绝消息并重新入队channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

测试

@Slf4j
@SpringBootTest(classes = RabbitProviderApplication.class)
public class RabbitTest {@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void directProvider(){String message = "direct模式消息推送。。。。。";/*** 参数分别为,交换机,路由key,消息体*/amqpTemplate.convertAndSend("direct-exchange","change:direct",message);System.out.println(" 消息发送 :'" +message + "'");}@Testpublic void directProvider1(){String message = "direct模式消息推送1。。。。。";/*** 参数分别为,交换机,路由key,消息体*/amqpTemplate.convertAndSend("direct1-exchange","change1:direct",message);System.out.println(" 消息发送1 :'" +message + "'");}}

在这里插入图片描述

5.其他类型交换机配置

1.FanoutExchange
/*** fanout模式声明配置*/
@Configuration
public class RabbitFanoutConfig {public static final String EXCHANGE_NAME="fanout-exchange";public static final String QUEUE_NAME1="fanout-queue1";public static final String QUEUE_NAME2="fanout-queue2";/*** 声明交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*/@Beanpublic Queue fanoutQueue1() {return new Queue(QUEUE_NAME1,false,false,false,null);}@Beanpublic Queue fanoutQueue2() {return new Queue(QUEUE_NAME2,false,false,false,null);}/*** 交换机队列绑定*/@Beanpublic Binding springExchangeBindQueue1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 交换机队列绑定*/@Beanpublic Binding springExchangeBindQueue2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}}

监听

    @RabbitListener(queues = "fanout-queue1")public void fanoutMsg1(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" fanout-queue1 消费者:'" + message1 + "'");}@RabbitListener(queues = "fanout-queue2")public void fanoutMsg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" fanout-queue2 消费者:'" + message1 + "'");}

测试

    @Testpublic void fanoutProvider(){String message = "fanout模式消息推送。。。。。";amqpTemplate.convertAndSend("fanout-exchange", "",message);System.out.println(" 消息发送 :'" +message + "'");}

在这里插入图片描述

2.TopicExchange
/*** topic模式声明配置*/
@Configuration
public class RabbitTopicConfig {public static final String EXCHANGE_NAME="topic-exchange";public static final String QUEUE_NAME="topic-queue";public static final String BINDING_KEY="*.orange.#";/*** 声明交换机*/@Beanpublic TopicExchange topicExchange() {return new TopicExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*/@Beanpublic Queue topicQueue() {return new Queue(QUEUE_NAME,false,false,false,null);}/*** 交换机队列绑定*/@Beanpublic Binding topicExchangeBindQueue() {return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(BINDING_KEY);}}
    @RabbitListener(queues = "topic-queue")public void topicMsg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" topic-queue2 消费者:'" + message1 + "'");}

测试

    @Testpublic void topicProvider(){String message1 = "topic test模式消息推送。。。。。";String message2 = "topic test.aaa模式消息推送。。。。。";amqpTemplate.convertAndSend("topic-exchange", "com.orange.test",message1);amqpTemplate.convertAndSend("topic-exchange", "com.orange.test.aaa",message2);System.out.println(" 消息发送");}

在这里插入图片描述

3.HeadersExchange
/*** headers模式声明配置* 与路由key无关,只需要消息的头参数匹配即可* x-match参数代表是全部匹配还是部分匹配*/
@Configuration
public class RabbitHeadersConfig {public static final String EXCHANGE_NAME="headers-exchange";public static final String QUEUE_NAME="headers-queue";public static final String QUEUE_NAME1="headers-queue1";/*** 声明交换机*/@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange(EXCHANGE_NAME,false,false,null);}/*** 声明队列*/@Beanpublic Queue headersQueue() {return new Queue(QUEUE_NAME,false,false,false,null);}@Beanpublic Queue headersQueue2() {return new Queue(QUEUE_NAME1,false,false,false,null);}/*** 交换机队列绑定(任意匹配)* whereAny 等同于x-match = any*/@Beanpublic Binding headersExchangeBindSpringQueue() {HashMap<String, Object> header = new HashMap<>();header.put("test", "111");header.put("test1", "222");return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAny(header).match();}/*** 交换机队列绑定(全部匹配)* whereAny 等同于x-match = all*/@Beanpublic Binding headersExchangeBindSpringQueue1() {HashMap<String, Object> header = new HashMap<>();header.put("test", "111");header.put("test1", "222");return BindingBuilder.bind(headersQueue2()).to(headersExchange()).whereAll(header).match();}}

发送测试

  @Testpublic void headerProvider(){String param = "headers 模式消息推送。。。。。";MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");messageProperties.setHeader("test","111");Message message = new Message(param.getBytes(), messageProperties);amqpTemplate.convertAndSend("headers-exchange", null,message);System.out.println(" 消息发送");}

在这里插入图片描述

队列queue任意匹配有数据,queue1全部匹配无数据

headers-queue

在这里插入图片描述

headers-queue1

在这里插入图片描述

消息监听

    @RabbitListener(queues = "headers-queue")public void headersMsg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" headers-queue 消费者:'" + message1 + "'");}@RabbitListener(queues = "headers-queue1")public void headers1Msg2(Channel channel, Message message) {String message1 = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(" headers-queue1 消费者:'" + message1 + "'");}

在这里插入图片描述

6.延迟消息处理(TTL)

  • 第一种是使用普通队列和死信队列来模拟实现延迟的效果。将消息放入一个没有被监听的队列上,设置TTL(一条消息的最大存活时间)为延迟的时间,时间到了没有被消费,直接成为死信,进入死信队列。后监听私信队列来消息消费

  • 第二种是使用rabbitmq官方提供的delayed插件来真正实现延迟队列。

方式一:ttl配置

超时自动删除

/*** rabbitmq的ttl延迟过期时间配置*/
@Configuration
public class RabbitMQTTLConfig {/*** 声明交换机* @return*/@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl-direct-exchange");}/*** 声明队列* @return*/@Beanpublic Queue ttlQueue(){//设置参数Map<String,Object> args = new HashMap<>();//设置ttl过期时间,需设置int值args.put("x-message-ttl",5000);return new Queue("ttl-direct-queue",true,false,false,args);}/*** 绑定队列* @return*/@Beanpublic Binding ttlBingQueue(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("direct:ttl:key");}}

测试

    @Testpublic void ttlSendMessageTest(){String exchange = "ttl-direct-exchange";String routingKey = "direct:ttl:key";String msg = UUID.randomUUID().toString();//发送并设置amqpTemplate.convertAndSend(exchange,routingKey,msg);System.out.println("消息发送成功====="+msg);}

在这里插入图片描述

方式二:消息发送设置

注释掉x-message-ttl参数,使用普通队列,发送消息时设置过期时间

    @Testpublic void ttlSendMessageTest(){String exchange = "ttl-direct-exchange";String routingKey = "direct:ttl:key";String msg = UUID.randomUUID().toString();//设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}};//发送并设置amqpTemplate.convertAndSend(exchange,routingKey,msg,messagePostProcessor);System.out.println("消息发送成功====="+msg);}

在这里插入图片描述

注意:如果项目中即使用了ttl配置过期时间,有设置了消息过期时间,则执行时以最小的时间为准,ttl过期队列的消息过期会写到死信,而设置方式的普通队列则不会自动写到死信队列

7.死信队列

死信的情况:消息被拒绝,消息过期,队列达到最大长度

死信队列声明

@Configuration
public class RabbitMQDLXConfig {/*** 声明死信交换机* @return*/@Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange("dlx-direct-exchange");}/*** 声明死信队列* @return*/@Beanpublic Queue dlxQueue(){ ;return new Queue("dlx-direct-queue",true);}/*** 绑定队列* @return*/@Beanpublic Binding dlxBingQueue(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with("direct:dlx:key");}}

过期推送到死信设置

   /*** 声明ttl队列* @return*/@Beanpublic Queue ttlQueue(){//设置参数Map<String,Object> args = new HashMap<>();//设置ttl过期时间,需设置int值args.put("x-message-ttl",5000);args.put("x-max-length",5);//最大长度//消息过期死信队列入队配置args.put("x-dead-letter-exchange","dlx-direct-exchange");//设置死信交换机args.put("x-dead-letter-routing-key","direct:dlx:key");//死信路由key,fanout模式不需要设置路由keyreturn new Queue("ttl-direct-queue",true,false,false,args);}

注意:队列参数修改后,不会重新创建覆盖而是会报错,需要手动删除重新创建,生产环境中则可以通过重新创建一个队列,进行转移

测试

在这里插入图片描述

消息过期进死信队列
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/444270.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

继承--C++

文章目录 一、继承的概念及定义1、继承的概念 二、继承定义1、定义格式2、继承基类成员访问方式的变化3、继承类模板 三、基类和派生类间的转换1、继承中的作用域2、隐藏规则&#xff1a; 四、派生类的默认成员函数1、4个常见默认成员函数2、实现⼀个不能被继承的类 五、继承与…

Android15之解决:Dex checksum does not match for dex:services.jar问题(二百三十五)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

【拥抱AIGC】应该如何衡量AI辅助编程带来的收益

本文主要介绍了如何度量研发效能&#xff0c;以及AI辅助编程是如何影响效能的&#xff0c;进而阐述如何衡量AI辅助编程带来的收益。 理解度量&#xff1a;有效区分度量指标 为了帮助研发团队更好地理解和度量研发效能&#xff0c;可以将指标分为三类&#xff1a;能力和行为指…

【含文档】基于Springboot+Vue的母婴全程服务管理系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定…

vue3中 a-table设置某一个单元格的背景颜色

需求&#xff1a;根据某一个单元格中的某个条件不同&#xff0c;设置动态的颜色&#xff1b; 思路&#xff1a;通过官方文档提供的customCell进行判断设置不同的颜色背景&#xff0c;案例中进行了简单的行列判断&#xff0c;同学们可以根据自己的需求修改判断条件&#xff0c;动…

SSH 公钥认证:从gitlab clone项目repo到本地

这篇文章的分割线以下文字内容由 ChatGPT 生成&#xff08;我稍微做了一些文字上的调整和截图的补充&#xff09;&#xff0c;我review并实践后觉得内容没有什么问题&#xff0c;由此和大家分享。 假如你想通过 git clone git10.12.5.19:your_project.git 命令将 git 服务器上…

建筑工程系列中级职称申报有什么要求?

一、学历资历条件 1.理工类或建筑工程相关专业博士研究生毕业后&#xff0c;从事本专业技术工作&#xff0c;当年内经考核评审确认&#xff1b; 2.理工类或建筑工程相关专业硕士研究生毕业或取得双学士学位后&#xff0c;从事本专业技术工作 3 年以上&#xff0c;取得并被聘任…

【大模型理论篇】精简循环序列模型(minGRU/minLSTM)性能堪比Transformer以及对循环神经网络的回顾

1. 语言模型之精简RNN结构 近期关注到&#xff0c;Yoshua Bengio发布了一篇论文《Were RNNs All We Needed?》&#xff0c;提出简化版RNN&#xff08;minLSTM和minGRU&#xff09;。该工作的初始缘由&#xff1a;Transformer 在序列长度方面的扩展性限制重新引发了对可在训练期…

Vue包的安装使用

文章目录 vue介绍一、灵活易用1.渐进式框架2.简洁的语法 二、高效的响应式系统1.数据驱动2.响应式原理 三、强大的组件化开发1.组件化思想2.组件通信 四、丰富的生态系统1.插件和库2.社区支持 安装依赖删除新增文件夹components设置(1)home.vue(2)data.vue(3)zero.vue router配…

简单的maven nexus私服学习

简单的maven nexus私服学习 1.需求 我们现在使用的maven私服是之前同事搭建的&#xff0c;是在公司的一台windows电脑上面&#xff0c;如果出问题会比较难搞&#xff0c;所以现在想将私服迁移到我们公司的测试服务器上&#xff0c;此处简单了解一下私服的一些配置记录一下&am…

Visual Studio 2022安装(含重生版)

前言&#xff1a; 昨天调试代码的时候发现程序怎么都运行不了&#xff0c;错误显示无法找到文件啊啊啊&#xff0c;能力有限&#xff0c;找不出错误源&#xff0c;然后就狠心删掉所有相关文件来“重新开始”&#xff01; 正文&#xff1a; 1.官网下载&#xff08;内定中文版…

Java | Leetcode Java题解之第470题用Rand7()实现Rand10()

题目&#xff1a; 题解&#xff1a; class Solution extends SolBase {public int rand10() {int a, b, idx;while (true) {a rand7();b rand7();idx b (a - 1) * 7;if (idx < 40) {return 1 (idx - 1) % 10;}a idx - 40;b rand7();// get uniform dist from 1 - 63…

中标麒麟操作系统:如何查看系统激活状态

中标麒麟操作系统&#xff1a;如何查看系统激活状态 1、图形界面查看方法方法一&#xff1a;任务栏查看方法二&#xff1a;通过“我的电脑”属性查看 2、命令行查看方法 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 本文将介绍两种查看系…

java 的三种IO模型(BIO、NIO、AIO)

java 的三种IO模型&#xff08;BIO、NIO、AIO&#xff09; 一、BIO 阻塞式 IO&#xff08;Blocking IO&#xff09;1.1、BIO 工作机制1.2、BIO 实现单发单收1.3、BIO 实现多发多收1.4、BIO 实现客户端服务端多对一1.5、BIO 模式下的端口转发思想 二、NIO 同步非阻塞式 IO&#…

新款平行进口奔驰GLS450升级原厂AR实景导航人机交互行车记录仪等功能

平行进口的24款奔驰GLS450升级原厂中规导航主机通常具备以下功能&#xff1a; 人机交互系统&#xff1a;该导航主机配备了人机交互系统&#xff0c;可以通过触摸屏、旋钮或语音控制等方式与导航系统进行交互&#xff0c;方便驾驶者进行导航设置和操作。 实景AR导航&#xff1…

如何利用wsl-Ubuntu里conda用来给Windows的PyCharm开发

前提&#xff1a;咱们在wsl-Ubuntu上&#xff0c;有conda的虚拟环境 咱们直接打开PyCharm,打开Settings 更换Python Interpreter即可 当然一开始可能没有下面的选项&#xff0c;需要我们点击右边的Add Interpreter 这里选择wsl 点击next 将这两步进行修改 可以看出来&#xff0…

算法: 前缀和题目练习

文章目录 前缀和题目练习前缀和二维前缀和寻找数组的中心下标除自身以外数组的乘积和为 K 的子数组和可被 K 整除的子数组连续数组矩阵区域和 前缀和题目练习 前缀和 自己写出来了~ 坑: 数据太大,要用long. import java.util.Scanner;public class Main {public static voi…

【AIGC】寻找ChatGPT最佳推理步骤:CoT思维链技术的探索与应用

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;CoT思维链概述&#x1f4af;CoT思维链在大型语言模型中的应用&#x1f4af;CoT思维链改变对模型推理能力的理解和改进方式多样化应用场景挑战与未来发展总结 &#x1f4a…

网关在不同行业自动化生产线的应用

网关在不同行业自动化生产线的应用&#xff0c;展示了其作为信息与物理世界交汇点的广泛影响力&#xff0c;尤其在推动行业智能化、自动化方面发挥了不可估量的作用。以下是网关技术在污水处理、智慧农业、智慧工厂、电力改造及自动化控制等领域的深入应用剖析。 1. 污水处理 …

初级网络工程师之从入门到入狱(五)

本文是我在学习过程中记录学习的点点滴滴&#xff0c;目的是为了学完之后巩固一下顺便也和大家分享一下&#xff0c;日后忘记了也可以方便快速的复习。 网络工程师从入门到入狱 前言一、链路聚合1.1、手动进行链路聚合1.1.1、 拓扑图&#xff1a;1.1.2、 LSW11.1.3、 LSW2 1.2、…