延迟队列的理解与使用

目录

一、场景引入

二、延迟队列的三种场景

1、TTL对队列进行延迟

2、创建通用延时消息对消息延迟

3、使用rabbitmq的延时队列插件

x-delayed-message使用

父pom文件

pom文件

配置文件

config

生产者

消费者

结果


一、场景引入

我们知道可以通过TTL来对队列进行设置过期时间;通过后置处理器MessagePostProcessor对消息进行设置过期时间;

 那么根据TTL及MessagePostProcessor机制可以处理关于延迟方面的问题。

比如:秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。

比如:餐厅订座,A用户早上8点预定的某家餐厅下午6点的座位,B用户中午12点预定的下午5点的座位;根据场景我们需要的时先让B用户进行消费,然后A用户再消费;这时TTL和MessagePostProcessor延迟就已经不能满足订餐的场景了;

因为TTL是对队列进行延迟,MessagePostProcessor是对消息进行延迟,但是MessagePostProcessor对消息延迟是不能根据订座的时间去排序消费的;

/*** 比如当我们发送第一个消息时延迟的时间时50s,而发送的第二个消息延迟时间是30s,虽然延迟30s的消息比延迟50s发送的晚* 但按照我们设想的情况,延迟30s的消息应该先消费;可是实际情况却不是这样,而是延迟50s的消息到达时间后 30s的才能消费!(队列先进先出)* 那这样此方式的不足就出现了!* 场景:*      A用户和B用户预定餐厅,A用户先开始预定的,预定的是下午6点。B用户比A用户预定操作晚一些,但是B用户预定的时间是下午5点。通过此场景*      我们希望的是B用户先进行用餐(因为他预定的吃饭时间比A早一些,需要先安排吃饭。不能说A用户没到6点B用户预定5点的吃不了。),根据此*      场景 之前的队列延迟还是消息延迟都不能满足场景需求了,这样就需要另一种延迟方式进行解决了! ->使用rabbitmq的延时队列插件*//*** 注意:*      TTL是对队列进行延迟,只要是在此队列中的消息都会按照TTL设置时间进行延迟;*      MessagePostProcessor是对消息进行延迟;**      如果我们不仅使用了消息延迟,而且还使用了队列延迟,那么延迟的时间就会以小的时间为准!*   理解:*      如果a消息设置的消息延迟是30s,b消息设置的延迟时间是90s,队列设置的延迟是60s。那么a消息最终的延迟是30s(a的消息延迟与队列延迟*      比对以延迟时间小的为准!),b消息最终延迟的时间是60s(b的消息延迟与队列延迟比对以延迟的时间小的为准!)*/

 二、延迟队列的三种场景

1、TTL对队列进行延迟

   @Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("业务队列名称").deadLetterExchange("死信交换机名称").deadLetterRoutingKey("死信队列 RoutingKey").ttl(20000) // 消息停留时间//.maxLength(500).build();}

监听死信队列,即可处理超时的消息队列

缺点:

上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。

 2、创建通用延时消息对消息延迟

rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
message => {message.getMessageProperties().setExpiration(String.valueOf(5000))// 设置消息的持久化属性//这样发送的消息就会被持久化到 RabbitMQ 中,即使 RabbitMQ 重启,消息也不会丢失。                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});

缺点:

该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

 3、使用rabbitmq的延时队列插件

使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

1、下载延迟插件

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、安装插件并启用

下载完成后直接把插件放在 /home/rabbitmq 目录,然后拷贝到容器内plugins目录下(my-rabbit是容器的name,也可以使用容器id)

docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

 进入 Docker 容器

docker exec -it rabbit /bin/bash

 在plugins内启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 退出容器

exit

重启 RabbitMQ

docker restart my-rabbit

 貌似不重启也能生效!

 结果:

 就多了一个x-delayed-message交换机

x-delayed-message使用

父pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version>
<!--        <version>2.2.5.RELEASE</version>--><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-rabbitmq</name><properties><java.version>8</java.version><hutool.version>5.8.3</hutool.version><lombok.version>1.18.24</lombok.version></properties><description>spring-boot-rabbitmq</description><packaging>pom</packaging><modules><module>direct-exchange</module><module>fanout-exchange</module><module>topic-exchange</module><module>game-exchange</module><module>dead-letter-queue</module><module>delay-queue</module><module>delay-queue2</module></modules><dependencyManagement><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies></dependencyManagement></project>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.chensir</groupId><artifactId>spring-boot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><relativePath>../pom.xml </relativePath></parent><artifactId>delay-queue2</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

logging.level.com.chensir = debug
server.port=8086
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

config

package com.chensir.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}@Beanpublic CustomExchange customExchange(){Map<String,Object> args = new HashMap<>();//延迟时以direct直连方式绑定args.put("x-delayed-type","direct");// name:交换机名称 type:类型 固定值x-delayed-messagereturn new CustomExchange("chen-x-delayedExchange","x-delayed-message",true,false,args);}@Beanpublic Queue directQueueLong(){return   QueueBuilder.durable("chen-DirectQueue").build();}@Beanpublic Binding binding(){return  BindingBuilder.bind(directQueueLong()).to(customExchange()).with("direct123").noargs();}
}

