微服务之SpringAMQP详解

目录

前言

1. 概述

2. Basic Queue简单队列模型

2.1 消息发送

2.2 消息接收

2.3 总结

3. WorkQueue模型

3.1 消息发送

3.2 消息接收

3.3 测试

3.4 消费预取限制

3.5 总结

4. 发布、订阅

5. Fanout

5.1 声明队列和交换机

 5.2 消息发送

5.3 消息接收

5.4 测试

5.5 总结

6. Direct

6.1 基于注解声明队列和交换机

6.2 发送消息

6.3 测试

6.4  总结

7. Topic

7.1 消息发送

7.2 消息接收

7.3 测试

7.4 总结

8. 交换机

8.1 配置JSON转换器

8.2 总结


前言

RabbitMQ帮助我们实现异步处理消息,大大减少了系统的压力,但是利用官方的API来实现RabbitMQ的功能实在是太麻烦了。为了简化发送和接收的API,就出现了SpringAMQP。

1. 概述

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用是来非常方便。

SpringAMQP的官方地址:Spring AMQP。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

2. Basic Queue简单队列模型

在父工程mq-demo中引入依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.1 消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:rabbitmq:host: MQ的IP地址 # RabbitMQ的IP地址port: 5672 # RabbitMQ的通信端口username: lyf # RabbitMQ的用户名password: 123456 # RabbitMQ的密码virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看

 然后在publisher服务中编写测试类SpringAMQPTest,并利用RabbitTemplate实现消息发送:

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello,spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

然后运行这个测试类成功之后,去RabbitMQ的管理平台可以看到:

点击进去可以看到,具体的MQ信息:

2.2 消息接收

 首先配置MQ地址,在consumer服务的application.yml文件中添加配置:

spring:rabbitmq:host: MQ的IP地址 # RabbitMQ的IP地址port: 5672 # RabbitMQ的通信端口username: lyf # RabbitMQ的用户名password: 123456 # RabbitMQ的密码virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看

然后在consumer服务的cn.itcast.mq.listener包中添加SpringRabbitListener类:

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void  listenerSimpleQueueMessage(String msg){System.out.println("接收到消息:【" + msg + "】");}}

找到consumer服务的启动类启动服务,然后在consumer服务的控制台可以看到:

这就是consumer接收到了刚刚publisher发送出去的消息。我们再去RabbitMQ的管理平台看看:

这里的队列显示未接收的消息为0了。

2.3 总结

什么是AMQP?

  • 应用消息通信的一种协议,与语言和平台无关。

SpringAMQP如何发送消息?

  • 引入amqp的starter依赖
  • 配置RabbitMQ地址
  • 利用RabbitTemplate的converAndSend方法

SpringAMQP如何接收消息:

  • 引入amqp的starter依赖
  • 配置RabbitMQ的地址
  • 定义类,添加@Component注解
  • 类中声明方法,添加@RabbitListener注解,方法参数就是消息

注意:消息一旦被消费就会从队列中删除,RabbitMQ没有消息回溯功能。

3. WorkQueue模型

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work模型,多个消费者共同处理消息处理,速度就能大大提高了。

现在我们模拟一下WorkQueue,实现一个队列绑定多个消费者。

基本实现思路:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

那么理论上来说,两个消费者加起来是不是就已经超过了50条消息,我们来实践一下。

3.1 消息发送

在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue:

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {@Autowiredprivate RabbitTemplate rabbitTemplate;//    @Test
//    public void testSendMessage2SimpleQueue() {
//        // 队列名称
//        String queueName = "simple.queue";
//        // 消息
//        String message = "hello,spring amqp!";
//        // 发送消息
//        rabbitTemplate.convertAndSend(queueName, message);
//    }/** 测试work模式* */@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello,message____";for (int i = 1; i <= 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20); // 一秒发送50条消息}}
}

