MQ高级(二):死信交换机--延迟消息及DelayExchange插件--超时订单案例实现

目录

1.延迟消息

1.1.死信交换机和延迟消息

1.1.1.死信交换机

1.1.2.延迟消息

1.2.DelayExchange插件

1.2.1.下载

1.2.2.安装

1.2.3.声明延迟交换机

1.2.4.发送延迟消息

1.3.超时订单问题

1.3.1.定义常量

1.3.2.配置MQ

1.3.3.改造下单业务,发送延迟消息

1.3.4.编写查询支付状态接口

1.3.5.监听消息,查询支付状态


1.延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

RabbitMQ中实现延迟消息也有两种方案:
- 死信交换机+TTL
- 延迟消息插件

1.1.死信交换机和延迟消息

首先我们来学习一下基于死信交换机的延迟消息方案。

1.1.1.死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

1.1.2.延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景:

如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:

假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒:

注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。

消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:
 
死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue

由于direct.queue1hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:

也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息

注意:

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

1.2.DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
这个插件可以将普通交换机改造为支持延迟消息的交换机,当消息投递到交换机后可以暂存一段时间,到期后再投递到队列。

官方文档说明:Scheduling Messages with RabbitMQ | RabbitMQ


1.2.1.下载

插件下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

由于我们安装的MQ是3.8版本,因此这里下载3.8.17版本:

 

1.2.2.安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:

1.2.3.声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),//delayed要设置为truekey = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式: 

package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

1.2.4.发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

注意:

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

 


1.3.超时订单问题

接下来,我们就在交易服务中利用延迟消息实现订单超时取消功能。其大概思路如下:

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。

1.3.1.定义常量

无论是消息发送还是接收都是在交易服务完成,因此我们在trade-service中定义一个常量类,用于记录交换机、队列、RoutingKey等常量:

内容如下:

package com.hmall.trade.constants;public interface MQConstants {String DELAY_EXCHANGE_NAME = "trade.delay.direct";String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";String DELAY_ORDER_KEY = "delay.order.query";
}

1.3.2.配置MQ

trade-service模块的pom.xml中引入amqp的依赖:

  <!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

trade-serviceapplication.yaml中添加MQ的配置:

spring:rabbitmq:host: 192.168.52.135   #虚拟机地址port: 5672virtual-host: /hmallusername: hmallpassword: 123

1.3.3.改造下单业务,发送延迟消息

接下来,我们改造下单业务,在下单完成后,发送延迟消息,查询支付状态。

修改trade-service模块的com.hmall.trade.service.impl.OrderServiceImpl类的createOrder方法,添加消息发送的代码:

这里延迟消息的时间应该是15分钟,不过我们为了测试方便,改成10秒。

1.3.4.编写查询支付状态接口

由于MQ消息处理时需要查询支付状态,因此我们要在pay-service模块定义一个这样的接口,并提供对应的FeignClient.

首先,在hm-api模块定义三个类:

说明:
- PayOrderDTO:支付单的数据传输实体
- PayClient:支付系统的Feign客户端
- PayClientFallback:支付系统的fallback逻辑


PayOrderDTO代码如下:

package com.hmall.api.dto;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDateTime;/*** <p>* 支付订单* </p>*/
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("业务订单号")private Long bizOrderNo;@ApiModelProperty("支付单号")private Long payOrderNo;@ApiModelProperty("支付用户id")private Long bizUserId;@ApiModelProperty("支付渠道编码")private String payChannelCode;@ApiModelProperty("支付金额,单位分")private Integer amount;@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")private Integer payType;@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")private String expandJson;@ApiModelProperty("第三方返回业务码")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功时间")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超时时间")private LocalDateTime payOverTime;@ApiModelProperty("支付二维码链接")private String qrCodeUrl;@ApiModelProperty("创建时间")private LocalDateTime createTime;@ApiModelProperty("更新时间")private LocalDateTime updateTime;
}

PayClient代码如下:

package com.hmall.api.client;import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根据交易订单id查询支付单* @param id 业务订单id* @return 支付单信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}

PayClientFallback代码如下:

package com.hmall.api.client.fallback;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}

最后,在pay-service模块的PayController中实现该接口:

@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

1.3.5.监听消息,查询支付状态

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:

代码如下:

package com.hmall.trade.listener;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.trade.constants.MQConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class OrderDelayMessageListener {private final IOrderService orderService;private final PayClient payClient;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),key = MQConstants.DELAY_ORDER_KEY))public void listenOrderDelayMessage(Long orderId){// 1.查询订单Order order = orderService.getById(orderId);// 2.检测订单状态,判断是否已支付if(order == null || order.getStatus() != 1){// 订单不存在或者已经支付return;}// 3.未支付,需要查询支付流水状态PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);// 4.判断是否支付if(payOrder != null && payOrder.getStatus() == 3){// 4.1.已支付,标记订单状态为已支付orderService.markOrderPaySuccess(orderId);}else{// TODO 4.2.未支付,取消订单,回复库存orderService.cancelOrder(orderId);}}
}

注意,这里要在OrderServiceImpl中实现cancelOrder方法..

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/430869.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux篇】TCP/IP协议(笔记)

目录 一、TCP/IP协议族体系结构 1. 数据链路层 &#xff08;1&#xff09;介绍 &#xff08;2&#xff09;常用协议 ① ARP协议&#xff08;Address Resolve Protocol&#xff0c;地址解析协议&#xff09; ② RARP协议&#xff08;Reverse Address Resolve Protocol&…

详解Web测试和APP测试的区别

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 最近听到有些朋友说&#xff0c;移动端要比web端稍微难一些&#xff0c;涉及到的细节笔记要多&#xff0c;有转去做web测试的想法&#xff0c;看看在具体测试的…

秋招面试准备:《小米2024数字芯片岗面试题》

在数字芯片设计的浪潮中&#xff0c;验证工程师的角色愈发重要。他们如同守门人&#xff0c;确保每一块芯片在投入市场前都能稳定、高效地运行。小米&#xff0c;作为全球知名的智能设备制造商&#xff0c;对数字芯片岗位的人才选拔尤为严格。 本文分享《小米2024数字芯片岗面…

专属文生图助手——SD3+ComfyUI文生图部署步骤

SD3ComfyUI文生图部署步骤 我们使用DAMODEL来实现文生图的部署。 根据提供的操作步骤与代码段落&#xff0c;本文旨在介绍如何下载并部署 Stable Diffusion 3 模型&#xff0c;并通过 ComfyUI 架构实现基于 Web 界面的图像生成应用。本文将剖析各个步骤&#xff0c;并详细解释…

[Redis][Hash]详细讲解

目录 0.前言1.常见命令1.HSET2.HGET3.HEXISTS4.HDEL5.HKEYS6.HVALS7.HGETALL8.HMGET9.HLEN10.HSETNX11.HINCRBY12.HINCRBYFLOAT 2.内部编码1.ziplist(压缩链表)2.hashtable(哈希表) 3.使用场景4.缓存方式对比1.原⽣字符串类型2.序列化字符串类型3.哈希类型 0.前言 在Redis中&am…

同一网络下两台电脑IP一样吗?探究局域网内的IP分配机制

在日常生活和工作中&#xff0c;我们经常会在同一网络环境下使用多台电脑。这时&#xff0c;一个常见的问题就会浮现&#xff1a;同一网络下两台电脑IP一样吗&#xff1f;这个问题看似简单&#xff0c;但实际上涉及到局域网内的IP分配机制。本文将深入探讨这一问题&#xff0c;…

Linux使用Clash,clash-for-linux

文件下载 clash-for-linuxhttps://link.zhihu.com/?targethttps%3A//zywang.lanzn.com/ijE2a1m7h6mb&#xff08;百度和阿里云盘都不支持这个文件分享&#xff09;。 使用须知 - 此项目不提供任何订阅信息&#xff0c;请自行准备Clash订阅地址。 - 运行前请手动更改.env文件…

使用四叉树碰撞的游戏 显微镜RPG

实现四叉树碰撞检测 //author bilibili 民用级脑的研发记录 // 开发环境 小熊猫c 2.25.1 raylib 版本 4.5 // 2024-7-14 // AABB 碰撞检测 在拖拽&#xff0c;绘制&#xff0c;放大缩小中 // 2024-7-20 // 直线改每帧打印一个点&#xff0c;生长的直线&#xff0c;直线炮弹 /…