生产者

package com.chensir.provider;import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class DirectProvider {@Resourceprivate RabbitTemplate rabbitTemplate;public  void  send(){OrderIngOk orderIngOk = new OrderIngOk();orderIngOk.setOrderNo("202207149687-1");orderIngOk.setId(1);orderIngOk.setUserName("倪海杉前-延迟40秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk,m->{m.getMessageProperties().setDelay(40*1000); //设置延迟时间,对延迟交换机有效// m.getMessageProperties().setExpiration(String.valueOf(30*1000)); 设置过期时间,对队列有效return m;});OrderIngOk orderIngOk2 = new OrderIngOk();orderIngOk2.setOrderNo("202207149687-2");orderIngOk2.setId(2);orderIngOk2.setUserName("倪海杉后-延迟30秒");rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk2,m->{m.getMessageProperties().setDelay(30*1000);return m;});log.debug("消息生产完成");}}

消费者

package com.chensir.consumer;import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class DirectConsumer {@RabbitHandler@RabbitListener(queues = "chen-DirectQueue" )public void  process(OrderIngOk orderIngOk) throws IOException {try {// 处理业务开始log.debug("接受到消息,并正常处理结束,{}", JSONUtil.toJsonStr(orderIngOk));// 处理业务结束} catch (Exception ex){throw ex;}}
}

结果

2023-08-29 18:27:45.983 DEBUG 15568 --- [           main] com.chensir.provider.DirectProvider      : 消息生产完成
2023-08-29 18:28:16.143 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":2,"OrderNo":"202207149687-2","userName":"倪海杉后-延迟30秒"}
2023-08-29 18:28:26.057 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer      : 接受到消息,并正常处理结束,{"id":1,"OrderNo":"202207149687-1","userName":"倪海杉前-延迟40秒"}

可见 延迟40s的是先发送的,但是最终结果是先消费延迟30s的。这样就能达到我们订座的场景需求了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/111432.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Matlab(结构化程式和自定义函数)

目录 1.脚本编辑器 2.脚本流 2.1 控制流 2.2 关系&#xff08;逻辑&#xff09;操作符 3.脚本与函数 1.脚本编辑器 Matlab的命名规则&#xff1a; 常用功能&#xff1a; 智能缩进&#xff1a; 在写代码的时候&#xff0c;有的时候代码看起来并不是那么美观&#xff08;可读性…

栈和队列(详解)

一、栈 1.1、栈的基本概念 1.1.1、栈的定义 栈&#xff08;Stack&#xff09;&#xff1a;是只允许在一端进行插入或删除的线性表。首先栈是一种线性表&#xff0c;但限定这种线性表只能在某一端进行插入和删除操作。 栈顶&#xff08;Top&#xff09;&#xff1a;线性表允许…

iPhone 15 Pro与谷歌Pixel 7 Pro:哪款相机手机更好?

考虑到苹果最近将更多高级功能转移到iPhone Pro设备上的趋势,今年秋天iPhone 15 Pro与谷歌Pixel 7 Pro的对决将是一场特别有趣的对决。去年发布的iPhone 14 Pro确实发生了这种情况,有传言称iPhone 15 Pro再次受到了苹果的大部分关注。 预计iPhone 15系列会有一些变化,例如切…

企业网络安全:威胁情报解决方案

什么是威胁情报 威胁情报是网络安全的关键组成部分&#xff0c;可为潜在的恶意来源提供有价值的见解&#xff0c;这些知识可帮助组织主动识别和防止网络攻击&#xff0c;通过利用 STIX/TAXII 等威胁源&#xff0c;组织可以检测其网络中的潜在攻击&#xff0c;从而促进快速检测…

Flutter Web 项目网络请求报 XMLHttpRequest error 解决方案

使用http库进行简单的网络请求时&#xff0c;运行在Chrome浏览器上&#xff0c;网络请求一直报错 XMLHttpRequest error&#xff0c;而在iOS 模拟器上运行则正常&#xff0c;后面在postman上发送请求&#xff0c;也是正常的。这就是很尴尬了&#xff01;&#xff01;&#xff0…

公有云与私有云,IaaS、PaaS 和 SaaS云服务模型概述

云计算主要分为 4 种类型&#xff1a;私有云、公共云、混合云和多云。同时&#xff0c;云计算服务主要有 3 种&#xff1a;基础架构即服务&#xff08;IaaS&#xff09;、平台即服务&#xff08;PaaS&#xff09;和软件即服务&#xff08;SaaS&#xff09; Saas&#xff08;Sof…

nginx-concat

为了减少tcp请求数量&#xff0c;nginx从上有服务器获取多个静态资源&#xff08;css&#xff0c;js&#xff09;的时候&#xff0c;将多个静态资源合并成一个返回给客户端。 这种前面有两个问号的请求都是用了cancat合并功能。 先到官网下载安装包&#xff0c;拷贝到服务器编译…

UDP 多播(组播)

前言&#xff08;了解分类的IP地址&#xff09; 1.组播&#xff08;多播&#xff09; 单播地址标识单个IP接口&#xff0c;广播地址标识某个子网的所有IP接口&#xff0c;多播地址标识一组IP接口。单播和广播是寻址方案的两个极端&#xff08;要么单个要么全部&#xff09;&am…

微信小程序 实时日志

目录 实时日志 背景 如何使用 如何查看日志 注意事项 实时日志 背景 为帮助小程序开发者快捷地排查小程序漏洞、定位问题&#xff0c;我们推出了实时日志功能。从基础库2.7.1开始&#xff0c;开发者可通过提供的接口打印日志&#xff0c;日志汇聚并实时上报到小程序后台…

【base64】JavaScriptuniapp 将图片转为base64并展示

Base64是一种用于编码二进制数据的方法&#xff0c;它将二进制数据转换为文本字符串。它的主要目的是在网络传输或存储过程中&#xff0c;通过将二进制数据转换为可打印字符的形式进行传输 JavaScript 压缩图片 <html><body><script src"https://code.j…

数学建模:主成分分析法

&#x1f506; 文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 主成分分析法 算法流程 构建原始数据矩阵 X X X &#xff0c;其中矩阵的形状为 x ∗ n x * n x∗n &#xff0c;有 m m m 个对象&#xff0c; n n n 个评价指标。然后进行矩阵的归一化处理。首先计算矩…

Android Looper Handler 机制浅析

最近想写个播放器demo&#xff0c;里面要用到 Looper Handler&#xff0c;看了很多资料都没能理解透彻&#xff0c;于是决定自己看看相关的源码&#xff0c;并在此记录心得体会&#xff0c;希望能够帮助到有需要的人。 本文会以 猜想 log验证 的方式来学习 Android Looper Ha…

第62步 深度学习图像识别:多分类建模(Pytorch)

基于WIN10的64位系统演示 一、写在前面 上期我们基于TensorFlow环境做了图像识别的多分类任务建模。 本期以健康组、肺结核组、COVID-19组、细菌性&#xff08;病毒性&#xff09;肺炎组为数据集&#xff0c;基于Pytorch环境&#xff0c;构建SqueezeNet多分类模型&#xff0…

Android Activity启动过程一:从Intent到Activity创建

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、概览二、应用内启动源码流程 (startActivity)2.1 startActivit…

ADRV9009子卡 设计原理图:FMCJ450-基于ADRV9009的双收双发射频FMC子卡 便携测试设备

FMCJ450-基于ADRV9009的双收双发射频FMC子卡 一、板卡概述 ADRV9009是一款高集成度射频(RF)、捷变收发器&#xff0c;提供双通道发射器和接收器、集成式频率合成器以及数字信号处理功能。北京太速科技&#xff0c;这款IC具备多样化的高性能和低功耗组合&#xff0c;FMC子…

基于亚马逊云科技无服务器服务快速搭建电商平台——部署篇

受疫情影响消费者习惯发生改变&#xff0c;刺激了全球电商行业的快速发展。除了依托第三方电商平台将产品销售给消费者之外&#xff0c;企业通过品牌官网或者自有电商平台销售商品也是近几年电商领域快速发展的商业模式。独立站电商模式可以进行多方面、全渠道的互联网市场拓展…

Git分布式版本控制系统与github

第四阶段提升 时 间&#xff1a;2023年8月29日 参加人&#xff1a;全班人员 内 容&#xff1a; Git分布式版本控制系统与github 目录 一、案例概述 二、版本控制系统 &#xff08;一&#xff09; 本地版本控制 &#xff08;二&#xff09;集中化的版本控制系统 &…

DP读书:鲲鹏处理器 架构与编程(十三)操作系统内核与云基础软件

操作系统内核与云基础软件 鲲鹏软件构成硬件特定软件 鲲鹏软件构成硬件特定软件1. Boot Loader2. SBSA 与 SBBR3. UEFI4. ACPI 操作系统内核Linux系统调用Linux进程调度Linux内存管理Linux虚拟文件系统Linux网络子系统Linux进程间通信Linux可加载内核模块Linux设备驱动程序Linu…

Vue 项目性能优化 — 实践指南

前言 Vue 框架通过数据双向绑定和虚拟 DOM 技术&#xff0c;帮我们处理了前端开发中最脏最累的 DOM 操作部分&#xff0c; 我们不再需要去考虑如何操作 DOM 以及如何最高效地操作 DOM&#xff1b;但 Vue 项目中仍然存在项目首屏优化、Webpack 编译配置优化等问题&#xff0c;所…

自然语言处理(四):全局向量的词嵌入(GloVe)

全局向量的词嵌入&#xff08;GloVe&#xff09; 全局向量的词嵌入&#xff08;Global Vectors for Word Representation&#xff09;&#xff0c;通常简称为GloVe&#xff0c;是一种用于将词语映射到连续向量空间的词嵌入方法。它旨在捕捉词语之间的语义关系和语法关系&#…