207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

目录

  • ★ 发送消息
  • ★ 创建队列的两种方式
  • 代码演示
    • 需求1:发送消息
      • 1、ContentUtil 先定义常量
      • 2、RabbitMQConfig 创建队列的两种方式之一:
        • 配置式:
          • 问题:
      • 3、MessageService 编写逻辑
      • PublishController 控制器
      • application.properties 配置属性
      • 测试:消息发送
  • ★ 接收消息
    • 代码演示:
      • 测试: 消息接收
  • ★ 定制监听器容器工厂
  • 完整代码:
    • application.properties RabbitMQ的连接等属性配置
    • ContentUtil 常量工具类
    • RabbitMQConfig 配置式创建消息队列
    • MessageService 发送消息的业务代码
    • PublishController.java 发送消息的控制层
    • MyRabbitMQListener 监听器,监听消息队列
    • pom.xml


在这里插入图片描述

★ 发送消息

- Spring Boot可以将AmqpAdmin和AmqpTemplate注入任何其他组件,接下来该组件即可通过AmqpAdmin来管理Exchange、队列和绑定,还可通过AmqpTemplate来发送消息。 - Spring Boot还会自动配置一个RabbitMessagingTemplate Bean(RabbitAutoConfiguration负责配置),如果想使用它来发送、接收消息,可使用RabbitMessagingTemplate代替上面的AmqpTemplate,两个Template的注入方式完全相同。

在这里插入图片描述

★ 创建队列的两种方式

 方式一(编程式):在程序中通过AmqpAdmin创建队列。方式二(配置式):在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。

代码演示

需求1:发送消息

1、ContentUtil 先定义常量

在这里插入图片描述

2、RabbitMQConfig 创建队列的两种方式之一:

配置式:

在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。
就是在配置类中创建一个生成消息队列的@Bean。

问题:

用 @Configuration 注解声明为配置类,但是项目启动的时候没有自动生成这个队列。
据了解是因为RabbitMQ使用了懒加载,大概是没有消费者监听这个队列,就没有创建。
但是当我写后面的代码后,这个消息队列就生成了,但是并没有消费者去监听这个队列。
这有点想不通。
不知道后面是哪里的代码让这个配置类能成功声明这个消息队列出来。
在这里插入图片描述
水落石出:
经过测试:
在下面的MessageService 这个类中,依赖注入了 AmqpAdmin 和 AmqpTemplate 这两个对象,当我们通过这两个对象去声明队列、Exchange 和绑定的时候,配置类中的创建消息队列的bean就能成功创建队列。
这张图结合下面的 MessageService 中的代码就可得知:
这是依赖注入 AmqpAdmin 和 AmqpTemplate 这两个对象的有参构造器中声明的。
在这里插入图片描述

3、MessageService 编写逻辑

声明Exchange 、 消息队列 、 Exchange和消息队列的绑定、发送消息的方法等
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

PublishController 控制器

在这里插入图片描述

application.properties 配置属性

在这里插入图片描述
在这里插入图片描述

测试:消息发送

成功生成队列
在这里插入图片描述
发送消息测试
在这里插入图片描述
在这里插入图片描述




★ 接收消息

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

 【备注】:该注解可通过queues属性指定它要监听的、已有的消息队列,它也可使用queuesToDeclare来声明队列,并监听该队列。- 如果没有显式配置监听器容器工厂(RabbitListenerContainerFactory),Spring Boot会在容器中自动配置一个SimpleRabbitListenerContainerFactory Bean作为监听器容器工厂,如果希望使用DirectRabbitListenerContainerFactory,可在application.properties文件中添加如下配置:spring.rabbitmq.listener.type=direct▲ 如果在容器中配置了MessageRecoverer或MessageConverter,它们会被自动关联到默认的监听器容器工厂。



代码演示:

创建个消息队列的监听器就可以了。

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

该注解可通过queues属性指定它要监听的、已有的消息队列
它也可使用queuesToDeclare来声明队列,并监听该队列
还可以用 bindings 进行 Exchange和queue的绑定操作。
在这里插入图片描述

测试: 消息接收

