RabbitMQ 之 延迟队列

目录

​编辑一、延迟队列概念

二、延迟队列使用场景

三、整合 SpringBoot

1、创建项目

2、添加依赖

3、修改配置文件

4、添加 Swagger 配置类

四、队列 TTL

1、代码架构图

2、配置文件代码类

3、生产者

4、消费者

5、结果展示

五、延时队列优化

1、代码架构图

2、配置文件

3、生产者​编辑

4、结果展示

六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

2、代码架构图

 3、配置文件类代码

4、生产者

5、消费者

6、结果展示


一、延迟队列概念

延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


三、整合 SpringBoot

1、创建项目

2、添加依赖

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

3、修改配置文件

spring.rabbitmq.host=111.229.153.16
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

4、添加 Swagger 配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo(){return new ApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(new Contact("馒头警告", "www.baidu.com","2714858327@qq.com")).build();}
}

四、队列 TTL

1、代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:


2、配置文件代码类

// TTL 队列,配置文件类代码
@Configuration
public class TTLQueueConfig {// 普通交换机的名称public static final String X_EXCHANGE = "X";// 死信交换机名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";// 普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";// 死信队列名称public static final String DEAD_LETTER_QUEUE_D = "QD";// 声明 X 交换机@Bean(value = "xExchange") // 相当于别名public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}// 声明 Y 交换机@Bean(value = "yExchange") // 相当于别名public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明普通队列 TTL 为 10s@Bean(value = "queueA") // 相当于别名public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","YD");// 设置 TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明普通队列 TTL 为 10s@Bean(value = "queueB") // 相当于别名public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","YD");// 设置 TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明死信队列@Bean(value = "queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();}// 绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA ,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}// 绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB ,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}// 绑定@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD ,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

3、生产者

// 发送延迟消息
@Slf4j
@Configuration
@RequestMapping("/ttl")
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;// 开始发消息@GetMapping("/sendmsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自 TTL 为 10s 的队列" + message);rabbitTemplate.convertAndSend("X","XB","消息来自 TTL 为 40s 的队列" + message);}
}

4、消费者

// 队列 TTL
@Slf4j
@Component
public class DeadLetterQueueConsumer {// 接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel){String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

5、结果展示


五、延时队列优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?


1、代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间


2、配置文件

 


3、生产者


4、结果展示

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。


六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

大家可以参考网上的教程自行去下载安装,这里就不再进行描述了

如果安装好了,就会有一个叫做  x-delayed-message 的类型

一旦装完插件之后,消息延迟的位置就换到交换机了,相当于生产者发消息,这个消息停留在交换机这个位置,再将消息发给队列


2、代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

一个生产者,要走延迟的交换机,延迟的交换机根据 RoutingKey ,路由到延迟的队列,而延迟的位置是在交换机的位置,队列被消费者所消费


