RabbitMQ从入门到精通之安装、通讯方式详解

文章目录

  • RabbitMQ
    • 一、RabbitMQ介绍
      • 1.1 现存问题
    • 一、RabbitMQ介绍
    • 二、RabbitMQ安装
    • 三、RabbitMQ架构
    • 四、RabbitMQ通信方式
      • 4.1 RabbitMQ提供的通讯方式
      • 4.2 Helloworld 方式
      • 4.2Work queues
      • 4.3 Publish/Subscribe
      • 4.4 Routing
      • 4.5 Topics
      • 4.6 RPC (了解)
    • 五、Springboot 操作RabbitMQ
    • 六、RabbitMQ保证消息可靠性
      • 6.1、 保证消息一定送达到Exchange
      • 6.2、保证消息可以路由到Queue中
      • 6.3、保证队列持久化消息
      • 6.4、保证消息者可以正常消费消息
      • 6.5 SpringBoot实现上述操作
        • 6.5.1 Confirm
      • 6.5.2 Return
      • 6.5.3 消息持久化
    • 七、RabbitMQ死信队列 & 延迟交换机
      • 7.4、准备Exchange&Queue
      • 7.5、实现效果
      • 7.6、延迟交换机


RabbitMQ

一、RabbitMQ介绍

1.1 现存问题

    • 服务异步调用: 服务A如何保证异步请求一定能被服务B接收到并处理
      在这里插入图片描述
    • 削峰: 海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?

在这里插入图片描述

    • 服务解耦: 如何尽量的降低服务之间的耦合问题,如果在订单与积分和商家服务解构,需要一个队列,而这个队列依然需要实现上述两个情况功能。

在这里插入图片描述

一、RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

AMQP协议:
在这里插入图片描述

Erlang:

Erlang在1991年由爱立信公司向用户推出了第一个版本,经过不断的改进完善和发展,在1996年爱立信又为所有的Erlang用户提供了一个非常实用且稳定的OTP软件库并在1998年发布了第一个开源版本。Erlang同时支持的操作系统有linux,windows,unix等,可以说适用于主流的操作系统上,尤其是它支持多核的特性非常适合多核CPU,而分布式特性也可以很好融合各种分布式集群。

二、RabbitMQ安装

docker-compose.yml

version: “3.1”
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- ./data/:/var/lib/rabbitmq/
ports:
- 5672:5672
- 15672:15672

在这里插入图片描述
在这里插入图片描述

docker-compose.yml文件内容:
在这里插入图片描述

镜像拉取完成后,直接在linux 内部执行: curl localhost:5672

在这里插入图片描述
执行后能够显示AMQP 字样的内容就说明执行成功了

执行 docker exec -it rabbitmq bash 命令 进入容器内部
cd opt/rabbitmq/ 目录下
在这里插入图片描述
执行cd plugins/sbin 命令进入目录下, 执行 ./rabbitmq-plugins enable rabbitmq_managent命令 启动rabbitmq 图形化界面
在这里插入图片描述
访问15672 端口,默认的账号密码是guest/guest
在这里插入图片描述

三、RabbitMQ架构

在这里插入图片描述

四、RabbitMQ通信方式

4.1 RabbitMQ提供的通讯方式

  • Hello World :为了入门操作
  • Work queues : 一个队列被多个消费者消费
  • Publish/Subscribe:手动创建Exchange(FANOUT)
  • Routing: 手动创建Exchange(DIRECT)
  • Topics : 手动创建Exchange(TOPIC)
  • RPC: RPC方式
  • Publisher Confirms

4.2 Helloworld 方式

在这里插入图片描述

