目录
1、前言
2、什么是RabbitMQ
3、安装RabbitMQ
4、Springboot集成RabbitMQ
4.1、添加依赖
4.2、添加配置
4.3、添加controller,作为生产者
4.4、设置生产者消息确认CallBack
4.5、添加Consumer,作为消费者
4.6、启动程序,访问
1、前言
消息队列(Message Queue,简称 MQ)是一种异步的消息传递中间件,它解耦了应用程序之间的通信。应用程序可以将消息发送到队列,而无需知道谁会接收这些消息。接收应用程序可以从队列中检索消息,而无需知道谁发送了这些消息。消息队列是一种重要的中间件,它可以帮助应用程序之间进行异步、可靠、可扩展的通信。常见的消息队列中间件有ActiveMQ,RabbitMQ,Kafka......今天我们就来介绍RabbitMQ。
2、什么是RabbitMQ
RabbitMQ 是一个开源的消息队列服务器,它实现了 AMQP (高级消息队列协议) 标准。AMQP 是一种应用层协议,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 的主要特点包括:
- 高性能:RabbitMQ 能够处理大量的消息,并提供低延迟的性能。
- 可靠性:RabbitMQ 提供持久化消息存储,确保消息不会丢失。
- 可扩展性:RabbitMQ 可以轻松扩展以满足不断增长的需求。
- 灵活性:RabbitMQ 支持多种编程语言和客户端,并提供丰富的功能和配置选项。
RabbitMQ 的常见应用场景包括:
- 分布式系统:RabbitMQ 可以用于在分布式系统中进行异步通信。
- 异步处理:RabbitMQ 可以用于异步处理任务,提高系统的性能和效率。
- 消息队列:RabbitMQ 可以用于实现消息队列,例如任务队列、发布/订阅队列等。
- 消息通知:RabbitMQ 可以用于发送消息通知,例如电子邮件或短信。
3、安装RabbitMQ
由于RabbitMQ是一个由 Erlang 语言开发的 AMQP 的开源实现。所以在安装RabbitMQ前需要先安装Erlang环境。
Erlang下载地址:Downloads - Erlang/OTP
RabbitMQ下载地址:Installing RabbitMQ | RabbitMQ
先安装Erlang,在安装RabbitMQ。安装工程相对简单,无脑下一步即可。
安装完RabbitMQ后,打开cmd窗口,进入RabbitMQ的安装目录的sbin下,我的目录是:
D:\RabbitMQ Server\rabbitmq_server-3.13.0\sbin
然后输入以下命令安装一下插件:
rabbitmq-plugins enable rabbitmq_management
提示以下这个就是安装成功。
验证RabbitMQ是否安装成功,输入以下命令:
rabbitmqctl status
这时候,直接访问http://127.0.0.1:15672就可以看到RabbitMQ的管理页面了,RabbitMQ默认端口为15672,默认的管理页面账号密码均为guest。
登录后,就可以看到一个初始的管理界面:
4、Springboot集成RabbitMQ
4.1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>springboot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-rabbitmq</name><description>springboot-rabbitmq</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.24</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
4.2、添加配置
# rabbitmq连接配置信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest# 确保消息在未被队列接收时返回
spring.rabbitmq.publisher-returns=true
# 发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
4.3、添加controller,作为生产者
新建controller,用于发送消息。
package com.example.springbootrabbitmq.controller;import com.example.springbootrabbitmq.config.MqProducerCallBack;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("push/message")
public class PushMessageController {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MqProducerCallBack mqProducerCallBack;@GetMapping("test")public String sendMessage() {// correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());// 消息确认和返回回调rabbitTemplate.setConfirmCallback(mqProducerCallBack);rabbitTemplate.setReturnsCallback(mqProducerCallBack);// 消息发送rabbitTemplate.convertAndSend("my-queue", "hello world", message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, correlationData);return "publisher success...";}
}
4.4、设置生产者消息确认CallBack
package com.example.springbootrabbitmq.config;import cn.hutool.json.JSONUtil;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class MqProducerCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {/*** correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。* ack:消息投递到broker 的状态,true成功,false失败。* cause:投递失败的原因。*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {System.err.println("消息ID=" + correlationData.getId() + "投递失败,失败原因:" + cause);} else {System.out.println("消息投递收到确认,correlationData=" + correlationData.getId());}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("返回消息结果:" + JSONUtil.toJsonStr(returnedMessage));}}
4.5、添加Consumer,作为消费者
package com.example.springbootrabbitmq.consumer;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class PushMessageConsumer {/*** basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。* void basicAck(long deliveryTag, boolean multiple)* deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。* */@RabbitListener(queuesToDeclare = @Queue(value = "my-queue"))@RabbitHandlerpublic void consume(String msg, Channel channel, Message message) throws IOException {try {System.out.println("消费者收到消息:" + msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);System.out.println("deliveryTag:" + message.getMessageProperties().getDeliveryTag());System.out.println("redelivered:" + message.getMessageProperties().getRedelivered());} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {System.err.println("消息已重复处理失败,拒绝再次接收!");/*** 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列* basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。* deliveryTag:表示消息投递序号。* requeue:值为 true 消息将重新入队列。*/channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {System.out.println("消息即将再次返回队列处理!");/*** requeue为是否重新回到队列,true重新入队* deliveryTag:表示消息投递序号。* multiple:是否批量确认。* requeue:值为 true 消息将重新入队列。*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}}
4.6、启动程序,访问
浏览器访问:http://localhost:8080/push/message/test 模拟消息进行推送。
查看控制台,发现消费者正常打印出了消费信息。
打开RabbitMQ管理控制台,可以发现我们的消息队列my-queue信息。
既可以查看消息队列的装填,消息投递情况等。