在这里插入图片描述
发送消息和监听消息
在这里插入图片描述




★ 定制监听器容器工厂

▲ 如果要定义更多的监听器容器工厂或覆盖默认的监听器容器工厂,可通过Spring Boot提供的SimpleRabbitListenerContainerFactoryConfigurer
或DirectRabbitListenerContainerFactoryConfigurer来实现,它们可对SimpleRabbitListenerContainerFactory
或DirectRabbitListenerContainerFactory进行与自动配置相同的设置。 ▲ 有了自定义的监听器容器工厂之后,可通过@RabbitListener注解的containerFactory属性来指定使用自定义的监听器容器工厂,
例如如下注解代码:@RabbitListener(queues = "myQueue1", containerFactory="myFactory")

完整代码:

application.properties RabbitMQ的连接等属性配置

# 配置连接 RabbitMQ 的基本信息------------------------------------------------------
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 下面属性可配置多个以逗号隔开的连接地址,一旦配置了该属性,host 和 port 属性就会被忽略
# spring.rabbitmq.addresses=
spring.rabbitmq.username=ljh
spring.rabbitmq.password=123456
# 连接虚拟主机
spring.rabbitmq.virtual-host=my-vhost01# 配置RabbitMQ的缓存相关信息--------------------------------------------------------
# 指定缓存 connection ,还是缓存 channel
spring.rabbitmq.cache.connection.mode=channel
# 指定可以缓存多少个 Channel
spring.rabbitmq.cache.channel.size=50
# 如果选择的缓存模式是 connection , 那么就可以配置如下属性
# spring.rabbitmq.cache.connection.size=15# 配置 和 RabbitTemplate 相关的属性--------------------------------------------------
# 指定 RabbitTemplate 发送消息失败时会重新尝试
spring.rabbitmq.template.retry.enabled=true
# RabbitTemplate 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.template.retry.initial-interval=1s
# RabbitTemplate 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.template.retry.max-attempts=5
# 设置每次尝试重新发送消息的时间间隔是一个等比数列:1s, 2s, 4s, 8s, 16s
# 第一次等1s后尝试,第二次等2s后尝试,第三次等4s后尝试重新发送消息......
spring.rabbitmq.template.retry.multiplier=2
# 指定发送消息时默认的Exchange名
spring.rabbitmq.template.exchange=""
# 指定发送消息时默认的路由key
spring.rabbitmq.template.routing-key="test"# 配置和消息监听器的容器工厂相关的属性--------------------------------------------------
# 指定监听器容器工厂的类型
spring.rabbitmq.listener.type=simple
# 指定消息的确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 指定获取消息失败时,是否重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.listener.simple.retry.max-attempts=2
# 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.listener.simple.retry.initial-interval=1s

ContentUtil 常量工具类

