Spring Boot | Spring Boot 整合 “RabbitMQ“ ( 消息中间件 ) 实现

目录:

  • Spring Boot 整合 "RabbitMQ" ( 消息中间件 )实现 :
    • 一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )
      • 1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 AmqpAdmin "定制消息发送组件"
        • (3) 查看 "RabbitMQ消息组件" 的定制效果
        • (4) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (5) "消息消费者" 接收消息
      • 1.2 基于 "配置类" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "配置类"的方式 "定制消息发送组件"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) "消息消费者" 接收消息
      • 1.3 基于 "注解" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式定制 "消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台查看 "消费者" 消费信息情况
    • 二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式
      • 2.1 基于 "注解" 的方式 ( 实现 Routing "路由模式"工作模式 )
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台 查看 "消费者" 消费信息情况
    • 三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式
      • 3.1 基于 "注解" 的方式 ( 实现 Topics"通配符模式"工作模式 )
        • (1) 创建项目,"全局配置文件" 中配置信息
        • (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
        • (3) "消息发送者" 发送消息 :
          • ① 创建 "实体类对象" ( 存储发送的"消息内容" )
          • ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
          • ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
        • (4) 控制台 查看 "消费者" 消费信息情况

Spring Boot 整合 “RabbitMQ” ( 消息中间件 )实现 :

在这里插入图片描述

作者简介 :一只大皮卡丘,计算机专业学生,正在努力学习、努力敲代码中! 让我们一起继续努力学习!

该文章参考学习教材为:
《Spring Boot企业级开发教程》 黑马程序员 / 编著
文章以课本知识点 + 代码为主线,结合自己看书学习过程中的理解和感悟 ,最终成就了该文章

文章用于本人学习使用 , 同时希望能帮助大家。
欢迎大家点赞👍 收藏⭐ 关注💖哦!!!

(侵权可联系我,进行删除,如果雷同,纯属巧合)


  • Spring Boot 可对 RabbitMQ工作模式 ( 6种工作模式 )进行了 整合,并支持 多种整合方式,包括 : 基于API 的方式 基于配置类的方式 基于注解的方式
  • 下面将 选取常用 Publish/Subscribe ( 发布订阅 )工作模式 Routing ( 路由 )工作模式Topics ( 通配符模式 ) 工作模式 3种工作模式完成在 Spring Boot 项目 中的 消息服务整合实现 ( 整合实现"消息中间件" )。

一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )

  • Spring Boot 整合 RabbitMQ 中间件实现消息服务,主要围绕3个部分的工作进行展开 :
    定制中间件消息发送者发送消息消息消费者接收消息。其中,定制中间件比较麻烦的工作,且必须预先定制
  • 下面我们以 用户注册成功同时 “发送邮件通知”发送短信通知 这一场景为例,分别使用 : 基于API 的方式 基于配置类的方式 基于注解的方式3种 方式实现 Publish/Subscribe ( 发布订阅 )工作模式 整合

1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用

  • 基于API 的方式主要讲的是使用 Spring 框架提供的 API管理类 : AmqpAdmin 定制 消息发送组件,并进行消息发送
  • 这种 定制消息发送组件方式与在 RabbitMQ 可视化界面上通过 对应面板 进行 组件操作实现基本一样,都是通过管理员的身份预先手动声明交换器队列路由键等,然后 “组装消息队列” 供应用程序调用,从而 实现消息服务。下面我们就对这种 基于 API的方式进行 讲解和演示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>--></project>
    
(2) 使用 AmqpAdmin “定制消息发送组件”
  • 在项目的测试类 : Chapter22ApplicationTests , 在该测试类先引入 AmqpAdmin 管理类 定制 Publish/Subscribe 工作模式 所需的 消息组件代码如下所示 :

    Chapter22ApplicationTests.Java ( 测试类 ) :

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}//自动注入 RabbitMQ消息中间件的API操作类: AmqpAdmin@Autowiredprivate AmqpAdmin amqpAdmin;@Testpublic void amqpAdmin() {//1.定义"fanout"类型的交换器amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));//2.定义两个"默认持久化队列" , 分别处理email 和 smsamqpAdmin.declareQueue(new Queue("fanout_queue_email")); //该队列处理"邮件信息"amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));//该队列处理"短信信息"//3.将两个队列分别与"交换器"进行绑定amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));}}
    

    上面的代码中,使用 Spring 框架提供的消息管理组件 AmqpAdmin定制了消息组件。其中定义了一个 fanout 类型交换器 : fanout_exchange ;
    还定义两个消息队列 : fanout_queue_emailfanout_queue_sms,分别用来处理邮件信息短信信息 ;最后 将定义的两个队列分别交换器绑定

