rabbit MQ的延迟队列处理模型示例(基于SpringBoot)

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL延迟队列配置文件类**/
@Configuration
public class TtlQueueConfig {////普通交换机的名称 Xpublic static final String X_EXCHANGE = "X";//死信交换机名称 Ypublic static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列QA QBpublic static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列名称QDpublic static final String DEAD_LETTER_QUEUE = "QD";////声明X_EXCHANGE@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换Y_DEAD_LETTER_EXCHANGE@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 QA@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明队列 QB@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列QD@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//捆绑//绑定队列QA与交换机X_EXCHANGE@Beanpublic Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列QB与交换机X_EXCHANGE@Beanpublic Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列QD与交换机Y_Exchange@Beanpublic Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;//发送延迟消息import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/senMsg/{message}")public void sendMes(@PathVariable String message){log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}
消费者与死信队列创建一个监听者示例:
package com.esint.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL消费者*/@Slf4j
@Component
public class DeadLetterQueueConsumer {//接受消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);}
}

rabbitmq的配置文件:

spring:rabbitmq:host: *.*.*.*port: 5672username: guestpassword: guest
接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/200487.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3.计算机网络

1.重点概念 MSL&#xff08;Maximum segment lifetime&#xff09;&#xff1a;TCP 报⽂最⼤⽣存时间。它是任何 TCP 报⽂在⽹络上存在的 最⻓时间&#xff0c;超过这个时间报⽂将被丢弃。实际应⽤中常⽤的设置是 30 秒&#xff0c;1 分钟和 2 分钟。 TTL&#xff08;Time to …

webpack 配置

1、基础配置 // node js核心模塊 const path require(path) // 插件是需要引入使用的 const ESLintPlugin require(eslint-webpack-plugin) // 自动生成index.html const HtmlWebpackPlugin require(html-webpack-plugin); // 将css文件单独打包&#xff0c;在index.html中…

2023年【四川省安全员A证】复审考试及四川省安全员A证考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 四川省安全员A证复审考试根据新四川省安全员A证考试大纲要求&#xff0c;安全生产模拟考试一点通将四川省安全员A证模拟考试试题进行汇编&#xff0c;组成一套四川省安全员A证全真模拟考试试题&#xff0c;学员可通过…

LVS+Keepalived 高可用群集

一、一.Keepalived工具介绍 专为LVS和HA设计的一款健康检查工具 • 支持故障自动切换&#xff08;Failover&#xff09; • 支持节点健康状态检查&#xff08;Health Checking&#xff09; • 官方网站&#xff1a;http://www.keepalived.org/ 二、Keepalived工作原理 • …

区块链技术与应用 【全国职业院校技能大赛国赛题目解析】第五套智能合约安全漏洞测试

第五套题的智能合约安全漏洞测试题目 环境 : ubuntu20 Truffle v5.8.3 (core: 5.8.3) Ganache v7.8.0 Solidity v0.8.3 Node v18.16.0 Web3.js v1.8.2 前言 请在测试的时候开启ganache打开,并且在truffle的配置文件配好ganache,之前两个帖子忘说了/(ㄒoㄒ)/~~ truffle-con…

SVN 修改版本库地址url路径

一、win11用户 1. win11系统右链菜单比较优秀&#xff0c;如果菜单中选择“TortoiseSVN”找不到“重新定位”&#xff0c;如下图所示&#xff0c;则需要添加右键菜单&#xff1a; 2.添加右键菜单&#xff1a;选择“TortoiseSVN”&#xff0c;点击设置&#xff0c;如下图所示&a…

云原生Docker系列 | Docker私有镜像仓库公有镜像仓库使用

云原生Docker系列 | Docker私有镜像仓库&公有镜像仓库使用 1. 使用公有云镜像仓库1.1. 阿里云镜像仓库1.2. 华为云镜像仓库1.3. 腾讯云镜像仓库2. 使用Docker Hub镜像仓库3. 使用Harbor构建私有镜像仓库4. 搭建本地Registry镜像仓库1. 使用公有云镜像仓库 1.1. 阿里云镜像…

SpringCloud原理-OpenFeign篇(三、FeignClient的动态代理原理)

文章目录 前言正文一、前戏&#xff0c;FeignClientFactoryBean入口方法的分析1.1 从BeanFactory入手1.2 AbstractBeanFactory#doGetBean(...)中对FactoryBean的处理1.3 结论 FactoryBean#getObject() 二、FeignClientFactoryBean实现的getObject()2.1 FeignClientFactoryBean#…

hadoop 配置历史服务器 开启历史服务器查看 hadoop (十)

1. 配置了三台服务器&#xff0c;hadoop22, hadoop23, hadoop24 2. hadoop文件路径: /opt/module/hadoop-3.3.4 3. hadoop22机器配置历史服务器的配置文件&#xff1a; 文件路径&#xff1a;/opt/module/hadoop-3.3.4/etc/hadoop 文件名称&#xff1a;mapred-size.xml 新增历…

用 HLS 实现 UART

用 HLS 实现 UART 介绍 UART 是一种旧的串行通信机制&#xff0c;但仍在很多平台中使用。它在 HDL 语言中的实现并不棘手&#xff0c;可以被视为本科生的作业。在这里&#xff0c;我将通过这个例子来展示在 HLS 中实现它是多么容易和有趣。 因此&#xff0c;从概念上讲&#xf…

【日常总结】Swagger-ui 导入 showdoc (优雅升级Swagger 2 升至 3.0)

一、场景 环境&#xff1a; 二、存在问题 三、解决方案 四、实战 - Swagger 2 升至 3.0 &#xff08;Open API 3.0&#xff09; Stage 1&#xff1a;引入Maven依赖 Stage 2&#xff1a;Swagger 配置类 Stage 3&#xff1a;访问 Swagger 3.0 Stage 4&#xff1a;获取 js…

Windows系统搭建VisualSVN服务并结合内网穿透实现公网访问

目录 前言 1. VisualSVN安装与配置 2. VisualSVN Server管理界面配置 3. 安装cpolar内网穿透 3.1 注册账号 3.2 下载cpolar客户端 3.3 登录cpolar web ui管理界面 3.4 创建公网地址 4. 固定公网地址访问 总结 前言 SVN 是 subversion 的缩写&#xff0c;是一个开放源…

转录组学习第四弹-数据质控

数据质控 将SRR转为fastq之后&#xff0c;我们需要对fastq进行质量检查&#xff0c;排除质量不好的数据 1.质量检查&#xff0c;生成报告文件 ls *fastq.gz|while read id;do fastqc $id;done并行处理 ls *fastq.gz|xargs fastqc -t 102.生成 html 报告文件和对应的 zip 压缩…

electron使用electron-builder macOS windows 打包 签名 更新 上架

0. 前言 0.1 项目工程 看清目录结构&#xff0c;以便您阅读后续内容 0.2 参考资料 &#xff08;1&#xff09;macOS开发 证书等配置/打包后导出及上架 https://www.jianshu.com/p/c9c71f2f6eac首先需要为Mac App创建App ID&#xff1a; 填写信息如下—Description为"P…

(02)vite环境变量配置

文章目录 将开发环境和生产环境区分开环境变量vite处理环境变量loadEnv 业务代码需要使用环境变量.env.env.development.env.test修改VITE_前缀 将开发环境和生产环境区分开 分别创建三个vite 的配置文件&#xff0c;并将它们引入vite.config.js vite.base.config.js import…

Kubernetes Gateway API 攻略:解锁集群流量服务新维度!

Kubernetes Gateway API 刚刚 GA&#xff0c;旨在改进将集群服务暴露给外部的过程。这其中包括一套更标准、更强大的 API资源&#xff0c;用于管理已暴露的服务。在这篇文章中&#xff0c;我将介绍 Gateway API 资源&#xff0c;并以 Istio 为例来展示这些资源是如何关联的。通…

C语言scanf_s函数的使用

因为scanf函数存在缓冲区溢出的可能性&#xff1b;提供了scanf_s函数&#xff1b;增加一个参数&#xff1b; scanf_s最后一个参数是缓冲区的大小&#xff0c;表示最多读取n-1个字符&#xff1b; 下图代码&#xff1b; 读取整型数可以不指定长度&#xff1b;读取char&#xf…

VMware安装kali(详细版)

如果不详细&#xff0c;你就留言骂我&#xff01; 文章目录 前言一、安装VMware二、安装KALI安装KALI配置网络总结 前言 今天更VMware安装kali 一、安装VMware VMware网址 安装之前&#xff0c;建议先退出360、电脑管家等杀毒软件&#xff0c;Win10操作系统好像还需要检查一…

HTML5生成二维码

H5生成二维码 前言二维码实现过程页面实现关键点全部源码 前言 本文主要讲解如何通过原生HTML、CSS、Js中的qrcodejs二维码生成库&#xff0c;实现一个输入URL按下回车后输出URL。文章底部有全部源码&#xff0c;需要可以自取。 实现效果图&#xff1a; 上述实现效果为&#…

TensorFlow实战教程(十八)-Keras搭建卷积神经网络及CNN原理详解

从本专栏开始,作者正式研究Python深度学习、神经网络及人工智能相关知识。前一篇文章详细讲解了Keras实现分类学习,以MNIST数字图片为例进行讲解。本篇文章详细讲解了卷积神经网络CNN原理,并通过Keras编写CNN实现了MNIST分类学习案例。基础性文章,希望对您有所帮助! 一…