Rabbitmq入门与应用(五)-延迟队列的设计与实现

延迟队列设计

在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式

  1. 使用创建一个延迟队列阻塞消息
  2. 使用延迟队列插件

Dead Letter Exchanges — RabbitMQ

image-20231119180143512

image-20230619235935374

配置

  1. To set the DLX for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
  2. You may also specify a routing key to use when the messages are being dead-lettered. If the routing key is not set, the message’s own routing keys are used. args.put("x-dead-letter-routing-key", “some-routing-key”);
package com.wnhz.mq.common.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DlxConfig {@Beanpublic Queue dlxQueue(){return new Queue("dlx_queue_test");}@Beanpublic DirectExchange dlxExchange(){return new DirectExchange("dlx_exchange_test");}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key");}@Beanpublic Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange", "dlx_exchange_test");map.put("x-dead-letter-routing-key","dlx_routing_key");return new Queue("normal_queue_test",true,false,false,map);}@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal_exchange_test");}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_test");}}
server:port: 10005spring:application:name: book-consumerautoconfigure:exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfigurationrabbitmq:host: 192.168.198.130port: 5672username: adminpassword: 123publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:prefetch: 1acknowledge-mode: auto
logging:level:com.wnhz.mq.consumer: debug

生产者发送信息

image-20230723161609896

    @Overridepublic void delaySendMessage() {String uuid = UUID.randomUUID().toString();CorrelationData data = new CorrelationData(uuid);String msg = "hello delay";int delayTime =5000;rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,p -> {p.getMessageProperties().setExpiration(String.valueOf(delayTime ));return p;});log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);}
}

消费者消费

   @RabbitListener(queues = "dlx_queue_test")public void delayConsume(Message message){log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());}

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

image-20230619214424612

image-20230619214539126

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@6d2342d51b11|/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11

下载支持3.9.x的插件

image-20230619215207816

退出容器:

root@6d2342d51b11:/plugins# exit
exit

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins
image-20230619215427865

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

打开管理页面

进入Exchange页面,下拉Type看是否已经安装成功。

image-20230619220041631

代码实现

