Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

目录

  • 一、序言
  • 二、死信交换机和消息TTL实现延迟消息
    • 1、死信队列介绍
    • 2、代码示例
      • (1) 死信交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 三、延迟消息交换机实现延迟消息
    • 1、安装延时消息插件
    • 2、代码示例
      • (1) 延时消息交换机配置
      • (2) 消息生产者
      • (3) 消息消费者
    • 3、测试用例
  • 四、两种实现方式优缺点
    • 1、延时消息插件
    • 2、TLL&死信交换机

一、序言

业务开发中有很多延时操作的场景,比如最常见的超时订单自动关闭延时异步处理,我们常用的实现方式有:

  • 定时任务轮询(有延时)。
  • 借助Redission的延时队列
  • Redis的key过期事件通知机制(需开启key过期事件通知,对Redis有性能损耗)。
  • RocketMQ中定时消息推送(支持的时间间隔固定,不支持自定义)。
  • RabbitMQ中的死信队列和延迟消息交换机

其中用的最多的也是借助Redisson实现的数据结构延迟队列RabbitMQ中的死信队列来实现,今天我们通过RabbitMQ死信队列和延迟消息交换机(新特性)来实现延时消息推送。


二、死信交换机和消息TTL实现延迟消息

1、死信队列介绍

这种方式主要通过结合消息过期和私信交换机来实现延迟消息推送,首先先了解下哪些消息会进入死信队列:

  • 被消费者nack(negatively acknowleged)的消息。
  • TTL过期后未被消费的消息。
  • 超过队列长度限制后被丢弃的消息。

备注:更多信息请参考RabbitMQ中的 Dead Letter Exchange。

2、代码示例

(1) 死信交换机配置