//工具类
public class RabbitMQConnectionUtil {public static final String RABBITMQ_HOST ="172.16.177.133";public static final int RABBITMQ_POST =5672;public static final String RABBITMQ_USERNAME ="guest";public static final String RABBITMQ_PASSWORD ="guest";public static final String RABBITMQ_VIRTUAL_HOST ="/";/*** 构建RabbitMQ的连接对象* @return*/public static Connection getConnection() throws Exception{//1.创建connection    工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(RABBITMQ_HOST);connectionFactory.setPort(RABBITMQ_POST);connectionFactory.setUsername(RABBITMQ_USERNAME);connectionFactory.setPassword(RABBITMQ_PASSWORD);connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);//2.设置RabbitMQ的连接信息Connection connection = connectionFactory.newConnection();//3. 返回连接对象return connection;}
//生产者@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息者获取消息"+new String(body,"UTF-8"));}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);System.out.println("开始监听");System.in.read();}//消费者@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息String message="hello word!";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功");System.in.read();}

4.2Work queues

在这里插入图片描述
一个队列中的消息,只会被一个消费者成功的消费,默认情况下,RabbitMQ的队列会将消息以轮询的方式交给不同的消费者消费,消费者拿到消息后,需要给RabbitMQ一个ack,RabbitMQ认为消费者已经拿到消息了

//消费者@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.1 设置消息的控制,一次拿几个消息
//#2        channel.basicQos(1);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//模拟业务执行时间Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息者获取消息"+new String(body,"UTF-8"));
//#1				channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听");System.in.read();//消费者2@Testpublic void consume2() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.1 设置消息的控制,一次拿几个消息
//#2        channel.basicQos(1);//4. 监听消息 DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//模拟业务执行时间Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息者获取消息"+new String(body,"UTF-8"));//basicAck(标识,是否批量操作)
//#1				channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听");System.in.read();}
//生产者public static final String QUEUE_NAME="work";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息for (int i=0;i<10;i++){String message="hello word!"+i;channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息发送成功");System.in.read();}

当两台消费者的消费能力不相同的时候,为了提高效率,就不能以轮询的方式进行分发,而是以消费者消费完成后手动传递ack 的方式进行下一个消息的分发,将== #1 #2 的代码 ==打开即可
操作步骤:

  • 操作#1 让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
  • 操作#2 设置每次拿几个消息

4.3 Publish/Subscribe

自行创建路由器,并绑定队列
在这里插入图片描述
如何构建一个自定义交换机,并指定类型是FANOUT,让交换机和多个Queue绑定到一起

//生产者 
public static final String EXCHANGE_NAME="pubsub";public static final String QUEUE_NAME1="pubsub-one";public static final String QUEUE_NAME2="pubsub-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe!".getBytes());System.out.println("消息成功发送");}

4.4 Routing

DIRECT 类型的交换机,在绑定Exchange和Queue时,需要指定好routingKey同时在发送消息的时候,也指定routingkey,只有在routingkey 一致时,才会把指定的消息路由到指定的队列
在这里插入图片描述

public static final String EXCHANGE_NAME="routing";public static final String QUEUE_NAME1="routing-one";public static final String QUEUE_NAME2="routing-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.DIRECTchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林!".getBytes());channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());System.out.println("消息成功发送");}

4.5 Topics

topics 模式支持模糊匹配RoutingKey,就像是sql中的 like子句模糊查询,而路由模式等同于sql中的where子句等值查询

在这里插入图片描述

通过模糊路由到队列。该方式的Routing key必须具有固定格式:以 . 间隔的一串单词,比如:quick.orange.rabbit,Routing key 最多不能超过255byte。

交换机和队列的Binding key用通配符来表示,有两种语法:

  • * 可以替代一个单词;
  • # 可以替代 0 或多个单词;

例如 #.com.#
#可以表示0级或多级。xx.com、com.xx、com、xx.com.xx.xx、xx.xx.com.xx都可以

