RabbitMQ 是如何做延迟消息的 ?——Java全栈知识(15)

RabbitMQ 是如何做延迟消息的 ?

1、什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
    死信交换机有什么作用呢?
  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因 TTL(有效期)到期的消息

2、死信队列

架构:
image.png
由于第一个队列没有消费者,所以可以在第一个队列中设置 TTL,当消息过期的时候,这个消息就变成了死信,被丢掉私信交换机中,以此实现延迟任务功能。

3、延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景: 如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒: image.png注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。
消息肯定会被投递到 ttl.queue 之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信: image.png 死信被再次投递到死信交换机 hmall.direct,并沿用之前的 RoutingKey,也就是 blueimage.png 由于 direct.queue1hmall.direct 绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1,如果此时有消费者与 direct.queue1 绑定,也就能成功消费消息了。但此时已经是5秒钟以后了: image.png 也就是说,publisher 发送了一条消息,但最终 consumer 在5秒后才收到消息。我们成功实现了延迟消息

[!info]
而且,RabbitMQ 中的这个 TTL 是可以设置任意时长的,这相比于 RocketMQ 只支持一些固定的时长而显得更加灵活一些。

死信队列消息堆积问题

[!danger] 死信队列消息堆积问题
但是,死信队列的实现方式存在一个问题,那就是可能造成队头阻塞。RabbitMQ 会定期扫描队列的头部检查队首的消息是否过期。如果队首消息过期了,它会被放到死信队列中。然而,RabbitMQ 不会逐个检查队列中的所有消息是否过期,而是仅检查队首消息。这样,如果队列的队头消息未过期,而它后面的消息已过期,这些后续消息将无法被单独移除,直到队头的消息被消费或过期。
因为队列是先进先出的,在普通队列中的消息,每次只会判断邢队头的消息是否过期,那么,如果队头的消息时间很长,一直都不过期,那么就会阻塞整个队列,这时候即使排在他后面的消息过期了,那么也会被一直阻塞。

基于 RabbitMQ 的死信队列,可以实现延迟消息,非常灵活的实现定时关单,并且借助 RabbitMQ 的集群扩展性,可以实现高可用,以及处理大并发量。他的缺点第一是可能存在消息阻塞的问题,还有就是方案比较复杂,不仅要依赖 RabbitMQ, 而目还需要声明很多队列出来,增加系统的复杂度

3、DelayExchange 插件

前面我们提到的基于死信队列的方式,是消息先会投递到一个正常队列,在 TTL 过期后进入死信队列。但是基于插件的这种方式,消息并不会立即进入队列,而是先把他们保存在一个基于 Erlang 开发的 Mnesia 数据库中,然后通过一个定时器去查询需要被投递的消息,再把他们投递到 x-delayed-message 交换机中。
基于 RabbitMQ 插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1 毫秒,大约 49 天,超过这个时间就会被立即消费。

插件下载地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的 MQ 是 3.8 版本,因此这里下载 3.8.17 版本:
image.png|600px
附件:![[rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez]]

4.2.2. 安装

因为我们是基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[  {  "CreatedAt": "2024-06-19T09:22:59+08:00",  "Driver": "local",  "Labels": null,  "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",  "Name": "mq-plugins",  "Options": null,  "Scope": "local"  }  
]  

插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data 这个目录,我们上传插件到该目录下

注意上传插件

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
image.png

4.2.3. 声明延迟交换机

image.png

根据

1、创建交换机:
image.png
2、创建队列
image.png
3、根据 bandingKey 绑定队列:
image.png|500

基于注解方式:

@RabbitListener(bindings = @QueueBinding(  value = @Queue(name = "delay.queue", durable = "true"),  exchange = @Exchange(name = "delay.direct", delayed = "true"),  key = "delay"  
))  
public void listenDelayMessage(String msg){  log.info("接收到delay.queue的延迟消息:{}", msg);  
}

