掌握RabbitMQ:全面知识点汇总与实践指南

前言

RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。

特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。

作用:服务间异步通信;顺序消费;定时任务;请求削峰;

1、AMQP协议定义

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输

特性AMQPMQTT
适用场景大型企业级应用、金融交易、云服务物联网、移动应用、智能家居
通信模式生产者-消费者发布-订阅
消息大小较大,适合复杂的消息结构小型,适合简单的消息
QoS 级别支持,但不如 MQTT 精细详细的 QoS 级别,特别是针对 IoT 场景
性能要求对性能有一定要求,但更注重可靠性和安全性极低的带宽消耗和资源占用
安全性强调端到端的安全性支持基本的安全特性,适用于资源受限环境

2、AMQP机制

1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。

  • 生产者发送消息到Broker消息代理服务
  • 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
  • 队列存储消息,直到消费者取走消息
  • 消费者,读取队列中的消息

AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。

类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。

2>AMQP消息传递方式

特性点对点模式 (P2P)发布/订阅模式 (Pub/Sub)
消息传递方式每条消息仅被一个消费者处理一条消息可以被多个消费者同时接收
队列数量单个队列每个消费者有自己的队列
生产者行为直接发送到队列发送到交换器,由交换器负责路由
消费者行为从同一队列中竞争消费各自独立消费自己的队列中的消息
适用场景任务分配、工作流管理广播通知、日志记录、事件驱动架构
扩展性受限于单个队列的吞吐量可以通过增加更多的消费者来提高整体吞吐量
复杂度较低,易于理解和实现需要考虑交换器类型、路由规则等因素,稍微复杂

在这里插入图片描述

  • 1、点对点
    生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
    每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。

竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。

// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 执行任务...channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});

  • 2、发布订阅
    生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本

(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息

// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

3、AMQP消息只被消费一次

  • 1、合理配置消息队列ACK机制
    大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
  • 2、合理配置消息队列预取数量
    防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
  • 3、消费者幂等性设计
    针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
    确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
    增加补偿机制,比如退款,退积分等概念的操作
  • 4、分布式锁
    借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。
  • 5、监控告警机制
    监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。
  • 6、事务性消息
    指的是消息和业务操作,一起成功或一起失败的机制。
    (1)本地事务+补偿机制
    (2)二阶段提交
    引入协调者和参与者的概念。
    客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
    (3)三阶段提交
    针对二阶段提交完善事务性消息机制。
    首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。

4、AMQP 消息顺序消费

  • 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
  • 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:channel.basicQos(1);
  • 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
  • 另外使用幂等性设计来避免重复消费。
  • 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
  • 增加监控告警到服务负责人。
  • 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。

5、AMQP消息可靠性

  • 事务支持
    允许一组操作作为一个整体提交或回滚。
  • ACK确认机制
    当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。
  • 持久化选项
    可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。

6、RabbitMQ配置ACK

1>rabbitmq.conf或rabbitmq.ini开启配置

# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on

2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumer {private final static String QUEUE_NAME = "task_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,确保它存在channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置预取计数为 1,确保每次只处理一条消息channel.basicQos(1);// 开启手动确认模式DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 模拟任务处理时间Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();} finally {System.out.println(" [x] Done");// 手动发送 ACK 确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};// 开始消费消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}}
}

3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认

// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}// 开启事务模式
channel.txSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());channel.txCommit();
} catch (Exception e) {channel.txRollback();
}

7、RabbitMQ配置协议

1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。

# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile   = /path/to/server_certificate.pem
ssl_options.keyfile    = /path/to/private_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671# 设置心跳间隔时间为 60 秒
heartbeat = 60

8、RabbitMQ消息持久化

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PersistentExample {private final static String QUEUE_NAME = "persistent_queue";private final static String EXCHANGE_NAME = "persistent_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 创建持久化的交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);// 创建持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换器channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");// 发送持久化的消息String message = "Persistent message!";AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 表示持久化.build();channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
  • 1、持久化队列
    channel.queueDeclare("durable_queue", true, false, false, null);
  • 2、交换器持久化
    确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
    channel.exchangeDeclare("durable_exchange", "direct", true);
  • 3、消息持久化
    delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
    channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());

9、RabbitMQ自动重连

网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态

ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

10、RabbitMQ组件

组件名称说明
Producer生产者负责生成并发送消息的应用程序。
Consumer消费者负责接收并处理消息的应用程序。
Message消息承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。
Exchange交换机用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。
Queue队列存储待处理消息的地方,消费者从中拉取消息进行处理。
Binding绑定定义了交换机和队列之间的关系,包括路由键等参数。
Virtual Host虚拟主机类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。

11、RabbitMQ核心组件交换器和路由键

交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。

消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。

消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。

若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。

交换器说明应用场景
Direct精确匹配路由键只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。
Topic基于通配符模式匹配路由键适用于灵活的消息过滤和多条件匹配。
Fanout广播所有消息给所有绑定的队列适用于需要将相同消息发送给多个消费者的场景。
Headers根据消息头属性进行路由适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。