例如 *.com. *
*表示一级,xx.com.xx 可以 ,com.xx 不可以,前面缺少一级,xx.com.xx.xx不可以,com后面只能有一级xx,最终格式必须是 xx.com.xx

 public static final String EXCHANGE_NAME="topics";public static final String QUEUE_NAME1="topics-one";public static final String QUEUE_NAME2="topics-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.TOPICchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,// Topic类型的交换机,在和队列绑定时,需要以aaa.bbb.ccc 方式编写routingKey// 其中有两个特殊字符: *(相当于占位符),# (相当通配符)channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙子兔子!".getBytes());//匹配1、2channel.basicPublish(EXCHANGE_NAME,"small.while.rabbit",null,"小兔子!".getBytes());//匹配1、2channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog",null,"懒狗狗狗狗!".getBytes());//匹配 3System.out.println("消息成功发送");}

4.6 RPC (了解)

因为两个服务在交互时,可以尽量做到Client和server的结偶,通过RabbitMQ进行结藕操作
需要让client 发送消息时,携带两个属性,

  • replyto告知server将相应信息放到哪个队列
  • correlationId告知server 发送相应消息时,需要携带位置标识来告知client响应的消息

在这里插入图片描述

public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 发布消息String message="hello rpc!";String uuid = UUID.randomUUID().toString();AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,prop,message.getBytes());channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String id = properties.getCorrelationId();if(id!=null && id.equals(uuid)){System.out.println("接收到服务端的响应:"+new String(body));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.out.println("消息发送成功");System.in.read();}
}public class Consumer {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息者获取消息"+new String(body,"UTF-8"));String resp = "获取到client发出的请求,这里是响应的信息";String respQueueName = properties.getReplyTo();String uuid = properties.getCorrelationId();AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().correlationId(uuid).build();channel.basicPublish("",respQueueName,prop,resp.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_PUBLISHER,false,callback);System.out.println("开始监听");System.in.read();}
}

五、Springboot 操作RabbitMQ


  • 创建项目
  • 导入依赖
	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 配置rabbitmq信息
spring:rabbitmq:host: 172.16.177.133password: guestusername: guestport: 5672virtual-host: /

配置类声明队列

@Configuration
public class RabbitMQConfig {public static final String EXCHANGE="boot-exchange";public static final String QUEUE="boot-queue";public static final String ROUTING_KEY="*.black.*";@Beanpublic Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Queue bootQueue(){return QueueBuilder.durable(QUEUE).build();}@Beanpublic Binding bootBinding(Exchange bootExchange,Queue bootQueue){return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();}
}

生产者配置

@SpringBootTest
class SpringbootRabbitmqApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid contextLoads() {}@Testpublic void publisher(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");}@Testpublic void publiWithProps(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setCorrelationId("123");return message;}});System.out.println("消息发送成功2");}
}

消费者配置

@Component
public class ConsumeListener {/**** @param msg* @param channel 前提是配置好spring.rabbitmq.listener.simple.acknowledge-mode: manual #开启手动ack* @param message* @throws IOException*/@RabbitListener(queues = RabbitMQConfig.QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("队列消息为:"+msg);String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("标识为:"+correlationId);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
  • 声明交换机&队列

六、RabbitMQ保证消息可靠性

confirm机制
可以通过confirm效果保证消息一定送达到Exchange,官方提供了三种,选择了对于效率影响最低的异步回调的效果

6.1、 保证消息一定送达到Exchange

使用confirm机制

 public static final String QUEUE_NAME="confirms ";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//3.1 开启confirms 的异步回调channel.confirmSelect();String message="hello word!";//3.2 设置confirms的异步回调channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息成功发送到Exchange");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作");}});//3.3 设置return回调,确认消息是否路由到了队列channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有到指定队列时,做其他的补偿措施!!");}});//3.4、设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.build();//4. 发布消息channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());System.out.println("消息发送成功");System.in.read();}

6.2、保证消息可以路由到Queue中

使用return 机制
为了保证Exchange上的消息一定可以送达到Queue

//6.2设置return 回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有到指定队列时,做其他的补偿措施!!");}});
//7.在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return	 回调

6.3、保证队列持久化消息

DeliveryMode设置消息持久化

//6.3、设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.build();//4. 发布消息channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());

6.4、保证消息者可以正常消费消息

详情看WorkQueue模式

6.5 SpringBoot实现上述操作

6.5.1 Confirm

  • 编写配置文件开启Confirm机制

spring:
rabbitmq:
publisher-confirm-type: correlated # 新版本 开启confirm机制
publisher-confirms: true # 老版本

  • 在发送消息时,配置RabbitTemplate
@Testpublic void publishWithConfirms() throws IOException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if(b){System.out.println("消息已经送达到交换机");}else{System.out.println("消息没有送到到Exchange,需要做一些补偿操作!");}}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}