3.2 消息接收

 在consumer服务中定义两个消息监听者,都监听simple.queue队列:

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component
public class SpringRabbitListener {//    @RabbitListener(queues = "simple.queue")
//    public void  listenerSimpleQueueMessage(String msg){
//        System.out.println("接收到消息:【" + msg + "】");
//    }@RabbitListener(queues = "simple.queue")public void  listenerWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalDateTime.now()); // 看看消费者接收消息的时间Thread.sleep(20); // 消费者1每秒处理50条消息}@RabbitListener(queues = "simple.queue")public void  listenerWorkQueueMessage2(String msg) throws InterruptedException {System.err.println("消费者2接收到消息:........【" + msg + "】" + LocalDateTime.now()); // 看看消费者接收消息的时间,打印方式跟消费者1区分开Thread.sleep(100); // 消费者2每秒处理10条消息}}

3.3 测试

启动ConsumerApplication后,将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。结果如下:

消费者1接收到消息:【hello,message____1】2024-08-04T15:21:37.238949
消费者1接收到消息:【hello,message____3】2024-08-04T15:21:37.337949300
消费者2接收到消息:........【hello,message____2】2024-08-04T15:21:37.339449800
消费者1接收到消息:【hello,message____5】2024-08-04T15:21:37.364449800
消费者1接收到消息:【hello,message____7】2024-08-04T15:21:37.416949800
消费者2接收到消息:........【hello,message____4】2024-08-04T15:21:37.443450200
消费者1接收到消息:【hello,message____9】2024-08-04T15:21:37.476948600
消费者1接收到消息:【hello,message____11】2024-08-04T15:21:37.537449800
消费者2接收到消息:........【hello,message____6】2024-08-04T15:21:37.549951
消费者1接收到消息:【hello,message____13】2024-08-04T15:21:37.600449200
消费者2接收到消息:........【hello,message____8】2024-08-04T15:21:37.659454600
消费者1接收到消息:【hello,message____15】2024-08-04T15:21:37.661949300
消费者1接收到消息:【hello,message____17】2024-08-04T15:21:37.726449800
消费者2接收到消息:........【hello,message____10】2024-08-04T15:21:37.767950
消费者1接收到消息:【hello,message____19】2024-08-04T15:21:37.785449500
消费者1接收到消息:【hello,message____21】2024-08-04T15:21:37.847451600
消费者2接收到消息:........【hello,message____12】2024-08-04T15:21:37.875949700
消费者1接收到消息:【hello,message____23】2024-08-04T15:21:37.909949400
消费者1接收到消息:【hello,message____25】2024-08-04T15:21:37.971451200
消费者2接收到消息:........【hello,message____14】2024-08-04T15:21:37.984450600
消费者1接收到消息:【hello,message____27】2024-08-04T15:21:38.033449900
消费者2接收到消息:........【hello,message____16】2024-08-04T15:21:38.093949600
消费者1接收到消息:【hello,message____29】2024-08-04T15:21:38.096950100
消费者1接收到消息:【hello,message____31】2024-08-04T15:21:38.159450600
消费者2接收到消息:........【hello,message____18】2024-08-04T15:21:38.201450
消费者1接收到消息:【hello,message____33】2024-08-04T15:21:38.225449200
消费者1接收到消息:【hello,message____35】2024-08-04T15:21:38.281950200
消费者2接收到消息:........【hello,message____20】2024-08-04T15:21:38.310451200
消费者1接收到消息:【hello,message____37】2024-08-04T15:21:38.344949700
消费者1接收到消息:【hello,message____39】2024-08-04T15:21:38.405949800
消费者2接收到消息:........【hello,message____22】2024-08-04T15:21:38.417951100
消费者1接收到消息:【hello,message____41】2024-08-04T15:21:38.473950300
消费者2接收到消息:........【hello,message____24】2024-08-04T15:21:38.530450600
消费者1接收到消息:【hello,message____43】2024-08-04T15:21:38.537450300
消费者1接收到消息:【hello,message____45】2024-08-04T15:21:38.592950300
消费者2接收到消息:........【hello,message____26】2024-08-04T15:21:38.634752300
消费者1接收到消息:【hello,message____47】2024-08-04T15:21:38.652981
消费者1接收到消息:【hello,message____49】2024-08-04T15:21:38.714585
消费者2接收到消息:........【hello,message____28】2024-08-04T15:21:38.745860
消费者2接收到消息:........【hello,message____30】2024-08-04T15:21:38.964359800
消费者2接收到消息:........【hello,message____32】2024-08-04T15:21:39.083360200
消费者2接收到消息:........【hello,message____34】2024-08-04T15:21:39.194360800
消费者2接收到消息:........【hello,message____36】2024-08-04T15:21:39.299858200
消费者2接收到消息:........【hello,message____38】2024-08-04T15:21:39.409246400
消费者2接收到消息:........【hello,message____40】2024-08-04T15:21:39.517245300
消费者2接收到消息:........【hello,message____42】2024-08-04T15:21:39.627854900
消费者2接收到消息:........【hello,message____44】2024-08-04T15:21:39.750355100
消费者2接收到消息:........【hello,message____46】2024-08-04T15:21:39.866354800
消费者2接收到消息:........【hello,message____48】2024-08-04T15:21:39.985855400
消费者2接收到消息:........【hello,message____50】2024-08-04T15:21:40.089854600

我们可以看到,消费者第一次接收消息时间是2024-08-04T15:21:37.238949,而最后一次处理消息的时间是2024-08-04T15:21:40.089854600,这之间相差了3秒左右,跟我们预想的结果相差的有点大。我们再来看一下输出的结果,发现消费者1处理的是奇数的消息,而消费者2处理的是偶数的消息。可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

3.4 消费预取限制

俗话说的好,没有那个金刚钻就别揽那个瓷器活。要解决上面的问题,也很简单。我们修改consumer服务中的aoolication.yml文件,添加一个prefetch的配置:

spring:rabbitmq:host: 主机IP # RabbitMQ的IP地址port: 5672 # RabbitMQ的通信端口username: lyf # RabbitMQ的用户名password: 123456 # RabbitMQ的密码virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

我们重启ConsumerApplication后,将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。结果如下:

消费者2接收到消息:........【hello,message____2】2024-08-04T15:23:57.373878700
消费者1接收到消息:【hello,message____1】2024-08-04T15:23:57.373878700
消费者1接收到消息:【hello,message____3】2024-08-04T15:23:57.416879100
消费者1接收到消息:【hello,message____4】2024-08-04T15:23:57.445379900
消费者1接收到消息:【hello,message____5】2024-08-04T15:23:57.475857900
消费者2接收到消息:........【hello,message____6】2024-08-04T15:23:57.490857100
消费者1接收到消息:【hello,message____7】2024-08-04T15:23:57.521357400
消费者1接收到消息:【hello,message____8】2024-08-04T15:23:57.552356900
消费者1接收到消息:【hello,message____9】2024-08-04T15:23:57.583357400
消费者1接收到消息:【hello,message____10】2024-08-04T15:23:57.614357400
消费者2接收到消息:........【hello,message____11】2024-08-04T15:23:57.645857400
消费者1接收到消息:【hello,message____12】2024-08-04T15:23:57.677357600
消费者1接收到消息:【hello,message____13】2024-08-04T15:23:57.707357500
消费者1接收到消息:【hello,message____14】2024-08-04T15:23:57.740858700
消费者1接收到消息:【hello,message____15】2024-08-04T15:23:57.770358100
消费者2接收到消息:........【hello,message____16】2024-08-04T15:23:57.802857
消费者1接收到消息:【hello,message____17】2024-08-04T15:23:57.831358
消费者1接收到消息:【hello,message____18】2024-08-04T15:23:57.862357600
消费者1接收到消息:【hello,message____19】2024-08-04T15:23:57.894357300
消费者1接收到消息:【hello,message____20】2024-08-04T15:23:57.925857300
消费者2接收到消息:........【hello,message____21】2024-08-04T15:23:57.955857800
消费者1接收到消息:【hello,message____22】2024-08-04T15:23:57.986357200
消费者1接收到消息:【hello,message____23】2024-08-04T15:23:58.017862
消费者1接收到消息:【hello,message____24】2024-08-04T15:23:58.048857600
消费者1接收到消息:【hello,message____25】2024-08-04T15:23:58.080357600
消费者2接收到消息:........【hello,message____26】2024-08-04T15:23:58.112358100
消费者1接收到消息:【hello,message____27】2024-08-04T15:23:58.141858100
消费者1接收到消息:【hello,message____28】2024-08-04T15:23:58.173358600
消费者1接收到消息:【hello,message____29】2024-08-04T15:23:58.203357100
消费者1接收到消息:【hello,message____30】2024-08-04T15:23:58.237363900
消费者2接收到消息:........【hello,message____31】2024-08-04T15:23:58.265856700
消费者1接收到消息:【hello,message____32】2024-08-04T15:23:58.298358200
消费者1接收到消息:【hello,message____33】2024-08-04T15:23:58.328857400
消费者1接收到消息:【hello,message____34】2024-08-04T15:23:58.387857500
消费者1接收到消息:【hello,message____35】2024-08-04T15:23:58.420358
消费者2接收到消息:........【hello,message____36】2024-08-04T15:23:58.451858400
消费者1接收到消息:【hello,message____37】2024-08-04T15:23:58.482857900
消费者1接收到消息:【hello,message____38】2024-08-04T15:23:58.514357600
消费者1接收到消息:【hello,message____39】2024-08-04T15:23:58.545358700
消费者1接收到消息:【hello,message____40】2024-08-04T15:23:58.576358500
消费者2接收到消息:........【hello,message____41】2024-08-04T15:23:58.606358400
消费者1接收到消息:【hello,message____42】2024-08-04T15:23:58.638357700
消费者1接收到消息:【hello,message____43】2024-08-04T15:23:58.668857600
消费者1接收到消息:【hello,message____44】2024-08-04T15:23:58.699357800
消费者1接收到消息:【hello,message____45】2024-08-04T15:23:58.730357300
消费者2接收到消息:........【hello,message____46】2024-08-04T15:23:58.763358600
消费者1接收到消息:【hello,message____47】2024-08-04T15:23:58.793357500
消费者1接收到消息:【hello,message____48】2024-08-04T15:23:58.823858100
消费者1接收到消息:【hello,message____49】2024-08-04T15:23:58.854859200
消费者1接收到消息:【hello,message____50】2024-08-04T15:23:58.886358300