配置类
package com.wnhz.rabbitmq.mq.config;public interface RabbitmqConstants {String DELAYX_QUEUE = "mq_delayx__queue";String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";String DELAYX_EXCHANGE = "mq_delayx__exchange";String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;import java.util.HashMap;@Configuration
@Slf4j
public class RabbitmqConfig {@Beanpublic Queue delayxQueue() {return new Queue(RabbitmqConstants.DELAYX_QUEUE);}@Beanpublic CustomExchange delayRoutingExchange() {return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_EXCHANGE_TYPE,true,false,new HashMap<String, Object>() {{put("x-delayed-type","direct");}});}@Beanpublic Binding delayxBinding() {return BindingBuilder.bind(delayxQueue()).to(delayRoutingExchange()).with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());log.debug("rabbitmq配置:{}完成", rabbitTemplate);return rabbitTemplate;}
}
生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendDelayxUser(User user) {int delayTime = 10000;rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_ROUTING_KEY,user, mpp -> {mpp.getMessageProperties().setDelay(delayTime);return mpp;});log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);}
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {@RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)@Overridepublic void receiveDelayxUser(User user) {log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/259669.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Aster实现一台电脑当两台使——副屏使用独立win账号

前言&#xff1a;笔者每年回家&#xff0c;都面临着想要和小伙伴一起玩游戏&#xff0c;但小伙伴没有电脑/只有低配电脑的问题。与此同时&#xff0c;笔者自身的电脑是高配置的电脑&#xff0c;因此笔者想到&#xff0c;能否在自己的电脑上运行游戏&#xff0c;在小伙伴的电脑上…

如何图片无损放大?几个无损放大图片分享

在数字化时代&#xff0c;图片已经成为我们生活中不可或缺的一部分。从社交媒体上的分享&#xff0c;到专业摄影作品的展示&#xff0c;再到网页设计和平面广告的制作&#xff0c;图片的质量往往直接影响到我们的视觉体验和信息传达的效果。然而&#xff0c;有时候&#xff0c;…

2024-02-11 Unity 编辑器开发之编辑器拓展2 —— 自定义窗口

文章目录 1 创建窗口类2 显示窗口3 窗口事件回调函数4 窗口中常用的生命周期函数5 编辑器窗口类中的常用成员6 小结 1 创建窗口类 ​ 当想为 Unity 拓展一个自定义窗口时&#xff0c;只需实现继承 EditorWindow 的类即可&#xff0c;并在该类的 OnGUI 函数中编写面板控件相关的…

开发知识点-JAVA-Jeecgboot

Jeecgboot 介绍fofa语法:漏洞列表jeecg-boot-getDictItemsByTable-sqlinuclei yamljeecg-boot-queryTableData-sqlijeecg-boot-sqlijeecg-queryFieldBySql-rcejeecg-register-login-bypass修复建议介绍 「企业级低代码平台」 前后端分离架构SpringBoot 2.x3.x,SpringCloud,…

【漏洞复现】蓝网科技临床浏览系统信息泄露漏洞

Nx01 产品简介 蓝网科技临床浏览系统是一个专门用于医疗行业的软件系统&#xff0c;主要用于医生、护士和其他医疗专业人员在临床工作中进行信息浏览、查询和管理。 Nx02 漏洞描述 蓝网科技临床浏览系统存在信息泄露漏洞&#xff0c;攻击者可以利用该漏洞获取敏感信息。 Nx03…

《C++ Primer Plus》《4、复合类型》

文章目录 前言&#xff1a;1 数组1.1数组的初始化规则1.2 C11的数组初始化方法 2 字符串2.1 拼接字符串常量2.2在数组中使用字符串2.3 字符串输入2.4 每次读取一行字符串输入2.5 混合输入字符串和数字 3 string类简介3.1 C11字符串初始化3.2 赋值、拼接、附加3.3 string类的其他…

CPU是如何工作的?什么是冯·诺依曼架构和哈弗架构?

《嵌入式工程师自我修养/C语言》系列——CPU是如何工作的&#xff1f;什么是冯诺依曼架构和哈弗架构&#xff1f; 一、CPU内部结构及工作原理1.1 CPU的结构1.2 CPU工作流程举例 二、计算机体系结构2.1 冯诺依曼架构2.2 哈弗架构 三、总结 快速学习嵌入式开发其他基础知识&#…

SpringBoot3 + Vue3 由浅入深的交互 基础交互教学

说明&#xff1a;这篇文章是适用于已经学过SpringBoot3和Vue3理论知识&#xff0c;但不会具体如何实操的过程的朋友&#xff0c;那么我将手把手从教大家从后端与前端交互的过程教学。 目录 一、创建一个SpringBoot3项目的和Vue3项目并进行配置 1.1后端配置: 1.1.1applicatio…

notepad++打开文本文件乱码的解决办法

目录 第一步 在编码菜单栏下选择GB2312中文。如果已经选了忽略这一步 第二步 点击编码&#xff0c;红框圈出来的一个个试。我切换到UTF-8编码就正常了。 乱码如图。下面分享我的解决办法 第一步 在编码菜单栏下选择GB2312中文。如果已经选了忽略这一步 第二步 点击编码&#…

基于ORB-SLAM2与YOLOv8剔除动态特征点

基于ORB-SLAM2与YOLOv8剔除动态特征点 以下方法以https://cvg.cit.tum.de/data/datasets/rgbd-dataset/download#freiburg3_walking_xyz数据集进行实验测试APE 首先在不剔除动态特征点的情况下进行测试&#xff1a; 方法1:segment坐标点集合逐一排查剔除 利用YOLOv8的segm…

自定义Linux登录自动提示语

设置提示语的方式 在Linux系统中&#xff0c;可以通过修改几个特定的文件来实现在用户登录时自动弹出提示语。以下是几个常用的方法&#xff1a; 1. 修改/etc/issue文件&#xff1a; 这个文件用于显示本地登录前的提示信息 sudo vi /etc/issue在项目合作的时候&#xff0c;…

VMware虚拟机安装CentOS7

对于系统开发来说&#xff0c;开发者时常会需要涉及到不同的操作系统&#xff0c;比如Windows系统、Mac系统、Linux系统、Chrome OS系统、UNIX操作系统等。由于在同一台计算机上安装多个系统会占据我们大量的存储空间&#xff0c;所以虚拟机概念应运而生。本篇将介绍如何下载安…

鉴源论坛 · 观模丨形式化工程方法之需求建模(上)

作者 | 杨坤 上海控安可信软件创新研究院系统建模组 版块 | 鉴源论坛 观模 社群 | 添加微信号“TICPShanghai”加入“上海控安51fusa安全社区” 引言&#xff1a;需求建模是整个软件开发、测试验证与维护的基础。经过长期研究与实践&#xff0c;工业界与学术界均意识到&…

Linux之Shell

第 1 章 Shell 概述 1&#xff09;Linux 提供的 Shell 解析器有 [zhaohadoop101 ~]$ cat /etc/shells /bin/sh /bin/bash /usr/bin/sh /usr/bin/bash /bin/tcsh /bin/csh2&#xff09;bash 和 sh 的关系 [zhaohadoop101 bin]$ ll | grep bash -rwxr-xr-x. 1 root root 941880…

JVM--- 垃圾收集器详细整理

目录 一、垃圾收集需要考虑的三个事情&#xff1a; 二、垃圾回收针对的区域 三、如何判断对象已死 1.引用计数算法&#xff1a; 2.可达性分析算法 四、引用 五、生存还是死亡&#xff1f; 六、回收方法区 七、垃圾收集算法 1.分代收集理论 2.标记-清除算法 3.标记-复制算…

【qt创建线程两种方式】

QT使用线程的两种方式 1.案例进度条 案例解析&#xff1a; 如图由组件一个进度条和三个按钮组成&#xff0c;当点击开始的时候进度条由0%到100%&#xff0c;点击暂停&#xff0c;进度条保持之前进度&#xff0c;再次点击暂停变为继续&#xff0c;点击停止按钮进度条停止。 案…

案例:CentOS8 在 MySQL8.0 实现半同步复制

异步复制 MySQL 默认的复制即是异步的&#xff0c;主库在执行完客户端提交的事务后会立即将结果返给给客户端&#xff0c;并不关心从库是否已经接收并处理&#xff0c;这样就会有一个问题&#xff0c;主节点如果 crash 掉了&#xff0c;此时主节点上已经提交的事务可能并没有传…

.NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库

一、效果 记录日志为文档 记录日志到数据库 二、添加NuGet包 三、log4net.config代码配置 <?xml version"1.0" encoding"utf-8" ?> <log4net><!-- Debug日志 --><appender name"RollingFileDebug" type"log4net…

Java学习24--异常

异常 软件运行过程中的各种意料之外叫做Exception&#xff0c;比如要读取的文件找不到&#xff0c;准备联网发现没网&#xff0c;等着int参数来了个String 注意Error和exception不一样&#xff0c;error错的比较猛&#xff0c;一般是直接把JAVA整个搞崩了&#xff0c;比如内存…

数据结构通讲

目录 集合源码详解 一、常见数据结构讲解 1. 线性数据结构 1.1 数组 1.2 队列 1.3 链表 1.3.1 单向链表 1.3.2 双向链表 1.4 栈 2. 非线性数据结构 2.1 树 2.2 二叉树 2.2.1 概念介绍 2.2.2 遍历操作 2.2.3 删除节点 2.2.4 查找局限性 2.2.5 AVL&#xff08; …