6.5.2 Return

  • 编写配置文件开启Return机制

spring:
rabbitmq:
publisher-returns: true # 开启return机制

  • 在发送消息时,配置RabbitTemplate
@Testpublic void publishWithReturn() throws IOException {rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {String msg = new String(message.getBody());System.out.println("消息失败:"+msg+"路由队列失败!!做补救操作");}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}

6.5.3 消息持久化

//3.4、设置消息持久化
AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化 #1.build();//4. 发布消息,  将参数mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());

七、RabbitMQ死信队列 & 延迟交换机

###7.1、 消息被消费者拒绝,requeue设置为false
###7.2.1、发送消息时设置消息的生存时间,如果生存时间到了,还没有被消费。
###7.2.2、 也可以指定某个队列中所有消息的生存时间,如果生存时间到了,还没有被消费
###7.3、队列已经达到消息的最长长度后,再路由过来的消息直接变成死信
在这里插入图片描述

7.4、准备Exchange&Queue

@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE="normal-exchange";public static final String NORMAL_QUEUE="normal-queue";public static final String NORMAL_ROUTING_KEY="normal.#";public static final String DEAD_EXCHANGE="dead-exchange";public static final String DEAD_QUEUE="dead-queue";public static final String DEAD_ROUTING_KEY="dead.#";/**普通交换机*/@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}/**普通队列*/@Beanpublic Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列.build();}/**普通队列绑定路由*/@Beanpublic Binding normalBingding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(DEAD_ROUTING_KEY).noargs();}/**死信交换机*/@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}/**死信队列*/@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE ).build();}/**绑定死信队列和交换机*/@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}
}

7.5、实现效果

  • 基于消费者进行reject 或者 nack 实现死信效果
  • 实现延迟消费的效果,比如:下订单时,有15分钟的付款时间
@Component
public class DeadListener {/**** @param msg* @param channel 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual* @param message 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual*/@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void comsume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal队列的消息:"+msg);
//        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}
}
  • 消息的生存时间

    • 给消息设置生存时间
     @Test
    public void publishExpire(){String msg ="dead letter expire";rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");//五秒return message;}});}
    
    • 给队列设置生存时间
     /**普通队列*/
    @Bean
    public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列.deadLetterRoutingKey("dead.abc") //从新修改routingkey 信息,不设置的话,普通队列的消息会消失,但是死信队列中却没有出现.ttl(10000) //队列生存时间.build();
    }
    
    • 设置Queue中的消息最大长度
        /**普通队列*/@Beanpublic Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列.deadLetterRoutingKey("dead.abc") //从新修改routingkey 信息,不设置的话,普通队列的消息会消失,但是死信队列中却没有出现
    //                .ttl(10000) //队列生存时间.maxLength(1)   //队列最大长度.build();}
    

    只要队列中有一个消息,如果再次发送一个消息,这个消息就会变成死信

7.6、延迟交换机

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

  • 构建延迟交换机
/*** @ClassName:DelayedConfig* @Description:延迟队列, 注意: 消息是在交换机中延迟,时间到了后才会放到队列中,*                              此时如果消息在交换机中延迟过程中,rabbitmq重启则会丢失消息* @Author:* @Date:9/7/23 10:05 上午* @Versiion:1.0*/
@Configuration
public class DelayedConfig {public static final String DELAYED_EXCHANGE="delayed-exchange";public static final String DELAYED_QUEUE="delayed-queue";public static final String DELAYED_ROUTING_KEY="delayed.#";@Beanpublic Exchange delayedExchange(){Map<String,Object> arguments =new HashMap<String,Object>();arguments.put("x-delayed-type","topic");Exchange exchange =new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);return exchange;}@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE).build();}@Beanpublic Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
  • 发送消息
    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.adb", "delayedxxx",new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(3000);return message;}});System.out.println("发送成功");}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128180.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【结合AOP与ReflectUtil对返回数据进行个性化填充展示】

