深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念

文章目录

  • 文章导图
  • RabbitMQ架构及相关概念
    • 四大核心概念
    • 名词解读
  • 七大工作模式及四大交换机类型
    • 0、前置了解-默认交换机DirectExchange
    • 1、简单模式(Simple Queue)-默认DirectExchange
    • 2、 工作队列模式(Work Queues)-默认DirectExchange
    • 3、发布/订阅模式(Publish/Subscribe)-FanoutExchange
    • 4、路由模式(Routing)-自定义DirectExchange
    • 5、主题模式(Topics)-TopicExchange
    • 总结
  • 三种队列类型
    • 普通队列
    • 死信队列(Dead Letter Queue, DLQ)
      • 定义
      • 触发条件
      • 应用场景
      • 配置
    • 延迟队列(Delayed Queue)
      • 定义
      • 实现方式
      • 应用场景
    • 两者区别
    • 代码实战
      • 1. 延迟队列:TTL+DLX死信队列
        • 配置步骤
      • 2. 延迟队列:RabbitMQ延迟消息插件
        • 配置步骤
      • 3、死信队列: basic.reject或basic.nack
        • 1. 引入依赖
        • 2. 配置交换机、队列和死信队列
        • 3. 生产者发送消息
        • 4. 消费者监听并拒绝消息
        • 5. 消费者监听死信队列
        • 总结

RabbitMQ系列文章
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
TODO:RabbitMQ可靠性
TODO:RabbtiMQ顺序性
TODO:RabbitMQ常见问题整理

文章导图

在这里插入图片描述

RabbitMQ架构及相关概念

在这里插入图片描述

四大核心概念

生产者

产生数据发送消息的程序是生产者。

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定 。

队列

队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者

名词解读

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接 Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的
  • channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走 Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。
  • Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

七大工作模式及四大交换机类型

网上查了很多资料有的说是五种,有的说是四种,可以看到在RabbitMQ在官网提到的共有7种工作模式:https://www.rabbitmq.com/tutorials

第6种是RPC调用,这个我们正常肯定不用这个实现RPC,而第7种是confirm 确认模式,可以用于保证生产者消息发送的可靠性,这个我后面会再专门介绍。
现在我们主要讲前5种工作模式,实际上总结来说5种又可以总结为是3种,其实第1、2、4根据他们都是Direct交换机可以归结为一种,下文我会详细讲解一下。

在这里插入图片描述

0、前置了解-默认交换机DirectExchange

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。 默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

1、简单模式(Simple Queue)-默认DirectExchange

这个和别的几种模式对比看着没有X,这个其实用了默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:

在这里插入图片描述

//Config
@Bean
Queue queue1() {return new Queue("simpleQueue");
}// 生产者
@Autowired
private RabbitTemplate rabbitTemplate;public void sendSimpleMessage(String message) {rabbitTemplate.convertAndSend("simpleQueue", message);
}// 消费者
@RabbitListener(queues = "simpleQueue")
public void receiveSimpleMessage(String message) {System.out.println("Received: " + message);
}

这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “simpleQueue”,则 routingkey 为 “simpleQueue” 的消息会被该消息队列接收。
具体可以看下源码发送convertAndSend

/** Alias for amq.direct default exchange. */
private static final String DEFAULT_EXCHANGE = "";private static final String DEFAULT_ROUTING_KEY = "";private volatile String exchange = DEFAULT_EXCHANGE;
private volatile String routingKey = DEFAULT_ROUTING_KEY;@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {//可以发现这个this.exchange就是DEFAULT_EXCHANGE = ""convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}

2、 工作队列模式(Work Queues)-默认DirectExchange

这种情况是这样的:

在这里插入图片描述

一个生产者,也是一个默认的交换机(DirectExchange),一个队列,两个消费者。
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

和第一种对比主要体现在有多个消费者进行消费,主要优势在于可以通过增加消费者来提高处理能力。

//Config
@Bean
Queue queue1() {return new Queue("workQueue");
}// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendWorkMessage(String message) {rabbitTemplate.convertAndSend("workQueue", message);
}// Consumer
@RabbitListener(queues = "workQueue")
public void receiveWorkMessage(String message) {System.out.println("Received: " + message);// Simulate workThread.sleep(1000);
}

