SpringAMQP
概述
使用RabbitMQ原生API在代码中设置连接MQ的参数比较繁琐,我们更希望把连接参数写在yml文件中来简化开发
SpringAMQP是基于AMQP协议定义的一套API规范,将RabbitMQ封装成一套模板用来发送和接收消息
- AMQP(Advanced Message Queuing Portocol)是用于在
应用程序之间传递业务消息
的开放标准, 该协议与语言和平台无关更符合微服务中独立性的要求 - SpringAMQP由两部分组成: spring-AMQP是基础抽象,spring-Rabbit是底层的默认实现,并且利用SpringBoot对其实现了自动装配
SpringAMQP提供的三个功能
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听模式(监听器容器),用于异步异步处理入站消息
- 封装了RabbitTemplate工具,用于发送消息(之前在Redis中我们也接触过RedisTemplate)
方法名 | 功能 |
---|---|
convertAndSend(队列名称, Object类型的消息对象) | 发送消息到队列 |
convertAndSend(交换机名称, 路由key, Object类型的消息对象) | 发送消息到交换机,交换机再根据路由规则发送到对应的队列 |
注解名 | 功能(声明交换机和队列时也可以基于配置类的方式注入到容器中) |
---|---|
@RabbitListener注解 | 声明要监听/绑定的队列 |
@Exchange | 声明交换机 |
@Queue | 声明队列 |
@QueueBinding | 声明交换机和队列的绑定关系 |
Work模型
Wrok模型的使用: 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理, 可以设置prefetch
来控制消费者预取的消息数量
Basic Queue
第一步: 在父工程mq-demo
中引入AMQP依赖,这样子模块也继承了该依赖就可以基于RabbitTemplate模板
实现消息的发送和接收
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:在publisher
服务的application.yml
中配置MQ地址,Spring已经帮我们跟MQ建立了连接,剩下只需要指定发送什么消息到哪个队列
spring:rabbitmq:host: 192.168.88.128 # 主机名port: 5672 #端口username: root # 用户名password: 123456 # 密码virtual-host: / # 虚拟主机
第三步: 在publisher服务
中编写测试类SpringAmqpTest利用RabbitTemplate发送消息到指定队列
(如果没有创建simple.queue
可以在RabbitMQ管理平台创建)
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {String queueName = "simple.queue";String message = "Hello, SpringAMQP! ";rabbitTemplate.convertAndSend(queueName, message);}
}
第四步:在consumer服务
的application.yml中同样配置MQ地址,这样Spring已经帮我们跟MQ建立了连接,剩下只需要关心要监听哪个队列以及监听到要干什么事儿
spring:rabbitmq:host: 192.168.88.128 # 主机名port: 5672 #端口username: root # 用户名password: 123456 # 密码virtual-host: / # 虚拟主机
第五步: 在consumer服务新建一个类用于编写消费逻辑,使用@Component
注解将该类声明为一个Bean,使用@RabbitListener
注解声明要监听/绑定的队列
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) {// msg参数就是接收到的消息,Spring已经帮我们二进制数据转成了字符串System.out.println("Spring 消费者接收到消息:【" + msg + "】");}
}
第六步: 启动Consumer服务查看控制台接收到的消息,如果多次使用publisher服务发送消息consumer服务也会接收多次消息
- 消息一旦被消费就会从队列删除,RabbitMQ没有
消息回溯
功能
Spring 消费者接收到消息:【Hello, SpringAMQP! 】
Wrok Queue(Task queues)
Work Queue
也被称为任务模型,简单来说就是让多个消费者绑定到一个队列让它们共同处理队列中的信息提高消息的处理速度(同一条消息只会被一个消费者处理)
- 如果消息处理比较耗时那么生产消息的速度会远远大于消息的消费速度,就会导致消息堆积的越来越多无法及时处理
第一步: 在publisher服务中的SpringAmqpTest类测试方法中循环发送消息模拟大量消息堆积的场景
@Test
public void testWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "Hello, SpringAMQP! __ ";for (int i = 1; i <= 50; i++) {// 循环发送50条消息,带上消息编号rabbitTemplate.convertAndSend(queueName, message + i);// 休眠20ms,模拟在1s内发送完Thread.sleep(20);}
}
第二步: 在consumer服务的SpringRabbitListener
类中添加两个方法模拟多个消费者绑定同一个队列(一个方法对应一个消费者)
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1 接收到消息:【" + msg + "】" + LocalDateTime.now());// 休眠20ms,1s大致能处理50条消息Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalDateTime.now());// 休眠200ms,1s大概能处理5条消息Thread.sleep(200);
}
第三步: 执行publisher服务中刚编写的测试方法发送50条消息,启动consumer服务查看控制台输出
- 消费者1很快就完成了自己的25条消息,消费者2却在缓慢的处理自己的25条消息,即当前的处理方式是
平均分配
给每个消费者
消费者2........接收到消息:【Hello, SpringAMQP! __ 1】2022-12-23T13:16:41.407
消费者1 接收到消息:【Hello, SpringAMQP! __ 2】2022-12-23T13:16:41.414
消费者1 接收到消息:【Hello, SpringAMQP! __ 4】2022-12-23T13:16:41.461
消费者1 接收到消息:【Hello, SpringAMQP! __ 6】2022-12-23T13:16:41.502
消费者1 接收到消息:【Hello, SpringAMQP! __ 8】2022-12-23T13:16:41.549
消费者1 接收到消息:【Hello, SpringAMQP! __ 10】2022-12-23T13:16:41.592
消费者2........接收到消息:【Hello, SpringAMQP! __ 3】2022-12-23T13:16:41.609
消费者1 接收到消息:【Hello, SpringAMQP! __ 12】2022-12-23T13:16:41.635
消费者1 接收到消息:【Hello, SpringAMQP! __ 14】2022-12-23T13:16:41.680
消费者1 接收到消息:【Hello, SpringAMQP! __ 16】2022-12-23T13:16:41.722
消费者1 接收到消息:【Hello, SpringAMQP! __ 18】2022-12-23T13:16:41.767
....
消费者2........接收到消息:【Hello, SpringAMQP! __ 35】2022-12-23T13:16:44.837
消费者2........接收到消息:【Hello, SpringAMQP! __ 37】2022-12-23T13:16:45.040
消费者2........接收到消息:【Hello, SpringAMQP! __ 39】2022-12-23T13:16:45.240
消费者2........接收到消息:【Hello, SpringAMQP! __ 41】2022-12-23T13:16:45.444
消费者2........接收到消息:【Hello, SpringAMQP! __ 43】2022-12-23T13:16:45.646
消费者2........接收到消息:【Hello, SpringAMQP! __ 45】2022-12-23T13:16:45.846
消费者2........接收到消息:【Hello, SpringAMQP! __ 47】2022-12-23T13:16:46.048
消费者2........接收到消息:【Hello, SpringAMQP! __ 49】2022-12-23T13:16:46.250
我们希望按照服务器的处理能力来处理消息,避免出现消息积压的风险,在consumer服务中的application.yml文件添加prefetch
配置控制预取消息的上限
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
发布订阅模型
在订阅模型中多了一个exchange
角色,发送消息的流程也发生了变化
Publisher(生产者)
: 消息不再发送到队列中而是发送给exchange(交换机)Exchange(交换机)
: 一方面负责接收生产者发送的消息,另一方面负责将消息按照规则路由
到与之绑定的队列(如递交给某个特定队列、递交给所有队列)Consumer(消费者)
: 订阅队列的消息Queue(消息队列)
: 接收消息并缓存消息
交换机如何处理接收到的消息取决于Exchange的类型
Fanout(广播)
: 将消息交给所有绑定到交换机的队列Direct(定向)
: 把消息交给符合指定routing key
的队列Topic(通配符)
: 把消息交给符合routing pattern(路由模式)
的队列- 交换机只负责转发消息不具备存储消息的能力,因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列,那么消息会丢失
Spring提供了一个接口Exchange
来表示所有不同类型的交换机(路由规则不同),Queue和Binding也是Springframework提供的类
Fanout(广播)
在广播模式下消息发送的流程
- 广播模式下可以声明多个队列,但每个队列都要绑定一个Exchange(交换机)
- 生产者发送的消息只能发送到交换机,由交换机决定要发给哪个队列
- FanoutExchange类型的交换机会把消息发送给每一个跟其绑定的队列,然后订阅队列的消费者都能拿到同一条消息
第一步: 在consumer服务创建一个配置类config/FanoutConfig
用来声明队列(Queue)和交换机(FanoutExchange)
以及队列和交换机的绑定关系(Binding)
@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("itcast.fanout");}/*** 第1个队列,方法的名称就是bean的名称*/@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}/*** 绑定第1个队列和交换机,根据方法形参的名称自动注入容器中对应的bean(队列和交换机)*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@BeanQueue fanoutQueue2() {return new Queue("fanout.queue2");}/*** 绑定第2个队列和交换机,根据方法形参的名称中自动注入容器中对应的bean(队列和交换机)*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
第二步: 在publisher服务的SpringAmqpTest类中添加测试方法向交换机发送一次消息
@Test
public void testFanoutExchange() {String exchangeName = "itcast.fanout";String message = "Hello Everyone~";// 交换机名称,路由key,消息rabbitTemplate.convertAndSend(exchangeName, "", message);
}
第三步: 在consumer服务的SpringRabbitListener中添加两个方法作为两个消费者分别绑定fanout.queue1
和fanout.queue2
接收消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1收到广播消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2收到广播消息:【" + msg + "】");
}
第四步: 重启consumer服务然后运行publisher中新编写的测试方法查看控制台输出
消费者1收到广播消息:【Hello Everyone~】
消费者2收到广播消息:【Hello Everyone~】
Direct(基于注解方式声明)
在Fanout广播模式中一条消息会发给所有与交换机绑定队列,但是在某些场景下我们更希望不同的消息发送给不同的队列,这时就要用到Direct类型的Exchange
Direct类型的Exchange不再把接收的消息交给每一个与其绑定的队列,只有当队列的BindingKey
与消息的RoutingKey
完全一致时该队列才会收到消息
- 每一个队列与虚拟机绑定时需要指定一个
BindingKey
(路由key) - 发布者向Exchange发送消息时也必须指定消息的
RoutingKey(路由规则)
- Exchange将消息路由到BindingKey和消息的RoutingKey一致的队列
Direct交换机与Fanout交换机的差异
- Fanout交换机会将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列,如果多个队列具有相同的RoutingKey,则与Fanout功能类似,也是把消息路由给每一个匹配的队列
第一步: 在consumer服务
的SpringRabbitListener
类中添加两个消费者监听direct.queue1和direct.queue2同时基于RabbitListener注解来声明队列和交换机
- 由于基于Bean的方式声明队列与交换机比较麻烦所以Spring还提供了基于注解的方式来声明Exchange(@Exchange),Queue(@Queue),Binding(@QueueBinding)
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),// 默认type = "direct"key = {"red", "blue"}// key就是队列和交换机的BindingKey
))
public void listenDirectQueue1(String msg) {System.out.println("消费者收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "directExchange", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {System.out.println("消费者收到direct.queue2的消息:【" + msg + "】");
}
第二步: 在publisher
中编写三个测试方法向directExchange
发送三条消息并指定消息的RoutingKey
@Test
public void testDirectExchange() {String exchangeName = "directExchange";String message = "hello,blue";rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}@Test
public void testDirectExchange() {String exchangeName = "directExchange";String message = "hello,yellow";rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
}@Test
public void testDirectExchange() {String exchangeName = "directExchange";String message = "hello,red";rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
第三步: 重启consumer服务运行以上三个测试方法查看控制台输出
消费者收到direct.queue1的消息:hello,blue
消费者收到direct.queue2的消息:hello,yellow
消费者收到direct.queue1的消息:hello,red
消费者收到direct.queue2的消息:hello,red
Topic(使用通配符)
Topic类型和Direct类型的Exchange都可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型的Exchange与队列绑定时指定的BindingKey可以使用通配符
#
: 匹配一个或多个词,如item.#
能够匹配item.kyle.violet
或者item.kyle
*
: 仅匹配一个词,如item.*
只能匹配item.kyle
或者item.violet
Direct交换机与Topic交换机的差异
- Topic交换机接收的消息Routing Key必须是多个单词,多个单词间以
.
分割 - Topic交换机与队列绑定时的Binding key可以指定通配符,其中
#
表示0个或多个单词,*
仅表示1个单词
第一步: 在publisher
服务的SpringAmqpTest
类中添加测试方法向交换机发送不同RoutingKey的消息
@Test
public void testTopicExchange() {String exchangeName = "topic";String message = "如何看待马化腾称「短视频会侵蚀游戏时间」,「腾讯游戏要聚焦精品」?";rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}@Test
public void testTopicExchange() {String exchangeName = "topic";String message = "今 天 也 是 个 emo 的 好 天 气";rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}@Test
public void testTopicExchange() {String exchangeName = "topic";String message = "自由美利坚,枪击每一天";rabbitTemplate.convertAndSend(exchangeName, "us.news", message);
}
第二步: 在consumer服务的SpringRabbitListener类中消费方法监听topic.queue1和topic.queue2, 利用@RabbitListener相关的注解声明Exchange,Queue,BindingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg) {System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg) {System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
消费者接收到topic.queue2的消息:【如何看待马化腾称「短视频会侵蚀游戏时间」,「腾讯游戏要聚焦精品」?】
消费者接收到topic.queue1的消息:【如何看待马化腾称「短视频会侵蚀游戏时间」,「腾讯游戏要聚焦精品」?】
消费者接收到topic.queue1的消息:【今 天 也 是 个 emo 的 好 天 气】
消费者接收到topic.queue2的消息:【自由美利坚,枪击每一天】
消息转换器
Spring的convertAndSend
方法接收消息的类型是Object也就是说它可以把任意对象类型的消息序列化为字节发送给MQ(默认采用JDK的序列化方式)
第一步: 在config.FanoutConfig
中声明一个队列
@Bean
public Queue objectQueue(){return new Queue("object.queue");
}
第二步: 在测试方法中发送一个Map类型的对象消息到MQ
@Test
public void testSimpleQueue() {String queueName = "simple.queue";HashMap<String, Object> hashMap = new HashMap<>();hashMap.put("名称", "艾尔登法环");hashMap.put("价格", 299);rabbitTemplate.convertAndSend(queueName, hashMap);
}
JDK序列化的方式存在数据体积过大,有安全漏洞,可读性差的问题,我们更希望消息的体积更小可读性更高,因此可以使用JSON方式
来做序列化和反序列化
第一步: 在publisher和consumer两个服务中引入依赖(或者直接在父工程mq-demo中引入依赖)
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
第二步: 在publisher和consumer的启动类中都添加jsonMessageConverter
并注册到容器中
@Bean
publis MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();
}
第三步: 再次执行测试方法发送一个Map类型的对象消息到MQ,修改consumer服务的SpringRabbitListener
添加方法并重启服务
consumer
和publisher
的序列化器需保持一致,同时consumer
中接收数据的类型也需要和发送数据的类型保持一致,如HashMap<String, Object>
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(HashMap<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到消息:【" + msg + "】");
}