结合AOP与ReflectUtil对返回数据进行个性化填充展示 背景 对于接口列表返回的数据&#xff0c;我们通常有时候会对某些特殊的字段进行转化&#xff0c;或者根据某逻辑进行重新赋值&#xff0c;举个例子&#xff0c; 比如返回的列表数据中有性别sex&#xff0c;我们通常会同时…

微信小程序实现连续签到七天

断签之后会从第一天重新开始 <template><view class"content" style"height: 100vh;background: white;"><view class"back"><view style"position: absolute;bottom: 200rpx;left: 40rpx;width: 90%;"><i…

无人机航线规划

无人机航线规划&#xff0c;对于无人机的任务执行有着至关重要的作用&#xff0c;无人机在从起点飞向目的点的过程中&#xff0c;如何规划出一条安全路径&#xff0c;并且保证该路径代价最优&#xff0c;是无人机航线规划的主要目的。其中路径最优的含义是&#xff0c;在无人机…

透视俄乌网络战之二:Conti勒索软件集团(上)

透视俄乌网络战之一&#xff1a;数据擦除软件 Conti勒索软件集团&#xff08;上&#xff09; 1. Conti简介2. 组织架构3. 核心成员4. 招募途径5. 工作薪酬6. 未来计划参考 1. Conti简介 Conti于2019年首次被发现&#xff0c;现已成为网络世界中最危险的勒索软件之一&#xff0…

goLang笔记+beego框架

goLang笔记+ 初始安装之后 GOPATH: Go开发相关的环境变量如下: GOROOT:GOROOT就是Go的安装目录,(类似于java的JDK) GOPATH:GOPATH是我们的工作空间,保存go项目代码和第三方依赖包 GOPATH可以设置多个,其中,第一个将会是默认的包目录,使用 go get 下载的包都会在第一…

Qt下SVG格式图片应用

SVG格式图片介绍 svg格式图片又称矢量图&#xff0c;该种格式的图片不同于png等格式的图片&#xff0c;采用的并不是位图的形式来组织图片&#xff0c;而是采用线条等组织图片&#xff0c;svg格式是图片的文件格式是xml&#xff0c;可以通过文件编译器打开查看svg格式内容。 …

【rust/egui】(七)看看template的app.rs:Slider

说在前面 rust新手&#xff0c;egui没啥找到啥教程&#xff0c;这里自己记录下学习过程环境&#xff1a;windows11 22H2rust版本&#xff1a;rustc 1.71.1egui版本&#xff1a;0.22.0eframe版本&#xff1a;0.22.0上一篇&#xff1a;这里 Slider 滑块&#xff0c;如下图 定义…

glibc2.35-通过tls_dtor_list劫持exit执行流程

前言 glibc2.35删除了malloc_hook、free_hook以及realloc_hook&#xff0c;通过劫持这三个hook函数执行system已经不可行了。 传统堆漏洞利用是利用任意地址写改上上述几个hook从而执行system&#xff0c;在移除之后则需要找到同样只需要修改某个地址值并且能够造成程序流劫持…

动态路由的主流算法

路由器就是一台网络设备&#xff0c;它有多张网卡。当一个入口的网络包送到路由器时&#xff0c;它会根据一个本地的转发信息库&#xff0c;来决定如何正确地转发流量。这个转发信息库通常被称为路由表。 一张路由表中会有多条路由规则。每一条规则至少包含这三项信息。 目的…

stable diffusion webui中的sampler