3、发布/订阅模式(Publish/Subscribe)-FanoutExchange

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

在这里插入图片描述

在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。

//Config
@Bean
public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");
}@Bean
public Queue fanoutQueue1() {return new Queue("fanoutQueue1");
}@Bean
public Queue fanoutQueue2() {return new Queue("fanoutQueue2");
}@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

接下来创建两个消费者,两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendFanoutMessage(String message) {rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}// Consumer
@RabbitListener(queues = "fanoutQueue1")
public void receiveFanoutMessage1(String message) {System.out.println("Queue1 Received: " + message);
}@RabbitListener(queues = "fanoutQueue2")
public void receiveFanoutMessage2(String message) {System.out.println("Queue2 Received: " + message);
}

注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null

4、路由模式(Routing)-自定义DirectExchange

DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “directQueue1”,则 routingkey 为 “directQueue1” 的消息会被该消息队列接收。

在这里插入图片描述

// Config
@Bean
public DirectExchange directExchange() {return new DirectExchange("directExchange");
}@Bean
public Queue directQueue1() {return new Queue("directQueue1");
}@Bean
public Queue directQueue2() {return new Queue("directQueue2");
}@Bean
public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(directExchange).with("info");
}@Bean
public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(directExchange).with("error");
}

可以发现我们可以根据routingKey控制发送到哪个队列上,这个本质上和我们前面2种模式都是一样的,采用的都是DirectExchange,只不过前面2种的交换机DirectExchange是""默认值,现在我们这里是指定了自己的DirectExchange

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendDirectMessage(String message, String routingKey) {rabbitTemplate.convertAndSend("directExchange", routingKey, message);
}// Consumer
@RabbitListener(queues = "directQueue1")
public void receiveDirectMessage1(String message) {System.out.println("Queue1 Received: " + message);
}@RabbitListener(queues = "directQueue2")
public void receiveDirectMessage2(String message) {System.out.println("Queue2 Received: " + message);
}

特别注意:如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

5、主题模式(Topics)-TopicExchange

在这里插入图片描述

在 RabbitMQ 的主题模式(Topics)中,消息通过带有路由键的主题交换机(TopicExchange)进行路由。消息的路由键是一个点分隔的字符串,消费者可以使用绑定键(带有通配符)来订阅感兴趣的消息。

  • 队列 topicQueue1 使用绑定键 *.orange.*,匹配任意第一个和第三个单词,以 orange 为第二个单词的消息。
  • 队列 topicQueue2 使用绑定键 *.*.rabbit,匹配任意前两个单词,以 rabbit 为第三个单词的消息。
// Config
@Bean
public TopicExchange topicExchange() {return new TopicExchange("topicExchange");
}@Bean
public Queue topicQueue1() {return new Queue("topicQueue1");
}@Bean
public Queue topicQueue2() {return new Queue("topicQueue2");
}@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
}@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
}
  • topicQueue1topicQueue2 接收匹配其绑定键的消息。
  • 灵活路由: 主题模式允许通过复杂的路由键实现灵活的消息路由。
  • 使用场景: 适用于需要按模式匹配路由消息的场景,比如日志分级、区域性数据分发等。
// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendTopicMessage(String message, String routingKey) {rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
}// Consumer
@RabbitListener(queues = "topicQueue1")
public void receiveTopicMessage1(String message) {System.out.println("Queue1 Received: " + message);
}@RabbitListener(queues = "topicQueue2")
public void receiveTopicMessage2(String message) {System.out.println("Queue2 Received: " + message);
}

总结

看了上面的5个例子,其实本质上我们可以根据Exchange交换机类型归结为3种工作模式Direct、Fanout、Topic

  • Direct:定向,把消息交给符合指定routing key 的队列 (第1、2、4其实都是这种交换机)
  • Fanout:广播,将消息交给所有绑定到交换机的队列 第**(第3种模式)**
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列**(第5种模式)**

这里提一下,交换机还有一种类型,Headers:头匹配,基于MQ的消息头匹配,不过这种用的非常少,可以忽略!

不难发现,这三种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,三种类别对应着三种判断角度

  • direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;
  • fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;
  • topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

三种队列类型

普通队列