 消费者第一次接收消息是2024-08-04T15:23:57.373878700,最后一次接收消息是2024-08-04T15:23:58.886358300的确跟我们之前预想的耗时一秒一样了。

3.5 总结

WorkQueue模型的使用:

  • 多个消费者绑定到一个队列,同一个消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

4. 发布、订阅

发布订阅的模型如图:

可以看到,再订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再是发送到队列中,而是发给exchange(交换机)
  • Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3个类型:
  1. Fanout:广播,将消息交给所有绑定到交换机的队列
  2. Direct:定向,把消息交给符合指定routing key的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

5. Fanout

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  1. 可以有多个队列
  2. 每个队列都要绑定到Exchange(交换机)
  3. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  4. 交换机把消息发送给绑定过的所有队列
  5. 订阅队列的消费者都能拿到消息

现在我们来利用SpringAMQP演示FanoutExchange的使用。

实现思路如下:

  1. 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
  2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  3. 在publisher中编写测试方法,向itcast.fanout发送消息

5.1 声明队列和交换机

SpringAMQP提供了一个接口Exchange,来表示所有不同类型的交换机:

 在consumer服务中,创建一个类,声明队列、交换机,并将两者绑定:

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {// 声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout"); // 创建名为itcast.fanout的交换机}// 声明队列-fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1"); // 创建名为fanout.queue1的队列}// 将交换机与队列1绑定@Beanpublic Binding bindingQueue1(FanoutExchange fanoutExchange,Queue fanoutQueue1){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明队列-fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2"); // 创建名为fanout.queue2的队列}// 将交换机与队列2绑定@Beanpublic Binding bindingQueue2(FanoutExchange fanoutExchange,Queue fanoutQueue2){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

 5.2 消息发送

在publisher服务中的SpringAmqpTest类中添加测试方法:

    // 测试fanoutExchange模式@Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello,everyone!";// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息rabbitTemplate.convertAndSend(exchangeName,"",message);}

5.3 消息接收

 在consumer服务中的SpringRabbitListener中添加两个方法,作为消费者:

    @RabbitListener(queues = "fanout.queue1")public void  listenerFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void  listenerFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}

5.4 测试

启动ConsumerApplication后,查看RabbitMQ的管理平台可以看到:

然后点击进去查看,就可以看到我们定义的两个队列:

将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testFanoutExchange。结果如下:

两个队列都接收到了消息。

5.5 总结

Fanout交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Bingding

6. Direct

在Fanout模式中,一条消息,会被所有订阅的队列消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange:

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致才会收到消息

现在我们来利用SpringAMQP演示FanoutExchange的使用。

实现思路如下:

  1. 利用@RabbitListener注解声明Exchange、Queue、RoutingKey
  2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  3. 在publisher中编写测试方法,向itcast.direct发送消息

6.1 基于注解声明队列和交换机

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解的方式来声明。这个不用背,可以根据提示来写,没有提示的按ctrl+p就有了。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void  listenerDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void  listenerDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2消息:【" + msg + "】");}

6.2 发送消息

在publisher服务的SpringAmqpTest类中添加测试方法:

    @Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello,blue!";// 发送消息,参数分别是:交互机名称、RoutingKey、消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);}

6.3 测试

启动ConsumerApplication后,在RabbitMQ的管理平台可以看到:

点击进去可以看到创建的队列:

将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testDirectExchange。结果如下:

因为发送消息的RoutingKey写的是blue,所以只有消费者1能收到消息,接下来,把发送消息的RoutingKey改成yellow:

清空控制台日志,再来看看:

因为发送消息的RoutingKey写的是yellow,所以只有消费者2能收到消息,接下来,把发送消息的RoutingKey改成red: 

清空控制台日志,再来看看:

6.4  总结

描述一下Direct交换机与Fanout交换机的差异:

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解:

  • @Queue
  • @Exchange

7. Topic

Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型交换机可以让队列在绑定RoutingKey的时候使用通配符。

RoutingKey一般都是有一个或多个单词组成,多个单词之间以"."分割,例如:item.insert通配符规则:
#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert或者item.spu

item.*:只能匹配item.spu

图示:

解释:

Queue1:绑定的是china.#,因此凡是以china.开头的routingKey都会被匹配到。包括china.news和china.weather

Queue4:绑定的是#.news,因此凡是以.news开头的routingKey都会被匹配到。包括china.news和japan.news

现在我们用SpringAMQP来试试Topic模型:

实现思路如下:

  1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
  3. 在publisher中编写测试方法,向itcast.topic发送消息

7.1 消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

    // 测试TopicExchange模式@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "恭喜樊振东拿到巴黎奥运会乒乓球男子单打冠军!";// 发送消息,参数分别是:交互机名称、RoutingKey、消息rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

7.2 消息接收

 在consumer服务的SpringRabbitListener中添加方法:

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void  listenerTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void  listenerTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2消息:【" + msg + "】");}

7.3 测试

启动ConsumerApplication后,在RabbitMQ的管理平台可以看到:

然后点击进去查看,就可以看到我们定义的两个队列: 

 将ConsumerApplication的日志清空,在执行publisher服务中刚刚编写的发送测试方法testTopicExchange。结果如下:

因为发送消息的RoutingKey写的是china.news,所以消费者1和消费者2都能收到消息,接下来,把发送消息的RoutingKey改成china.weather:

然后清空控制台的日志,可以看到:

因为发送消息的RoutingKey写的是china.weather,所以只有消费者1能收到消息。

7.4 总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

8. 交换机

在SpringAMQP的发送方法中,接收消息的类型都是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们做序列化为字节后发送。

首先,我们是不是要先声明一下队列?声明队列有两种方式,一种是在SpringRabbitListener基于注解声明,另一种是在FanoutCofig里面注入Bean声明。第一种方式消息一出来就被消费了,所以我们用第二种方式:

    @Beanpublic Queue objectQueue(){return new Queue("object.queue");}

然后重启ConsumerApplication,查看一下RabbitMQ的管理平台:

接下来我们来试试往这个队列里发送消息,在publisher服务中的SpringAMQPTest类中添加一个发送Map类型消息的方法: 

    @Testpublic void testObjectMessage() {// 消息Map<String,Object> msg = new HashMap<>();msg.put("name","樊振东");msg.put("age",21);// 发送消息rabbitTemplate.convertAndSend("object.queue",msg);}

执行一下,然后去RabbitMQ的管理平台上看看:

发现这里有了一条消息,点进去看一下:

这里没有显示中文,用的还是java序列化,其实就是JDK序列化,这是因为Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

8.1 配置JSON转换器

如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

在父工程的pom文件引入依赖(因为publisher和consumer都用到,所以我们的依赖加在父工程里):

        <!--消息转换器的依赖--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>

然后在publisher服务中的启动类PublisherApplication中添加一个Bean:

    @Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}

然后在RabbitMQ的管理平台中把刚刚放到object.queue队列里的消息删除:

删除之后,我们再到SpringAMQPTest类中重新发消息。发完之后再到RabbitMQ的管理平台去看:

信息已经能正常显示了。然后在ConsumerApplication也添加转换器的Bean:

    @Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}