@Configuration
protected static class DeadLetterExchangeConfig {@Beanpublic Queue deadLetterQueue(){return QueueBuilder.durable("dead-letter-queue").build();}@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange("dead-letter-exchange").build();}@Beanpublic Binding bindQueueToDeadLetterExchange(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead-letter-routing-key").build();}}

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToDeadLetterExchange(String body, int timeoutInMillSeconds) {log.info("开始发送消息到dead letter exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("normal-queue", message);}}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "dead-letter-queue")public void handleMsgFromDeadLetterQueue(String msg) {log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/dead-letter")public ResponseEntity<String> sendMsgToDeadLetterExchange(String body, int timeout) {rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);return ResponseEntity.ok("消息发送到死信交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 11:50:33.041  INFO 19152 --- [nio-8080-exec-7] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到dead letter exchange 消息体:hello, 消息延迟:5000ms, 当前时间:2023-11-26T11:50:33.041
2023-11-26 11:50:38.054  INFO 19152 --- [ntContainer#4-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054

三、延迟消息交换机实现延迟消息

上面通过消息TTL和死信交换机实现延迟消息的解决方案是由一个叫James Carr的人提出来的,后来RabbitMQ提供了一个开箱即用的解决方案,通过延时消息插件来实现。

该插件以前被当做是试验性产品,但是现在已经可以投产使用了。(PS:2015年4月16号就已经有该插件文档)

Spring AMQP中,同样提供了对该延时消息插件的支持,并且在RabbitMQ 3.6.0版本就已经测试通过。

1、安装延时消息插件

该延时消息插件为社区插件,因此需要自己手动下载安装的RabbMQ版本对应的插件,下载地址:RabbitMQ延时消息插件releases。

我安装的RabbitMQ版本为3.9.9,3.9.0版本的插件对所有3.9.x版本的RabbitMQ都支持。

在这里插入图片描述
下载完后把.ez结尾的插件复制RabbitMQ的插件目录下,插件目录为/usr/lib/rabbitmq/plugins

在这里插入图片描述
通过命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装该插件,通过命令rabbitmq-plugins list查看插件列表,可以看到该延时消息插件已经成功安装。

在这里插入图片描述

2、代码示例

(1) 延时消息交换机配置

@Configuration
protected static class DelayedMsgExchangePluginConfig {@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed-queue").build();}@Beanpublic DirectExchange delayedExchange() {return ExchangeBuilder.directExchange("delayed-exchange").delayed().build();}@Beanpublic Binding bindDelayedQueueToDelayedChange(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");}
}

备注:延时交换机的类型可以为DirectExchage、TopicExcahgeFanoutExchange,这些都支持。

(2) 消息生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendDelayedMsg(String body, int timeoutInMillSeconds) {log.info("开始发送消息到delayed-exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds, LocalDateTime.now());MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(timeoutInMillSeconds);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("delayed-exchange", "delayed-routing-key", message);}
}

(3) 消息消费者

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "delayed-queue")public void handleMsgFromDelayedQueue(String msg) {log.info("Message received from delayed-queue, message body: {}, current time:{}", msg, LocalDateTime.now());}
}

3、测试用例

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/delayed")public ResponseEntity<String> sendMsgToHeadersExchange(String body, int timeout) {rabbitMqProducer.sendDelayedMsg(body, timeout);return ResponseEntity.ok("消息发送到延迟交换机成功");}}

浏览器访问http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000,可以看到消息被延迟5s处理。

2023-11-26 13:02:07.816  INFO 26524 --- [nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到delayed-exchange 消息体:Hello, 消息延迟:5000ms, 当前时间:2023-11-26T13:02:07.816
2023-11-26 13:02:12.830  INFO 26524 --- [ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829

四、两种实现方式优缺点

1、延时消息插件

  • 优点:配置更加简单,少配置1个过期消息接收队列,且语义更明确,容易定位消息出入口。
  • 缺点:延时消息插件对RabbitMQ版本有要求,只有RabbitMQ 3.8.x及以上版本支持。

2、TLL&死信交换机

  • 优点:基本适用于所有RabbitMQ版本。
  • 缺点:配置相对来说复杂一些,还有就是我们最开始提到的,不只是TTL过期的消息才会进入死信队列,还有超出队列限制nack的消息也会进入死信队列,触发的条件没那么纯粹。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/206921.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

set和map + multiset和multimap(使用+封装(RBTree))

set和map 前言一、使用1. set(1)、模板参数列表(2)、常见构造(3)、find和count(4)、insert和erase(5)、iterator(6)、lower_bound和upper_bound 2. multiset3. map(1)、模板参数列表(2)、构造(3)、modifiers和operations(4)、operator[] 4. multimap 二、封装RBTree迭代器原理R…

科技与教育:未来教育的新趋势

在21世纪&#xff0c;科技的快速发展正在深刻地改变教育行业。从在线学习平台到虚拟现实教室&#xff0c;科技为教育带来了革命性的变化。本文将探讨科技如何影响现代教育&#xff0c;并预测未来教育的发展趋势。 一、科技在教育中的应用 在线学习平台&#xff1a;通过平台如C…

JSON 与 FastJSON

JSON 与 FastJSON JSON JavaScript Object Notation&#xff08;JavaScript 对象表示法&#xff09;是目前最常用的执行对象序列化的方式。 虽然 json 最初是为了在 JavaScript 语言中使用的&#xff0c;但实际上 json 本身跟语言没有任何关系&#xff0c;各种编程语言都可以使…

微服务--08--Seata XA模式 AT模式

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 分布式事务Seata 1.XA模式1.1.两阶段提交1.2.Seata的XA模型1.3.优缺点 AT模式2.1.Seata的AT模型2.2.流程梳理2.3.AT与XA的区别 分布式事务 > 事务–01—CAP理论…

Flutter使用flutter_gen管理资源文件

pub地址&#xff1a; https://pub.dev/packages/flutter_gen 1.添加依赖 在你的pubspec.yaml文件中添加flutter_gen作为开发依赖 dependencies:build_runner:flutter_gen_runner: 2.配置pubspec.yaml 在pubspec.yaml文件中&#xff0c;配置flutter_gen的参数。指定输出路…

msvcp140.dll的解决方法有哪些。详细解析五种可以修复msvcp140.dll丢失的方法

引言&#xff1a; 在日常使用电脑的过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是“msvcp140.dll丢失”。那么&#xff0c;什么是msvcp140.dll文件&#xff1f;它的作用是什么&#xff1f;当它丢失时会对电脑产生什么影响&#xff1f;本文将详细介绍…

使用elementPlus去除下拉框蓝色边框

// 下拉框去除蓝色边框 .el-select {--el-select-input-focus-border-color: none !important; }

仅仅通过提示词,GPT-4可以被引导成为多个领域的特定专家

The Power of Prompting&#xff1a;提示的力量&#xff0c;仅通过提示&#xff0c;GPT-4可以被引导成为多个领域的特定专家。微软研究院发布了一项研究&#xff0c;展示了在仅使用提策略的情况下让GPT 4在医学基准测试中表现得像一个专家。研究显示&#xff0c;GPT-4在相同的基…

【bug篇】Tomcat一直报错,但是代码没问题

代码都没有问题&#xff0c;就是报404错误&#xff0c;原因竟然是版本不兼容&#xff0c;搞了我好长时间&#xff0c;简直麻了&#xff01;&#xff01;&#xff01; 因为我的Tomcat是11版本的&#xff0c;所以导入的servlet和jsp依赖应该是下面这些&#xff1a; <!-- Serv…

c++之STL

首先我们来仔细研究string 首先我们需要实现string的构造函数和析构函数。有new就有delete. 然后我们实现size()和c_str()&#xff0c;其中c_str就是可以将string类型转换为char*类型返回。 通过运算符重载&#xff0c;我们就可以实现string的[]访问。 然后我们实现和append。 …

【机器学习】平滑滤波

平滑滤波技术 平滑滤波&#xff0c;顾名思义就是对信号进行处理使之整体显得更加平滑&#xff0c;降低噪声影响&#xff0c;提高信号质量&#xff0c;它常见于数字信号处理和图像处理&#xff0c;一般意义上的数字信号多体现于一维数据&#xff0c;图像信号多体现于二维数据。…

mysql:免费的GUI客户端工具推荐并介绍常用的操作

给大家推荐几个常用的 mysql 数据库客户端 sequel-pro sequel-ace 官网下载地址 免费 sequel-ace 可以理解为 Sequel Pro 的升级版&#xff0c;由于Sequel Pro官方不维护了&#xff0c;特别是对 MySQL 8.0 支持不好&#xff0c;所以现在由社区维护了新分支 sequel-ace&#x…

人力资源管理后台 === 角色管理

目录 1.组织架构-编辑部门-弹出层获取数据 2.组织架构-编辑部门-编辑表单校验 3.组织架构-编辑部门-确认取消 4.组织架构-删除部门 5.角色管理-搭建页面结构 6.角色管理-获取数据 7.角色管理-表格自定义结构 8.角色管理-分页功能 9.角色管理-新增功能弹层 10.角色管理…

Linux 中的 ls 命令使用教程

目录 前言 如何运用 ls 命令 1、列出带有所有权的文件和目录 2、获取以人类可读的方式显示的信息 3、列出隐藏文件 4、递归列出文件 5、在使用 ls 时对文件和目录做区分 6、列出指定扩展名的文件 7、基于大小对输出内容排序 8、根据日期和时间排序文件 让我们来总结…

Ansible的重用(include和import)

环境 管理节点&#xff1a;Ubuntu 22.04控制节点&#xff1a;CentOS 8Ansible&#xff1a;2.15.6 重用 Ansible提供四种可重用的工件&#xff1a; variable文件&#xff1a;只包含变量的文件task文件&#xff1a;只包含task的文件playbook&#xff1a;可包含play、变量、ta…

MVVM 模式与 MVC 模式:构建高效应用的选择

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

代码随想录算法训练营第六十天| 84.柱状图中最大的矩形

文档讲解&#xff1a;代码随想录 视频讲解&#xff1a;代码随想录B站账号 状态&#xff1a;看了视频题解和文章解析后做出来了 84.柱状图中最大的矩形 class Solution:def largestRectangleArea(self, heights: List[int]) -> int:heights.insert(0, 0)heights.append(0)st…

java操作windows系统功能案例(二)

1、打印指定文件 可以使用Java提供的Runtime类和Process类来打印指定文件。以下是一个示例代码&#xff1a; import java.io.File; import java.io.IOException;public class PrintFile {public static void main(String[] args) {if (args.length ! 1) {System.out.println(…

NFT Insider115:The Sandbox开设元宇宙Diorama快闪店,​YGG Web3 游戏峰会已开幕

引言&#xff1a;NFT Insider由NFT收藏组织WHALE Members、BeepCrypto联合出品&#xff0c;浓缩每周NFT新闻&#xff0c;为大家带来关于NFT最全面、最新鲜、最有价值的讯息。每期周报将从NFT市场数据&#xff0c;艺术新闻类&#xff0c;游戏新闻类&#xff0c;虚拟世界类&#…

时间序列预测实战(二十一)PyTorch实现TCN卷积进行时间序列预测(专为新手编写的自研架构)

一、本文介绍 本篇文章给大家带来的是利用我个人编写的架构进行TCN时间序列卷积进行时间序列建模&#xff08;专门为了时间序列领域新人编写的架构&#xff0c;简单不同于市面上大家用GPT写的代码&#xff09;&#xff0c;包括结果可视化、支持单元预测、多元预测、模型拟合效…