我们平常发送的正常都是普通队列,比如上面5种工作模式说的都是普通队列,就不多说了

死信队列(Dead Letter Queue, DLQ)

特别注意:

  • 队列和消息都有个TTL生存时间,队列的TTL到达后队列会自动删除,消息不会进入死信队列;
  • 消息的生存时间到达后会进入死信队列。消息的生存时间可以在队列设置所有消息的TTL,也可以对某个消息单独设置TTL。

定义

死信队列是用于处理无法被消费者正确处理的消息的队列。当消息在原始队列中无法被消费时,会被转移到死信队列中。

触发条件

消息会变成死信并进入死信队列的几种情况:

  1. 消息被消费者拒绝(通过basic.rejectbasic.nack),并且requeue=false
  2. 消息在队列中超过了TTL(Time To Live)时间。
  3. 队列达到最大长度限制,无法再接收新消息。

在这里插入图片描述

应用场景

  • 处理无法被消费的消息,避免消息堆积影响其他消息的消费。
  • 记录和监控消息处理错误,方便进行后续处理

配置

  • 通过设置 x-dead-letter-exchangex-dead-letter-routing-key 将消息路由到死信队列。
  • 在原始队列中设置死信交换机和死信队列的相关参数

延迟队列(Delayed Queue)

定义

延迟队列是一种特殊的队列,消息在发送到队列后,需要等待一段时间后才能被消费。

实现方式

  1. 通过死信队列实现延迟任务

    把死信队列就当成延迟队列,具体来说是这样:

    假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

    • 将消息发送到一个没有消费者的队列,设置消息的TTL。
    • 消息过期后进入死信队列,再由死信队列的消费者处理。
  2. 通过RabbitMQ延迟插件

    • 使用RabbitMQ的延迟插件(rabbitmq_delayed_message_exchange 插件),消息在延迟一段时间后再投递到目标队列中进行消费。

应用场景

  • 订单超时未支付自动取消。
  • 用户注册后未登录的提醒。
  • 预定会议前的通知

两者区别

使用TTL和死信队列实现延迟插件其实是会有一些问题的:

  • 问题一:当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, 必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;
  • 问题二:就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;

延迟交换机插件可以在一定程度上解决上述两种问题。

特性死信队列延迟队列
定义处理无法被消费的消息消息在指定时间后才被消费
触发条件消息被拒绝、消息过期、队列满消息设置了TTL或使用延迟插件
应用场景处理消费失败的消息,避免队列堵塞订单超时取消、提醒通知等延迟处理场景
实现方式配置死信交换机和死信队列使用TTL和死信队列或延迟插件
消息处理进入死信队列后进行特殊处理延迟一段时间后再投递到目标队列

代码实战

1. 延迟队列:TTL+DLX死信队列

配置步骤

1、引入依赖

pom.xml中引入Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置交换机和队列

在Spring Boot的配置类中,配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal_exchange");}// 死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead_letter_exchange");}// 普通队列并绑定到普通交换机@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal_queue").withArgument("x-dead-letter-exchange", "dead_letter_exchange").withArgument("x-dead-letter-routing-key", "dead_letter_routing_key").build();}@Beanpublic Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");}// 死信队列并绑定到死信交换机@Beanpublic Queue deadLetterQueue() {return new Queue("dead_letter_queue");}@Beanpublic Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");}
}

3、生产者发送消息

在生产者发送消息时,可以指定消息的TTL(Time-To-Live)。TTL到期后,消息会被转发到死信队列:

  • 创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间
java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send(@RequestParam String message, @RequestParam int delay) {rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message, msg -> {//创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间msg.getMessageProperties().setExpiration(String.valueOf(delay));return msg;});return "Message sent with delay: " + delay;}
}

4、消费者监听死信队列

消费者监听死信队列,接收到消息后处理:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "dead_letter_queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

2. 延迟队列:RabbitMQ延迟消息插件

RabbitMQ有一个插件 rabbitmq-delayed-message-exchange 可以直接支持延迟消息队列。

配置步骤

1、安装RabbitMQ延迟消息插件

首先,确保RabbitMQ服务器上已安装rabbitmq-delayed-message-exchange插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange/21、**配置交换机和队列**

2、在Spring Boot中配置使用延迟消息交换机:

  • 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类,用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。
  • 交换机的名称为DELAYED_EXCHANGE,类型为"x-delayed-message",持久化为true,自动删除为false,属性为之前创建的HashMap对象。
java复制代码import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitDelayedConfig {@Beanpublic CustomExchange delayedExchange() {return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, Map.of("x-delayed-type", "direct"));}@Beanpublic Queue delayedQueue() {return new Queue("delayed_queue");}@Beanpublic Binding delayedBinding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("delayed_routing_key").noargs();}
}

