【RabbitMQ】⾼级特性

RabbitMQ ⾼级特性

  • 1. 消息确认
    • 1.1 消息确认机制
    • 1.2 代码示例
  • 2. 持久化
    • 2.1 交换机持久化
    • 2.2 队列持久化
    • 2.3 消息持久化
  • 3. 发送⽅确认
    • 3.1 confirm确认模式
    • 3.2 return退回模式
    • 3.3 问题: 如何保证RabbitMQ消息的可靠传输?
  • 4. 重试机制
  • 5. TTL
    • 5.1 设置消息的TTL
    • 5.2 设置队列的TTL
    • 5.3 两者区别
  • 6. 死信队列
    • 6.1 死信的概念
    • 6.2 代码示例
    • 6.3 常⻅问题
  • 7. 延迟队列
    • 7.1 概念
    • 7.2 应⽤场景
    • 7.3 TTL+死信队列实现
      • 存在问题
    • 7.4 延迟队列插件
    • 7.5 常⻅问题
  • 8. 事务
  • 9. 消息分发
    • 9.1 概念
    • 9.2 应⽤场景
      • 9.2.1 限流
      • 9.2.2 负载均衡
  • 代码演示获取

1. 消息确认

1.1 消息确认机制

⽣产者发送消息之后, 到达消费端之后, 可能会有以下情况:

a. 消息处理成功
b. 消息处理异常

在这里插入图片描述

RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第两种情况, 就会造成消息丢失

那么如何确保消费端已经成功接收了, 并正确处理了呢?

为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:

  • ⾃动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, ⽽不管消费者是否真正地消费到了这些消息. ⾃动确认模式适合对于消息可靠性要求不⾼的场景.
  • ⼿动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求⽐较⾼的场景.

当autoAck参数置为false, 对于RabbitMQ服务端⽽⾔, 队列中的消息分成了两个部分:

⼀是等待投递给消费者的消息。
⼆是已经投递给消费者, 但是还没有收到消费者确认信号的消息。

如果RabbitMQ⼀直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者.

在这里插入图片描述
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数

在这里插入图片描述
Ready: 等待投递给消费者的消息数

Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息数

1.2 代码示例

链接

2. 持久化

我们在前⾯讲了消费端处理消息时, 消息如何不丢失, 但是如何保证当RabbitMQ服务停掉以后, ⽣产者发送的消息不丢失呢. 默认情况下, RabbitMQ 退出或者由于某种原因崩溃时, 会忽视队列和消息, 除⾮告知他不要这么做.

RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化.

2.1 交换机持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在.

如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()

2.2 队列持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true实现的.

如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没有了, 消息也⽆处可存了)

队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化.

咱们前⾯⽤的创建队列的⽅式都是持久化的

QueueBuilder.durable(Constant.ACK_QUEUE).build();

点进去看源码会发现,该⽅法默认durable 是true

通过下⾯代码,可以创建⾮持久化的队列

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();在这里插入图片描述

2.3 消息持久化

消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT

在这里插入图片描述

设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继⽽消息也丢失. 所以单单设置消息持久化⽽不设置队列的持久化显得毫⽆意义.

在这里插入图片描述

MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性

在这里插入图片描述
如果使⽤RabbitTemplate 发送持久化消息, 代码如下:

在这里插入图片描述

RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为⾮持久化,或者消息在发送时被标记为⾮持久化

在这里插入图片描述
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的.

  1. 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之后, 还没来得及处理就宕机了, 这样也算数据居丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进⾏⼿动确认,详细可以参考[消息确认]章节.
  2. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理, 可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失.