 3、配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {// 交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";// 队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";// RoutingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 声明交换机 基于插件的@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}// 声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}// 绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

4、生产者

    // 开始发消息  基于插件的  消息及延迟的时间@GetMapping("/senddelaymsg/{message}/{delaytime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delaytime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delaytime,message);rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",message, msg ->{// 设置发送消息时候的延迟时长msg.getMessageProperties().setDelay(delaytime);return msg;});}

5、消费者

// 消费者 基于插件的延迟消息
@Slf4j
@Component
public class DelayQueueConsumer {// 监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiverDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}

6、结果展示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/366108.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS中使用SSH远程登录

CentOS中使用SSH远程登录 准备工作SSH概述SSH服务的安装与启动建立SSH连接SSH配置文件修改SSH默认端口SSH文件传输 准备工作 两台安装CentOS系统的虚拟机 客户机&#xff08;192.168.239.128&#xff09; 服务器&#xff08;192.168.239.129&#xff09; SSH概述 Secure S…

叮!云原生虚拟数仓 PieCloudDB Database 动态包裹已送达

第一部分 PieCloudDB Database 最新动态 支持动态配置查询簇 PieCloudDB 最新内核版本 v2.14.0 新增动态配置查询簇功能。PieCloudDB 动态配置查询簇功能实现可伸缩的并行化查询&#xff0c;可提升单个查询并行使用底层资源的能力&#xff0c;同时加快查询响应速度。 动态配…

基于隐马尔可夫模型的股票预测【HMM】

基于机器学习方法的股票预测系列文章目录 一、基于强化学习DQN的股票预测【股票交易】 二、基于CNN的股票预测方法【卷积神经网络】 三、基于隐马尔可夫模型的股票预测【HMM】 文章目录 基于机器学习方法的股票预测系列文章目录一、HMM模型简介&#xff08;1&#xff09;前向后…

学生管理系统

一、登录 用户类&#xff1a;属性&#xff1a;用户名、密码、身份证号码、手机号码 1、欢迎页面 System.out.println("欢迎来到学生管理系统"); System.out.println("请选择操作1登录 2注册 3忘记密码"); 代码实现&#xff1a; //欢迎页面public static…

Rabbitmq部署

环境 操作系统CentOS7 安装 准备安装包 # rabbitmq基于erlang语言开发&#xff0c;需先安装erlang语言解释器 [rootnode2 ~]# ls erlang-21.3-1.el7.x86_64.rpm rabbitmq-server-3.8.8-1.el7.noarch.rpm [rootnode2 ~]# rpm -ivh erlang-21.3-1.el7.x86_64.rpm #安装soca…

【嵌入式】探索嵌入式世界:在ARM上构建俄罗斯方块游戏的奇妙之旅

文章目录 前言&#xff1a;1. 简介2. 总体设计思路及功能描述2.1 设计思路2.2 功能描述2.3 程序流程图 3. 各部分程序功能及详细说明3.1 游戏界面函数3.1.1 游戏界面中的图片显示3.1.2 游戏开始界面3.1.3 游戏主界面3.1.4 游戏结束广告界面3.1.5 游戏界面中的触摸反馈3.1.6 游戏…

全球首款搭载Google Gemini和GPT-4o的智能眼镜发布

智能眼镜仍然是一个尚未完全成熟的未来概念&#xff0c;但生成式人工智能的到来显著提升了这些设备的能力。Meta 的 Ray-Ban 智能眼镜被许多人视为当今最好的选择之一&#xff0c;而现在 Solos AirGo Vision 正在为其带来竞争&#xff0c;这款眼镜还集成了 Google Gemini 支持。…

python代码报错

1.报错信息&#xff1a; asyncio.WindowsSelectorEventLoopPolicy()^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: module asyncio has no attribute WindowsSelectorEventLoopPolicy 2.解决办法&#xff1a; if __name__ "__main__": # 移除或注…

❤ Gitee平台的使用

Gitee平台的使用 文章目录 Gitee平台的使用一、Gitee的注册1、注册2、添加邮箱 二、仓库的创建 和 团队成员的添加1、单击右上角的 **&#xff0b;** 号 、创建仓库2、如下填写即可 三、仓库克隆到本地1、安装好git 和 小乌龟&#xff08;TortoiseGit&#xff09;2、打开仓库 复…

(超详细)数据结构——“队列”的深度解析

目录 前言&#xff1a; 1.队列的概念 2.队列的实现 3.代码实现队列 3.1 队列的初始化 3.2 插入 3.3 删除 3.4 队列的队头&#xff0c;队尾和大小 3.5 判空 3.6 销毁 3.7 测试 前言&#xff1a; 队列与栈都是线性表&#xff0c;它们的结构也非常类似&#…

“论单元测试方法及应用”写作框架,软考高级论文,系统架构设计师论文

论文真题 1、概要叙述你参与管理和开发的软件项目,以吸你所担的主要工作。 2、结给你参与管理和开发的软件项目&#xff0c;简要叙述单元测试中静态测试和动态测试方法的基本内容。 3、结给你惨与管理和研发的软件项目,体阐述在玩测试过程中,如何确定白盒测试的覆盖标准,及如…

Three.js机器人与星系动态场景:实现3D渲染与交互式控制

内容摘要&#xff1a;使用Three.js库构建了一个交互式的3D场景。组件中创建了一个机器人模型&#xff0c;包括头部、眼睛、触角、身体和四肢&#xff0c;以及两个相同的机器人实例以实现动态效果。场景中还加入了粒子效果&#xff0c;模拟星系环境&#xff0c;增强了视觉效果。…

设备调试上位机GUI

C Fast Qt C 前端 原来真的不需要在 design 上画来画去&#xff0c;有chat-gpt 那里不知道问哪里 全是组件拼起来的,不需要画,最后发现其实也是定式模式,跟着AI 学套路

JavaScript将参数传递给事件处理程序

本篇文件我们将实现导航栏中&#xff0c;选中时候&#xff0c;会将您选中的进行高亮显示&#xff1b; ● 首先我们来获取我们想要的HTML元素 const nav document.querySelector(.nav);● 接着我们来写选中的高亮显示 nav.addEventListener(mouseover, function (e) { //鼠…

Python系统教程01

Python 是一门解释性语言&#xff0c;相对更简单、易学&#xff0c;它可以用于解决数学问题、获取与分 析数据、爬虫爬取网络数据、实现复制数学算法等等。 1、print()函数&#xff1a; print()书写时注意所有的符号都是英文符号。print()输出内容时&#xff0c;若要输出字符…

安卓实现微信聊天气泡

一搜没一个能用的&#xff0c;我来&#xff1a; 布局文件&#xff1a; <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schemas.android.com/apk/res/android"xml…

【MySQL】数据库事务详解

文章目录 前言1. 事务的定义2. 事务的四个特性2.1 原子性2.2 一致性2.3 隔离性2.4 持久性 3. 事务的并发问题3.1 脏读3.2 不可重复读3.3 幻读3.4 更新丢失 4. 事务的隔离级别5. 事务的使用结语 前言 假设我们现在需要操作数据库进行转账&#xff0c;A 给 B 转账 100 块钱&…

掌握React与TypeScript:从零开始绘制中国地图

最近我需要使用reactts绘制一个界面&#xff0c;里面需要以中国地图的形式展示区块链从2019-2024年这五年的备案以及注销情况&#xff0c;所以研究了一下这方面的工作&#xff0c;初步有了一些成果&#xff0c;所以现在做一些分享&#xff0c;希望对大家有帮助&#xff01; 在这…

【Kotlin】Kotlin 基础语法指南

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

基于TCP/QT/C++的网盘系统测试报告

目录 一、项目介绍 1、项目描述 2、项目组成模块 3、项目技术要点 二、用户功能测试 1、查看在线用户测试 1.1、运行服务器 1.2、登录两个账号 1.3、点击显示在线用户&#xff0c;可以看到jack和lucy 2、搜索用户测试 2.1、打开服务器&#xff0c;登录两个账号jack,lucy 2.2、在…