基于 @Bean 的方式:

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;@Slf4j  
@Configuration  
public class DelayExchangeConfig {@Bean  public DirectExchange delayExchange(){  return ExchangeBuilder  .directExchange("delay.direct") // 指定交换机类型和名称  .delayed() // 设置delay的属性为true  .durable(true) // 持久化  .build();  }@Bean  public Queue delayedQueue(){  return new Queue("delay.queue");  }  @Bean  public Binding delayQueueBinding(){  return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");  }  
}  

4.2.4. 发送延迟消息

发送消息时,必须通过 x-delay 属性设定延迟时间:

@Test  
void testPublisherDelayMessage() {  // 1.创建消息  String message = "hello, delayed message";  // 2.发送消息,利用消息后置处理器添加消息头  rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {  @Override  public Message postProcessMessage(Message message) throws AmqpException {  // 添加延迟消息属性  message.getMessageProperties().setDelay(5000);  return message;  }  });  
}

warning 注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/322969.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

推荐5个免费的国内平替版GPT

提起AI,大家第一个想到的就是GPT。 虽然它确实很厉害,但奈何于我们水土不服,使用门槛有些高。 不过随着GPT的爆火,现在AI智能工具已经遍布到各行各业了,随着时间的推移,国内的AI工具也已经“百花盛放”了…

如何添加、编辑、调整WordPress菜单

我们最近在使用WordPress建站建设公司网站。我们是使用的hostease的主机产品建设的WordPress网站。在建设网站使用遇到了一些WordPress菜单使用方面的问题。好在hostease提供了不少帮助。 下面把WordPress菜单使用心得分享一下。 本文将详细介绍WordPress菜单的各种功能&#x…

智能家居|基于SprinBoot+vue的智能家居系统(源码+数据库+文档)

智能家居目录 基于SprinBootvue的智能家居系统 一、前言 二、系统设计 三、系统功能设计 1管理员:个人中心管理功能的详细实现 2管理员:用户信息管理功能的详细实现 3管理员:家具管理功能的详细实现 4管理员:任务管理功能…

QT+串口调试助手+扩展版

前言:此文章是这篇文章的拓展 QT串口调试助手基本版-CSDN博客,如果需要独立完成串口调试助手直接看基本版文章即可,如果需要完成串口调试助手的其他功能,参考拓展版。 一、更新QT串口调试助手UI界面 1、ui串口设置界面 2、ui串口…

【win10 文件夹数量和看到不一致查看隐藏文件已经打开,Thumb文件作妖】

目录 任务介绍:重命名规则修改前修改后 实现思路VB代码实现BUG犯罪现场(眼见不一定为实)破案1:抓顶风作案的反贼!!!破案2:破隐身抓刺客!!!杀器&am…

[1726]java试飞任务规划管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java试飞任务规划管理系统是一套完善的java web信息管理系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql…

【LLM 论文】Least-to-Most Prompting 让 LLM 实现复杂推理

论文:Least-to-Most Prompting Enables Complex Reasoning in Large Language Models ⭐⭐⭐ Google Research, ICLR 2023 论文速读 Chain-of-Thought(CoT) prompting 的方法通过结合 few-show prompt 的思路,让 LLM 能够挑战更具…

我的Transformer专栏来啦

五一节前吹的牛,五一期间没完成,今天忙里偷闲,给完成了。 那就是初步拟定了一个《Transformer最后一公里》的写作大纲。 之前一直想写一系列Transformer架构的算法解析文章,但因为一直在忙(虽然不知道在忙啥&#xf…

Linux0.11中MINIX 文件系统

阅读linux 的源码的时候对minix 文件系统有很多的疑惑,根据自己的认识将这些做一个总结。 MINIX 文件系统由六个部分组成,分别是引导块,超级块,i结点位图,逻辑块位图,i结点,数据块。 引导块&am…

数据结构的堆(c语言版)

一.堆的概念 1.堆的基本概念 在计算机科学中,堆是一种特殊的数据结构,通常用于实现优先队列和动态分配内存。 2.堆的特征 堆是一个完全二叉树,它具有以下两个主要特性: 堆序性:对于最大堆,在堆中的任意节…

wpf转换器

WPF(Windows Presentation Foundation)中的转换器主要是指IValueConverter接口的实现,它用于在数据绑定过程中转换源数据和目标数据的类型或表示形式。这种机制使得开发者能够灵活地处理数据,特别是在用户界面(UI&…

VMP 简单源码分析(.net)

虚拟机 获取CPU的型号 实现了一个指令集解释器,每个操作码对应一个特定的处理函数,用于执行相应的指令操作。在执行字节码时,解释器会根据操作码查找并调用相应的处理函数来执行指令。 截获异常 先由虚拟机处理 处理不了再抛出异常 priva…

基于Springboot的校园疫情防控系统(有报告)。Javaee项目,springboot项目。

演示视频: 基于Springboot的校园疫情防控系统(有报告)。Javaee项目,springboot项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构…

【可实战】被测需求理解(需求文档是啥样的、从哪些角度进行需求评审、需求分析需要分析出哪些内容、如何提高需求分析能力)

产品人员会产出一个需求文档,然后组织一个需求的宣讲。测试人员的任务就是在需求宣讲当中,分析需求有没有存在一些问题,然后在需求宣讲结束之后通过分析需求文档,分析里面的测试点并预估一个排期。 一、需求文档是什么样的&#x…

我独自升级崛起怎么下载 游戏下载教程分享

《我独自升级:崛起》这款游戏核心聚焦于激烈的战斗与角色的持续成长。新加入的玩家首要任务是熟悉基础攻击模式,随后深入探索技能组合策略与连贯招式的艺术,同时掌握防守与躲避技巧,这些都是战斗中不可或缺的关键。随着战斗的持续…

python turtle 升国旗

​一、导语 大家好,前段时间,我们画出了五星红旗,今天我们要用Python的Turtle库来绘制一个五星红旗,并让国旗上升,让我们一起来感受编程与艺术的完美结合吧!领略国家的强大!爱祖国,做一个遵纪守法的好公民。 二、效果展示 升国旗 三、开发过程 一、准备工作 首先我们…

ICode国际青少年编程竞赛- Python-2级训练场-坐标与列表练习

ICode国际青少年编程竞赛- Python-2级训练场-坐标与列表练习 1、 for i in range(6):Spaceship.step(Item[i].x - Spaceship.x)Dev.step(Item[i].y - Dev.y)Dev.step(Spaceship.y - Dev.y)2、 for i in range(5):Spaceship.step(Item[i].x - Spaceship.x)Flyer[i].step(Item[…

7.基于麻雀搜索算法(SSA)优化VMD参数(SSA-VMD)

01.智能优化算法优化VMD参数的使用说明 02.基本原理 麻雀搜索算法(SSA)是一种基于鸟类觅食行为的启发式优化算法,它模拟了麻雀在觅食时的群体行为,通过模拟麻雀的觅食过程来寻找问题的最优解。SSA的基本原理是通过模拟麻雀的搜索…

PyCharm 2024新版图文安装教程(python环境搭建+PyCharm安装+运行测试+汉化+背景图设置)

名人说:一点浩然气,千里快哉风。—— 苏轼《水调歌头》 创作者:Code_流苏(CSDN) 目录 一、Python环境搭建二、PyCharm下载及安装三、解释器配置及项目测试四、PyCharm汉化五、背景图设置 很高兴你打开了这篇博客,如有疑问&#x…

已经有 Prometheus 了,还需要夜莺?

谈起当下监控,Prometheus 无疑是最火的项目,如果只是监控机器、网络设备,Zabbix 尚可一战,如果既要监控设备又要监控应用程序、Kubernetes 等基础设施,Prometheus 就是最佳选择。甚至有些开源项目,已经内置…