这个问题怎么解决呢?

  1. 引⼊RabbitMQ的仲裁队列, 如果主节点(master)在此特殊时间内挂掉, 可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性, 除⾮整个集群都挂掉(此⽅法也不能保证100%可靠, 但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多, 实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
  2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中

3. 发送⽅确认

在使⽤ RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ为我们提供了两种解决⽅案:

a. 通过事务机制实现
b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多, 咱们主要介绍confirm机制来实现发送⽅的确认.

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递

  1. confirm确认模式
  2. return退回模式

3.1 confirm确认模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达Exchange, 这个监听都会被执⾏, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false.

具体代码

关键代码
在这里插入图片描述

3.2 return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理

在配置 RabbitTemplate 的时候要设置这两个属性

        // 设置 return 模式rabbitTemplate.setMandatory(true); // 只有设置为true, 才会回调ReturnCallbackrabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回: " + returned.getMessage());}});

回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:

在这里插入图片描述

3.3 问题: 如何保证RabbitMQ消息的可靠传输?

先放⼀张RabbitMQ消息传递图

在这里插入图片描述

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:

  1. ⽣产者将消息发送到 RabbitMQ失败
    a. 可能原因: ⽹络问题等
    b. 解决办法: 参考本章节[发送⽅确认-confirm确认模式]
  2. 消息在交换机中⽆法路由到指定队列
    a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
    b. 解决办法: 参考本章节[发送⽅确认-return模式]
  3. 消息队列⾃⾝数据丢失
    a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失
    b. 解决办法: 参考本章节[持久性]. 开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会⾃动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的⽅式提⾼可靠性)
  4. 消费者异常, 导致消息丢失
    a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
    b. 解决办法: 参考本章节[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认, 当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制(参考下⼀章节), 当消息消费异常时, 通过消息重试确保消息的可靠性

4. 重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.

但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数

在这里插入图片描述

5. TTL

TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.

当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除

在这里插入图片描述

5.1 设置消息的TTL

⽬前有两种⽅法可以设置消息的TTL.

⼀是设置队列的TTL, 队列中所有消息都有相同的过期时间. ⼆是对消息本⾝进⾏单独设置, 每条消息的TTL可以不同. 如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.

5.2 设置队列的TTL

在这里插入图片描述

5.3 两者区别

设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除

设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的

为什么这两种⽅法处理的⽅式不⼀样?

因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可

⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.

6. 死信队列

6.1 死信的概念

死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.

有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(DeadLetter Queue,简称DLQ).

在这里插入图片描述

消息变成死信⼀般是由于以下⼏种情况:

  1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
  2. 消息过期.
  3. 队列达到最⼤⻓度

6.2 代码示例

队列和交换机的配置

@Configuration
public class DLXConfig {// 死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {return ExchangeBuilder.directExchange(Constants.DLX_EXCHANGE_NAME).durable(true).build();}// 死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constants.DLX_QUEUE_NAME).build();}// 绑定@Bean("dlxBind")public Binding dlxBind(@Qualifier("dlxQueue") Queue queue, @Qualifier("dlxExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(Constants.DLX_ROUTING_KEY).noargs();}// 正常交换机@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE_NAME).durable(true).build();}// 正常队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE_NAME).deadLetterExchange(Constants.DLX_EXCHANGE_NAME).deadLetterRoutingKey(Constants.DLX_ROUTING_KEY) // 正常队列绑定死信交换机.ttl(10 * 1000).maxLength(10L) // 制造死信产⽣的条件: 10s后消息变成死信, 队列最多存10条消息.build();}// 绑定@Bean("normalBind")public Binding normalBind(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(Constants.NORMAL_ROUTING_KEY).noargs();}
}

6.3 常⻅问题

  1. 死信队列的概念

死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息

  1. 死信的来源
  1. 消息过期: 消息在队列中存活的时间超过了设定的TTL

  2. 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.

  3. 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.

  1. 死信队列的应⽤场景

对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.

⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态

为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).

场景的应⽤场景还有:

  • 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
  • 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
  • ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.

7. 延迟队列

7.1 概念

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.

7.2 应⽤场景

延迟队列的使⽤场景有很多, ⽐如:

  1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
  2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
  3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等

RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.

假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息.

在这里插入图片描述

7.3 TTL+死信队列实现