Stable Diffusion-采样器篇 - 知乎采样器&#xff1a;Stable Diffusion的webUI中&#xff0c;提供了大量的采样器供我们选择&#xff0c;例如Eular a&#xff0c; Heum&#xff0c;DDIM等&#xff0c;不同的采样器之间究竟有什么区别&#xff0c;在操作时又该如何进行选择&…

Vue2项目练手——通用后台管理项目第六节

Vue2项目练手——通用后台管理项目 用户管理页table表格获取表格数据目录列表user.jsmock.jsindex.jsUsers.vue 新增和编辑功能Users.vue 删除功能使用的组件Users.vue 用户管理页 table表格 使用的组件和前面的表格使用的一致。 获取表格数据 目录列表 user.js import Mo…

静态路由——实现两个不相连的网段通信实验

路漫漫其修远兮&#xff0c;吾将上下而求索 今天做一个简单的实现两个不相连的网段通信实验&#xff0c;本实验使用静态路由配置&#xff0c;主要 加强初学者对静态路由的理解。 实际中不可能只使用静态路由&#xff0c;还要使用诸多的其他网络协议&#xff0c;达到安全可靠的…

C语言柔性数组详解:让你的程序更灵活

柔性数组 一、前言二、柔性数组的用法三、柔性数组的内存分布四、柔性数组的优势五、总结 一、前言 仔细观察下面的代码&#xff0c;有没有看出哪里不对劲&#xff1f; struct S {int i;double d;char c;int arr[]; };还有另外一种写法&#xff1a; struct S {int i;double …

【LeetCode-中等题】27. 移除元素

文章目录 题目方法一&#xff1a;快慢指针 题目 方法一&#xff1a;快慢指针 int fast 0;// 快指针 用于扫描需要的元素int slow 0;//慢指针 用于记录需要存放元素的位置class Solution { // 快慢指针public int removeElement(int[] nums, int val) {int fast 0;// 快指针…

BIOS < UEFI

Basic Input Output System &#xff08;BIOS&#xff09; Unified Extensible Firmware Interface &#xff08;UEFI&#xff09;

【100天精通Python】Day55:Python 数据分析_Pandas数据选取和常用操作

目录 Pandas数据选择和操作 1 选择列和行 2 过滤数据 3 添加、删除和修改数据 4 数据排序 Pandas数据选择和操作 Pandas是一个Python库&#xff0c;用于数据分析和操作&#xff0c;提供了丰富的功能来选择、过滤、添加、删除和修改数据。 1 选择列和行 Pandas 提供了多种…

STM32--蓝牙

本文主要介绍基于STM32F103C8T6和蓝牙模块实现的交互控制 简介 蓝牙&#xff08;Bluetooth&#xff09;是一种用于无线通信的技术标准&#xff0c;允许设备在短距离内进行数据交换和通信。它是由爱立信&#xff08;Ericsson&#xff09;公司在1994年推出的&#xff0c;以取代…

华为云云耀云服务器L实例评测 | 分分钟完成打地鼠小游戏部署

前言 在上篇文章【华为云云耀云服务器L实例评测 | 快速部署MySQL使用指南】中&#xff0c;我们已经用【华为云云耀云服务器L实例】在命令行窗口内完成了MySQL的部署并简单使用。但是后台有小伙伴跟我留言说&#xff0c;能不能用【华为云云耀云服务器L实例】来实现个简单的小游…

2023-9-10 Nim游戏

题目链接&#xff1a;Nim游戏 #include <iostream> #include <algorithm>using namespace std;int main() {int n;cin >> n;int res 0;while(n--){int x;cin >> x;res ^ x;}if(res) cout << "Yes" << endl;else cout << …

【Linux】Ubuntu20.04版本安装谷歌中文输入法【教程】

【Linux】Ubuntu20.04版本安装谷歌中文输入法【教程】 文章目录 【Linux】Ubuntu20.04版本安装谷歌中文输入法【教程】一、下载fcitx-googlepinyin二、配置Language SupportReference 一、下载fcitx-googlepinyin 使用下面的命令行下载fcitx-googlepinyin sudo apt-get insta…