package cn.ljh.app.rabbitmq.util;//常量
public class ContentUtil
{//定义Exchange的常量-----fanout:扇形,就是广播类型public static final String EXCHANGE_NAME = "boot.fanout";//消息队列数组public static final String[] QUEUE_NAMES =new String[] {"queue_boot_01","queue_boot_02","queue_boot_03"};}

RabbitMQConfig 配置式创建消息队列

package cn.ljh.app.rabbitmq.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//配置式:在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列
//声明这个类为配置类
@Configuration
public class RabbitMQConfig
{//用配置式的方式在RabbitMQ中定义队列@Beanpublic Queue myQueue(){//在容器中配置一个 Queue Bean,Spring 就会为它在 RabbitMQ 中自动创建对应的 Queuereturn new Queue("queue_boot",   /* Queue 消息队列名 */true,         /* 是否是持久的消息队列 */false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */false,     /* 是否在没有消息的时候自动删除消息队列 */null       /* 额外的一些消息队列的参数 */);}
}

MessageService 发送消息的业务代码

声明Exchange 、Queue ,Exchange 绑定Queue,发送消息代码

package cn.ljh.app.rabbitmq.service;import cn.ljh.app.rabbitmq.util.ContentUtil;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.Service;//业务逻辑:声明Exchange 和 Queue 消息队列,发送消息的方法
@Service
public class MessageService
{//AmqpAdmin来管理Exchange、队列和绑定private final AmqpAdmin amqpAdmin;//AmqpTemplate来发送消息private final AmqpTemplate amqpTemplate;//通过有参构造器进行依赖注入public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate){this.amqpAdmin = amqpAdmin;this.amqpTemplate = amqpTemplate;//由于声明 Exchange 、 队列 、 绑定(Exchange绑定队列),都只需要做一次即可,因此放在此处构造器中完成即可//创建 fanout 类型的 Exchange ,使用FanoutExchange实现类FanoutExchange exchange = new FanoutExchange(ContentUtil.EXCHANGE_NAME,true,    /* Exchange是否持久化 */false, /* 是否自动删除 */null   /* 额外的参数属性 */);//声明 Exchangethis.amqpAdmin.declareExchange(exchange);//此处循环声明 Queue ,也相当于代码式创建 Queuefor (String queueName : ContentUtil.QUEUE_NAMES){Queue queue = new Queue(queueName,   /* Queue 消息队列名 */true,         /* 是否是持久的消息队列 */false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */false,     /* 是否在没有消息的时候自动删除消息队列 */null       /* 额外的一些消息队列的参数 */);//此处声明 Queue ,也相当于【代码式】创建 Queuethis.amqpAdmin.declareQueue(queue);//声明 Queue 的绑定Binding binding = new Binding(queueName,  /* 指定要分发消息目的地的名称--这里是要发送到这个消息队列里面去 */Binding.DestinationType.QUEUE, /* 分发消息目的的类型,指定要绑定 queue 还是 Exchange */ContentUtil.EXCHANGE_NAME, /* 要绑定的Exchange */"x", /* 因为绑定的Exchange类型是 fanout 扇形(广播)模式,所以路由key随便写,没啥作用 */null);//声明 Queue 的绑定amqpAdmin.declareBinding(binding);}}//发送消息的方法public void publish(String content){//发送消息amqpTemplate.convertAndSend(ContentUtil.EXCHANGE_NAME, /* 指定将消息发送到这个Exchange */"",  /* 因为Exchange是fanout 类型的(广播类型),所以写什么路由key都行,都没意义 */content /* 发送的消息体 */);}
}

PublishController.java 发送消息的控制层

package cn.ljh.app.rabbitmq.controller;import cn.ljh.app.rabbitmq.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;//发送消息
@RestController
public class PublishController
{private final MessageService messageService;//有参构造器进行依赖注入public PublishController(MessageService messageService){this.messageService = messageService;}@GetMapping("/publish/{message}")//因为{message}是一个路径参数,所以方法接收的时候需要加上注解 @PathVariablepublic String publish(@PathVariable String message){//发布消息messageService.publish(message);return "消息发布成功";}}

MyRabbitMQListener 监听器,监听消息队列

package cn.ljh.app.rabbitmq.listener;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//监听器:监听消息队列并进行消费
@Component
public class MyRabbitMQListener
{//queues 指定监听已有的哪个消费队列@RabbitListener(queues = "queue_boot_01")public void onQ1Message(String message){System.err.println("从 queue_boot_01 消息队列接收到的消息:" + message);}//queues 指定监听已有的哪个消费队列@RabbitListener(queues = "queue_boot_02")public void onQ2Message(String message){System.err.println("从 queue_boot_02 消息队列接收到的消息:" + message);}//queues 指定监听已有的哪个消费队列//还可以用 queuesToDeclare 直接声明并监听该队列,还可以用 bindings 进行Exchange和queue的绑定@RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03",durable = "true",exclusive = "false",autoDelete = "false"),admin = "amqpAdmin" /*指定声明Queue,绑定Queue所用的 amqpAdmin,不指定的话就用容器中默认的那一个 */)public void onQ3Message(String message){System.err.println("从 queue_boot_03 消息队列接收到的消息:" + message);}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.5</version></parent><groupId>cn.ljh</groupId><artifactId>rabbitmq_boot</artifactId><version>1.0.0</version><name>rabbitmq_boot</name><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- RabbitMQ 的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- web 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 开发者工具的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- lombok 依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/159813.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

同为科技(TOWE)关于风力发电雷电防护的解决方案

风能作为一种可再生清洁能源&#xff0c;是国家新能源发展战略的重要组成部分。我国风能开发潜力高达2.510GW以上&#xff0c;近年来风力发电机组逐年增加&#xff0c;截止到2022年&#xff0c;全国风电装机容量约3.5亿千瓦&#xff0c;同比增长16.6%。然而&#xff0c;由于风力…

FPGA project : flash_continue_write

本实验学习了通过spi通信协议&#xff0c;驱动flash&#xff1b;完成连续写操作。 连续写&#xff1a; 本质上还是页编程指令&#xff0c;两种连续写的方式&#xff1a; 1&#xff0c;每次只写1byte的数据。 2&#xff0c;每次写满1页数据&#xff0c;计算剩余数据够不够写…

RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 页面分析

前言 RabbitMQ作为一款常用的消息中间件&#xff0c;在微服务项目中得到大量应用&#xff0c;其本身是微服务中的重点和难点&#xff0c;有不少概念我自己的也是一知半解&#xff0c;本系列博客尝试结合实际应用场景阐述RabbitMQ的应用&#xff0c;分析其为什么使用&#xff0…

手机市场或迎复苏,芯片测试与封测供应链积极应对 | 百能云芯

低迷不振的手机供应链&#xff0c;终于迎来曙光&#xff1f;半导体封测供应链传出从10月开始&#xff0c;手机系统大厂终于开始有明显的库存回补动作&#xff0c;锁定如联发科等一线手机SoC从业者的「旧款芯片」备货。 测试厂如京元电、测试界面的雍智等接获备战指示&#xff0…

深圳寄包裹到德国

深圳&#xff0c;作为全球最发达的城市之一&#xff0c;以其高效的物流服务在全球范围内享有盛名。如果你正在寻找一种方式将包裹从深圳寄送到德国&#xff0c;那么本文将为你提供详细的步骤和建议。 第一步&#xff1a;了解国际邮寄的基本信息 首先&#xff0c;你需要了解包裹…

C++基本语法【恩培学习笔记(一)】

文章目录 1、C程序结构1.1 C程序的基本组成部分1.2 预处理指令1.3 注释1.4 main() 主函数1.5 命名空间 namespace 2、 C的变量和常量2.1 变量2.2 变量的声明2.3 变量的类型 3、C 数组和容器3.1 数组&#xff08;array&#xff09;3.2 容器&#xff08;vector&#xff09; 4、C …

多模态大模型升级:LLaVA→LLaVA-1.5,MiniGPT4→MiniGPT5

Overview LLaVA-1.5总览摘要1.引言2.背景3.LLaVA的改进4.讨论附录 LLaVA-1.5 总览 题目: Improved Baselines with Visual Instruction Tuning 机构&#xff1a;威斯康星大学麦迪逊分校&#xff0c;微软 论文: https://arxiv.org/pdf/2310.03744.pdf 代码: https://llava-vl.…

10.2手动推导linux中file, cdev, inode之间的关系

是时候可以手动推导一下linux里面基类父类和子类的关系了 代码放最后把 简单说明版 详细流程 第一步注册驱动 cdev结构体能看做是一个基类,那么链表里面都是字符设备驱动的cdev连载一起,啥串口,lcd的,通过cdev->list_head连接 那cdev结构体里有主次设备号 第一步 使用r…

探索未来:硬件架构之路

文章目录 &#x1f31f; 硬件架构&#x1f34a; 基本概念&#x1f34a; 设计原则&#x1f34a; 应用场景&#x1f34a; 结论 &#x1f4d5;我是廖志伟&#xff0c;一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO专家博主、阿里云专家博主、清华大学出版社签约作…

矿区井下智慧用电安全监测解决方案

一、背景 矿区井下作业具有复杂的环境和较高的危险性&#xff0c;对于用电安全的要求尤为严格。传统的管理模式和监测方法往往无法实时、准确地掌握井下用电情况&#xff0c;对安全隐患的排查与预防存在一定局限性。因此&#xff0c;引入智慧用电安全监测解决方案&#xff…

【LeetCode刷题(数组and排序)】:存在重复元素

给你一个整数数组 nums 。如果任一值在数组中出现 至少两次 &#xff0c;返回 true &#xff1b;如果数组中每个元素互不相同&#xff0c;返回 false 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3,1] 输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;nums [1,2…

【ARM Coresight 系列文章 9.1 -- ITM 仪器化跟踪宏单元详细介绍】

文章目录 1.1 ITM 介绍1.1.1 ITM 功能介绍1.1.2 Cortex-M ITM 的地址范围1.2 ITM 使用1.2.1 ITM 寄存器介绍1.2.2 Cortex-M7 ITM 代码示例1.2.3 Cortex-M33 ITM 代码示例1.1 ITM 介绍 在debug 调试阶段通常都是使用 printf(printk) 来进行进行 log 输出,然后定位问题。那么如…

TikTok国际版 使用特网科技Bluestacks模拟器安装方法

特网科技Bluestacks模拟器主机 桌面自带Bluestacks模拟器 TikTok国际版Bluestacks模拟器搜索tiktot 登录google应用商店-安装TikTok 安装过程可能需要3-5分钟不等-配置过低可能会导致安装失败&#xff0c;建议升级更高内存。 安装完成-打开 安装成功APP-我的游戏查看 打开国际版…

普通螺纹基本牙型尺寸及拧紧力矩.exe

一、概要 本软件功能主要是通过输入螺纹原始三角形高度P,螺栓规格(公称直径)d,材料的屈服应力σs,计算出公称应力截面积As、外螺纹小径d1、外螺纹小径d2、拧紧力矩T等参数。 开发本软件的原因主要有以下几点: 提高设计效率:通过这款软件,工程师可以快速计算螺纹的基本牙…

【Java学习之道】网络编程的基本概念

引言 这一章我们将一同进入网络编程的世界。在开始学习网络编程之前&#xff0c;我们需要先了解一些基本概念。那么&#xff0c;我们就从“什么是网络编程”这个问题开始吧。 一、网络编程的基本概念 1.1 什么是网络编程 网络编程&#xff0c;顾名思义&#xff0c;就是利用…

05_51单片机led流水线的实现

1:step创建一个新的项目并将程序烧录进入51单片机 以下是51单片机流水线代码的具体实现 #include <REGX52.H>void Delay500ms() //11.0592MHz {unsigned char i, j, k;i 4;j 129;k 119;do{do{while (--k);} while (--j);} while (--i); }void main(){while(1){P1 0…

智慧水利:山海鲸数字孪生的革新之路

一、概念 什么是港口&#xff1f; "港口"通常指的是一个水域或岸边的设施&#xff0c;用于装载、卸载、储存和处理货物、以及提供与海上、河流或湖泊交通相关的服务。港口可以包括各种类型的码头、码头设备、仓库、货物运输设施、以及各种管理和物流设施。 什么是数…

LinkedHashMap与LRU缓存

序、慢慢来才是最快的方法。 背景 LinkedHashMap 是继承于 HashMap 实现的哈希链表&#xff0c;它同时具备双向链表和散列表的特点。事实上&#xff0c;LinkedHashMap 继承了 HashMap 的主要功能&#xff0c;并通过 HashMap 预留的 Hook 点维护双向链表的逻辑。 1.缓存淘汰算法…

车辆车型识别系统python+TensorFlow+Django网页界面+算法模型

一、介绍 车辆车型识别系统。本系统使用Python作为主要开发编程语言&#xff0c;通过TensorFlow搭建算法模型网络对收集到的多种车辆车型图片数据集进行训练&#xff0c;最后得到一个识别精度较高的模型文件。并基于该模型搭建Django框架的WEB网页端可视化操作界面。实现用户上…

【Unity基础】6.动画状态机

【Unity基础】6.动画状态机 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity基础系列博客&#xff0c;所学知识来自B站阿发老师~感谢 &#xff08;一&#xff09;Animator Controller组件 &#xff08;1&#xff09;创建组件 Animator Controller组件是unity用于控制管…