(3) 查看 “RabbitMQ消息组件” 的定制效果
  • 执行上述单元测试方法 : amgpAdmin()验证 RabbitMQ 消息组件的定制效果单元测试方法执行成功,可通过 RabbitMQ 可视化管理页面具体操作为: 打开RabbitMQ安装目录中的 sbin目录,双击 rabbitmq-server.bat ,后在浏览器访问访问 : http://localhost:15672 ,输入密码和密码 : guest , 此时 可在可视化界面查询 “RabbitMQ消息组件定制效果 ,如下图所示

    在这里插入图片描述


    在这里插入图片描述

    通过上面图片看出,在 Queues 队列面板页面中,展示有定制消息队列信息与程序中 "定制"消息队列一致。可以单击消息队列名称查看每个队列详情

    通过上述操作可以发现,在管理页面中提供了 消息组件交换器队列的定制功能。在程序中使用 Spring 框架提供的管理员 API组件 AmqpAdmin 定制消息组件和在管理页面上手动定制消息组件本质是一样 的。

(4) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;/*** "消息发送者" 发送信息*/@Testpublic void psubPublisher() {User user = new User(); //消息发送者 要发送的"消息内容"user.setId(1);user.setUsername("石头");/*使用 RabbitTemplate中的convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为String exchange : 表示发送信息的"交换器"String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"Object object : 表示要"发送的信息内容"*///执行哪个"交换器" 和 指定给该"交换器"的"消息内容"rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"}}
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
  • 再次执行 psubPublisher( ) 方法,该方法 执行成功后,查看 RabbitMQ可视化管理页面 中的 Queues面板信息,具体如下图所示

    在这里插入图片描述

  • 进入 队列详情页面查看消息 , 如下图所示

    在这里插入图片描述

    上图看出,在 消息队列存储有指定发送消息详情其他参数信息,这与程序指定发送信息完全一致

(5) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类/*** Publish/Subscribe 工作模式接收、处理 "邮件业务"** @RabbitListener(queues = "fanout_queue_email") :*  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。*  */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerEmail(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("邮件业务接收到的信息为: "+str);}/*** Publish/Subscribe 工作模式接收、处理 "短信业务"*/
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerSms(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("短信业务接收到的信息为: "+str);}}
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听 的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法立即接收并消费队列消息 ( 即 注解对应方法会被调用 )。另外,在接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果使用与消息类型对应参数接收消息的话,只能够得到 具体的消息体信息如果使用 Object或者 Message 类型参数接收消息的话,还可以获得除了消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.2 基于 “配置类” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用

  • 基于 配置类方式 主要讲的是使用 Spring Boot 框架提供的 @Configuration 注解 配置类 定制消息发送组件并进行消息发送例子代码如下所示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>--></project>
    