1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。

  • 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;public class DirectConsumer {private final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Binding Keyif (argv.length < 1) {System.err.println("Usage: DirectConsumer [info] [warning] [error]");System.exit(1);}for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}

2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列

  • 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;public class FanoutConsumer {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}

3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词

  • 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
  • 消费者
import com.rabbitmq.client.*;public class TopicConsumer {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Binding Key 模式if (argv.length < 1) {System.err.println("Usage: TopicConsumer [binding_key_pattern]");System.exit(1);}for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}

4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。

  • 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
  • 消费者
import com.rabbitmq.client.*;public class HeadersConsumer {private final static String EXCHANGE_NAME = "headers_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "headers");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Headers 匹配规则Map<String, Object> headers = new HashMap<>();headers.put("user_id", "12345");headers.put("order_status", "pending");AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}

12、RabbitMQ核心方法及参数说明

1>newConnection 创建连接工程并开启连接

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();

2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据

信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。

Channel channel = connection.createChannel();

3>exchangeDeclare 交换器声明

channel.exchangeDeclare("my_exchange", "direct", true, false, null);

exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。

4>queueDeclare 队列声明

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。

5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则

channel.queueBind(queueName, "my_exchange", "routing_key");

queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则

6>basicPublish 发布消息
向指定的交换器发布一条消息

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2) // 2 表示持久化.build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());

exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。

7>basicConsume 消费消息
费来自指定队列的消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用

8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费

channel.basicAck(envelope.getDeliveryTag(), false);

9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃

// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);

13、RabbitMQ镜像集群模式

搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。

参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/501871.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

渗透测试-非寻常漏洞案例

声明 本文章所分享内容仅用于网络安全技术讨论&#xff0c;切勿用于违法途径&#xff0c;所有渗透都需获取授权&#xff0c;违者后果自行承担&#xff0c;与本号及作者无关&#xff0c;请谨记守法. 此文章不允许未经授权转发至除先知社区以外的其它平台&#xff01;&#xff0…

前端-计算机网络篇

一.网络分类 1.按照网络的作用范围进行分类 &#xff08;1&#xff09;广域网WAN(Wide Area Network) 广域网的作用范围通常为几十到几千公里,因而有时也称为远程网&#xff08;long haul network&#xff09;。广域网是互联网的核心部分&#xff0c;其任务是长距离运送主机…

开发培训-慧集通(iPaaS)集成平台脚本开发Groovy基础培训视频

‌Groovy‌是一种基于Java虚拟机&#xff08;JVM&#xff09;的敏捷开发语言&#xff0c;结合了Python、Ruby和Smalltalk的许多强大特性。它旨在提高开发者的生产力&#xff0c;通过简洁、熟悉且易于学习的语法&#xff0c;Groovy能够与Java代码无缝集成&#xff0c;并提供强大…

腾讯云智能结构化 OCR:驱动多行业数字化转型的核心引擎

在当今数字化时代的汹涌浪潮中&#xff0c;数据已跃升为企业发展的关键要素&#xff0c;其高效、精准的处理成为企业在激烈市场竞争中脱颖而出的核心竞争力。腾讯云智能结构化 OCR 技术凭借其前沿的科技架构与卓越的功能特性&#xff0c;宛如一颗璀璨的明星&#xff0c;在交通、…

vulnhub Earth靶机

搭建靶机直接拖进来就行 1.扫描靶机IP arp-scan -l 2.信息收集 nmap -sS -A -T4 192.168.47.132 得到两个DNS; 在443端口处会让我们加https dirb https://earth.local/ dirb https://terratest.earth.local/ #页面下有三行数值 37090b59030f11060b0a1b4e0000000000004312170a…

消息中间件类型都有哪些

在消息中间件的专业术语中&#xff0c;我们可以根据其特性和使用场景将其划分为几种主要的类型。这些类型不仅反映了它们各自的技术特点&#xff0c;还决定了它们在不同应用场景下的适用性。 1. 点对点&#xff08;Point-to-Point&#xff09;消息中间件&#xff1a; • 这类中…

服务器迁移中心——“工作组迁移”使用指南

简介 服务器迁移中心&#xff08;Server Migration Center&#xff0c;简称SMC&#xff09;是阿里云提供给您的迁移平台。专注于提供能力普惠、体验一致、效率至上的迁移服务&#xff0c;满足您在阿里云的迁移需求。 工作组迁移是SMC的一项功能&#xff0c;提供标准化迁移流程…

C#调用Lua

目录 xLua导入 打包工具导入 单例基类导入与AB包管理器导入 Lua解析器 文件加载与重定向 Lua解析器管理器 全局变量获取 全局函数获取 对于无参数无返回值 对于有参数有返回值 对于多返回值 对于变长参数 完整代码 List与Dictionary映射Table 类映射Table 接口映射…