3、生产者发送消息

生产者在发送消息时,可以设置延迟时间:

java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;@RestController
public class DelayedProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDelayed")public String sendDelayed(@RequestParam String message, @RequestParam int delay) {Map<String, Object> headers = new HashMap<>();headers.put("x-delay", delay);rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", message, msg -> {msg.getMessageProperties().getHeaders().putAll(headers);return msg;});return "Delayed message sent with delay: " + delay;}
}

4、消费者监听延迟队列

与TTL+DLX方法相同,消费者直接监听队列接收消息:

java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DelayedConsumer {@RabbitListener(queues = "delayed_queue")public void receiveDelayedMessage(String message) {System.out.println("Received delayed message: " + message);}
}

3、死信队列: basic.reject或basic.nack

死信队列有3种情况: 这里就举常见的手动ack的情况拒绝消息实现死信队列

要在Spring Boot中使用RabbitMQ实现死信队列(Dead Letter Queue,DLQ),并处理消息被消费者拒绝的情况(通过basic.rejectbasic.nack并且requeue=false),可以按照以下步骤来实现。

1. 引入依赖

首先,在pom.xml中引入Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置交换机、队列和死信队列

接下来,在Spring Boot的配置类中配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal_exchange");}// 死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead_letter_exchange");}// 普通队列并绑定到普通交换机@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal_queue").withArgument("x-dead-letter-exchange", "dead_letter_exchange") // 设置死信交换机.withArgument("x-dead-letter-routing-key", "dead_letter_routing_key") // 设置死信RoutingKey.build();}@Beanpublic Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");}// 死信队列并绑定到死信交换机@Beanpublic Queue deadLetterQueue() {return new Queue("dead_letter_queue");}@Beanpublic Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");}
}
3. 生产者发送消息

在生产者中发送消息到普通队列:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send(@RequestParam String message) {rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message);return "Message sent: " + message;}
}
4. 消费者监听并拒绝消息

注意这里的前提是要开启手动ack:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual  # 手动ack

消费者监听普通队列并有条件地拒绝消息,将消息转发到死信队列:

  • 当发送的消息内容为"reject"时,该消息会被拒绝并转发到死信队列。
  • 当发送其他内容的消息时,消息会被正常消费。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;@Component