(2) 使用 "配置类"的方式 “定制消息发送组件”
  • 使用 "配置类"的方式 “定制消息发送组件(Publish/Subscribe 工作模式) 代码如下所示 :

    RabbitMQConfig.Java ( 配置类 ) :

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"/*** 使用基于"配置类"的方式定义"消息发送组件"*///1.定义定义fanout类型的交换器@Beanpublic Exchange fanout_exchange() {//创建一个名为"fanout_exchange" 的 "交换器"return ExchangeBuilder.fanoutExchange("fanout_exchange").build();}//2.定义两个不同名称的消息队列@Beanpublic Queue fanout_queue_email() { //创建消息队列//创建一个名为 "fanout_queue_email" 的 "消息队列"return new Queue("fanout_queue_email");}@Beanpublic Queue fanout_queue_sms() { //创建消息队列//创建一个名为 "fanout_queue_sms" 的 "消息队列"return new Queue("fanout_queue_sms");}//3.将两个不同名称的"消息队列"@Beanpublic Binding bindingEmail() { //将"fanout_queue_email"消息队列 和 "fanout_exchange"交换器 进行"绑定"return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();}@Beanpublic Binding bindingSms() { //将"fanout_queue_sms"消息队列 和 "fanout_exchange"交换器 进行"绑定"return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();}}
    
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;/*** "消息发送者" 发送信息*/@Testpublic void psubPublisher() {User user = new User(); //消息发送者 要发送的"消息内容"user.setId(1);user.setUsername("石头");/*使用 RabbitTemplate中的convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为String exchange : 表示发送信息的"交换器"String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"Object object : 表示要"发送的信息内容"*///执行哪个"交换器" 和 指定给该"交换器"的"消息内容"rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"}}
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类/*** Publish/Subscribe 工作模式接收、处理 "邮件业务"** @RabbitListener(queues = "fanout_queue_email") :*  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。*  */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerEmail(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("邮件业务接收到的信息为: "+str);}/*** Publish/Subscribe 工作模式接收、处理 "短信业务"*/
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作public void psubConsumerSms(Message message) {byte[] body = message.getBody();//转化为字符串String str = new String(body);System.out.println("短信业务接收到的信息为: "+str);}}
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法会**立即接收并消费队列消息 ( 即注解对应方法会被调用 )。另外,在 接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果 使用与消息类型对应参数**接收消息的话,只能够得到 具体的消息体信息如果 使用 Object 或者 Message 类型参数 接收消息 的话,还可以获得 除了 消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.3 基于 “注解” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用

  • 基于注解的方式指的是使用 Spring框架@RabbitListener注解 定制消息发送组件 "消息消费者" 接收信息 , 当然还要结合之前的代码,才能完整实现个功能需求
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>--></project>
(2) 使用 “注解” 的方式定制 “消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import com.myh.chapter_24.domain.User;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类/*** 使用基于注解的方式实现消息服务 ( 使用"注解"的方式 ①定制消息组件 ②接收信息)* 1.1 Publish/Subscribe工作模式接收、处理"邮件业务"*/@RabbitListener(bindings = @QueueBinding(value =@Queue("fanout_queue_email"),exchange =@Exchange(value = "fanout_exchange",type = "fanout")))public void psubConsumerEmailAno(User user) {System.out.println("邮件业务接收到消息: "+user);}/**** 1.2 Publish/Subscribe工作模式接收、处理"短信业务"*/@RabbitListener(bindings = @QueueBinding(value =@Queue("fanout_queue_sms"),exchange =@Exchange(value = "fanout_exchange",type = "fanout")))public void psubConsumerSmsAno(User user) {System.out.println("短信业务接收到消息: "+user);}}
    

    上面代码中,使用 @RabbitListener 注解及 其相关属性定制了两个消息组件消费者,这两个消费者都接收实体类 User 并消费。在 @RabbitListener 注解中,bindings 属性用于创建并绑定交换器消息队列组件,需要注意的是,为了能使两个消息组件的消费者接受实体类User,需要我们在 定制交换器将交换器类型 type 设置为 fanout。另外,bindings 属性@QueueBinding 注解除了有 valuetype 属性外,还有key 属性用于定制路由键 : routingKey (当前发布订阅模式不需要)。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;/*** "消息发送者" 发送信息*/@Testpublic void psubPublisher() {User user = new User(); //消息发送者 要发送的"消息内容"user.setId(1);user.setUsername("石头");/*使用 RabbitTemplate中的convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为String exchange : 表示发送信息的"交换器"String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"Object object : 表示要"发送的信息内容"*///执行哪个"交换器" 和 指定给该"交换器"的"消息内容"rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"}}
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) 控制台查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行psubPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述

  • 至此,在 Spring Boot 中完成了 使用基于 API基于配置类基于注解3 种方式来实现 Publish/Subscribe 工作模式整合讲解 在这 3 种实现消息服务的方式中,

  • 上面讲述过三种配置方式区别
    基于 API 的方式
    相对简单、直观
    ,但容易与业务代码产生 耦合

    基于 配置类方式相对隔离 容易统一管理、符合Spring Boot 框架思想;

    基于 注解的方式 清晰明了方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用 基于配置类的方式 和 基于注解的方式定制组件 实现消息服务较为常见。使用 基于 API的方式偶尔使用,具体还需要根据实际情况进行选择

二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式

2.1 基于 “注解” 的方式 ( 实现 Routing "路由模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>--></project>
    
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类//使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"/*** 2.1路由模式消息接收、处理error级别日志信息* (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")*/@RabbitListener(bindings = @QueueBinding(value =@Queue("routing_queue_error"), exchange =@Exchange(value = "routing_exchange", type = "direct"),key = "error_routing_key"))public void routingConsumerError(String message) {System.out.println("接收到error级别日志信息: "+message);}/*** 2.2路由模式消息接收、处理info、error、warning级别日志信息* (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")*/@RabbitListener(bindings = @QueueBinding(value =@Queue("routing_queue_all"), exchange =@Exchange(value = "routing_exchange", type = "direct"),key = {"error_routing_key","info_routing_key","warning_routing_key"}))public void routingConsumerAll(String message) {System.out.println("接收到info、error、warning级别日志信息: "+message);}}

    上述代码中,在消息业务处理类 : RabbitMQService 中编写了两个用来 处理 Routing 路由模式消息消费者方法,在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制路由模式消息服务组件

    Routing路由模式下的交换器类型 : type属性direct,而且还必须 指定 key 属性
    ( 每个消息队列可以
    映射多个路由键
    但在 Spring Boot 1.X版本中@QueueBinding 中的 key 属性只接收 Spring 类型不接收 Spring [ ] 类型 )。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;//Routing工作模式消息发送端 ( "消息发布者" "发送信息" )@Testpublic void routingPublisher() {//发送信息//给指定的"交换器" "发送信息"rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send 错误信息...");}}
    

    上述代码中,通过调用 RabbitTemplateconverAndSend ( String exchange , String routingKey , Object object )方法来 "发送消息"。在 Routing 工作模式下发送消息时,必须指定路由键参数该参数要与消息队列映射路由键保持一致,否则发送的消息将会丢失

    本次示例中使用的是error_routing_key 路由键,根据定制规则,编写的两个消息消费者方法应该都可以正常接收并消费发送端发送消息

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行routingPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述


    此时,修改 routingPublisher( )方法中的消息发送参数调整发送info 级别日志信息( 注意同修改 info_routing_key 路由键 ),再次启动 routingPublisher( )方法控制台效果如下图所示

    在这里插入图片描述

    上图所示,控制台打印出使用 info_routing_key 路由键发送 info 级别日志信息,说明只有配置映射 info_routing_key 路由键消息消费者 的方法 消费了消息

三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式

3.1 基于 “注解” 的方式 ( 实现 Topics"通配符模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.myh</groupId><artifactId>chapter_22</artifactId><version>0.0.1-SNAPSHOT</version><name>chapter_22</name><description>chapter_22</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>--></project>
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;@Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类//使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"/*** 3.1使用"通配符模式"消息接收、进行"邮件业务"订阅处理*   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )*/@RabbitListener(bindings = @QueueBinding(value =@Queue("topic_queue_email"),exchange =@Exchange(value = "topic_exchange",type = "topic"),key = "info.#.email.#"))public void topicConsumerEmail(String message) {System.out.println("接收到邮件订阅需求处理信息: "+message);}/*** 3.2使用"通配符模式"消息接收、进行"短信业务"订阅处理*   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )*/@RabbitListener(bindings = @QueueBinding(value =@Queue("topic_queue_sms"),exchange =@Exchange(value = "topic_exchange",type = "topic"),key = "info.#.sms.#"))public void topicConsumerSms(String message) {System.out.println("接收到短信订阅需求处理信息: "+message);}}
    

    上述代码中,在消息业务处理类RabbitMQService 中编写了两个处理 Topics 通配符模式消息消费者方法 ( 这两个方法即创建了"消息组件",又创建了"消息消费者接收且消费信息 "),在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制了通配符模式下的 消息组件

    从上述示例可以看出Topics 通配符模式下注解使用方式与 Routing 路由模式使用基本一样,主要是将交换器类型 type 修改为了 topic,还分别使用通配符的样式指定路由键 routingKey

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"private Integer id;private String username;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +'}';}
    }
    

    针对不同的用户订阅需求,使用 RabbitTemplate 模板工具类的 convertAndSend ( Stringexchange,String routingKey,Object object )方法发送不同的消息发送消息时,必须 根据具体需求已经定制路由键通配符 设置 具体的路由键

② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {@Testvoid contextLoads() {}@Autowired//引入进行"消息中间件"管理的 RabbitTemplate组件对象private RabbitTemplate rabbitTemplate;//3.Topcis工作模式下的"消息发布者" "发送消息"@Testpublic void topicPublisher() {//(1)只发送"邮件"订阅用户信息 ---(给指定的"交换器" "发送信息")rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message...");//(2)只发送"短信"订阅用户信息rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send  sms message...");//(3)"邮件"订阅用户 和 "短信"订阅用户 都发送消息rabbitTemplate.convertAndSend("topic_exchange","info.email.sms","topics send email and sms  message...");}}
    
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"@Bean //将该类的返回值对象加入到IOC容器中public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 执行topicPublisher( ) 方法 中的 步骤(1)邮件订阅用户的消息发送控制台效果下图所示 :

    在这里插入图片描述


    在这里插入图片描述

  • topicPublisher( )方法步骤(1) 进行 注释打开步骤(2) 中只进行 短信订阅用户消息发送方法,并 再次启动该测试方法控制台效果如下图所示 : 在这里插入图片描述


在这里插入图片描述

  • 为了查看 topicPublisher( )方法步骤(3)的效果,我们需要把 步骤(2) 的代码注释步骤(3) 的代码主要进行 邮件短信订阅用户消息发送方法项目重新启动 后的效果如下图所示 :

    在这里插入图片描述


在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/325215.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis抖动问题导致延迟或者断开的处理方案

目录&#xff1a; 1、使用背景2、redis重试机制3、redis重连机制4、其他一些解决redis抖动问题方案 1、使用背景 客户反馈文件偶现打不开&#xff0c;报错现象是session not exist&#xff0c;最终定位是redis抖动导致的延迟/断开的现象&#xff0c;最终研发团方案是加入redis…

排序算法及实现(上)

稳定性的判断&#xff1a;如果两个相同大小的元素也进行了交换就是不稳定&#xff0c;否则稳定 1.直接插入排序&#xff1a; 当插入第 i 位置元素时&#xff0c;前面 0 到 i-1 位置的元素已经各自有序。 此时将i 再次从i-1到0位置依次进行比较。找到合适位置将其插入&#x…

【Python】PYQT5详细介绍

本专栏内容为&#xff1a;Python学习专栏 通过本专栏的深入学习&#xff0c;你可以了解并掌握Python。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;Python &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&#x1f69a; &#x1f3…

GitHub操作

远程库-GitHub GitHub网址 GitHub是全球最大的远程库 1. 创建远程库 2. 远程仓库操作 2.1 创建远程仓库别名 git remote -v 查看当前所有远程库地址别名 git remote add 别名 远程地址 设置远程库地址别名 案例操作 起一个别名会出现两个别名&#xff0c;是因为既可以拉取…

macOS DOSBox 汇编环境搭建

正文 一、安装DOSBox 首先前往DOSBox的官网下载并安装最新版本的DOSBox。 二、下载必备的工具包 在用户目录下新建一个文件夹&#xff0c;比如 dosbox: mkdir dosbox然后下载一些常用的工具。下载好了后&#xff0c;将这些工具解压&#xff0c;重新放在 dosbox 这个文件夹…

数据链路层(详细版)【02】

接 数据链路层&#xff08;详细版&#xff09;【01】 文章目录 四、以太网MAC层&#xff08;一&#xff09;MAC地址组成&#xff08;1&#xff09;48位MAC地址格式&#xff08;2&#xff09;单播地址 & 多播地址 & 广播地址&#xff08;3&#xff09;全球管理 & 本…

笔记2:cifar10数据集获取及pytorch批量处理

&#xff08;1&#xff09;cifar10数据集预处理 CIFAR-10是一个广泛使用的图像数据集&#xff0c;它由10个类别的共60000张32x32彩色图像组成&#xff0c;每个类别有6000张图像。 CIFAR-10官网 以下为CIFAR-10数据集data_batch_*表示训练集数据&#xff0c;test_batch表示测试…

Colibri for Mac v2.2.0 原生无损音频播放器 激活版

Colibri支持所有流行的无损和有损音频格式的完美清晰的比特完美播放&#xff0c;仅使用微小的计算能力&#xff0c;并提供干净和直观的用户体验。 Colibri在播放音乐时使用极少的计算能力。该应用程序使用最先进的Swift 3编程语言构建&#xff0c;BASS音频引擎作为机器代码捆绑…

46 udp网络程序

查询网络服务的命令 netstat -nlup n: 显示数字 a&#xff1a;显示所有 u&#xff1a;udp服务 p&#xff1a;显示pid Recv-Q收到的数量&#xff0c;本地ip和远端ip&#xff0c;00表示可以收到任何地址 网络聊天 服务端 定义一个server类&#xff0c;成员保存ip地址&#xff…

JVM 类加载机制

JVM 类加载机制分为五个部分&#xff1a;加载&#xff0c;验证&#xff0c;准备&#xff0c;解析&#xff0c;初始化&#xff0c;下面我们就分别来看一下这五个过程。 加载 加载是类加载过程中的一个阶段&#xff0c;这个阶段会在内存中生成一个代表这个类的 java.lang.class 对…

基于yolov5+streamlit目标检测演示系统设计

YOLOv5与Streamlit&#xff1a;智能目标检测可视化展示介绍 随着人工智能技术的飞速发展&#xff0c;目标检测技术已成为推动智能化社会进步的关键技术之一。在众多目标检测算法中&#xff0c;YOLOv5以其卓越的性能和实时性&#xff0c;成为了业界的佼佼者。与此同时&#xff…

UDP多播

1 、多播的概念 多播&#xff0c;也被称为组播&#xff0c;是一种网络通信模式&#xff0c;其中数据的传输和接收仅在同一组内进行。多播具有以下特点&#xff1a; 多播地址标识一组接口&#xff1a;多播使用特定的多播地址&#xff0c;该地址标识一组接收数据的接口。发送到多…

实现红黑树

目录 红黑树的概念 红黑树的节点结构定义 红黑树的插入 红黑树的验证 实现红黑树完整代码 红黑树的概念 红黑树 &#xff0c;是一种 二叉搜索树 &#xff0c;但 在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是 Red 或 Black 。 通过对 任何一条从根到叶子的…

Leetcode39.组合总和

文章目录 题目描述解题思路重复子集剪枝 代码 题目 参考题解 题目描述 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返…

【JavaSE】/*初识Java*/

目录 一、了解 Java 语言 二、Java 语言的重要性 2.1 使用程度 2.2 工作领域 三、Java 语言的特性 四、Java 的基础语法 五、可能遇到的错误 六、第一个 java 程序代码解析 七、Java 注释 八、Java 标识符 九、Java 关键字 一、了解 Java 语言 Java 是由 Sun Micr…

图论(洛谷刷题)

目录 前言&#xff1a; 题单&#xff1a; P3386 【模板】二分图最大匹配 P1525 [NOIP2010 提高组] 关押罪犯 P3385 【模板】负环 P3371 【模板】单源最短路径&#xff08;弱化版&#xff09; SPFA写法 Dij写法&#xff1a; P3385 【模板】负环 P5960 【模板】差分约束…

【iOS开发】—— 初识锁

【iOS开发】—— 初识锁 线程安全锁的种类自旋锁定义原理自旋锁缺点OSSpinLock&#xff08;自旋锁&#xff09; 互斥锁os_unfair_lockpthread_mutexNSLockNSRecusiveLockSemaphore信号量synchronized 总结两种之间的区别和联系&#xff1a; 线程安全 当一个线程访问数据的时候…

双向冒泡法,可以只求最大最小值

int BiBubbleSort(int Arr[],int n,int maxnum){int left0,rightn-1;int i;bool notDone true;int temp;if(n<2)return -1;while(left<right&&notDone){ notDone false; //设置未发生交换标志 for(ileft;i<right;i){if(Arr[i]>Arr[i1]){//swap(Arr[…

Python-VBA函数之旅-staticmethod函数

目录 一、staticmethod函数的常见应用场景 二、staticmethod函数使用注意事项 三、如何用好staticmethod函数&#xff1f; 1、staticmethod函数&#xff1a; 1-1、Python&#xff1a; 1-2、VBA&#xff1a; 2、推荐阅读&#xff1a; 个人主页&#xff1a; https://blog…