Wend看源码-Java-fork/Join并行执行任务框架学习

摘要 本文主要介绍了Java的Fork/Join并行任务执行框架&#xff0c;详细阐述了其工作原理和核心构件&#xff0c;全面解读了Fork/Join框架的运作机制&#xff0c;旨在为学习这一框架的开发者提供一份详实且实用的参考资料。 Java的Fork/Join框架是Java 7引入的一个用于并行执行任…

C++ 中 Unicode 字符串的宽度

首先&#xff0c;什么是 Unicode&#xff1f; Unicode 实际上是一个统一的文字编码标准&#xff0c;它出现目的是为了解决不同计算机之间字符编码不同而导致的灾难性不兼容问题。 Unicode 字符集与 Unicode 编码是两种不同的概念。Unicode 字符集实际是对进入标准的所有文字用…

Python爬虫 - 豆瓣图书数据爬取、处理与存储

文章目录 前言一、使用版本二、需求分析1. 分析要爬取的内容1.1 分析要爬取的单个图书信息1.2 爬取步骤1.2.1 爬取豆瓣图书标签分类页面1.2.2 爬取分类页面1.2.3 爬取单个图书页面 1.3 内容所在的标签定位 2. 数据用途2.1 基础分析2.2 高级分析 3. 应对反爬机制的策略3.1 使用 …

MIPI_DPU 综合(DPU+MIPI+Demosaic+VDMA 通路)

目录 1. 简介 2. 创建 Platform 2.1 Block Design 2.1.1 DPU PFM Lite 2.1.2 DPU prj 2.1.3 DPU MIPI Platform 2.2 pin 约束 2.2.1 GPIO 约束 2.2.2 IIC 约束 2.1.3 DPHY 约束 3. 报错总结 3.1 AXI_M 必须顺序引用 3.2 DPU 地址分配错误 4. Design Example 4.…

Spring系列一:spring的安装与使用

文章目录 ?? 官方资料 ??Spring5下载??文档介绍 ??Spring5 ??内容介绍??重要概念 ??快速入门 ??Spring操作演示??类加载路径??Spring容器结构剖析??Debug配置 ??实现简单基于XML配置程序 ??Spring原生容器结构梳理??作业布置??Spring课堂练习 …

AutoSar架构学习笔记

1.AUTOSAR&#xff08;Automotive Open System Architecture&#xff0c;汽车开放系统架构&#xff09;是一个针对汽车行业的软件架构标准&#xff0c;旨在提升汽车电子系统的模块化、可扩展性、可重用性和互操作性。AUTOSAR的目标是为汽车电子控制单元&#xff08;ECU&#xf…

Kernel Stack栈溢出攻击及保护绕过

前言 本文介绍Linux内核的栈溢出攻击&#xff0c;和内核一些保护的绕过手法&#xff0c;通过一道内核题及其变体从浅入深一步步走进kernel世界。 QWB_2018_core 题目分析 start.sh qemu-system-x86_64 \-m 128M \-kernel ./bzImage \-initrd ./core.cpio \-append "…

【顶刊TPAMI 2025】多头编码(MHE)之Part 6:极限分类无需预处理

目录 1 标签分解方法的消融研究2 标签分解对泛化的影响3 讨论4 结论 论文&#xff1a;Multi-Head Encoding for Extreme Label Classification 作者&#xff1a;Daojun Liang, Haixia Zhang, Dongfeng Yuan and Minggao Zhang 单位&#xff1a;山东大学 代码&#xff1a;https:…

友元和运算符重载

1. 友元 可以把某些选定的函数看作类的“荣誉函数”&#xff0c;允许它们访问类对象中非公共的成员&#xff0c;就好像它们是类的成员一样&#xff0c;这种函数称为类的友元。友元可以访问类对象的任意成员。 1.1 友元函数 友元函数是一种定义在类外部的普通函数&#xff0…

Git 树形图表不显示问题

注册表修改 ## 注册表 计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\ShellIconOverlayIdentifiers 生效 右键重启 windows资源管理器

【MySQL基础篇】三、表结构的操作

文章目录 Ⅰ. 创建表1、语法2、创建表样例3、创建和其它表一样结构的表 Ⅱ. 查看表结构1、查看数据库中的表2、查看指定表的属性3、获取表的创建语句 Ⅲ. 删除表Ⅳ. 修改表结构1、向表中插入新的字段2、删除表中的字段3、修改表名4、修改字段属性 Ⅰ. 创建表 1、语法 create …

aws(学习笔记第二十二课) 复杂的lambda应用程序(python zip打包)

aws(学习笔记第二十二课) 开发复杂的lambda应用程序(python的zip包) 学习内容&#xff1a; 练习使用CloudShell开发复杂lambda应用程序(python) 1. 练习使用CloudShell CloudShell使用背景 复杂的python的lambda程序会有许多依赖的包&#xff0c;如果不提前准备好这些python的…