生产者

    @RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE_NAME,Constants.NORMAL_ROUTING_KEY, "delay test 5s... " + new Date(), message -> {message.getMessageProperties().setExpiration("5000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE_NAME,Constants.NORMAL_ROUTING_KEY, "delay test 10s... " + new Date(), message -> {message.getMessageProperties().setExpiration("10000");return message;});return "delay";}

消费者

    @RabbitListener(queues = Constants.DLX_QUEUE_NAME)public void listenerDLXQueue(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n",new Date(), new String(message.getBody(), StandardCharsets.UTF_8),message.getMessageProperties().getDeliveryTag());// 手动确认channel.basicAck(deliveryTag, true);}

运行结果

在这里插入图片描述

存在问题

接下来把⽣产消息的顺序修改⼀下
先发送20s过期数据, 再发送10s过期数据

在这里插入图片描述

运行结果

在这里插入图片描述
这时会发现: 10s过期的消息, 也是在20s后才进⼊到死信队列.

消息过期之后, 不⼀定会被⻢上丢弃. 因为RabbitMQ只会检查队⾸消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很⻓, 第⼆个消息的延时时间很短, 那第⼆个消息并不会优先得到执⾏.

所以在考虑使⽤TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列.

7.4 延迟队列插件

安装

docker compose 安装队列插件的 rabbitmq

交换机和队列声明并绑定

在这里插入图片描述

生产者

在这里插入图片描述
消费者

在这里插入图片描述
运行结果

在这里插入图片描述
从结果可以看出, 使⽤延迟队列, 可以保证消息按照延迟时间到达消费者.

7.5 常⻅问题

介绍下RabbitMQ的延迟队列

延迟队列是⼀个特殊的队列, 消息发送之后, 并不⽴即给消费者, ⽽是等待特定的时间, 才发送给消费者.

延迟队列的应⽤场景有很多, ⽐如:

  1. 订单在⼗分钟内未⽀付⾃动取消
  2. ⽤⼾注册成功后, 3天后发调查问卷
  3. ⽤⼾发起退款, 24⼩时后商家未处理, 则默认同意, ⾃动退款

但RabbitMQ本⾝并没直接实现延迟队列, 通常有两种⽅法:

  1. TTL+死信队列组合的⽅式
  2. 使⽤官⽅提供的延迟插件实现延迟功能

⼆者对⽐:

  1. 基于死信实现的延迟队列
    a. 优点: 1) 灵活不需要额外的插件⽀持
    b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
  2. 基于插件实现的延迟队列
    a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
    b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本

8. 事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败.

配置类

在这里插入图片描述
生产者

在这里插入图片描述

9. 消息分发

9.1 概念

RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者. 这种⽅式⾮常适合扩展, 如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可.

默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降.

如何处理呢? 我们可以使⽤前⾯章节讲到的channel.basicQos(int prefetchCount) ⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量

⽐如: 消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗⼝".

prefetchCount 设置为0时表⽰没有上限.
basicQos 对拉模式的消费⽆效

9.2 应⽤场景

消息分发的常⻅应⽤场景有如下:

  1. 限流
  2. ⾮公平分发

9.2.1 限流

如下使⽤场景:

订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求

但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮.

在这里插入图片描述
RabbitMQ提供了限流机制, 可以控制消费端⼀次只拉取N个请求

通过设置prefetchCount参数, 同时也必须要设置消息应答⽅式为⼿动应答

prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡.

9.2.2 负载均衡

我们也可以⽤此配置,来实现"负载均衡"

如下图所⽰, 在有两个消费者的情况下,⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量.

在这里插入图片描述
我们可以使⽤设置prefetch=1 的⽅式, 告诉 RabbitMQ ⼀次只给⼀个消费者⼀条消息, 也就是说, 在处理并确认前⼀条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下⼀个不忙的消费者.

消费者

在这里插入图片描述

运行结果

在这里插入图片描述

deliveryTag 有重复是因为两个消费者使⽤的是不同的Channel, 每个 Channel 上的deliveryTag 是独⽴计数的.

代码演示获取

代码演示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/429037.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3集成google第三方登陆

网上资源很多,但乱七八糟,踩坑几小时后,发现下面的方式没问题。 npm install vue3-google-login 插件文档:vue3-google-登录 (devbaji.github.io) 修改main.js import ./assets/main.css import { createApp } from vue impor…

医院伤员消费点餐限制———未来之窗行业应用跨平台架构

一、点餐上限 医院点餐上限具有以下几方面的意义: 1. 控制成本 - 有助于医院合理规划餐饮预算,避免食物的过度供应造成浪费,从而降低餐饮成本。 2. 保障饮食均衡 - 防止患者或陪护人员过度点餐某一类食物,有利于引导合…

【AcWing】873. 欧拉函数

#include<iostream> using namespace std;int main(){int n;cin>>n;while(n--){int x;cin>>x;int resx;for(int i2;i<x/i;i){if(x%i0){//resres*(1-1/i);整数1/i等于0&#xff0c;算不对且会溢出//以下几种都能ac//resres/i*(i-1);i*(1-1/i)i-1&#xff0…

[JavaEE] TCP协议

目录 一、TCP协议段格式 二、TCP确保传输可靠的机制 2.1 确认应答 2.2 超时重传 2.3 连接管理 2.3.1 三次握手 2.3.2 四次挥手 2.4 滑动窗口 2.4.1 基础知识 2.4.2 两种丢包情况 2.4.2.1 数据报已经抵达&#xff0c;ACK丢包 2.4.2.2 数据包丢包 2.5 流量控制…

音视频入门基础:AAC专题(7)——FFmpeg源码中计算AAC裸流每个packet的size值的实现

音视频入门基础&#xff1a;AAC专题系列文章&#xff1a; 音视频入门基础&#xff1a;AAC专题&#xff08;1&#xff09;——AAC官方文档下载 音视频入门基础&#xff1a;AAC专题&#xff08;2&#xff09;——使用FFmpeg命令生成AAC裸流文件 音视频入门基础&#xff1a;AAC…

Vue3:v-model实现组件通信

目录 一.性质 1.双向绑定 2.语法糖 3.响应式系统 4.灵活性 5.可配置性 6.多属性绑定 7.修饰符支持 8.defineModel使用 二.使用 1.父组件 2.子组件 三.代码 1.父组件代码 2.子组件代码 四.效果 一.性质 在Vue3中&#xff0c;v-model指令的性质和作用主要体现在…

AI健身之俯卧撑计数和姿态矫正-角度估计

在本项目中&#xff0c;实现了Yolov7-Pose用于人体姿态估计。以下是如何在Windows 11操作系统上设置和运行该项目的详细步骤。 环境准备 首先&#xff0c;确保您的计算机已经安装了Anaconda。Anaconda是一个开源的Python发行版本&#xff0c;它包含了conda、Python以及众多科…

【滑动窗口】算法总结

文章目录 滑动窗口算法总结1.暴力求解vs滑动窗口2.需要注意的细节问题 2.滑动窗口的基本模板1.非固定窗口大小的滑动窗口2.固定窗口大小的滑动窗口细节 滑动窗口算法总结 1.暴力求解vs滑动窗口 遇到那些可以转化成一个子数组的长度的问题时&#xff0c;往往需要用到双指针。 …

安全基础学习-AES128加密算法

前言 AES&#xff08;Advanced Encryption Standard&#xff09;是对称加密算法的一个标准&#xff0c;主要用于保护电子数据的安全。AES 支持128、192、和256位密钥长度&#xff0c;其中AES-128是最常用的一种&#xff0c;它使用128位&#xff08;16字节&#xff09;的密钥进…

探索人工智能绘制宇宙地图的实现

人工智能 (AI) 已成为了解世界的重要工具。现在&#xff0c;随着人们对太空探索的兴趣重新升温&#xff0c;人工智能也可能对其他世界产生同样的影响。 尽管经过了几十年的研究&#xff0c;科学家们对地球大气层以外的宇宙仍然知之甚少。绘制行星、恒星、星系及其在太空中的运…

python爬虫初体验(一)

文章目录 1. 什么是爬虫&#xff1f;2. 为什么选择 Python&#xff1f;3. 爬虫小案例3.1 安装python3.2 安装依赖3.3 requests请求设置3.4 完整代码 4. 总结 1. 什么是爬虫&#xff1f; 爬虫&#xff08;Web Scraping&#xff09;是一种从网站自动提取数据的技术。简单来说&am…

安卓14剖析SystemUI的ShadeLogger/LogBuffer日志动态控制输出dumpsy机制

背景&#xff1a; 看SystemUI的锁屏相关代码时候发现SystemUI有一个日志打印相关的方法调用&#xff0c;相比于常规的Log.i直接可以logcat查看方式还是比较新颖。 具体日志打印代码如下&#xff1a; 下面就来介绍一下这个ShadeLogger到底是如何打印的。 分析源码&#xff1…

前端JavaScript导出excel,并用excel分析数据,使用SheetJS导出excel

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f…

OpenWrt 定时重启

问题 想设置 OpenWrt 定时重启&#xff0c;避免因长期不重启导致的问题。 方法 参考 Scheduling tasks with cronopenwrt设置定时重启&#xff08;天/周/月&#xff09;定时重启openwrt (istoreos) 软路由系统 设置 cron 自启动 System - Start up - 找到 cron - 设置成自…

stm32单片机个人学习笔记5(OLED调试工具)

前言 本篇文章属于stm32单片机&#xff08;以下简称单片机&#xff09;的学习笔记&#xff0c;来源于B站教学视频。下面是这位up主的视频链接。本文为个人学习笔记&#xff0c;只能做参考&#xff0c;细节方面建议观看视频&#xff0c;肯定受益匪浅。 STM32入门教程-2023版 细…

深入探究HTTP网络协议栈:互联网通信的基石

在我们日常使用互联网的过程中&#xff0c;HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;扮演着至关重要的角色。无论是浏览网页、下载文件&#xff0c;还是进行在线购物&#xff0c;HTTP协议都在背后默默地支持着这些操作。今天&#x…

go语言中的切片详解

1.概念 在Go语言中&#xff0c;切片&#xff08;Slice&#xff09;是一种基于数组的更高级的数据结构&#xff0c;它提供了一种灵活、动态的方式来处理序列数据。切片在Go中非常常用&#xff0c;因为它们可以动态地增长和缩小&#xff0c;这使得它们比固定大小的数组更加灵活。…

芯片开发(1)---BQ76905---底层参数配置

主要开发思路:AFE主要是采集、保护功能、均衡&#xff0c;所以要逐一去配置芯片的寄存器 采集、均衡功能主要是配置引脚 保护功能主要是参数寄存器配置&#xff0c;至于如何使用命令修改寄存器参数该系列芯片提供了子命令和直接命令两种方式 BQ76905的管脚配置 I、参数配置 …

使用Renesas R7FA8D1BH (Cortex®-M85)和微信小程序App数据传输

目录 概述 1 系统架构 1.1 系统结构 1.2 系统硬件框架结构 1.3 蓝牙模块介绍 2 微信小程序实现 2.1 UI介绍 2.2 代码实现 3 上位机功能实现 3.1 通信协议 3.2 系统测试 4 下位机功能实现 4.1 功能介绍 4.2 代码实现 4.3 源代码文件 5 测试 5.1 编译和下载代码…

Codeforces Round 974 (Div. 3) A-F

封面原图 画师礼島れいあ 下午的ICPC网络赛的难受一晚上全都给我打没了 手速拉满再加上秒杀线段树 这场简直了啊 唯一可惜的是最后还是掉出了1000名 一把上蓝应该没啥希望了吧 A - Robin Helps 题意 侠盗罗宾因劫富济贫而闻名于世 罗宾遇到的 n n n 人&#xff0c;从 1 s …