【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码

参考资料

  • RabbitMQ官方网站
  • RabbitMQ官方文档
  • 噼咔噼咔-动力节点教程

文章目录

    • 七、延迟队列
      • 7.1 什么是延迟队列
      • 7.2 延迟队列的解决方案
        • 7.2.1 定时任务
        • 7.2.2 **被动取消**
        • 7.2.3 JDK的延迟队列
        • 7.2.3 采用消息中间件(rabbitMQ
          • 7.2.3.1 适用专门优化后的死信队列实现延迟队列
          • 7.2.3.2 :star:实例代码
          • 7.2.3.2 测试结果
        • 7.2.4 使用rabbitmq_delayed_message_exchange插件.
          • 7.2.4.1 插件下载
          • 7.2.4.2 :star:如何在docker环境下安装插件
          • 7.2.4.3 :star: 代码示例:如何使用该插件
          • 7.2.4.4 测试结果
      • 7.3 问题:多个消息的延迟时间不同该如何解决?
        • 7.3.1 解决方案一:用延迟队列区分
        • 7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

七、延迟队列

7.1 什么是延迟队列

正常的MQ应用场景中,我们希望消息可以快速稳定的传递。但是有一些场景中,希望在指定的延迟后再消费信息,比如订单支付场景(订单15部分内未支付则关闭订单)。

这类实现延迟任务的场景,就可以采用延迟队列来实现。

以下介绍一下其他的一些方法。

7.2 延迟队列的解决方案

7.2.1 定时任务

每隔n秒扫描一次数据库,查询数据库装为过期的订单进行处理。

实现方式

spring schedule、quartz、xxljob等

优点

简单,容易实现;

缺点

  1. 存在延迟(受定时器延迟时间限制
  2. 性能较差,每次扫描数据库,如果订单量交大,会给数据库造成较大压力。
7.2.2 被动取消

当用户主动查询订单时,判断订单是否超时,超时则取消

  • 优点:服务器压力小
  • 缺点:如果用户长时间不查询,则会造成统计异常;而且用户打开订单页面会变慢,严重的话会影响用户体验
7.2.3 JDK的延迟队列

DelayedQueue:无界阻塞队列,该队列只有在延迟期满后,才能从中获取元素。

优点

实现简单,任务的延迟低。

缺点

  • 服务器重启宕机,数据会丢失
  • 只适用于单机版
  • 订单量大时,可能会造成内存不足:OOM
7.2.3 采用消息中间件(rabbitMQ

RabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递(前面提到的死信队列)。.

image-20231017141210411

把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费者可以从DLX的队列中取走消息。

7.2.3.1 适用专门优化后的死信队列实现延迟队列

在上面的mq方案中,存在两个不同的交换机,我们可以利用直连交换机的特性,将交换机优化成一个交换机,同时通过不同的routingKey指定普通队列和死信队列。

image-20231017141445269

思路解释

  1. 生产者发送消息到交换机X,并指定ttl的key
  2. 消息被交换机传递到ttl队列中(指定了消息过期时间的队列
  3. 同时,ttl队列还指定的死信交换机DLX为自身的交换机X,但是指定的routingKey为死信队列的key
  4. 这样,当消息在ttl队列中到期后,这条消息就会被传递到死信队列中,提供给消费者
7.2.3.2 ⭐️实例代码

为了便于测试,将发送和接收写在同一个服务中

配置信息

@Configuration
public class DelayExchangeConfig {public static String exchangeName = "order.ttl.exchange";public static String orderQ = "order.ttl.queue";public static String dlxQ = "order.dlx.queue";@Beanpublic DirectExchange delayedExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}@Beanpublic Queue orderQueue(){// 指定该队列的过期时间和死信队列Map<String , Object> properties = new HashMap<>();properties.put("x-message-ttl" , 15000);properties.put("x-dead-letter-exchange" , exchangeName);properties.put("x-dead-letter-routing-key" , "dead-letter");return QueueBuilder.durable(orderQ).withArguments(properties).build();}@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(dlxQ).build();}@Beanpublic Binding dlxBinding1(){return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("dead-letter");}@Beanpublic Binding ttlBinding1(){return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("order");}}

测试代码

@RestController
@RequestMapping("/delay")
@Slf4j
public class DelayedController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/{msg}")public void sentErrorMsg(@PathVariable("msg") String msg) {log.info("(延迟队列)准备发送的信息:{} , 路由键 :{}", msg, "order");// 发送到普通的延时列表中rabbitTemplate.convertAndSend(exchangeName, "order", msg.getBytes(StandardCharsets.UTF_8));log.info("(延迟队列)成功发送!发送时间{}" , LocalDateTimeUtil.now());}@RabbitListener(queues = "order.dlx.queue")public void receiveDelayedMsg(Message message){log.info("(延迟队列)接受到的消息是:{}" , new String(message.getBody()));}
}
7.2.3.2 测试结果

配置正确

image-20231017144033384

控制台打印正确:15秒后接收到的了之前发送的信息

image-20231017144116843


7.2.4 使用rabbitmq_delayed_message_exchange插件.
7.2.4.1 插件下载

插件下载地址

  • https://www.rabbitmq.com/community-plugins.html
  • https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    • 根据自己的rabbit版本,我这里用的是3.9
7.2.4.2 ⭐️如何在docker环境下安装插件

参考文章:https://juejin.cn/post/7138717546894589966

  1. 将下载到的文件,移动到容器内

    docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
    

image-20231017153230781

  1. 进入容器bash指令,并启动插件

    docker exec -it rabbitmq bashroot@rabbit:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 使用下面的指令查看插件列表
    rabbitmq-plugins list
    

image-20231017153257970

进入控制台新建交换机,能查看到新的交换机类型

image-20231017154024943

7.2.4.3 ⭐️ 代码示例:如何使用该插件

官方说明文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#usage

image-20231017153803323

理解原理:delay exchange在接受到消息后,会先存在内部数据库中,检查x-delay延迟时间(头部

image-20231017154940504

代码使用思路

  1. 要创建自定义的交换机类型,要使用CustomExchange()来创建。几个参数的解释如下:

    • name:rabbit中交换机的名称
    • type:交换机类型 (x-delayed-message)
    • durable:是否持久
    • autoDelete:是否自动删除
    • arguments:参数信息
  2. arguments:参数信息从官方文档中获取

    // ... elided code ...
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
    // ... more code ...
    
  3. 交换机创建好后,只需要创建一条队列即可,并进行绑定

  4. 注意:消息发送需要在头部存放信息headers.put("x-delay", 延迟时间)。不需要使用自带的expiration来控制延迟时间了

配置类

@Configuration
public class DelayPluginConfig {public static String exchangeName = "delay-x-plugin.x";public static String key = "demo";@Beanpublic CustomExchange customExchange(){// 参考官方文档,创建插件提供的自定义交换机Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");// public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new CustomExchange(exchangeName, "x-delayed-message" , true , false , args);}@Beanpublic Queue delayDemoQueue(){return QueueBuilder.durable("delay-x-plugin.queue.demo").build();}@Beanpublic Binding delayPluginBinding(){return BindingBuilder.bind(delayDemoQueue()).to(customExchange()).with(key).noargs();}
}

生产者

@RestController
@RequestMapping("/delay/plugin")
@Slf4j
public class DelayedPluginController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/{delay}/{msg}")public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("delay") Long delay) {log.info("(延迟插件队列)准备发送的信息:{} ,延迟时间:{} 路由键 :{}", msg, delay , "demo");// 在头部设置过期时间MessageProperties properties = new MessageProperties();properties.setHeader("x-delay", delay);Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();// 发送信息rabbitTemplate.convertAndSend(exchangeName, "demo", message);log.info("(延迟插件队列)成功发送!发送时间:{}", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()));}@RabbitListener(queues = "delay-x-plugin.queue.demo")public void receiveDelayedMsg(Message message) {log.info("(延迟插件队列)接受到的消息是:{}", new String(message.getBody()));}
}
7.2.4.4 测试结果

生成交换机和队列

image-20231017160126659image-20231017160147125

访问路径/delay/plugin/25000/一条25秒过期的信息:查看日志打印:成功

image-20231017160422203

7.3 问题:多个消息的延迟时间不同该如何解决?

由于队列先进先出的特性,如果不同消息的延迟时间不同,一旦出现后进的消息延迟时间小于先进的队列,那么消息过期的时间就会出错。

7.3.1 解决方案一:用延迟队列区分

要解决这个问题,就需要将队列的延迟时间统一,将不同的延迟的消息发送到对应延迟的队列中。

保证队列的延迟时间和消息的延迟时间是一样的即可。

如下

image-20231017144817671

7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange

由于该插件的原理并不是单纯的队列实现,而是使用rabbit内部数据库时间,所以可以很好的解决问题。

可以进行一个简单测试验证:

  • 先发送一条25秒过期的信息,再发送3条5秒过期的信息

  • 查看结果:正常消费,解决问题

    image-20231017160917110

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/163258.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

攻防世界web篇-unserialize3

得出php代码残篇 将代码补全后再在线php运行工具中进行运行 在浏览器输入后得到下面的界面 这里需要将O:4:“xctf”:1:{s:4:“flag”;s:3:“111”;} 改为 O:4:“xctf”:2:{s:4:“flag”;s:3:“111”;}

单片机入门后该怎么学习进一步提升?

单片机入门后该怎么学习进一步提升&#xff1f; 可以将你目前会的单片机基础先整理一下&#xff0c;你看看运用这些基本的外设或者一些入门知识能做个什么东西&#xff0c;最近很多小伙伴找我&#xff0c;说想要一些单片机资料&#xff0c;然后我根据自己从业十年经验&#xff…

查询企业信息的四种方法

在工作中或者对于找工作的求职人来说&#xff0c;怎么查看企业的信息呢&#xff1f;可能很多人会想到各种查查类软件&#xff0c;但是这类软件需要会员或者付费才能查看&#xff0c;对于没有会员的人来说&#xff0c;有没有其他查询企业的方法呢&#xff1f;答案肯定是有的&…

Python 爬虫实战之爬淘宝商品并做数据分析

前言 是这样的&#xff0c;之前接了一个金主的单子&#xff0c;他想在淘宝开个小鱼零食的网店&#xff0c;想对目前这个市场上的商品做一些分析&#xff0c;本来手动去做统计和分析也是可以的&#xff0c;这些信息都是对外展示的&#xff0c;只是手动比较麻烦&#xff0c;所以…

IDC:到2027年,全球生成式AI支出将达到1430亿美元

全球著名信息调查咨询机构IDC在官网公布了一项调查&#xff0c;到2027年&#xff0c;全球生成式AI&#xff08;Generative AI&#xff0c;简称Gen AI&#xff09;支出将达到1430亿美元&#xff0c;5年复合年增长率为73.3%。 该支出包括&#xff1a;生成式AI的软件以及相关基础…

报错:AttributeError: module ‘tensorflow‘ has no attribute ‘flags‘

改成如下&#xff1a; 报错原因&#xff1a;tensorflow1.x与2.x版本问题不兼容

3D Web轻量化工具HOOPS Web Platform助力Rapid DCS快速上市碳估算产品!

总部位于英国的Rapid DCS提供全面的交钥匙解决方案和服务&#xff0c;帮助建筑环境领域的客户充分利用数字化的优势。 Rapid DCS技术总监James Hunter表示&#xff1a;“如今的建筑项目需要一套与20甚至10年前的建筑项目不同的功能。” “例如&#xff0c;虽然成本规划一直很重…

性能监控软件是什么?有哪些优势?

在现代科技驱动的世界中&#xff0c;计算机系统的性能对于企业和个人用户都至关重要。性能监控软件是一种不可或缺的工具&#xff0c;可以帮助我们实时跟踪、分析和优化系统的性能。本文将介绍性能监控软件的概念、其重要性以及如何选择和使用这些工具来提高系统效率。 一、性能…

【微服务 SpringCloud】实用篇 · Eureka注册中心

微服务&#xff08;3&#xff09; 文章目录 微服务&#xff08;3&#xff09;1. Eureka的结构和作用2. 搭建eureka-server2.1 创建eureka-server服务2.2 引入eureka依赖2.3 编写启动类2.4 编写配置文件2.5 启动服务 3. 服务注册1&#xff09;引入依赖2&#xff09;配置文件3&am…

如何让你的Node.js应用程序处理数百万的API请求

目录 一、了解 Node.JS 和 API 请求 二、优化 NodeJS 以实现高性能 1.使用异步操作 2.实现缓存 3.优化数据库查询 4.负载平衡 5.水平扩展 三、测试和监控性能 1.负载测试 2.应用程序性能监控(APM) 四、结论 一、了解 Node.JS 和 API 请求 在我们深入研究这些技术之…

29栈与队列——优先队列

目录 LeetCode之路——347. 前 K 个高频元素 分析 优先队列 简单示例 运行结果 源码简析 LeetCode之路——347. 前 K 个高频元素 给你一个整数数组 nums 和一个整数 k &#xff0c;请你返回其中出现频率前 k 高的元素。你可以按 任意顺序 返回答案。 示例 1: 输入: num…

Java基础(三)

1. 异常 Java 异常类层次结构图概览&#xff1a; 1.1 Exception 和 Error 有什么区别&#xff1f; 在 Java 中&#xff0c;所有的异常都有一个共同的祖先 java.lang 包中的 Throwable 类。Throwable 类有两个重要的子类: Exception :程序本身可以处理的异常&#xff0c;可以…

uniapp 小程序实现图片宽度100%、高度自适应的效果

因为image组件默认是有宽度跟高度的&#xff0c;所以这个高度不怎么好写 通过load事件来控制图片的高度 话不多说&#xff0c;直接上代码&#xff0c; <image class"img" src"/static/image.png" :style"{ height: imgHeight px }"mode&q…

Linux-ssh

文章目录 远程登录服务器配置远程服务器相关信息创建config文件配置config文件 配置密钥登陆先创建密钥配置密钥文件 执行命令scp传文件copy文件copy文件夹配置我们的vim和tmux 远程登录服务器 ssh userhostnameuser:用户名hostname&#xff1a;IP地址或域名 第一次登陆会显示…

SI基础知识:说一说玻纤布规格(如1078)的具体含义,以及等效Dk计算

玻纤布的编织包含经向和纬向两个不同的方向&#xff0c;这些玻璃布并没有被紧密放置在一起&#xff0c;在玻纤布上会有开窗&#xff0c;而且经向开窗和纬向开窗大小不同。 IPC定义了每种玻纤布的编织密度以及所用玻璃丝的规格&#xff0c;如下图所示。 看上面的表格&#xff0c…

会议OA项目-其它页面->自定义组件应用,其它界面的布局

1.自定义组件应用 文档参考:https://developers.weixin.qq.com/miniprogram/dev/framework/custom-component/ //oamin\project.config.json {"description": "项目配置文件","packOptions": {"ignore": [],"include": []},…

ESP32集成开发环境Espressif-IDE安装 – Windows

陈拓 2023/10/15-2023/10/16 1. 概述 Espressif IDE是一个基于Eclipse CDT的集成开发环境&#xff08;IDE&#xff09;&#xff0c;用于使用ESP-IDF框架开发物联网应用程序。这是一个专门为ESP-IDF构建的独立定制IDE。Espressif IDE附带了IDF Eclipse插件、重要的Eclipse CDT插…

基于 KubeSphere 部署 KubeBlocks 实现数据库自由

作者&#xff1a;尹珉&#xff0c; KubeSphere Contributor & Ambassador&#xff0c;KubeSphere 社区用户委员会杭州站站长。 KubeSphere 是什么&#xff1f; KubeSphere 是在 Kubernetes 之上构建的面向云原生应用的分布式操作系统&#xff0c;完全开源&#xff0c;支持…

最新最全网络安全专业毕业设计选题精华汇总-持续更新中

文章目录 0 前言1 网络安全(信息安全)毕设选题推荐2 开题指导3 最后 0 前言 Hi&#xff0c;大家好&#xff0c;随着毕业季的临近&#xff0c;许多同学开始向学长咨询关于选题和开题的问题。在这里&#xff0c;学长分享一些关于网络安全(信息安全)毕业设计选题的内容。 以下为…

插入排序改进 将交换变成赋值语句 优点适用于近乎有序的序列

效果非常的明显 下面给出代码截图 再给出原代码 #include<iostream> #include<string> #include "Student.h" #include "sorttesthelper.h" using namespace std;template<typename T >void selectionSort( T arr[], int n){for(int i…