rabbit MQ的延迟队列处理模型示例(基于SpringBoot死信模式)

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL延迟队列配置文件类**/
@Configuration
public class TtlQueueConfig {////普通交换机的名称 Xpublic static final String X_EXCHANGE = "X";//死信交换机名称 Ypublic static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列QA QBpublic static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列名称QDpublic static final String DEAD_LETTER_QUEUE = "QD";////声明X_EXCHANGE@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换Y_DEAD_LETTER_EXCHANGE@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 QA@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明队列 QB@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列QD@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//捆绑//绑定队列QA与交换机X_EXCHANGE@Beanpublic Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列QB与交换机X_EXCHANGE@Beanpublic Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列QD与交换机Y_Exchange@Beanpublic Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;//发送延迟消息import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/senMsg/{message}")public void sendMes(@PathVariable String message){log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}
消费者与死信队列创建一个监听者示例:
package com.esint.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL消费者*/@Slf4j
@Component
public class DeadLetterQueueConsumer {//接受消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);}
}

rabbitmq的配置文件:

spring:rabbitmq:host: *.*.*.*port: 5672username: guestpassword: guest
接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述


拓展:是不是有一种可能,如果再队列中不设置过期时间,在生产者发送消息时设置过期时间 来实现过期时间自由设定,而延迟自由?

结论是不能:
rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。

示例验证:

增加一个无过期时间约束的队列,以routing-key为XC绑定X交换机,过期后以routing-key为YD绑定Y交换机。
过期时间放生产者发送时设定。

在的rabbitMQ配置类中增加QC 绑定前(X routing-key=XC)后(Y routing-key=YD)交换机:

    // 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时public static final  String QUEUE_C ="QC";//声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时@Bean("queueC")public Queue queueC(){Map<String,Object> arguments = new HashMap<>(2);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置routing-keyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}
//绑定队列QC与交换机X_EXCHANGE   优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时@Beanpublic Binding queueCBindXExchange(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}

生产者:

@GetMapping("/sendttl/{message}/{ttlTime}")public void sendMes(@PathVariable String message,@PathVariable String ttlTime){
/*** 死信队列做延迟时的缺陷:* rabbitMQ只会检查第一个消息是否过期带来的问题就是,如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。*/log.info("当前时间:{},发送一条ttl为{}ms的消息给QC队列:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,mes->{mes.getMessageProperties().setExpiration(ttlTime);return mes;});}

消费者不变,启动服务!

生产者发送消息:第一条 3000ms
http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000
http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000

在这里插入图片描述
结论:第二条虽然早早过期,它依然需要等待第一条过期后,才能排到他。rabbitMQ的队列过期检查机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/200986.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开发上门按摩系统对技师如何管理,薪资结构怎么设计

开发完上门按摩系统平台上线之后&#xff0c;对技师的管理和薪资结构是非常重要的环节&#xff0c;关乎着平台的服务能力和服务质量&#xff0c;那么应该如何去管理和设计薪资结构呢 首先说技师管理&#xff1a; 一、培训和认证&#xff1a;平台应对技师进行全面的培训&#xf…

电脑磁盘怎么设置密码?磁盘加密软件哪个好?

电脑磁盘经常被用于存放各种重要数据&#xff0c;而为了避免数据泄露&#xff0c;我们需要为磁盘设置密码&#xff0c;以防止未授权人员使用磁盘。那么&#xff0c;电脑磁盘怎么设置密码呢&#xff1f;下面我们就一起来了解一下。 如何设置磁盘密码&#xff1f; 想要为磁盘设置…

释放搜索潜力:基于Docker快速搭建ES语义检索系统(快速版),让信息尽在掌握

搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术细节以及项目实战(含码源) 专栏详细介绍:搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术…

某高品质房产企业:借助NineData平台,统一数据库访问权限,保障业务安全

该企业是中国领先的优质房产品开发及生活综合服务供应商。在 2022 年取得了亮眼的业绩表现&#xff0c;销售额市场占有率跻身全国前五。业务涵盖房产开发、房产代建、城市更新、科技装修等多个领域。 2023 年&#xff0c;该企业和玖章算术&#xff08;浙江&#xff09;科技有限…

动态规划求 x 轴上相距最远的两个相邻点 java 代码实现

如图为某一状态下 x 轴上的情况&#xff0c;此时 E、F相距最远&#xff0c;现在加入一个点H&#xff0c;如果H位于点A的左边的话&#xff0c;只需要比较 A、H 的距离 和 E、F 的距离&#xff1b;如果点H位于点G的右边&#xff0c;则值需要比较 G、H 的距离 和 E、F 的距离&…

OceanBase:OBServer节点管理

目录 1.查看节点 2.添加节点 2.1 创建数据目录 2.2.OceanBase 运行时所依赖的部分三方动态库 2.3.安装 OceanBase 数据库的 RPM 包 2.4.启动节点 observer 进程 2.5.向集群中添加节点 3.隔离节点 4.重启节点 4.1 停止服务 4.2 转储 4.3 关闭进程 4.4 启动进程 4.…

《多GPU大模型训练与微调手册》

多GPU微调预备知识 1. 参数数据类型 torch.dtype 1.1 半精度 half-precision torch.float16&#xff1a;fp16 就是 float16&#xff0c;1个 sign&#xff08;符号位&#xff09;&#xff0c;5个 exponent bits(指数位)&#xff0c;10个 mantissa bits(小数位) torch.bfloat16…

PHP手动为第三方类添加composer自动加载

有时候我们要使用的第三方的类库&#xff08;SDK&#xff09;没用composer封装好&#xff0c;无法用composer进行安装&#xff0c;怎么办呢&#xff1f;&#xff1f;&#xff1f; 步骤如下&#xff1a; 第一步、下载需要的SDK文件包&#xff0c;把它放在vendor目录下 第二步、…

ubuntu22.04安装网易云音乐

附件&#xff1a; https://download.csdn.net/download/weixin_44503976/88557248 wget https://d1.music.126.net/dmusic/netease-cloud-music_1.2.1_amd64_ubuntu_20190428.deb wget -O patch.c https://aur.archlinux.org/cgit/aur.git/plain/patch.c?hnetease-cloud-m…

windows11记事本应用程序无法打开,未响应,崩溃,卡死

windows11记事本应用程序无法打开&#xff0c;未响应&#xff0c;崩溃&#xff0c;卡死 文章目录 问题描述搜索引擎&#xff08;度娘&#xff09;卸载后如何安装问题未解决另一个解决方案&#xff1a;步骤&#xff1a;1.设置 → 语音和区域 → 输入2.选择“高级键盘设置”3.替…

基于springboot实现班级综合测评管理系统项目【项目源码+论文说明】

基于springboot实现班级综合测评管理系统演示 摘要 随着互联网技术的高速发展&#xff0c;人们生活的各方面都受到互联网技术的影响。现在人们可以通过互联网技术就能实现不出家门就可以通过网络进行系统管理&#xff0c;交易等&#xff0c;而且过程简单、快捷。同样的&#x…

Redis的性能管理

一、Redis性能管理 1.1 查看redis的内存使用情况 redis-cli info memory 或 redis-cli 127.0.0.1:6379> info memoryused_memory&#xff1a;redis中的数据占用的内存。 used_memory_rss&#xff1a;是redis向操作系统申请的内存。 used_memory_peak&#xff1a;redis内存…

数据结构-插入排序+希尔排序+选择排序

目录 1.插入排序 插入排序的时间复杂度&#xff1a; 2.希尔排序 希尔排序的时间复杂度&#xff1a; 3.选择排序 选择排序的时间复杂度&#xff1a; 所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的…

多目标应用:基于非支配排序的鲸鱼优化算法NSWOA求解微电网多目标优化调度(MATLAB代码)

一、微网系统运行优化模型 微电网优化模型介绍&#xff1a; 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 二、基于非支配排序的鲸鱼优化算法NSWOA 基于非支配排序的鲸鱼优化算法NSWOA简介&#xff1a; 三、基于非支配排序的鲸鱼优化算法NSWOA求解微电网多目标优化…

前端js调取摄像头并实现拍照功能

前言 最近接到的一个需求十分有意思&#xff0c;设计整体实现了前端仿 微信扫一扫 的功能。整理了一下思路&#xff0c;做一个分享。 tips: 如果想要实现完整扫一扫的功能&#xff0c;你需要掌握一些前置知识&#xff0c;这次我们先讲如何实现拍照并且保存的功能。 一. windo…

腾讯云轻量数据库试用初体验

腾讯云轻量数据库1核1G开箱测评&#xff0c;轻量数据库服务采用腾讯云自研的新一代云原生数据库TDSQL-C&#xff0c;轻量数据库兼100%兼容MySQL数据库&#xff0c;实现超百万级 QPS 的高吞吐&#xff0c;128TB海量分布式智能存储&#xff0c;虽然轻量数据库为单节点架构&#x…

vue3+vite+SQL.js 读取db3文件数据

前言&#xff1a;好久没写博客了&#xff0c;最近一直在忙&#xff0c;没时间梳理。最近遇到一个需求是读取本地SQLite文件&#xff0c;还是花费了点时间才实现&#xff0c;没怎么看到vite方面写这个的文章&#xff0c;现在分享出来完整流程。 Tips:解决了打包后wasm文件404问…

Linux操作系统使用及C高级编程-D9D10Linux 服务搭建与使用

TFTP服务器 TFTP&#xff08;Trivial File Transfer Protocol&#xff09;即简单文件传输协议&#xff0c;是TCP/IP协议中一个用来在客户机与服务器之间进行简单文件传输的协议&#xff0c;提供不复杂、开销不大的文件传输服务。端口号为69 1、使用客户服务器方式和使用UDP数据…

jenkins springCloud项目优雅下线

文章目录 场景解决下线请求效果如图贴一个可用的部署脚本 场景 在 Spring Cloud 项目的微服务实例关闭时&#xff0c;需要首先从注册中心设置为下线&#xff0c;避免该服务的消费者继续请求该服务实例&#xff0c;导致请求失败如果我们在服务实例从注册中心取消注册后&#xff…

银河麒麟安装Docker

# 配置阿里云 Centos8 镜像源&#xff0c;需要额外的一些依赖&#xff0c;而这些依赖在麒麟官方的源里面是没有的 sudo curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-8.repo# 配置阿里云 docker 镜像源 sudo yum-config-manager --add-r…