前言
Spring AMQP是Spring框架中用于与消息中间件(如RabbitMQ)进行交互的一个项目,它简化了消息发送、接收以及消息处理的过程。通过Spring AMQP,开发者可以快速实现基于RabbitMQ的消息传递系统。本文将介绍Spring AMQP的快速入门,重点讲解Work Queues和Fanout交换机的使用方式,帮助开发者快速掌握消息队列的基本操作和工作原理。
SpringAMQP
快速入门
AMQP
Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP
Spring AMQP是基于AMQP协议定义的一套API规范,提供了板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
SpringAMQP官方文档:https://spring.io/projects/spring-amqp/
需求如下:
- 利用控制台创建队列simple.queue
- 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
创建队列simple.queue
(1)引入spring-amqp依赖
在父工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用:
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-bot-starter-amgp</artifactId>
</dependency>
(2)配置RabbitMQ服务端信息
在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ
spring:rabbitmq:host: 192.168.244.135#主机名port: 5672#端口virtual-host: /hmall # 虚拟主机username: hmall#用户名password: 1234#密码
(3)发送消息
SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:
@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testsimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName,message);
测试代码:
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void TestSimpleQueue(){//1.队列名String queueName = "simple.queue";//2.消息String message = "Hello Spring Amqp!";//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
控制台中查看信息:
(4)接收消息
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMOP就会把消息传递给当前方法:
@slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues="simple.queue")public void listenSimpleQueueMessage(String msg)throws InterruptedException {log.info("spring 消费者接收到消息:["+msg+"]");
}
测试代码如下:
package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {log.info("监听到simple.queue的消息,["+message+"]");}
}
运行项目启动类后,运行结果如下:
SpringAmpq收发消息步骤:
- 引入spring-boot-starter-amqp依赖
- 配置rabbitmq服务端信息
- 利用RabbitTemplate发送消息
- 利用@RabbitListener注解声明要监听的队列,监听消息
Work Queues
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
1.在RabbitMQ的控制台创建一个队列,名为wprk.queue
2.在publisher服务中定义测试方法,发送50条消息到work.queue
3.在consumer服务中定义两个消息监听者,都监听work.queue队列
创建新的队列work.queue
测试代码
@Testpublic void TestWorkQueue(){//1.队列名String queueName = "work.queue";for (int i = 1; i <= 50; i++) {//2.消息String message = "Hello Spring Amqp_"+i;//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}}
定义两个消息监听者,都监听work.queue队列
@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String message) {System.out.println("消费者1接收到消息:"+message+","+ LocalTime.now());}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String message) {System.out.println("消费者2.......:"+message+","+ LocalTime.now());}
运行结果如下:
通过观察发现,两个消费者最终接收到的消息各占一半。
增加一个需求:消费者1每秒处理40个消息,消费者2每秒处理5条消息
消费者代码修改如下:
@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String message) throws Exception {System.out.println("消费者1接收到消息:"+message+","+ LocalTime.now());Thread.sleep(25);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String message)throws Exception {System.out.println("消费者2.......:"+message+","+ LocalTime.now());Thread.sleep(200);}
运行结果如下:
消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:
spring:rabbitmg:listener:simple:prefetch: 1#每次只能获取一条消息,处理完成才能获取下一个消息
由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终的执行耗时也在1秒左右,速度大大提升。
Work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度际
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
Fanout交换机
交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。
常见交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式.
利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
1.在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2在publisher中编写测试方法,向hmall.fanout发送消息A.
创建队列fanout.queue1和fanout.queue2
创建交换机hmall.fanout(需指定类型Type)
将交换机hmall.fanout绑定队列fanout.queue1和fanout.queue2
消费者代码分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String message) {log.info("消费者1监听到fanout.queue1的消息,["+message+"]");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String message)throws Exception {log.info("消费者2监听到fanout.queue2的消息,["+message+"]");}
测试方法代码如下:
@Testpublic void TestFanoutQueue(){//1.交换机名String exchangeName = "hmall.fanout";//2.消息String message = "Hello everyOne!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName,null, message);}
运行结果如下:
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列
发送消息到交换机的API:
总结
通过本文的学习,我们深入了解了Spring AMQP的快速入门,并探讨了如何使用Work Queues和Fanout交换机来实现高效的消息传递系统。Spring AMQP提供的简化功能让开发者能够专注于业务逻辑的实现,而不必关心底层的消息传递细节。掌握这些基本概念和技术,能大大提升我们在分布式系统中使用消息队列的能力。