再在consumer的消息监听SpringRabbitListener类上添加接收消息的方法:

    @RabbitListener(queues = "object.queue")public void  listenerObjectQueue(Map<String,Object> msg){System.out.println("消费者接收到object.queue消息:【" + msg + "】");}

然后重启ConsumerApplication,可以看到:

8.2 总结

 SpringAMQP中消息的序列化和反序列化是怎么实现的?

  • 利用MessageConverter实现的,默认是JDK的序列化
  • 注意发送方和接收方必须使用相同的MessageConverter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/392734.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux常用命令学习

常用apt命令. apt&#xff08;Advanced Packaging Tool&#xff09;是一个在 Debian 和 Ubuntu 中的 Shell 前端软件包管理器。 apt 命令提供了查找、安装、升级、删除某一个、一组甚至全部软件包的命令&#xff0c;而且命令简洁而又好记。 apt 命令执行需要超级管理员权限(ro…

【Java】Java泛型、集合、UML统一建模语言、final关键字

昨天在昆仑巢&#xff0c;下午练习Spring Boot的过滤器Filter。 昨天傍晚开始阅读《疯狂Java讲义(第2版)》&#xff0c;熟悉了UML建模语言、Final修饰符、List集合和泛型。 1.UML建模语言: 13种图&#xff0c;常用的包括用例图、类图、组件图、部署图、顺序图、活动图和状态机…

JVM结构、架构与生命周期总结

【1】JVM结构 不同厂商的JVM产品 &#xff1a; 厂商JVMOracle-SUNHotspotOracleJRocketIBMJ9 JVM阿里Taobao JVM HotSpot VM是目前市面上高性能虚拟机的代表作之一。它采用解释器与即时编译器并存的架构。 在今天&#xff0c;Java程序的运行性能早已脱胎换骨&#xff0c;已…

文章管理接口——里面有动态SQL编写,在分页查询里

1.实体类和表结构 2. 新增文章分类 接口文档 实现 完整代码放在校验部分 结果&#xff1a; 参数校验&#xff08;Validation自定义&#xff09; 对state的校验&#xff08;已发布|草稿&#xff09;&#xff0c;已有的注解不能满足校验需求&#xff0c;这时就需要自定义校验注解…

[Bugku] web-CTF靶场系列系列详解④!!!

平台为“山东安信安全技术有限公司”自研CTF/AWD一体化平台&#xff0c;部分赛题采用动态FLAG形式&#xff0c;避免直接抄袭答案。 平台有题库、赛事预告、工具库、Writeup库等模块。 --------------------------------- eval 开启环境&#xff1a; 进入页面发现是一道php题&…

如何用 ChatGPT 提升学术写作:15 个高效提示

在本文&#xff0c;我们详细探讨了如何利用 ChatGPT 提升学术写作的各个方面。我们帮助学术作者通过生成创意点子、构建论证结构、克服写作障碍以及格式化引用&#xff0c;从而显著提升其学术论文的质量。这 15 条提示不仅可以单独使用&#xff0c;还可作为学习的良好范例。 本…

集合基础知识及练习

