RabbitMQ 延时消息实现

1. 实现方式

1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
  • 导入Spring 集成RabbitMQ MAEVN
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.5.RELEASE</version>
</dependency>

2. 设置队列过期时间:延迟队列消息过期 + 死信队列

推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列

2.1. MQ配置信息

2.1.1. 自定义队列配置

…/bootstrap.yml

# rabbitmq自定义配置
rabbitmq:ttlExchange: medical_dev_ttl_topic_changettlKey: dev_ttlttlQueue: medical.dev.ttl.topic.queuedelayExpireTime: 600ttlQueueSize: 10000deadExchange: medical_dev_dead_topic_changedeadKey: dev_deaddeadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/*** amqp配置文件*/
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {/*** 延迟队列*/public String ttlExchange;public String ttlKey;public String ttlQueue;private Integer delayExpireTime;public Integer ttlQueueSize;/*** 死信队列*/public String deadExchange;public String deadKey;public String deadQueue;}

2.2. 配置文件自动生成队列

2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;/*** 延迟队列配置文件* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigTTL {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic TopicExchange ttlTopicExchange(){return new TopicExchange(myConfigProperties.getTtlExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue ttlTopicduanxinQueue(){HashMap<String, Object> args = new HashMap<>();// 给队列设置消息过期时间:毫秒值args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);// 设置队列最大长度args.put("x-max-length", myConfigProperties.getTtlQueueSize());// 设置死信队列交换机名称// 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列// 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());// 设置死信队列路由keyargs.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);}// 3: 绑定对用关系@Beanpublic Binding ttlTopicsmsBinding(){return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());}}
2.2.2. 死信队列

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;/*** 死信队列配置文件* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigDead {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic TopicExchange deadTopicExchange(){return new TopicExchange(myConfigProperties.getDeadExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue deadTopicduanxinQueue(){return new Queue(myConfigProperties.getDeadQueue(), true);}// 3: 绑定对用关系@Beanpublic Binding deadTopicsmsBinding(){return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());}}

2.3. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** RabbitMQ生产者推送消息类* * @author xiemingan*/
@Component
@Slf4j
public class RabbitmqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfigProperties myConfigProperties;/*** @param pushMessage 推送消息体*/public void pushTtlMessage(String pushMessage) {// 推送消息至交换机,并指定路由keyrabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);}}

2.4. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** @author mingAn.xie*/
@Log4j2
@Component
public class RabbitmqConsumer {/*** 消费死信队列* @param message 消息体*/@RabbitListener(queues = "${rabbitmq.deadQueue}")public void pushMessages(Message message) {String body = new String(message.getBody()).trim();if (StringUtils.isEmpty(body)){return;}log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}

3. 设置消息的过期时间

设置交换机类型为 x-delayed-type,推送消息至交换机,直连队列消费

3.1. 安装插件 rabbitmq_delayed_message_exchange

前言:这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ

具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件

安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本

插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  • 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 交换机类型中出现 x-delayed-type 表示安装成功

3.2. MQ配置信息

3.2.1. 自定义队列配置

…/bootstrap.yml

#mq队列自定义配置
rabbitmq:saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchangesaveTaskTtlKey: ey240001_pro_save_task_ttlsaveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queuesaveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/*** amqp配置文件** @author mingAn.xie*/
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {/*** 任务待办生成延时队列*/public String saveTaskTtlExchange;public String saveTaskTtlKey;public String saveTaskTtlQueue;public Integer saveTaskTtlQueueSize;}

3.3. 配置文件生成 x-delayed-type 交换机

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** x-delayed-type 交换机延迟队列配置* * @author mingAn.xie*/
@Configuration
public class RabbitMQConfigSaveTaskTtl {@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@Beanpublic CustomExchange saveTaskTopicExchange() {Map<String, Object> args = new HashMap<>();// 设置延迟队列插件类型:按过期时间消费args.put("x-delayed-type", "direct");// 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@Beanpublic Queue saveTaskTopicduanxinQueue() {return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);}// 3: 绑定对用关系@Beanpublic Binding saveTaskTopicsmsBinding() {return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();}}

3.4. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 生产者推送消息类* * @author xiemingan*/
@Component
@Slf4j
public class RabbitmqProducer {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyConfigProperties myConfigProperties;/*** @param pushMessage 推送消息体* @param ttlTime     延时时间(毫秒值)*/public void pushTtlMessage(String pushMessage, long ttlTime) {ttlTime = ttlTime <= 0 ? 1000 : ttlTime;// 3.1.推送MQ延迟消息队列long finalTtlTime = ttlTime;MessagePostProcessor messagePostProcessor = message -> {// 设置延迟时间message.getMessageProperties().setDelay((int) finalTtlTime);return message;};rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);}}

3.5. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** @author mingAn.xie*/
@Log4j2
@Component
public class RabbitmqConsumer {/*** 消费延时消息* @param message 消息体*/@RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")public void pushMessages(Message message) {String body = new String(message.getBody()).trim();if (StringUtils.isEmpty(body)) {return;}log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/291560.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++从入门到精通——缺省参数

缺省参数 前言一、缺省参数概念二、缺省参数分类位置参数的缺省参数全缺省参数半缺省参数 关键字参数的缺省参数函数指针的缺省参数lambda表达式 三、缺省参数的具体代码展示main.cpp 前言 缺省参数是在函数定义时指定的默认值&#xff0c;当调用函数时未提供该参数的值时&…

暴雨服务器X7740赋能大模型训练

数字经济浪潮愈演愈烈,大模型训练对服务器的要求也越来越高。在此背景下,暴雨信息发布专门为大规模模型训练而设计的全新旗舰GPU服务器—X7740,以卓越的计算性能、高速网络通信能力以及创新的能效表现,有效赋能大模型训练。 X7740 搭载了暴雨信息最新一代的英特尔至强可扩展处理…

如何划分训练集、测试集、验证集

训练集、测试集和验证集是在机器学习和数据科学中常用的术语&#xff0c;用于评估和验证模型的性能。它们通常用于监督学习任务中。 1. 训练集&#xff08;Training Set&#xff09;&#xff1a;训练集是用于训练机器学习模型的数据集。在训练期间&#xff0c;模型使用训练集中…

Leetcode - 周赛390

目录 一&#xff0c;3090. 每个字符最多出现两次的最长子字符串 二&#xff0c;3091. 执行操作使数据元素之和大于等于 K 三&#xff0c;3092. 最高频率的 ID 四&#xff0c;3093. 最长公共后缀查询 一&#xff0c;3090. 每个字符最多出现两次的最长子字符串 本题是一道标准…

嵌入式C语言中头文件计设规则方法

我是阿梁,最近在负责的项目代码,也算是祖传代码了,里面有很多头文件嵌套的情况,即a.h包含b.h,b.h又包含c.h,c.h又包含d.h......遂找到一份华子的C语言编程规范学习一下,并结合自己的理解写成这篇文章,以规范自己的代码。 1. 头文件嵌套的缺点 依赖:若x.h包含了y.h,则…

启信宝商业大数据助力全国经济普查

近日&#xff0c;合合信息旗下启信宝收到中国青年创业就业基金会感谢信&#xff0c;对启信宝协同助力全国经济普查和服务青年创业就业研究表达感谢。 第五次全国经济普查是新时代新征程上一次重大国情国力调查&#xff0c;是对国民经济“全面体检”和“集中盘点”&#xff0c;…

武汉星起航:亚马逊受惠国家政策,企业成长与行业发展齐头并进

亚马逊电商平台作为国际知名的跨境电商巨头&#xff0c;在中国市场也展现出了强劲的发展势头。近年来&#xff0c;国家政策对亚马逊电商平台的支持力度不断加大&#xff0c;为企业提供了良好的发展环境和机遇。武汉星起航将探讨国家政策对亚马逊电商平台的重要影响&#xff0c;…

深度思考:雪花算法snowflake分布式id生成原理详解

雪花算法snowflake是一种优秀的分布式ID生成方案&#xff0c;其优点突出&#xff1a;它能生成全局唯一且递增的ID&#xff0c;确保了数据的一致性和准确性&#xff1b;同时&#xff0c;该算法灵活性强&#xff0c;可自定义各部分bit位&#xff0c;满足不同业务场景的需求&#…

QT使用数据库

数据库就是保存数据的文件。可以存储大量数据&#xff0c;包括插入数据、更新数据、截取数据等。用专业术语来说&#xff0c;数据库是“按照数据结构来组织、存储和管理数据的仓库”。 什么时候需要数据库&#xff1f;在嵌入式里&#xff0c;存储大量数据&#xff0c;或者记录数…

【项目技术介绍篇】若依开源项目RuoYi-Cloud后端技术介绍

作者介绍&#xff1a;本人笔名姑苏老陈&#xff0c;从事JAVA开发工作十多年了&#xff0c;带过大学刚毕业的实习生&#xff0c;也带过技术团队。最近有个朋友的表弟&#xff0c;马上要大学毕业了&#xff0c;想从事JAVA开发工作&#xff0c;但不知道从何处入手。于是&#xff0…

双端队列deque和vector以及list的优缺点比较

参考:https://blog.csdn.net/TWRenHao/article/details/123483085 一、vector vector具体用法详情点这里 优点&#xff1a; 支持随机访问 CPU高速环缓存命中率很高 缺点&#xff1a; 空间不够&#xff0c;便需要增容。而增容代价很大&#xff0c;还存在一定的空间浪费。 头部…

在同一个网站上自动下载多个子页面内容

一、问题现象 第一次遇到这样的问题&#xff0c;如下图&#xff1a; 即在同一个网站上下载多个内容时&#xff0c;第一个内容明明已经正常get到了&#xff0c;但开始第二个页面的查询 以后&#xff0c;原来已经查出的内容就找不到了。 二、解决办法 我不知道大家是不是遇到…

C++项目——集群聊天服务器项目(七)Model层设计、注册业务实现

在前几节的研究中&#xff0c;我们已经实现网络层与业务层分离&#xff0c;本节实现数据层与业务层分离&#xff0c;降低各层之间的耦合性&#xff0c;同时实现用户注册业务。 网络层专注于处理网络通信与读写事件 业务层专注于处理读写事件到来时所需求的各项业务 数据层专…

msvcr110.dll文件丢失要怎么办?教你多种解决msvcr110.dll文件的方法

面对“程序无法启动&#xff0c;因为电脑中缺失msvcr110.dll”的错误提示&#xff0c;你可能会觉得你的工作或者休闲时间被意外中断了&#xff0c;这确实很让人烦恼。这种问题对于很多Windows用户来说并不陌生&#xff0c;但幸运的是&#xff0c;它通常可以通过几个简单的步骤得…

node.js学习(2)

版权声明 以下文章为尚硅谷PDF资料&#xff0c;B站视频链接&#xff1a;【尚硅谷Node.js零基础视频教程&#xff0c;nodejs新手到高手】仅供个人学习交流使用。如涉及侵权问题&#xff0c;请立即与本人联系&#xff0c;本人将积极配合删除相关内容。感谢理解和支持&#xff0c;…

Python3:ModuleNotFoundError: No module named ‘elftools‘

问题背景 问题 ModuleNotFoundError: No module named ‘elftools’ 解决方法 pip3 install pyelftools 成功&#xff01;&#xff01;&#xff01;

哲♂学家带你深♂入理解c语言的编译与链接

目录 前言&#xff1a; 一、翻译环境 二、预处理 三、编译 1.词法分析 2.语法分析 3、语义分析 四、汇编 五、链接 总结&#xff1a; 前言&#xff1a; 编译和链接能够帮我们更好的理解c语言中程序执行前是如何运行的&#xff0c;今天由本哲学家带你深入理解c语言的编译…

iOS - Runloop的运行逻辑

文章目录 iOS - Runloop的运行逻辑1. 苹果官方的Runloop执行图2. Mode里面的东西2.1 Source02.2 Source12.3 Timers2.4 Observers 3. 执行流程3.1 注意点 4. Runloop休眠 iOS - Runloop的运行逻辑 1. 苹果官方的Runloop执行图 2. Mode里面的东西 2.1 Source0 触摸事件处理pe…

Unity Mesh 生成图形(一)

目录 一、概述 二、获取顶点坐标和索引 三、绘制正方形 1.显示顶点坐标 2.顶点坐标的顺序 3.顶点排序 4.绘制最终效果 结束 一、概述 Unity 的 Mesh 是用于表示三维物体的网格数据结构。它是由一系列顶点和三角形组成的网格&#xff0c;用于描述物体的形状和外观。 M…

Java8之接口默认方法

Java8之接口默认方法 一、介绍二、代码1、接口2、实现类3、测试代码4、效果 一、介绍 在Java8中&#xff0c;允许为接口方法提供一个默认的实现。必须用default修饰符标记这样一个方法。默认方法也可以调用其他方法 二、代码 1、接口 public interface PersonService {void…