public class Consumer {@RabbitListener(queues = "normal_queue")public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {try {String msg = new String(message.getBody());System.out.println("Received message: " + msg);// 根据某些条件判断是否拒绝消息if ("reject".equals(msg)) {// 拒绝消息,并且不重新入队(requeue=false)channel.basicReject(tag, false);System.out.println("Message rejected: " + msg);} else {// 消费成功,确认消息channel.basicAck(tag, false);}} catch (Exception e) {// 异常情况也可以使用basicNack将消息拒绝,并且不重新入队channel.basicNack(tag, false, false);}}
}
5. 消费者监听死信队列

最后,消费者监听死信队列,处理被拒绝的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DeadLetterConsumer {@RabbitListener(queues = "dead_letter_queue")public void receiveDeadLetterMessage(String message) {System.out.println("Received dead letter message: " + message);}
}
总结
  • 配置普通队列和死信队列,并通过设置x-dead-letter-exchangex-dead-letter-routing-key来实现消息被拒绝后的处理。
  • 消费者可以根据业务逻辑通过basic.rejectbasic.nack拒绝消息,并指定不重新入队(requeue=false),从而将消息转发到死信队列。
  • 死信队列中的消息可以被另一个消费者监听和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/420789.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【IEEE独立出版 | 往届快至会后2个月检索,刊后1个月检索】2024年第四届电子信息工程与计算机科学国际会议(EIECS 2024)

在线投稿&#xff1a;学术会议-学术交流征稿-学术会议在线-艾思科蓝 电子信息的出现与计算机技术、通信技术和高密度存储技术的迅速发展并在各个领域里得到广泛应用有着密切关系。作为高技术领域中重要的前沿技术之一&#xff0c;电子信息工程具有前瞻性、先导性的特点&#x…

MySQL 查询优化秘籍:让你的数据库查询飞起来

《MySQL 查询优化秘籍&#xff1a;让你的数据库查询飞起来》 在数据库应用中&#xff0c;高效的查询性能至关重要。MySQL 作为广泛使用的关系型数据库&#xff0c;掌握一些常用的查询优化方法可以极大地提升系统的响应速度和性能。今天&#xff0c;我们就来一起探讨常用的优化…

NSSCTF reserve wp--非常简单的逻辑题

也可参考这篇文章&#xff1a; 本题并不是拖入ida中&#xff0c;进行静态分析&#xff0c;下载文件后文件名是py&#xff0c;我们将其重命名(即修改后缀为.py) (如图) 打开后分析以下代码 逆向一下有点麻烦&#xff0c;看了大佬的题解&#xff0c;说是直接正向爆破一下&#x…

萱仔环境记录——git的使用流程:以上传一个项目进入GitHub仓库为例子

前段时间我已经不使用学校的电脑了&#xff0c;在自己的笔记本上安装了git&#xff0c;准备好好把我这几年做的项目整理一下进行开源&#xff0c;由于前几次的面试&#xff0c;一些公司考核到了git的用法&#xff0c;虽然平时我也在使用git对自己的项目进行管理&#xff0c;但还…

华为 HCIP-Datacom H12-821 题库 (9)

有需要题库的可以看主页置顶 V群进行学习交流 1.以下关于 RSTP 保护功能的描述&#xff0c;错误的是哪一选项&#xff1f; A、环路保护可以部署在根端口上&#xff0c;以防网络中形成环路 B、环路保护可以部署在Alternate 端口上&#xff0c;以防网络中形成环路 C、BPDU 保…

导入torch时,报错 Error loading “C:\Users\Thinkpad\AppData\Roaming\Python\Python311\site-packages\torch\li

1.报错内容&#xff1a; Error loading "C:\Users\Thinkpad\AppData\Roaming\Python\Python311\site-packages\torch\lib\fbgemm.dll" or one of its dependencies. 2.报错原因&#xff1a;是因为torch和python版本不对应 3.解决方案&#xff1a; 重新安装torch&a…

vue基础语法

指令修饰符 如果想了解keycode&#xff0c;可以查看keycode对照表&#xff0c;如下图&#xff08;部分&#xff09;: 阻止冒泡事件名.stop 父div包裹子div&#xff0c;如果点击子div&#xff0c;会触发父div。 如果想只显示子div的事件&#xff0c;那么可以改子div的内容 cli…

【论文分享精炼版】 sNPU: Trusted Execution Environments on Integrated NPUs

今天在COMPASS分享了之前写的一个博客&#xff0c;做了进一步的提炼总结&#xff0c;大家可以看看原文~ 今天分享的论文《sNPU: Trusted Execution Environments on Integrated NPUs》来自2024年ISCA&#xff0c;共同一作为Erhu Feng以及Dahu Feng。并且&#xff0c; 这两位作…

Windows Home版本实现远程桌面——RDP Wrapper,及由于更新导致不可用的解决方法:以win11 22631.3593为例

一、RDP Wrapper工作机制 根据rdpwap.ini文件调用相应windows版本的termsrv.dll. 实用的命令&#xff1a; > net stat -au | findstr 3389 ; 查看端口是否启动 > net stop termservice ; 停止远程桌面 > net start termservice; > mstsc > 二、问题解决 注意查…

93. UE5 GAS RPG 应用负面效果表现

在上一篇文章里&#xff0c;我们实现了添加负面效果GE&#xff0c;并且在添加GE时&#xff0c;也会给角色应用一个负面效果标签作为标识。在这一篇里&#xff0c;我们将通过负面效果标签标识&#xff0c;应用角色身上展现对应的负面效果的表现。 我们将在这篇文章里添加一个自定…

leetcode:2710. 移除字符串中的尾随零(python3解法)

难度&#xff1a;简单 给你一个用字符串表示的正整数 num &#xff0c;请你以字符串形式返回不含尾随零的整数 num 。 示例 1&#xff1a; 输入&#xff1a;num "51230100" 输出&#xff1a;"512301" 解释&#xff1a;整数 "51230100" 有 2 个尾…

vmware 17.6 pro for personal USE初体验

新学期开学了&#xff0c;暑假期间把台式机放在办公室远程&#xff0c;无赖期间经常断电&#xff0c;把我的老台给烧坏了&#xff0c;检测了下固态硬盘和机械硬盘&#xff0c;好歹能用。但是win11的系统奔溃了。就花了半天时间重装。*v* 悲剧的是&#xff0c;一些软件环境必须…

javaWeb【day04】--(MavenSpringBootWeb入门)

01. Maven课程介绍 1.1 课程安排 学习完前端Web开发技术后&#xff0c;我们即将开始学习后端Web开发技术。做为一名Java开发工程师&#xff0c;后端Web开发技术是我们学习的重点。 1.2 初识Maven 1.2.1 什么是Maven Maven是Apache旗下的一个开源项目&#xff0c;是一款用于…

python进阶篇-day09-数据结构与算法(非线性结构与排序算法)

非线性结构(树状结构) 特点: 每个节点都可以有n个子节点(后继节点) 和 n个父节点(前驱节点) 代表: 树, 图...... 概述 属于数据结构之 非线性结构的一种, 父节点可以有多个子节点(后续节点) 特点 有且只有1个根节点 每个节点都可以有1个父节点及任意个子节点, 前提: 根节点除…

C++竞赛初阶L1-15-第六单元-多维数组(34~35课)551: T456501 计算矩阵边缘元素之和

题目内容 输入一个整数矩阵&#xff0c;计算位于矩阵边缘的元素之和。 所谓矩阵边缘的元素&#xff0c;就是第一行和最后一行的元素以及第一列和最后一列的元素。 输入格式 第 1 行包含两个整数&#xff0c;分别为行数 m 和列数 n&#xff0c;两个整数之间空格隔开。 第 2 …

文本字符分割算法尝试

一、基于opencv的分割算法 import cv2 import numpy as np from matplotlib import pyplot as pltimg cv2.imread(scratch.png, 0) # global thresholding ret1, th1 cv2.threshold(img, 127, 255, cv2.THRESH_BINARY) # Otsus thresholding th2 cv2.adaptiveThreshold(img…

Windows I/O系统

硬件存储体系 寄存器 处理器内部定义的存储体&#xff0c;它们除了存储功能&#xff0c;往往还兼有其他的能力&#xff0c;比如参与运算&#xff0c;地址解析&#xff0c;指示处理器的状态&#xff0c;等等。寄存器是由处理器内部专门的触发器电路实现的&#xff0c;处理器往…

Java代码审计篇 | ofcms系统审计思路讲解 - 篇3 | 文件上传漏洞审计

文章目录 0. 前言1. 文件上传代码审计【有1处】1.1 可疑点1【无漏洞】1.1.1 直接搜索upload关键字1.1.2 选择第一个&#xff0c;点进去分析一下1.1.3 分析this.getFile()方法1.1.4 分析new MultipartRequest(request, uploadPath)1.1.5 分析isSafeFile()方法1.1.6 分析request.…

关于支付宝小程序多规格选项的时候点击不起反应的原因分析及修改方法

解决方案&#xff1a; watch的时候&#xff0c;对于对象的赋值&#xff0c;最好用深拷贝&#xff0c;即如下图&#xff1a; watch:{ row: function (nv, ov) {var that this;that.indata.row JSON.parse(JSON.stringify(nv));//如果是对象&#xff0c;请用深入的for (va…

《OpenCV计算机视觉》—— 图像边缘检测

文章目录 一、图像边缘检测概述二、常见的图像边缘检测算法&#xff08;简单介绍&#xff09;1.sobel算子2.Scharr算子3.Laplacian算子4.Canny算子 三、代码实现 一、图像边缘检测概述 图像边缘检测是一种重要的图像处理技术&#xff0c;用于定位二维或三维图像中对象的边缘。…