RabbitMQ实现延迟消息发送——实战篇

在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章跟着练手哦~

需求背景

我这儿的一个需求背景大概是干部添加完活动后,由管理员进行审批,审批通过后,该活动id连同设置的过期时间会被放入消息队列中,等到活动结束时间到的时候,自动将活动的状态设置为已完成,这里华丽一个活动图,各位参考一下。

ce9bd03466514d6294a1a1de81f7772d.png

需求了解完之后我们就可以开始的写代码啦~(手动微笑)

相关知识点拓展

这里还是简单提一下MQ实现延迟队列的一个方法,一种是用插件,还有一种是使用死信队列,当然本文我们使用的就是通过死信队列来实现的。

当我们的一个正常消息因为设置了过期时间或者被消费者拒绝消费的时候,这条消息就会被放入死信队列中,然后死信队列再进行消费。

然后啰嗦一下,说一下MQ的交换机类型,以及死信交换机一般选用哪种:

1. Direct Exchange(直连交换机)

  • 特点
    • 根据消息的 Routing Key 精确匹配队列的 Binding Key
    • 完全匹配时,消息才会被路由到对应的队列。
  • 适用场景
    • 点对点消息传递,消息需要精确路由到特定队列。
  • 示例
    • 消息的 Routing Key 为 order.created,队列的 Binding Key 也为 order.created,则消息会被路由到该队列。

2. Fanout Exchange(扇出交换机)

  • 特点
    • 将消息广播到所有绑定到该交换机的队列,忽略 Routing Key。
  • 适用场景
    • 广播消息,消息需要发送到多个队列。
  • 示例
    • 消息发送到 Fanout Exchange,所有绑定到该交换机的队列都会收到消息。

3. Topic Exchange(主题交换机)

  • 特点
    • 根据消息的 Routing Key 和队列的 Binding Key 进行模式匹配。
    • Binding Key 支持通配符:
      • *:匹配一个单词。
      • #:匹配零个或多个单词。
  • 适用场景
    • 消息需要根据模式路由到多个队列。
  • 示例
    • 消息的 Routing Key 为 order.created.us,队列的 Binding Key 为 order.created.*,则消息会被路由到该队列。

4. Headers Exchange(头交换机)

  • 特点
    • 根据消息的 Headers(键值对)匹配队列的 Binding Arguments。
    • 忽略 Routing Key。
  • 适用场景
    • 消息需要根据复杂的条件路由到队列。
  • 示例
    • 消息的 Headers 包含 type=order 和 region=us,队列的 Binding Arguments 要求 x-match=all 且 type=order,则消息会被路由到该队列。

5. Default Exchange(默认交换机)

  • 特点
    • RabbitMQ 默认创建的交换机,类型为 Direct Exchange。
    • 每个队列都会自动绑定到默认交换机,Binding Key 为队列名称。
  • 适用场景
    • 默认情况下,消息可以直接发送到队列。

死信交换机适合使用哪种类型?

死信交换机(DLX, Dead Letter Exchange)的类型选择取决于你的业务需求。以下是常见的选择:

1. Direct Exchange

  • 适用场景
    • 死信消息需要精确路由到特定的死信队列。
  • 示例
    • 将死信消息路由到 dlx-queue,用于统一处理所有死信消息。

2. Topic Exchange

  • 适用场景
    • 死信消息需要根据不同的 Routing Key 路由到不同的死信队列。
  • 示例
    • 将死信消息根据业务类型(如 order.deadpayment.dead)路由到不同的死信队列。

3. Fanout Exchange

  • 适用场景
    • 死信消息需要广播到多个死信队列。
  • 示例
    • 将死信消息同时发送到日志队列和报警队列。

推荐选择

  • 大多数情况下,死信交换机使用 Direct Exchange,因为死信消息通常需要精确路由到一个死信队列,用于统一处理。
  • 如果死信消息需要根据不同的条件路由到多个队列,可以使用 Topic Exchange

代码部分

首先,我们需要定义一个死信交换机和死信队列,用来接收来自普通队列的消息。

//    创建死信交换机,处理延迟消息通知@Bean("dead_letter_exchange")public DirectExchange delayExchange(){return new DirectExchange("dead_letter_exchange",true,false);}
//    创建死信队列public Queue deadLetterQueue(){Queue queue = new Queue("dead_letter_queue", true);rabbitAdmin.declareQueue(queue);log.info("死信队列声明成功:" + queue.getName());return queue;    }

然后,我们需要配置一个普通的消息队列和一个普通的交换机,这个消息队列需要设置对应的死信交换机和死信路由,同时我们这个普通队列需要接收一个过期时间,保证一到过期时间消息就会被发送到死信队列当中。

//    创建一个普通队列,接受一个过期时间,出列活动结束后,发送到死信队列public Queue normalQueue(Long expireTime){Map<String,Object> args = new HashMap<>();if (expireTime != null && expireTime > 0) {  // 确保 TTL 是正数args.put("x-message-ttl", expireTime);}// 设置死信交换机args.put("x-dead-letter-exchange",deadLetterExchange);// 设置死信路由键args.put("x-dead-letter-routing-key","dead_letter_routing_key");Queue queue = new Queue("normal_queue", true, false, false, args);log.info("普通队列声明成功:" + queue.getName());return queue;    }
//    创建一个普通交换机,处理活动结束自动设置活动状态为结束@Bean("activity_end_exchange")public DirectExchange activityEndExchange(){return new DirectExchange("activity_end_exchange");}

然后我们需要分别将死信交换机和死信队列,普通交换机和普通队列分别进行绑定。

//    将死信队列和死信交换机进行绑定public void bindDeadLetterRouting(){Queue queue=queueDeclareConfig.deadLetterQueue();Binding binding = BindingBuilder.bind(queue).to(deadLetterExchange).with("dead_letter_routing_key");rabbitAdmin.declareBinding(binding);log.info("死信队列绑定成功,死信队列名称----》" + queue.getName() + ",死信交换机名称----》" + deadLetterExchange.getName());}//    绑定活动结束交换机和普通队列public void bindActivityEndRouting(Long expireTime) {Queue queue = queueDeclareConfig.normalQueue(expireTime);Binding binding = BindingBuilder.bind(queue).to(activityEndExchange).with("activity_end_routing_key");rabbitAdmin.declareBinding(binding);}

当然,我们还需要配置生产者来发送消息到交换机里面

//活动结束后,发送消息到死信队列,自动设置活动结束状态public void sendActivityEndMessage(Long expireTime, Integer activityId) {rabbitMQBindRoutingConfig.bindDeadLetterRouting();rabbitMQBindRoutingConfig.bindActivityEndRouting(expireTime);try {// 将消息发送到普通队列,等待消息过期发送到死信交换机rabbitTemplate.convertAndSend("activity_end_exchange", "activity_end_routing_key", activityId, msg -> {msg.getMessageProperties().setExpiration(expireTime.toString());return msg;});} catch (Exception e) {log.error("发送消息失败------->" + activityId);throw new RuntimeException("发送消息失败---->" + activityId);}}

这里生产者的代码可以根据你的业务逻辑具体进行更改~

消费者逻辑也需要进行编写一下

//    使用MQ延迟队列,活动结束,修改活动状态@RabbitListener(queues = "dead_letter_queue")public void updatePlaceOccupyStatus(Message message, Channel channel){try {String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);Integer activityId = Integer.parseInt(messageBody);ActivityInfo activityInfo = baseMapper.selectById(activityId);LambdaUpdateWrapper<ActivityInfo> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(ActivityInfo::getActivityId,activityId).set(ActivityInfo::getProgress,StatusConstant.FINISH);if(baseMapper.update(activityInfo,wrapper)>0){channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {log.error("处理消息时发生错误:" + e.getMessage());try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} catch (IOException ioException) {ioException.printStackTrace();}}

消费者这边需要注意的是如果你选择的提交类型不是自动提交的话,在处理完消息之后需要手动ack一下消息,不然消费的消息不会被认为已经消费,从而导致消息积压,也会在之后的消费中重复进行消费,因此你需要告诉生产者这条消息已经被消费了。

当然,如果在消费的过程中出现了什么问题,可以设置以下这行代码:

419e93610db14c019fcfb49ad0d23703.png

basicNack方法接收三个参数:
deliveryTag: 消息的标识符。
multiple: 是否对多个消息进行否定确认。
requeue: 是否将消息重新放入队列。 

可以根据你的需求进行设定~

然后的然后,我们需要再application.yml当中进行配置相关信息:

rabbitmq:host: localhostport: 5672username: guestpassword: guest
#    确认消息发送到交换机上publisher-confirm-type: correlated#    消息发送到队列确认,失败回调publisher-returns: truelistener:direct:acknowledge-mode: manualretry:enabled: true
#          重试的时间间隔为1sinitial-interval: 1000ms
#          最大重试3次max-attempts: 3
#          最大的重试时间间隔为2smax-interval: 2000ms
#          每次重试时间间隔为1s,每次重试时间间隔倍数multiplier: 1.0#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)default-requeue-rejected: falsesimple:default-requeue-rejected: falseacknowledge-mode: manual
#        最小消费者数量concurrency: 1
#        最大消费者数量max-concurrency: 10retry:enabled: trueinitial-interval: 1000msmax-attempts: 3max-interval: 2000msmultiplier: 1.0

上面给出了一个比较全的配置,你可以根据你的需求进行选择,但是需要注意的是default-requeue-rejected: false这一行配置一定要先配置,不然你的消息在普通队列中过期了,是不会发送到死信队列当中进行消费的~

到这儿,基本上所有的代码都写的差不多了,当然我们还需要再rabbitmq控制平台上分别建一个普通交换机和一个死信交换机,一个普通队列和一个私信队列,然后分别绑定就可以了。

注意的是,普通交换机也需要在平台上配置一次死信队列和死信路由:

5f1c7325f13a4910bd2d28e8b62a7f60.png

1d9202ed97554cc4ab1601d6f839b0ef.png

到这儿,如果没有什么问题的话基本上已经可以直接运行了,所以我的这篇文章到这儿基本上也已经结束了,如果你有什么问题,可以评论区留言,我们相互学习~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/4466.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IoTDB 常见问题 QA 第四期

关于 IoTDB 的 Q & A IoTDB Q&A 第四期来啦&#xff01;我们将定期汇总我们将定期汇总社区讨论频繁的问题&#xff0c;并展开进行详细回答&#xff0c;通过积累常见问题“小百科”&#xff0c;方便大家使用 IoTDB。 Q1&#xff1a;Java 中如何使用 SSL 连接 IoTDB 问题…

【STM32-学习笔记-14-】FLASH闪存

文章目录 FALSH闪存一、FLASH简介二、FLASH基本结构三、FLASH解锁四、使用指针访问存储器五、FLASH擦除以及编程流程Ⅰ、程序存储器全擦除1. 读取FLASH_CR的LOCK位2. 检查LOCK位是否为13. 设置FLASH_CR的MER 1和STRT 1&#xff08;如果LOCK位0&#xff09;4. 检查FLASH_SR的B…

CamemBERT:一款出色的法语语言模型

摘要 预训练语言模型在自然语言处理中已无处不在。尽管这些模型取得了成功&#xff0c;但大多数可用模型要么是在英语数据上训练的&#xff0c;要么是在多种语言数据拼接的基础上训练的。这使得这些模型在除英语以外的所有语言中的实际应用非常有限。本文探讨了为其他语言训练…

线性代数概述

矩阵与线性代数的关系 矩阵是线性代数的研究对象之一&#xff1a; 矩阵&#xff08;Matrix&#xff09;是一个按照长方阵列排列的复数或实数集合&#xff0c;是线性代数中的核心概念之一。矩阵的定义和性质构成了线性代数中矩阵理论的基础&#xff0c;而矩阵运算则简洁地表示和…

金仓Kingbase客户端KStudio报OOM:Java heap space socketTimeout

找到Kingbase\ES\V8\KESRealPro\V008R006C006B0021\ClientTools\guitools\KStudio\KStudio.ini 修改JVM参数&#xff1a; 默认值&#xff1a; -Xms512m -Xmx1024m 改为&#xff1a; -Xms1024m -Xmx2048m -XX:MaxPermSize512m SQL查询报错&#xff1a;An I/O error occurred …

Spring6.0新特性-HTTP接口:使用@HttpExchange实现更优雅的Http客户端

文章目录 一、概述二、使用1、创建接口HttpExchange方法2、创建一个在调用方法时执行请求的代理3、方法参数4、返回值5、错误处理&#xff08;1&#xff09;为RestClient&#xff08;2&#xff09;为WebClient&#xff08;3&#xff09;为RestTemplate 注意 一、概述 官方文档…

kubernetes学习-Service(七)

一、Service-pod-endpoint关系 # 查看endpoints [rootk8s-master deployments]# kubectl get endpoints NAME ENDPOINTS AGE kubernetes 192.168.129.136:6443 90m nginx-svc 10.109.131.1:80,10.111.156.65:80 22m # …

Python数据分析案例70——基于神经网络的时间序列预测(滞后性的效果,预测中存在的问题)

背景 这篇文章可以说是基于 现代的一些神经网络的方法去做时间序列预测的一个介绍科普&#xff0c;也可以说是一个各种模型对比的案例&#xff0c;但也会谈一谈自己做了这么久关于神经网络的时间序列预测的论文&#xff0c;其中一些常见的模式及它们存在的问题以及效果&#x…

opencv笔记2

图像灰度 彩色图像转化为灰度图像的过程是图像的灰度化处理。彩色图像中的每个像素的颜色由R&#xff0c;G&#xff0c;B三个分量决定&#xff0c;而每个分量中可取值0-255&#xff0c;这样一个像素点可以有256*256*256变化。而灰度图像是R&#xff0c;G&#xff0c;B三个分量…

LeetCode:2266. 统计打字方案数(DP Java)

目录 2266. 统计打字方案数 题目描述&#xff1a; 实现代码与解析&#xff1a; 动态规划 原理思路&#xff1a; 2266. 统计打字方案数 题目描述&#xff1a; Alice 在给 Bob 用手机打字。数字到字母的 对应 如下图所示。 为了 打出 一个字母&#xff0c;Alice 需要 按 对…

http://noi.openjudge.cn/——4.7算法之搜索——【169:The Buses】

题目 169:The Buses 总时间限制: 5000ms 内存限制: 65536kB 描述 A man arrives at a bus stop at 12:00. He remains there during 12:00-12:59. The bus stop is used by a number of bus routes. The man notes the times of arriving buses. The times when buses arrive …

java基础概念59-File

一、路径 二、File类 2-1、常见的构造方法 示例&#xff1a; 【注意】&#xff1a; 一般不自己用分割符把父路径和子路径拼接起来&#xff0c;因为&#xff0c;不用的操作系统&#xff0c;分隔符不同。 2-2、小结 2-3、File中常见的成员方法 示例&#xff1a; 【注意】&#…

PortSwigger靶场练习---第二关-查找和利用未使用的 API 端点

第二关&#xff1a;Finding and exploiting an unused API endpoint 实验&#xff1a;查找和利用未使用的 API 端点 PortSwigger靶场地址&#xff1a; Dashboard | Web Security Academy - PortSwigger 题目&#xff1a; 官方提示&#xff1a; 在 Burp 的浏览器中&#xff0c…

软路由系统iStoreOS 一键安装 docker compose

一键安装命令 大家好&#xff01;今天我来分享一个快速安装 docker-compose 的方法。以下是我常用的命令&#xff0c;当前版本是 V2.32.4。如果你需要最新版本&#xff0c;可以查看获取docker compose最新版本号 部分&#xff0c;获取最新版本号后替换命令中的版本号即可。 w…

SpringCloud nacos 2.0.0 + seata 2.0.0

NACOS 下载nacos https://github.com/alibaba/nacos/releases/tag/2.2.0 启动nacos startup.cmd -m standalone SEATA 下载seata https://seata.apache.org/release-history/seata-server 新建数据库-seata CREATE TABLE branch_table (branch_id bigint NOT NULL,xid …

springboot音乐播放器系统

Spring Boot音乐播放器系统是一个基于Spring Boot框架开发的音乐播放平台&#xff0c;旨在为用户提供高效、便捷的音乐播放体验。 一、系统背景与意义 随着互联网的飞速发展和人们对音乐娱乐需求的不断增长&#xff0c;音乐播放器已经成为人们日常生活中不可或缺的一部分。传…

奉加微PHY6230兼容性:部分手机不兼容

从事嵌入式单片机的工作算是符合我个人兴趣爱好的,当面对一个新的芯片我即想把芯片尽快搞懂完成项目赚钱,也想着能够把自己遇到的坑和注意事项记录下来,即方便自己后面查阅也可以分享给大家,这是一种冲动,但是这个或许并不是原厂希望的,尽管这样有可能会牺牲一些时间也有哪天原…

Go-知识 版本演进

Go-知识 版本演进 Go release notesr56(2011/03/16)r57(2011/05/03)Gofix 工具语言包工具小修订 r58(2011/06/29)语言包工具小修订 r59(2011/08/01)语言包工具 r60(2011/09/07)语言包工具 [go1 2012-03-28](https://golang.google.cn/doc/devel/release#go1)[go1.1 2013-05-13]…

C#,入门教程(02)—— Visual Studio 2022开发环境搭建图文教程

如果这是您阅读的本专栏的第一篇博文&#xff0c;建议先阅读如何安装Visual Studio 2022。 C#&#xff0c;入门教程(01)—— Visual Studio 2022 免费安装的详细图文与动画教程https://blog.csdn.net/beijinghorn/article/details/123350910 一、简单准备 开始学习、编写程序…

数字艺术类专业人才供需数据获取和分析研究

本文章所用数据集&#xff1a;数据集 本文章所用源代码&#xff1a;源代码和训练好的模型 第1章 绪论 1.1研究背景及意义 随着社会经济的迅速发展和科技的飞速进步&#xff0c;数字艺术类专业正逐渐崛起&#xff0c;并呈现出蓬勃发展的势头。数字艺术作为创作、设计和表现形式的…