1.为什么需要消息队列?
RabbitMQ体系结构
操作001:RabbitMQ安装
二、安装
# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
二、验证
访问后台管理界面:http://192.168.200.100:15672
使用上面创建Docker容器时指定的默认用户名、密码登录:
三、可能的问题
1、问题现象
在使用Docker拉取RabbitMQ镜像的时候,如果遇到提示:missing signature key,那就说明Docker版本太低了,需要升级
比如我目前的Docker版本如下图所示:
2、解决办法
基于CentOS7
①卸载当前Docker
更好的办法是安装Docker前曾经给服务器拍摄了快照,此时恢复快照;
如果不曾拍摄快照,那只能执行卸载操作了
yum erase -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine \docker-ce
②升级yum库
yum update -y
③安装Docker最新版
yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
如果这一步看到提示:没有可用软件包 docker-ce,那就添加Docker的yum源:
yum install -y yum-utils yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
④设置Docker服务
systemctl start docker systemctl enable docker
3、验证
上述操作执行完成后,再次查看Docker版本:
操作002:HelloWorld
一、目标
生产者发送消息,消费者接收消息,用最简单的方式实现
官网说明参见下面超链接:
RabbitMQ tutorial - "Hello World!" — RabbitMQ
二、具体操作
1、创建Java工程
①消息发送端(生产者)
②消息接收端(消费者)
③添加依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
</dependencies>
2、发送消息
①Java代码
不用客气,整个代码全部复制——当然,连接信息改成你自己的:
package com.atguigu.rabbitmq.simple; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置主机地址 connectionFactory.setHost("192.168.200.100"); // 设置连接端口号:默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称:默认为 /connectionFactory.setVirtualHost("/");// 设置连接用户名;默认为guest connectionFactory.setUsername("guest");// 设置连接密码;默认为guest connectionFactory.setPassword("123456");// 创建连接 Connection connection = connectionFactory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 // queue 参数1:队列名称 // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在 // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列 // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除 // arguments 参数5:队列其它参数 channel.queueDeclare("simple_queue", true, false, false, null); // 要发送的信息 String message = "你好;小兔子!"; // 参数1:交换机名称,如果没有指定则使用默认Default Exchange // 参数2:路由key,简单模式可以传递队列名称 // 参数3:配置信息 // 参数4:消息内容 channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); // 关闭资源 channel.close(); connection.close(); } }
②查看效果
3、接收消息
①Java代码
不用客气,整个代码全部复制——当然,连接信息改成你自己的:
package com.atguigu.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2. 设置参数 factory.setHost("192.168.200.100"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest");factory.setPassword("123456"); // 3. 创建连接 Connection Connection connection = factory.newConnection(); // 4. 创建Channel Channel channel = connection.createChannel(); // 5. 创建队列 // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建 // 参数1. queue:队列名称 // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在 // 参数3. exclusive:是否独占。 // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 // 参数5. arguments:其它参数。 channel.queueDeclare("simple_queue",true,false,false,null); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 回调方法,当收到消息后,会自动执行该方法 // 参数1. consumerTag:标识 // 参数2. envelope:获取一些信息,交换机,路由key... // 参数3. properties:配置信息 // 参数4. body:数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; // 参数1. queue:队列名称 // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息 // 参数3. callback:回调对象 // 消费者类似一个监听程序,主要是用来监听消息 channel.basicConsume("simple_queue",true,consumer); } }
②控制台打印
consumerTag:amq.ctag-8EB87GaZFP52LKSMcj98UA Exchange: RoutingKey:simple_queue properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) body:你好;小兔子!
③查看后台管理界面
因为消息被消费掉了,所以RabbitMQ服务器上没有了:
操作003:工作队列模式
Work Queues 本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:• 生产者只有一个 • 发送一个消息 • 消费者也只有一个,消息也只能被这个消费者消费所以HelloWorld也称为简单模式。 现在我们还原一下常规情况: • 生产者发送多个消息 • 由多个消费者来竞争 • 谁抢到算谁的
结论: • 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。• Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。
一、生产者代码
1、封装工具类
package com.atguigu.rabbitmq.util; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static final String HOST_ADDRESS = "192.168.200.100"; public static Connection getConnection() throws Exception { // 定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setHost(HOST_ADDRESS); // 端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection con = ConnectionUtil.getConnection(); // amqp://guest@192.168.200.100:5672/ System.out.println(con); con.close(); } }
2、编写代码
package com.atguigu.rabbitmq.work; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }
3、发送消息效果
二、消费者代码
1、编写代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
package com.atguigu.rabbitmq.work; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。 如果已经运行过生产者程序,则手动把work_queue队列删掉。
2、运行效果
最终两个消费端程序竞争结果如下:
操作004:发布订阅模式
生产者不是把消息直接发送到队列,而是发送到交换机
• 交换机接收消息,而如何处理消息取决于交换机的类型• 交换机有如下3种常见类型
• Fanout:广播,将消息发送给所有绑定到交换机的队列
• Direct:定向,把消息交给符合指定routing key的队列
• Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
• 注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
组件之间关系:
• 生产者把消息发送到交换机
• 队列直接和交换机绑定
• 工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列
• 理解概念:
• Publish:发布,这里就是把消息发送到交换机上
• Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系
一、生产者代码
package com.atguigu.rabbitmq.fanout; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection = ConnectionUtil.getConnection(); // 2、创建频道 Channel channel = connection.createChannel(); // 参数1. exchange:交换机名称 // 参数2. type:交换机类型 // DIRECT("direct"):定向 // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。 // TOPIC("topic"):通配符的方式 // HEADERS("headers"):参数匹配 // 参数3. durable:是否持久化 // 参数4. autoDelete:自动删除 // 参数5. internal:内部使用。一般false // 参数6. arguments:其它参数 String exchangeName = "test_fanout"; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout,routingKey设置为"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; // 6、发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); } }
二、消费者代码
1、消费者1号
package com.atguigu.rabbitmq.fanout; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
2、消费者2号
package com.atguigu.rabbitmq.fanout; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
三、运行效果
还是先启动消费者,然后再运行生产者程序发送消息:
四、小结
交换机和队列的绑定关系如下图所示:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
-
工作队列模式本质上是绑定默认交换机
-
发布订阅模式绑定指定交换机
-
监听同一个队列的消费端程序彼此之间是竞争关系
-
绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
操作006-路由模式
通过『路由绑定』的方式,把交换机和队列关联起来
• 交换机和队列通过路由键进行绑定
• 生产者发送消息时不仅要指定交换机,还要指定路由键
• 交换机接收到消息会发送到路由键绑定的队列
• 在编码上与 Publish/Subscribe发布与订阅模式的区别:
• 交换机的类型为:Direct
• 队列绑定交换机的时候需要指定routing key。
一、生产者代码
package com.atguigu.rabbitmq.routing; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_direct"; // 创建交换机 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null); // 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; // 声明(创建)队列 channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name,exchangeName,"error"); // 队列2绑定info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String message = "日志信息:张三调用了delete方法.错误了,日志级别warning"; // 发送消息 channel.basicPublish(exchangeName,"warning",null,message.getBytes()); System.out.println(message); // 释放资源 channel.close(); connection.close(); } }
二、消费者代码
1、消费者1号
package com.atguigu.rabbitmq.routing; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
2、消费者2号
package com.atguigu.rabbitmq.routing; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer2 将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
三、运行结果
1、绑定关系
2、消费消息
操作006:主题模式
• Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符
• Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert
• 通配符规则:
• #:匹配零个或多个词
• *:匹配一个词
一、生产者代码
package com.atguigu.rabbitmq.topic; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout ,routingKey设置为"" // routing key 常用格式:系统的名称.日志的级别。 // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); // 分别发送消息到队列:order.info、goods.info、goods.error String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]"; channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]"; channel.basicPublish(exchangeName,"goods.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]"; channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); channel.close(); connection.close(); } }
二、消费者代码
1、消费者1号
消费者1监听队列1:
package com.atguigu.rabbitmq.topic; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue1"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
2、消费者2号
消费者2监听队列2:
package com.atguigu.rabbitmq.topic; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue2"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
三、运行效果
队列1:
队列2:
操作007:整合SpringBoot
搭建环境
• 基础设定:交换机名称、队列名称、绑定关系
• 发送消息:使用RabbitTemplate
• 接收消息:使用@RabbitListener注解
1、消费者工程
①创建module
②配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
③YAML
增加日志打印的配置:
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
logging:level:com.atguigu.mq.listener.MyMessageListener: info
④主启动类
仿照生产者工程的主启动类,改一下类名即可
package com.atguigu.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQConsumerMainType {
public static void main(String[] args) {SpringApplication.run(RabbitMQConsumerMainType.class, args);}
}
⑤监听器
package com.atguigu.mq.listener;
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}))public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);}}
2、@RabbitListener注解属性对比
①bindings属性
-
表面作用:
-
指定交换机和队列之间的绑定关系
-
指定当前方法要监听的队列
-
-
隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们
②queues属性
@RabbitListener(queues = {QUEUE_ATGUIGU})
-
作用:指定当前方法要监听的队列
-
注意:此时框架不会创建相关交换机和队列,必须提前创建好
3、生产者工程
①创建module
②配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>
③YAML
spring: rabbitmq: host: 192.168.200.100port: 5672 username: guest password: 123456 virtual-host: /
④主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args); }
}
⑤测试程序
package com.atguigu.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order";@Autowired private RabbitTemplate rabbitTemplate;@Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello atguigu"); } }
消息可靠性投递
下单操作的正常流程
故障情况1
消息没有发送到消息队列上
后果:消费者拿不到消息,业务功能缺失,数据错误
故障情况2
消息成功存入消息队列,但是消息队列服务器宕机了
原本保存在内存中的消息也丢失了
即使服务器重新启动,消息也找不回来了
后果:消费者拿不到消息,业务功能缺失,数据错误
故障情况3
消息成功存入消息队列,但是消费端出现问题,
例如:宕机、抛异常等等后果:业务功能缺失,数据错误
对症下药
• 故障情况1:消息没有发送到消息队列
• 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
• 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
• 故障情况2:消息队列服务器宕机导致内存中消息丢失
• 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
• 故障情况3:消费端宕机或抛异常导致消息没有成功被消费
• 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
• 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
操作008-01-A:生产者端消息确认机制
一、创建module
二、搭建环境
1、配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
2、主启动类
没有特殊设定:
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args); }
}
3、YAML
注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info
三、创建配置类
1、目标
在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:
方法名 | 方法功能 | 所属接口 | 接口所属类 |
---|---|---|---|
confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。
原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:
设置组件调用的方法 | 所需对象类型 |
---|---|
setConfirmCallback() | ConfirmCallback接口类型 |
setReturnCallback() | ReturnCallback接口类型 |
2、API说明
①ConfirmCallback接口
这是RabbitTemplate内部的一个接口,源代码如下:
/*** A callback for publisher confirmations.**/@FunctionalInterfacepublic interface ConfirmCallback {
/*** Confirmation callback.* @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
生产者端发送消息之后,回调confirm()方法
-
ack参数值为true:表示消息成功发送到了交换机
-
ack参数值为false:表示消息没有发送到交换机
②ReturnCallback接口
同样也RabbitTemplate内部的一个接口,源代码如下:
/*** A callback for returned messages.** @since 2.3*/@FunctionalInterfacepublic interface ReturnsCallback {
/*** Returned message callback.* @param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);
}
注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用
ReturnedMessage类中主要属性含义如下:
属性名 | 类型 | 含义 |
---|---|---|
message | org.springframework.amqp.core.Message | 消息以及消息相关数据 |
replyCode | int | 应答码,类似于HTTP响应状态码 |
replyText | String | 应答码说明 |
exchange | String | 交换机名称 |
routingKey | String | 路由键名称 |
3、配置类代码
①要点1
加@Component注解,加入IOC容器
②要点2
配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。
操作封装到了一个专门的void init()方法中。
为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。
关于@PostConstruct注解大家可以参照以下说明:
@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。
使用@PostConstruct注解的方法必须满足以下条件:
方法不能有任何参数。
方法必须是非静态的。
方法不能返回任何值。
当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。
③代码
有了以上说明,下面我们就可以展示配置类的整体代码:
package com.atguigu.mq.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Autowiredprivate RabbitTemplate rabbitTemplate;
@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功!数据:" + correlationData);} else {log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);}}
@Overridepublic void returnedMessage(ReturnedMessage returned) {//发送到交换机失败调用的方法log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}
四、发送消息
package com.atguigu.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowired private RabbitTemplate rabbitTemplate;@Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello atguigu"); } }
通过调整代码,测试如下三种情况:
-
交换机正确、路由键正确
-
交换机正确、路由键不正确,无法发送到队列
-
交换机不正确,无法发送到交换机
操作008-01-B:备份交换机
一、创建备份交换机
1、创建备份交换机
注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键
2、创建备份交换机要绑定的队列
①创建队列
②绑定交换机
注意:这里是要和备份交换机绑定
3、针对备份队列创建消费端监听器
public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";public static final String QUEUE_NAME_BACKUP = "queue.order.backup";
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),key = {""}))public void processMessageBackup(String dateString,Message message,Channel channel) {log.info("BackUp: " + dateString);}
二、设定备份关系
1、原交换机删除
·
2、重新创建原交换机
3、原交换机重新绑定原队列
三、测试
-
启动消费者端
-
发送消息,但是路由键不对,于是转入备份交换机
操作008-03:消费端消息确认
一、ACK
ACK是acknowledge的缩写,表示已确认
二、默认情况
默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息
但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了
所以还是要修改成手动确认
三、创建消费端module
1、配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
2、YAML
增加针对监听器的设置:
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认
3、主启动类
没有特殊设定:
package com.atguigu.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQConsumerMainType {
public static void main(String[] args) {SpringApplication.run(RabbitMQConsumerMainType.class, args);}
}
四、消费端监听器
1、创建监听器类
package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
@Component
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";
public void processMessage(String dataString, Message message, Channel channel) {
}
}
2、在接收消息的方法上应用注解
// 修饰监听方法
@RabbitListener(// 设置绑定关系bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {
}
3、接收消息方法内部逻辑
-
业务处理成功:手动返回ACK信息,表示消息成功消费
-
业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:
-
把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍
-
不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止
-
4、相关API
先回到PPT理解“deliveryTag:交付标签机制”
下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口
①basicAck()方法
-
方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
-
参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
②basicNack()方法
-
方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
-
参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
③basicReject()方法
-
方法功能:根据指定的deliveryTag,对该消息表示拒绝
-
参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
-
basicNack()和basicReject()有啥区别?
-
basicNack()有批量操作
-
basicReject()没有批量操作
-
5、完整代码示例
package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME = "queue.order";
// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {
// 1、获取当前消息的 deliveryTag 值备用long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {// 2、正常业务操作log.info("消费端接收到消息内容:" + dataString);// System.out.println(10 / 0);
// 3、给 RabbitMQ 服务器返回 ACK 确认信息channel.basicAck(deliveryTag, false);} catch (Exception e) {
// 4、获取信息,看当前消息是否曾经被投递过Boolean redelivered = message.getMessageProperties().getRedelivered();
if (!redelivered) {// 5、requeu :如果没有被投递过,那就重新放回队列,重新投递,再试一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列channel.basicReject(deliveryTag, false);}
}}
}
五、要点总结
-
要点1:把消息确认模式改为手动确认
-
要点2:调用Channel对象的方法返回信息
-
ACK:Acknowledgement,表示消息处理成功
-
NACK:Negative Acknowledgement,表示消息处理失败
-
Reject:拒绝,同样表示消息处理失败
-
-
要点3:后续操作
-
requeue为true:重新放回队列,重新投递,再次尝试
-
requeue为false:不放回队列,不重新投递
-
-
要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据
六、流程梳理
七、多啰嗦一句
消费端如果设定消息重新放回队列,Broker重新投递消息,那么消费端就可以再次消费消息,这是一种“重试”机制,这需要消费端代码支持“幂等性”——这属于前置知识,不展开了。
操作009:Prefetch
一、思路
-
生产者发送100个消息
-
对照两种情况:
-
消费端没有设置prefetch参数:100个消息被全部取回
-
消费端设置prefetch参数为1:100个消息慢慢取回
-
二、生产者端代码
@Test
public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}
三、消费者端代码
// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);
// System.out.println(10 / 0);
TimeUnit.SECONDS.sleep(1);
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);
四、测试
1、未使用prefetch
-
不要启动消费端程序,如果正在运行就把它停了
-
运行生产者端程序发送100条消息
-
查看队列中消息的情况:
-
-
说明:
-
Ready表示已经发送到队列的消息数量
-
Unacked表示已经发送到消费端但是消费端尚未返回ACK信息的消息数量
-
Total未被删除的消息总数
-
-
接下来启动消费端程序,再查看队列情况:
-
-
能看到消息全部被消费端取走了,正在逐个处理、确认,说明有多少消息消费端就并发处理多少
2、设定prefetch
①YAML配置
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 # 设置每次最多从消息队列服务器取回多少消息
②测试流程
-
停止消费端程序
-
运行生产者端程序发送100条消息
-
查看队列中消息的情况:
-
-
接下来启动消费端程序,持续观察队列情况:
-
能看到消息不是一次性全部取回的,而是有个过程
操作010:消息超时
一、队列层面设置
1、设置
别忘了设置绑定关系:
2、测试
-
不启动消费端程序
-
向设置了过期时间的队列中发送100条消息
-
等10秒后,看是否全部被过期删除
二、消息层面设置
1、设置
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
@Test
public void testSendMessageTTL() { // 1、创建消息后置处理器对象 MessagePostProcessor messagePostProcessor = (Message message) -> { // 设定 TTL 时间,以毫秒为单位message.getMessageProperties().setExpiration("5000"); return message;};// 2、发送消息 rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello atguigu", messagePostProcessor);
}
2、查看效果
这次我们是发送到普通队列上:
操作011:死信
• 概念:当一个消息无法被消费,它就变成了死信。
• 死信产生的原因大致有下面三种:
• 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
• 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信
• 超时:消息到达超时时间未被消费
• 死信的处理方式大致有下面三种:
• 丢弃:对不重要的消息直接丢弃,不做处理
• 入库:把死信写入数据库,日后处理
• 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)
一、测试相关准备
1、创建死信交换机和死信队列
常规设定即可,没有特殊设置:
-
死信交换机:exchange.dead.letter.video
-
死信队列:queue.dead.letter.video
-
死信路由键:routing.key.dead.letter.video
2、创建正常交换机和正常队列
注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机
-
正常交换机:exchange.normal.video
-
正常队列:queue.normal.video
-
正常路由键:routing.key.normal.video
全部设置完成后参照如下细节:
3、Java代码中的相关常量声明
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video"; public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video"; public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
二、消费端拒收消息
1、发送消息的代码
@Test
public void testSendMessageButReject() { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况1:消息被拒绝");
}
2、接收消息的代码
①监听正常队列
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 监听正常队列,但是拒绝消息log.info("★[normal]消息接收到,但我拒绝。");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
②监听死信队列
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException { // 监听死信队列 log.info("★[dead letter]dataString = " + dataString);log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
3、执行结果
三、消息数量超过队列容纳极限
1、发送消息的代码
@Test
public void testSendMultiMessage() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况2:消息数量超过队列的最大容量" + i); }
}
2、接收消息的代码
消息接收代码不再拒绝消息:
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 监听正常队列log.info("★[normal]消息接收到。");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
重启微服务使代码修改生效。
3、执行效果
正常队列的参数如下图所示:
生产者发送20条消息之后,消费端死信队列接收到前10条消息:
四、消息超时未消费
1、发送消息的代码
正常发送一条消息即可,所以使用第一个例子的代码。
@Test
public void testSendMessageTimeout() {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"测试死信情况3:消息超时");
}
2、执行效果
队列参数生效:
因为没有消费端监听程序,所以消息未超时前滞留在队列中:
消息超时后,进入死信队列:
延迟队列
实现思路•
方案1:借助消息超时时间+死信队列(就是刚刚我们测试的例子)
方案2:给RabbitMQ安装插件
基于插件的延迟队列
操作012:延迟插件
一、插件简介
-
官网地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
-
延迟极限:最多两天
二、插件安装
1、确定卷映射目录
docker inspect rabbitmq
运行结果:
"Mounts": [{"Type": "volume","Name": "rabbitmq-plugin","Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data","Destination": "/plugins","Driver": "local","Mode": "z","RW": true,"Propagation": ""},{"Type": "volume","Name": "cca7bc3012f5b76bd6c47a49ca6911184f9076f5f6263b41f4b9434a7f269b11","Source": "/var/lib/docker/volumes/cca7bc3012f5b76bd6c47a49ca6911184f9076f5f6263b41f4b9434a7f269b11/_data","Destination": "/var/lib/rabbitmq","Driver": "local","Mode": "","RW": true,"Propagation": ""}]
和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data
进入/var/lib/docker/volumes/rabbitmq-plugin/_data
2、下载延迟插件
官方文档说明页地址:Community Plugins | RabbitMQ
下载插件安装文件:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
3、启用插件
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出Docker容器
exit
# 重启Docker容器
docker restart rabbitmq
4、确认
确认点1:查看当前节点已启用插件的列表:
确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了
三、创建交换机
rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以,创建方式如下:
关于x-delayed-type参数的理解:
原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?
这里就额外使用x-delayed-type来指定交换机本身的类型
四、代码测试
1、生产者端代码
@Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",messageProcessor -> {
// 设置延迟时间:以毫秒为单位messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
return messageProcessor;});
}
2、消费者端代码
①情况A:资源已创建
package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date; @Component
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException { log.info("[生产者]" + dataString);log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
②情况B:资源未创建
package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date; @Component
@Slf4j
public class MyDelayMessageListener { public static final String EXCHANGE_DELAY = "exchange.delay.video";public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"), exchange = @Exchange( value = EXCHANGE_DELAY, durable = "true", autoDelete = "false", type = "x-delayed-message", arguments = @Argument(name = "x-delayed-type", value = "direct")), key = {ROUTING_KEY_DELAY} )) public void process(String dataString, Message message, Channel channel) throws IOException { log.info("[生产者]" + dataString); log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
}
3、执行效果
①交换机类型
②生产者端效果
注意:使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行
③消费者端效果
操作013:事务消息之生产者端
一、测试代码
1、引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
2、yaml配置
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
3、主启动类
package com.atguigu.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}
}
4、相关配置
package com.atguigu.mq.config;
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class RabbitConfig {
@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}
5、测试代码
package com.atguigu.mq.test;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class RabbitMQTest {
public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";
@Resourceprivate RabbitTemplate rabbitTemplate;
@Testpublic void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
// 2、抛出异常log.info("do bad:" + 10 / 0);
// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");}
}
二、执行测试
1、未使用事务
抛出异常前的消息发送了,抛异常后的消息没有发送:
为了不影响后续操作,我们直接在管理界面这里把这条消息消费掉:
2、使用事务
①说明
因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控
②测试提交事务的情况
@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}
③测试回滚事务的情况
@Test
@Transactional
@Rollback(value = true)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)");// 2、抛出异常log.info("do bad:" + 10 / 0);
// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)");
}
操作014:惰性队列
惰性队列:未设置惰性模式时队列的持久化机制
• 创建队列时,在Durability这里有两个选项可以选择
• Durable:持久化队列,消息会持久化到硬盘上
• Transient:临时队列,不做持久化操作,broker重启后消息会丢失
惰性队列:未设置惰性模式时队列的持久化机制
• 那么Durable队列在存入消息之后,是否是立即保存到硬盘呢?
答:队列满了,或者mq服务器关闭的时候就会将消息存储到硬盘当中去
一、创建惰性队列
1、官网说明
惰性队列:短时间提升缓存消息的能力
使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。由于各种原因,排队可能会变得很长:
• 消费者离线/崩溃/停机进行维护
• 突然出现消息进入高峰,生产者的速度超过了消费者
• 消费者比正常情况慢
• 比较下面两个说法是否是相同的意思:
• 立即移动到硬盘
• 尽早移动到硬盘
• 我认为不一样:
• 立即:消息刚进入队列时
• 尽早:服务器不繁忙时
队列可以创建为默认
或惰性
模式,模式指定方式是:
-
使用队列策略(建议)
-
设置
queue.declare
参数
如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
2、基于策略方式设定
# 登录Docker容器
docker exec -it rabbitmq /bin/bash
# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
-
rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
-
set_policy是子命令,表示设置策略
-
Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的
-
"^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
-
'{"queue-mode":"lazy"}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
-
–-apply-to参数指定该策略将应用于队列(queues)级别
-
命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列
如果需要修改队列模式可以执行如下命令(不必删除队列再重建):
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
3、在声明队列时使用参数设定
-
参数名称:x-queue-mode
-
可用参数值:
-
default
-
lazy
-
-
不设置就是取值为default
Java代码原生API设置方式:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
Java代码注解设置方式:
@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy") })
二、实操演练
1、生产者端代码
①配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent> <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
②配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
③主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQLazyProducer { public static void main(String[] args) {SpringApplication.run(RabbitMQLazyProducer.class, args);} }
④发送消息
package com.atguigu.mq.test; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_LAZY_NAME = "exchange.atguigu.lazy";public static final String ROUTING_LAZY_KEY = "routing.key.atguigu.lazy"; @Resourceprivate RabbitTemplate rabbitTemplate; @Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, "I am a message for test lazy queue.");} }
2、消费者端代码
①配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent> <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
②配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
③主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQLazyConsumerMainType { public static void main(String[] args) {SpringApplication.run(RabbitMQLazyConsumerMainType.class, args);}}
④监听器
package com.atguigu.mq.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component @Slf4j public class MyLazyMessageProcessor { public static final String EXCHANGE_LAZY_NAME = "exchange.atguigu.lazy";public static final String ROUTING_LAZY_KEY = "routing.key.atguigu.lazy";public static final String QUEUE_LAZY_NAME = "queue.atguigu.lazy"; @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")}),exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),key = {ROUTING_LAZY_KEY}))public void processMessageLazy(String data, Message message, Channel channel) {log.info("消费端接收到消息:" + data);} }
三、测试
-
先启动消费端
-
基于消费端@RabbitListener注解中的配置,自动创建了队列
-
发送消息
操作015:优先级队列
• 默认情况:基于队列先进先出的特性,通常来说,先入队的先投递
• 设置优先级之后:优先级高的消息更大几率先投递
• 关键参数:x-max-priority
RabbitMQ允许我们使用一个正整数给消息设定优先级
• 消息的优先级数值取值范围:1~255
• RabbitMQ官网建议在1~5之间设置消息的优先级(优先级越高,占用CPU、内存等资源越多)
一、创建相关资源
1、创建交换机
exchange.test.priority
2、创建队列
queue.test.priority
x-max-priority
3、队列绑定交换机
二、生产者发送消息
1、配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
2、配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
3、主启动类
package com.atguigu.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQPriorityProducer {
public static void main(String[] args) {SpringApplication.run(RabbitMQPriorityProducer.class, args);}
}
4、发送消息
-
不要启动消费者程序,让多条不同优先级的消息滞留在队列中
-
第一次发送优先级为1的消息
-
第二次发送优先级为2的消息
-
第三次发送优先级为3的消息
-
先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息
①第一次发送优先级为1的消息
package com.atguigu.mq.test;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_PRIORITY = "exchange.test.priority";public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
@Resourceprivate RabbitTemplate rabbitTemplate;
@Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{message.getMessageProperties().setPriority(1);return message;});}
}
②第二次发送优先级为2的消息
@Test
public void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{message.getMessageProperties().setPriority(2);return message;});
}
③第三次发送优先级为3的消息
@Test
public void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{message.getMessageProperties().setPriority(3);return message;});
}
三、消费端接收消息
1、配置POM
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
2、配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
3、主启动类
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitMQPriorityConsumer {public static void main(String[] args) {SpringApplication.run(RabbitMQPriorityConsumer.class, args);}}
4、监听器
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyMessageProcessor {public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
5、测试效果
对于已经滞留服务器的消息,只要消费端一启动,就能够收到消息队列的投递,打印效果如下: