或许是全网最全的延迟队列

什么是延迟队列

作用:用来存储延迟消息
延迟消息:生产者发送一个消息给mq,然后mq会经过一段时间(延迟时间),然后在把这个消息发送给消费者

应用场景

  1. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
  2. 推送某些数据的定时任务
  3. 微信公众号文章的延迟发布
  4. 订单超时未支付自动取消订单

实现延迟队列

在rabbitmq中没有提供真正意义上的延迟队列。要实现延迟队列有两套方案

  1. 方案一:基于死信队列中的消息TTL过期模式的进行改造,不监听对应队列,使消息过期后全部进入死信队列以达成延时效果,主要有队列TTL消息TTL两种
  2. 方案二:使用延时队列插件,让交换机管理消息延时时间(常用)

创建工程

创建springBoot工程,勾选需要的依赖
image.png
添加RabbitMQ配置

spring.rabbitmq.host=xxxx
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=DeadQueue

使用TTL+死信队列

队列TTL案例

对队列QA设置过期时间 10S,队列QB设置过期时间 40S,不监听QA、QB队列,使消息进入队列后不被消费导致TTL超时进入QD延迟队列

Y是死信交换机,QD是死信队列

对队列设置TTL
缺点:每增加一个新的时间需求,就要新增一个队列
创建RabbitMQ配置文件

package com.dmbjz.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* RabbitMQ的交换机、队列配置文件 */
@Configuration
public class ExchangeQueueConfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String DEAD_LETTER_QUEUE = "QD";/*创建X交换机*/@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}/*创建死信交换机*/@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")public Queue queueA(){Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);      //声明当前队列绑定的死信交换机args.put("x-dead-letter-routing-key", "YD");                     //声明当前队列的死信路由 keyargs.put("x-message-ttl", 10000);                                //声明队列的 TTLreturn QueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 声明队列 A 绑定 X 交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);         //声明当前队列绑定的死信交换机args.put("x-dead-letter-routing-key", "YD");                        //声明当前队列的死信路由 keyargs.put("x-message-ttl", 40000);                                   //声明队列的 TTLreturn QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queue1B).to(xExchange).with("XB");}//声明死信队列 QD@Bean("queueD")public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码:

package com.dmbjz.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;/* 生产者发送消息Controller */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/sendMessage/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列,消息内容:{}",new Date(),message);rabbitTemplate.convertAndSend("X","XA",message.getBytes(StandardCharsets.UTF_8));rabbitTemplate.convertAndSend("X","XB",message.getBytes(StandardCharsets.UTF_8));}
}

消费者代码:

package com.dmbjz.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/* 队列TTL消费者 */
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel)throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);}
}

浏览器访问地址测试:

http://localhost:8080/ttl/sendMessage/测试消息TTL

image.png


消息TTL案例

对消息设置过期时间,不监听QC队列,消息超时后自动进入QD延迟队列
缺点:如果积压在队列前面的消息延时时长很长,而后面积压的消息延时时长很短,积压时间短的消息并不会被提前放入死信队列;如果QC恰好又设置了积压上限,无法被积压的消息将直接进入延时队列,达不到延时效果
对消息设置TTL
修改配置文件:

    //声明队列 QC@Beanpublic Queue queueC(){Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);      //声明当前队列绑定的死信交换机args.put("x-dead-letter-routing-key", "YD");                     //声明当前队列的死信路由 keyreturn QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 QC 绑定 X 交换机@Beanpublic Binding queuebCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC()).to(xExchange).with("XC");}

生产者代码:

    //声明队列 QC@Beanpublic Queue queueC(){Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);      //声明当前队列绑定的死信交换机args.put("x-dead-letter-routing-key", "YD");                     //声明当前队列的死信路由 keyreturn QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 QC 绑定 X 交换机@Beanpublic Binding queuebCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC()).to(xExchange).with("XC");}

浏览器访问地址进行测试:

http://localhost:8080/ttl/sendMessagExpira/测试消息1/10000
http://localhost:8080/ttl/sendMessagExpira/测试消息2/1000

延时插件

使用延时队列插件实现延时队列功能,原理为交换机管理消息延时时间
插件版本需要兼容 RabbitMQ 版本,具体参考其发布说明**,**延时队列插件下载:github
插件安装步骤

1.将安装目录的延时队列插件拷贝到RabbitMQ插件目录cp rabbitmq_delayed_message_exchange-3.8.0.ez /root/rabbitmq_server-3.8.8/plugins2.安装延时队列插件   rabbitmq-plugins enable rabbitmq_delayed_message_exchange3、重启RabbitMQ服务systemctl restart rabbitmq-server

延时队列插件安装完成
重启服务后交换机多了延迟类型
案例演示:
延时队列插件实际落地固定为图中架构模式
延时队列插件架构图
创建配置文件:

package com.dmbjz.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* 延时队列插件案例 RabbitMQ配置类 */
@Configuration
public class DelayedQueueConfig {private static final String delayed_queue_name = "delayed.queue";private static final String delayed_exchange_name = "delayed.exchange";private static final String delayed_routingkey = "delayed.routingkey";/*创建延时插件的交换机,需要使用自定义方法进行创建*   插件版非死信队列,不需要路由到不同的交换机进行指定过期时间,所以固定为 direct 类型交换机* */@Beanpublic CustomExchange delayedExchange(){Map<String,Object> map = new HashMap<>(1);map.put("x-delayed-type","direct");       //延迟队列类型,固定值return new CustomExchange(delayed_exchange_name,"x-delayed-message",true,false,map);}/*队列*/@Beanpublic Queue delayQueue(){return QueueBuilder.durable(delayed_queue_name).build();}/*绑定,自定义交换机绑定多一个 noargs方法 */@Beanpublic Binding delayBing(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayQueue).to(delayedExchange).with(delayed_routingkey).noargs();}
}

生产者代码:

    /*延时插件案例*/@RequestMapping("/sendMessagPlugin/{message}/{time}")public void sendMsgPlugin(@PathVariable String message,@PathVariable Integer time){MessageProperties properties = new MessageProperties();properties.setDelay(time);      //设置延时时间Message msg = new Message(message.getBytes(StandardCharsets.UTF_8),properties);log.info("当前时间:{},发送具有过期时间为{}毫秒的信息给延时插件队列,消息内容:{}",new Date(),time,message);rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",msg);}

消费者代码:

package com.dmbjz.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/* 延时队列插件 消费者 */
@Component
@Slf4j
public class DelayQueueConsumer {@RabbitListener(queues = "delayed.queue")public void receiveDelayQueue(Message message, Channel channel)throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);}
}

浏览器访问地址进行测试:

http://localhost:8080/ttl/sendMessagPlugin/测试消息1/10000
http://localhost:8080/ttl/sendMessagPlugin/测试消息2/1000

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/217969.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是数据可视化?数据可视化的优势、方法及示例

前言 在当今的数字时代&#xff0c;数据是企业和组织的命脉&#xff0c;生成的数据量呈指数级增长。这种被称为大数据的海量数据在洞察力和决策方面具有巨大的潜力。然而&#xff0c;如果没有一种有效的方法来分析和理解这些数据&#xff0c;它就会变得毫无意义和难以管理。这就…

MyBatis--07--启动过程分析、SqlSession安全问题、拦截器

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 谈谈MyBatis的启动过程具体的操作过程如下&#xff1a;实现测试类,并测试SqlSessionFactorySqlSession SqlSession有数据安全问题?在MyBatis中&#xff0c;SqlSess…

BERT大模型:英语NLP的里程碑

BERT的诞生与重要性 BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;大模型标志着自然语言处理&#xff08;NLP&#xff09;领域的一个重要转折点。作为首个利用掩蔽语言模型&#xff08;MLM&#xff09;在英语语言上进行预训练的模型&…

深入分析ClassLocader工作机制

文章目录 一、ClassLoader简介1. 概念2. ClassLoader类结构分析 二、ClassLoader的双亲委派机制三、Class文件的加载流程1. 简介2. 加载字节码到内存3. 验证与解析4. 初始化Class对象 四、常见加载类错误分析1. ClassNotFoundException2. NoClassDefFoundError3. UnsatisfiledL…

RK3568/RV1126/RV1109/RV1106 ISP调试方案

最近一直在做瑞芯微rv1126的开发&#xff0c;由于项目性质&#xff0c;与camera打的交道比较多&#xff0c;包括图像的采集&#xff0c;ISP处理&#xff0c;图像处理&#xff0c;H.264/H.265编解码等各个方面吧。学到了不少&#xff0c;在学习的过程中&#xff0c;也得到了不少…

人工智能中的顺序学习:概念、应用和未来方向

一、介绍 人工智能 &#xff08;AI&#xff09; 中的顺序学习是一个关键研究领域&#xff0c;近年来引起了人们的极大兴趣。它指的是人工智能系统从数据序列中学习的能力&#xff0c;其中数据点的顺序至关重要。本文将探讨人工智能中顺序学习的概念、其重要性、应用、方法、挑战…

el-table 表格多选(后端接口搜索分页)实现已选中的记忆功能。实现表格数据和已选数据(前端分页)动态同步更新。

实现效果&#xff1a;&#xff08;可拉代码下来看&#xff1a;vue-demo: vueDemo&#xff09; 左侧表格为点击查询调用接口查询出来的数据&#xff0c;右侧表格为左侧表格所有选择的数据&#xff0c;由前端实现分页。 两个el-table勾选数据联动更新 实现逻辑&#xff1a; el-…

IDEA 出现问题:Idea-操作多次commit,如何合并为一个并push解决方案

❤️作者主页&#xff1a;小虚竹 ❤️作者简介&#xff1a;大家好,我是小虚竹。2022年度博客之星评选TOP 10&#x1f3c6;&#xff0c;Java领域优质创作者&#x1f3c6;&#xff0c;CSDN博客专家&#x1f3c6;&#xff0c;华为云享专家&#x1f3c6;&#xff0c;掘金年度人气作…

为什么需要分库分表,如何实现?

本文我们主要讲解“为什么需要分库分表&#xff0c;如何实现”。 在前文中讲到了读写分离&#xff0c;读写分离优化了互联网读多写少场景下的性能问题&#xff0c;考虑一个业务场景&#xff0c;如果读库的数据规模非常大&#xff0c;除了增加多个从库之外&#xff0c;还有其他…

RabbitMQ插件详解:rabbitmq_web_stomp【RabbitMQ 六】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 《RabbitMQ Web STOMP&#xff1a;打破界限的消息传递之舞》 前言STOMP协议简介STOMP&#xff08;Simple Text Oriented Messaging Protocol&#xff09;协议简介STOMP与WebSocket的关系 WebSocket和R…

学习JVM

java虚拟机 流程&#xff1a;helloworld.java----(javac编译)----helloworld.class-------(java运行)——JVM——机器码JVM功能 *解释和运行 *内存管理 *即时编译&#xff08;跨平台-慢一点&#xff09;jit &#xff08;反复用到的代码 解释保存再内存里面&#xff09;…

anolisos8.8安装显卡+CUDA工具+容器运行时支持(containerd/docker)+k8s部署GPU插件

anolisos8.8安装显卡及cuda工具 一、目录 1、测试环境 2、安装显卡驱动 3、安装cuda工具 4、配置容器运行时 5、K8S集群安装nvidia插件 二、测试环境 操作系统&#xff1a;Anolis OS 8.8 内核版本&#xff1a;5.10.134-13.an8.x86_64 显卡安装版本&#xff1a;525.147.05 c…

【docker】docker入门与安装

Docker 一、入门 Docker的主要目标是&#xff1a;Build, Ship and Run Any App, Anywhere&#xff0c;也就是通过对应用组件的封装、分发、部署、运行等生命周期的管理&#xff0c;使用户的APP及其运行环境能做到一次镜像,处处运行。 Docker运行速度快的原因 Docker有比虚拟…

Spark编程实验一:Spark和Hadoop的安装使用

一、目的与要求 1、掌握在Linux虚拟机中安装Hadoop和Spark的方法&#xff1b; 2、熟悉HDFS的基本使用方法&#xff1b; 3、掌握使用Spark访问本地文件和HDFS文件的方法。 二、实验内容 1、安装Hadoop和Spark 进入Linux系统&#xff0c;完成Hadoop伪分布式模式的安装。完成Ha…

Ignoring query to other database

登录数据库执行查看database的脚本提示 仔细观察才发现&#xff0c;登录的时候我写的是&#xff0c;没写 -u 退出重新登录&#xff0c;好了~

死锁的预防、避免、检测和消除

一、预防死锁 1. 破坏互斥条件 2. 破坏不剥夺条件 3.破坏请求和保持条件 4.破坏循环等待条件 二、避免死锁 避免死锁的一种方法是使用银行家算法&#xff0c;它涉及到安全序列的概念。银行家算法是一种资源分配和死锁避免的算法&#xff0c;它确保系统能够分配资源而不会导致死…

c/c++ 结构体、联合体、枚举

结构体 结构体内存对齐规则&#xff1a; 1、结构体的第一个成员对齐到结构体变量起始位置偏移量为0的地址处 2、其他成员变量要对齐到某个数字&#xff08;对齐数&#xff09;的整数倍的地址处。 对齐数&#xff1a;编译器默认的一个对齐数与该成员变量大小的较小值。 vs 中…

JMeter下载与安装

文章目录 前言一、安装java环境&#xff08;JDK下载与安装&#xff09;二、JMeter下载三、JMeter安装1.解压缩2.配置环境变量 四、JMeter启动&#xff08;启动成功则代表JMeter安装成功&#xff09;五、JMeter汉化&#xff08;将JMeter修改成中文&#xff09;1.方法一&#xff…

浅谈5G基站节能及数字化管理解决方案的设计与应用-安科瑞 蒋静

截至2023年10月&#xff0c;我国5G基站总数达321.5万个&#xff0c;占全国通信基站总数的28.1%。然而&#xff0c;随着5G基站数量的快速增长&#xff0c;基站的能耗问题也逐渐日益凸显&#xff0c;基站的用电给运营商带来了巨大的电费开支压力&#xff0c;降低5G基站的能耗成为…

用Bat文件调用小牛翻译api快速翻译

为了帮助大家更加轻松地调用机器翻译api&#xff0c;本人探索实现了一种可以通过BAT文件来调用机器翻译api&#xff0c;对粘贴板中的文本进行翻译&#xff0c;并将翻译结果保存为txt文件。下面把实现步骤简要说明如下&#xff1a; 第一步&#xff1a;获取小牛机器翻译api 进入…