import java.util.ArrayList;public class Solution {//将字符串转化为整数public static void main(String[] args) {ArrayList<String> listnew ArrayList();list.add("aaa");list.add("aaa");list.add("bbb");list.add("ccc"…

Occlusion in Augmented Reality

1.Occlusion in Augmented Reality 笔记来源&#xff1a; 1.Occlusion handling in Augmented Reality context 2.Occlusion in Augmented Reality 3.Real-Time Occlusion Handling in Augmented Reality Based on an Object Tracking Approach 4.Occlusion Matting: Realisti…

JavaWeb——CSS的使用

CSS 层叠样式表(英文全称:(cascading stle sheets)能够对网页中元素位置的排版进行像素级精确控制&#xff0c;支持几乎所有的字体字号样式&#xff0c;拥有对网页对象和模型样式编辑的能力,简单来说,CSS用来美化页面 一、CSS的引入方式: 1.行内式&#xff1a;通过元素开始标…

未授权访问漏洞系列

环境 1.此漏洞需要靶场vulhub&#xff0c;可自行前往gethub下载 2.需要虚拟机或云服务器等linux系统&#xff0c;并在此系统安装docker和docker-compose提供环境支持 3.运行docker-compose指令为docker-compose up -d即可运行当前目录下的文件 Redis未授权访问漏洞 一、进…

用于相位解包的卷积和空间四向 LSTM 联合网络

原文&#xff1a;A Joint Convolutional and Spatial Quad-Directional LSTM Network for Phase Unwrapping 作者&#xff1a;Malsha V. Perera 和 Ashwin De Silva 摘要&#xff1a; 相位展开是一个经典的病态问题&#xff0c;其目标是从包裹相位中恢复真实的相位。本文&…

RAG前沿技术/解决方案梳理

RAG前沿技术/解决方案梳理 BenchmarkRetrievalAdaptive-RAGDR-RAGRichRAGGenRT Critique/ReasoningSelf-RAGCorrective RAGSpeculative RAGPlanRAGSelf-ReasoningReSP MemorySelfmemHippoRAG Query RewriteRaFe SummaryRefiner 个人理解 对当前RAG的学术研究&#xff08;或者好…

SAP 接口PO(PI,XI)在ECC端日志记录及显示

在接口的处理中通常会需要记录日志&#xff0c;而如果是与PO(PI,XI)做的接口的话&#xff0c;可以使用事务码SXI_MONITOR – XI&#xff1a;消息监控&#xff0c;来查询日志&#xff0c;但对于一些有加密&#xff0c;或者在业务接口功能上想直接查询报文日志时&#xff0c;会在…

使用 MinIO、Langchain 和 Ray Data 构建分布式嵌入式子系统

嵌入子系统是实现检索增强生成所需的四个子系统之一。它将您的自定义语料库转换为可以搜索语义含义的向量数据库。其他子系统是用于创建自定义语料库的数据管道&#xff0c;用于查询向量数据库以向用户查询添加更多上下文的检索器&#xff0c;最后是托管大型语言模型 &#xff…

ES6中的Promise、async、await,超详细讲解!

Promise是es6引入的异步编程新解决方案&#xff0c;Promise实例和原型上有reject、resolve、all、then、catch、finally等多个方法&#xff0c;语法上promise就是一个构造函数&#xff0c;用来封装异步操作并可以获取其成功或失败的结果&#xff0c;本篇文章主要介绍了ES6中的P…

(免费领源码)java#SSM#MYSQL私家车位共享APP 51842-计算机毕业设计项目选题推荐

目 录 摘要 1 绪论 1.1 课题的研究背景 1.2研究内容与研究目标 1.3ssm框架 1.4论文结构与章节安排 2 2 私家车位共享APP系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据增加流程 2.2.2 数据修改流程 2.2.3数据删除流程 2.3 系统功能分析 2.3.1功能性分析 2…

原型图绘制技巧

针对于 Axure RP绘图软件。 1、拉辅助线 目的&#xff0c;确定画布大小尺寸从上面和左面的刻度尺上&#xff0c;点击鼠标&#xff0c;拖动&#xff0c;就可以拉出一条线。 2、画布底模设为组件 右键转换为母版&#xff0c;方便后续其他页面使用 3、按钮 按钮字体不要太大&am…

【嵌入式】STM3212864点阵屏使用SimpleGUI单色屏接口库——(2)精简字库

一 开源库简介与移植 最近一个项目需要用12864屏幕呈现一组较为复杂的菜单界面&#xff0c;本着不重复造轮子的原则找到了SimpleGUI开源库。 开源地址&#xff1a;SimpleGUI: 一个面向单色显示屏的开源GUI接口库。 SimpleGUI是一款针对单色显示屏设计的接口库。相比于传统的GUI…

SpringBoot集成阿里百炼大模型(初始demo) 原子的学习日记Day01

文章目录 概要下一章SpringBoot集成阿里百炼大模型&#xff08;多轮对话&#xff09; 原子的学习日记Day02 整体架构流程技术名词解释集成步骤1&#xff0c;选择大模型以及获取自己的api-key&#xff08;前面还有一步开通服务就没有展示啦&#xff01;&#xff09;2&#xff0c…

CSS学习 02 利用鼠标悬停制造按钮边框的渐变方向变化

效果 页面背景为深灰色&#xff0c;使用Karla字体。容器内的按钮居中显示&#xff0c;按钮有一个彩色渐变的边框。按钮的背景为黑色&#xff0c;文字为浅灰色。当鼠标悬停在按钮边框上时&#xff0c;边框的渐变方向变化&#xff0c;按钮文字变为白色&#xff0c;并且按钮内边距…