The NCCoE’s Automation of the CMVP

Earlier today at the ICMC24, we heard from a panel about the US National Cybersecurity Center of Excellence’s (NCCoE) work on the Automated Cryptographic Module Validation Program (ACMVP), which intends to tackle the troublingly long queue times we’ve se…

Flink 与 Kubernetes (K8s)、YARN 和 Mesos集成对比

Flink 与 Kubernetes (K8s)、YARN 和 Mesos 的紧密集成&#xff0c;是 Flink 能够在不同分布式环境中高效运行的关键特性。 Flink 提供了与这些资源管理系统的深度集成&#xff0c;以便在多种集群管理环境下提交、运行和管理 Flink 作业。Flink 与 K8s、YARN 和 Mesos 集成的详…

百度Android IM SDK组件能力建设及应用

作者 | 星途 导读 移动互联网时代&#xff0c;随着社交媒体、移动支付、线上购物等行业的快速发展&#xff0c;对即时通讯功能的需求不断增加。对于各APP而言&#xff0c;接入IM SDK&#xff08;即时通讯软件开发工具包&#xff09;能够大大降低开发成本、提高开发效率&#…

数据结构:(OJ141)环形列表

给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#xff08;…

C++ | Leetcode C++题解之第420题强密码检验器

题目&#xff1a; 题解&#xff1a; class Solution { public:int strongPasswordChecker(string password) {int n password.size();bool has_lower false, has_upper false, has_digit false;for (char ch: password) {if (islower(ch)) {has_lower true;}else if (isu…

渗透测试综合靶场 DC-2 通关详解

一、准备阶段 准备工具如Kali Linux&#xff0c;下载并设置DC-2靶场机。确保攻击机和靶机在同一网络段&#xff0c;通常设置为桥接模式或NAT模式。 1.1 靶机描述 Much like DC-1, DC-2 is another purposely built vulnerable lab for the purpose of gaining experience in …

面试知识点总结篇二

一、makefile链接库参数 LIBS -L/path/to/lib -lmylib//&#xff0c;-lmylib会链接名为libmylib.so或libmylib.a的库。all: myprogrammyprogram: myprogram.ogcc -o myprogram myprogram.o $(LIBS)//此处使用myprogram.o: myprogram.cgcc -c myprogram.c二、shell指令 Shell…

高性能分布式搜索引擎Elasticsearch详解

♥️作者&#xff1a;小宋1021 &#x1f935;‍♂️个人主页&#xff1a;小宋1021主页 ♥️坚持分析平时学习到的项目以及学习到的软件开发知识&#xff0c;和大家一起努力呀&#xff01;&#xff01;&#xff01; &#x1f388;&#x1f388;加油&#xff01; 加油&#xff01…

在线相亲交友系统:寻找另一半的新方式

在这个快节奏的时代里&#xff0c;越来越多的单身男女发现&#xff0c;传统意义上的相亲方式已经难以满足他们的需求。与此同时&#xff0c;互联网技术的迅猛发展为人们提供了新的社交渠道——在线相亲交友系统作者h17711347205。本文将探讨在线相亲交友系统如何成为一种寻找另…

MYSQL基础语法

1-什么是数据库 数据库就是保留数据的仓库&#xff0c;体现在电脑当中&#xff0c;是一个软件或者是文件系统。然后把这些数据都保存在特殊的文件中&#xff0c;然后使用固定的语言&#xff08;SQL语句&#xff09;去操作文件中的数据。 2-数据库的优点 数据库是按照特定的格…

Spring MVC 基本配置步骤 总结

1.简介 本文记录Spring MVC基本项目拉起配置步骤。 2.步骤 在pom.xml中导入依赖&#xff1a; <dependency><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId><version>6.0.6</version><scope>…

Activiti7《第九式:破气式》——流畅驱动工作流进程。面试题大全

冲冲冲&#xff01;开干 这篇文章将分为九个篇章&#xff0c;带你逐步掌握工作流的核心知识。“破气式”&#xff0c;代表着工作流中的 无形之力&#xff0c;它是贯穿整个流程的 关键驱动 不知不觉已经到了独孤九剑最后一式了&#xff0c;我相信到这里之后各位都已经出神入化…