007 订单支付超时自动取消订单(rabbitmq死信队列 mybatis)

文章目录

  • 死信队列
    • RabbitMQ 配置类 RabbitMQConfig.java
    • 生产者 OrderTimeoutProducer.java
    • 消费者 OrderTimeoutConsumer.java
    • 应用配置 application.yml
    • pom.xml 依赖
    • 实体类 Order.java(不变)
    • Mapper 接口 OrderMapper.java(不变)
    • 服务层 OrderService.java(不变)
    • 缓存配置 CacheConfig.java(不变)
    • 对账服务 ReconciliationTask.java(不变)
    • 控制器 OrderController.java(不变)

死信队列

在代码中,processedMsgCache的类型是Cache<String, Boolean>,键是消息ID,值是Boolean。当处理成功时,会执行processedMsgCache.put(msgId, true),将消息ID存入缓存,并设置有效期1小时,最大容量10000条。这样,在1小时内,如果同样的消息再次被处理,缓存中已经有记录,就会直接ACK并跳过处理。

不过,这里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默认可能不是唯一的,生产者发送消息时可以设置messageId。如果生产者没有明确设置,可能无法保证唯一性,这会导致幂等性检查失效。因此,需要确保生产者发送消息时设置了唯一的messageId,或者在消费者端使用其他唯一标识,比如消息内容中的orderId结合其他属性。

另外,缓存的过期时间设置为1小时,这可能与业务场景有关。如果消息的存活时间超过1小时,可能会有重复处理的风险。需要根据实际消息的存活时间来调整缓存的过期时间,确保覆盖消息可能被重新投递的时间窗口。

还有一个问题是,缓存是本地缓存,如果消费者有多个实例,每个实例的缓存是独立的。这可能导致不同的实例处理同一条消息,因为一个实例处理过,但另一个实例的缓存中没有记录。这种情况下,本地缓存的幂等性检查可能不够,需要考虑分布式缓存,比如Redis,来保证全局唯一性。但根据当前代码,在单实例或允许短暂重复的场景下使用本地缓存。

总结来说,幂等性检查的逻辑是通过缓存已处理消息的ID,在消息处理前检查是否已存在,存在则跳过处理,避免重复执行。这适用于消息队列保证至少一次投递,但业务需要确保幂等的场景。

                      +---------------------+|   RabbitMQ Message  ||  (携带唯一messageId)   |+----------+----------+|v
+----------------+       +-------+-------+       +-----------------+
|  消息到达消费者   | ----> | 检查缓存是否存在 | ----> | 存在:直接ACK丢弃消息 |
+----------------+       +-------+-------+       +-----------------+|| 不存在v+-------+-------+       +-----------------+| 执行业务逻辑处理  | ----> | 成功:存入缓存并ACK |+---------------+       +-----------------+

缓存过期时间(1小时)> 消息最大存活时间(30分钟+重试时间)
计算公式:缓存过期时间 = 消息TTL + 最大重试时间 * 重试次数 + 缓冲时间

缓存击穿空值缓存对不存在的key也进行缓存(需设置较短过期时间)
缓存穿透布隆过滤器在缓存前增加过滤层
消费者重启持久化存储配合数据库记录处理状态
网络分区最终一致性依赖对账服务修正状态
组件类型作用说明
processedMsgCacheCaffeine缓存存储已处理消息的唯一标识
messageId字符串消息唯一标识(需生产者保证唯一性)
deliveryTag长整型RabbitMQ消息投递标识
sequenceDiagramparticipant RabbitMQparticipant Consumerparticipant Cacheparticipant DBRabbitMQ->>Consumer: 投递消息(messageId=123)Consumer->>Cache: 查询messageId=123alt 存在缓存Cache-->>Consumer: 返回trueConsumer->>RabbitMQ: 发送ACKelse 无缓存Consumer->>DB: 执行取消操作alt 操作成功Consumer->>Cache: 写入messageId=123Consumer->>RabbitMQ: 发送ACKelse 操作失败Consumer->>RabbitMQ: 发送NACK(requeue=true)endend

RabbitMQ 配置类 RabbitMQConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 订单超时相关配置public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";public static final String ORDER_DELAY_QUEUE = "order.delay.queue";public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";// 死信队列配置public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";// 声明延迟队列(设置死信参数)@Beanpublic Queue orderDelayQueue() {return QueueBuilder.durable(ORDER_DELAY_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY).build();}// 声明延迟交换机@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}// 绑定延迟队列到交换机@Beanpublic Binding delayBinding() {return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}// 声明死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(ORDER_DEAD_LETTER_QUEUE, true);}// 声明死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);}// 绑定死信队列到交换机@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ORDER_DEAD_LETTER_ROUTING_KEY);}// JSON 消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

生产者 OrderTimeoutProducer.java

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class OrderTimeoutProducer {private final RabbitTemplate rabbitTemplate;public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendTimeoutMessage(String orderId) {// 设置消息过期时间为30分钟(单位:毫秒)MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1800000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_DELAY_EXCHANGE,RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,orderId,messagePostProcessor);}
}

消费者 OrderTimeoutConsumer.java

import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
public class OrderTimeoutConsumer {private final OrderService orderService;private final Cache<String, Boolean> processedMsgCache;public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) {this.orderService = orderService;this.processedMsgCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();}@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)public void processMessage(Message message, Channel channel) throws IOException {String orderId = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 幂等性检查if (processedMsgCache.getIfPresent(messageId) != null) {channel.basicAck(deliveryTag, false);return;}boolean success = orderService.safeCancel(orderId);if (success) {processedMsgCache.put(messageId, true);System.out.println("订单超时取消成功: " + orderId);}channel.basicAck(deliveryTag, false);} catch (Exception e) {// 记录错误日志,重新放回队列channel.basicNack(deliveryTag, false, true);System.err.println("处理订单超时取消失败: " + orderId);e.printStackTrace();}}
}

应用配置 application.yml

spring:rabbitmq:host: ${RABBITMQ_HOST:localhost}port: 5672username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /connection-timeout: 5000template:retry:enabled: truemax-attempts: 3initial-interval: 1000mslistener:simple:acknowledge-mode: manual # 手动确认模式prefetch: 10 # 每次预取数量retry:enabled: truemax-attempts: 3initial-interval: 1000ms

pom.xml 依赖

<!-- 移除 RocketMQ 依赖 -->
<!-- 添加 RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实体类 Order.java(不变)

public class Order {// 保持原有实现
}

Mapper 接口 OrderMapper.java(不变)

@Mapper
public interface OrderMapper {// 保持原有SQL操作
}

服务层 OrderService.java(不变)

@Service
public class OrderService {// 保持原有业务逻辑
}

缓存配置 CacheConfig.java(不变)

@Configuration
public class CacheConfig {// 保持原有缓存配置
}

对账服务 ReconciliationTask.java(不变)

@Component
public class ReconciliationTask {// 保持原有定时任务逻辑
}

控制器 OrderController.java(不变)

@RestController
@RequestMapping("/orders")
public class OrderController {// 保持原有API接口
}

关键差异对比

功能点RocketMQ 实现RabbitMQ 实现
延迟机制内置延迟级别TTL+死信队列
消息存储持久化到CommitLog内存+磁盘持久化
消费确认自动ACK手动ACK+重试机制
消息追踪原生支持消息轨迹需要额外实现
集群方案主从复制镜像队列

部署注意事项
队列初始化:确保首次启动时自动创建所需交换机和队列
消息持久化:所有队列声明时设置durable=true
监控配置:需要监控以下指标:
死信队列消息堆积量
消费者处理耗时
消息TTL过期情况
集群配置:建议至少3节点RabbitMQ集群
故障转移:配置镜像队列实现高可用

性能优化建议
批量处理:

@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {// 批量查询缓存Map<String, Boolean> existMap = cache.getAllPresent(messageIds);// 过滤已处理消息// 批量执行数据库操作
}

异步更新:

CompletableFuture.runAsync(() -> {processedMsgCache.put(messageId, true);
});

分级缓存:

Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();

首先看CacheConfig.java,里面有一个orderStatusCache的Bean,使用Caffeine配置,最大大小10000,过期时间10分钟。然后在OrderService.java中,有一个注入的Cache<String, OrderStatus> orderCache。看起来这里应该用的是这个Bean,因为在OrderService里用了@RequiredArgsConstructor,所以orderCache这个字段会被构造器注入,而对应的Bean应该就是CacheConfig里的orderStatusCache。所以虽然用户可能没有显式地用@Autowired,但是因为用了Lombok的@RequiredArgsConstructor,如果orderCache是final的话,Spring会自动注入同类型的Bean。这时候需要检查OrderService中的orderCache的类型是否匹配。
看OrderService的定义:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是这个类型,所以应该会被正确注入。所以这个Bean是有被使用的,可能没注意到OrderService里的注入,导致认为没有用到。
另外,在OrderTimeoutConsumer.java中,还有一个processedMsgCache,这个是用Caffeine直接new的,并没有使用Spring管理的Bean。所以这个缓存是消费者自己创建的,和CacheConfig无关。而orderStatusCache是被OrderService使用的,所以这个配置是必须的,否则OrderService中的orderCache会找不到对应的Bean,导致启动失败。
所以可能在于误解了哪里用到了这个Bean。可能在代码中没有显式地看到@Autowired或者@Resource,但实际上因为Lombok的@RequiredArgsConstructor和Spring的自动注入机制,这个Bean是被OrderService使用的。如果没有CacheConfig中的Bean,Spring启动时会报错,提示找不到符合条件的Cache类型的Bean来注入到OrderService中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/27093.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过 PromptTemplate 生成干净的 SQL 查询语句并执行SQL查询语句

问题描述 在使用 LangChain 和 Llama 模型生成 SQL 查询时&#xff0c;遇到了 sqlite3.OperationalError 错误。错误信息如下&#xff1a; OperationalError: (sqlite3.OperationalError) near "sql SELECT Name FROM MediaType LIMIT 5; ": syntax error [SQL: …

计算机毕业设计SpringBoot+Vue.js企业资产管理系统(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

从零开始:H20服务器上DeepSeek R1 671B大模型部署与压力测试全攻略

前言 最近&#xff0c;我有幸在工作中接触到了DeepSeek R1 671B模型&#xff0c;这是目前中文开源领域参数量最大的高质量模型之一。DeepSeek团队在2024年推出的这款模型&#xff0c;以其惊人的6710亿参数量和出色的推理性能&#xff0c;引起了业界广泛关注。 作为一名AI基础…

Qt 文件操作+多线程+网络

文章目录 1. 文件操作1.1 API1.2 例子1&#xff0c;简单记事本1.3 例子2&#xff0c;输出文件的属性 2. Qt 多线程2.1 常用API2.2 例子1&#xff0c;自定义定时器 3. 线程安全3.1 互斥锁3.2 条件变量 4. 网络编程4.1 UDP Socket4.2 UDP Server4.3 UDP Client4.4 TCP Socket4.5 …

计算机毕业设计SpringBoot+Vue.js公司日常考勤系统(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

基于单片机的智能宿舍管理系统(论文+源码)

2.1总体方案设计 本课题为智能宿舍的设计&#xff0c;整个系统架构如图2.1所示&#xff0c;整个系统在器件上包括了主控制器STM32单片机&#xff0c;LD3320语音识别模块&#xff0c;按键模块&#xff0c;串口通信模块&#xff0c;照明模块&#xff0c;窗帘控制模块家电控制模块…

弱监督语义分割学习计划(2)-使用CoT进行Open Vocabulary Label简单实现类激活图

零: 项目说明 是这样的一个事情&#xff0c;经过与deepseek的一番讨论和交流&#xff0c;DeepSeek为我设计了一个30天高强度学习计划&#xff0c;重点聚焦弱监督/无监督语义分割在野外场景的应用&#xff0c;结合理论与实践&#xff0c;并最终导向可落地的开源项目。目前开始了…

RabbitMQ操作实战

1.RabbitMQ安装 RabbitMQ Windows 安装、配置、使用 - 小白教程-腾讯云开发者社区-腾讯云下载erlang&#xff1a;http://www.erlang.org/downloads/https://cloud.tencent.com/developer/article/2192340 Windows 10安装RabbitMQ及延时消息插件rabbitmq_delayed_message_exch…

力扣27.移除元素(双指针)

题目看起来很乱&#xff0c;实际上意思是&#xff1a;把数组中值不等于val的元素放在下标为0,1,2,3......&#xff0c;并且返回数组中值不等于val的元素的个数 方法一&#xff1a;直接判断覆盖 class Solution { public:int removeElement(vector<int>& nums, int…

【弹性计算】弹性裸金属服务器和神龙虚拟化(二):适用场景

弹性裸金属服务器和神龙虚拟化&#xff08;二&#xff09;&#xff1a;适用场景 1.混合云和第三方虚拟化软件部署2.高隔离容器部署3.高质量计算服务4.高速低时延 RDMA 网络支持场景5.RISC CPU 支持6.GPU 性能无损输出 公共云服务提供商推出 弹性裸金属服务器&#xff0c;很显然…

深入解析 Spring WebFlux:原理与应用

优质博文&#xff1a;IT-BLOG-CN WebFlux 是 Spring Framework 5 引入的一种响应式编程框架&#xff0c;和Spring MVC同级&#xff0c;旨在处理高并发和低延迟的非阻塞应用。这是一个支持反应式编程模型的新Web框架体系。 顺便一提&#xff0c;Spring Cloud Gateway在实现上是…

命名管道的实现与共享内存介绍

1.命名管道实现 comm.hpp文件 1.定义宏 通过宏来简便代码中&#xff0c;判断错误用宏就可以少写代码。 #define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)在宏定义中使用 do { ... …

Window下Redis的安装和部署详细图文教程(Redis的安装和可视化工具的使用)

文章目录 Redis下载地址&#xff1a;一、zip压缩包方式下载安装 1、下载Redis压缩包2、解压到文件夹3、启动Redis服务4、打开Redis客户端进行连接5、使用一些基础操作来测试 二、msi安装包方式下载安装 1、下载Redis安装包2、进行安装3、进行配置4、启动服务5、测试能否正常工…

哔哩哔哩IT私塾python爬虫视频教程中的项目文件

视频链接&#xff1a; Python课程天花板,Python入门Python爬虫Python数据分析5天项目实操/Python基础.Python教程_哔哩哔哩_bilibili 视频教程中要访问的链接&#xff1a; 豆瓣电影 Top 250 httpbin.org seo推广公司网站模板_站长素材 Examples - Apache ECharts WordCloud…

go前后端开源项目go-admin,本地启动

https://github.com/go-admin-team/go-admin 教程 1.拉取项目 git clone https://github.com/go-admin-team/go-admin.git 2.更新整理依赖 go mod tidy会整理依赖&#xff0c;下载缺少的包&#xff0c;移除不用的&#xff0c;并更新go.sum。 # 更新整理依赖 go mod tidy 3.编…

深入理解Spring @Async:异步编程的利器与实战指南

一、为什么需要异步编程&#xff1f; 在现代高并发系统中&#xff0c;同步阻塞式编程会带来两大核心问题&#xff1a; // 同步处理示例 public void processOrder(Order order) {// 1. 保存订单&#xff08;耗时50ms&#xff09;orderRepository.save(order); // 2. 发送短信…

PHP:IDEA开发工具配置XDebug,断点调试

文章目录 一、php.ini配置二、IDEA配置 一、php.ini配置 [xdebug] zend_extension"F:\wamp64\bin\php\php7.4.0\ext\php_xdebug-2.8.0-7.4-vc15-x86_64.dll" xdebug.remote_enable on xdebug.remote_host 127.0.0.1 xdebug.remote_port 9001 xdebug.idekey"…

FPGA开发,使用Deepseek V3还是R1(9):FPGA的全流程(详细版)

以下都是Deepseek生成的答案 FPGA开发&#xff0c;使用Deepseek V3还是R1&#xff08;1&#xff09;&#xff1a;应用场景 FPGA开发&#xff0c;使用Deepseek V3还是R1&#xff08;2&#xff09;&#xff1a;V3和R1的区别 FPGA开发&#xff0c;使用Deepseek V3还是R1&#x…

AtCoder Beginner Contest 001(A - 積雪深差、B - 視程の通報、C - 風力観測、D - 感雨時刻の整理)题目翻译

由于我发现网上很少有人会发很久之前AtCoder Beginner Contes的题&#xff0c;所以我打算从AtCoder Beginner Contest 001开始写。大约两周一更&#xff0c;需要的可以订阅专栏&#xff0c;感谢支持Thanks♪(&#xff65;ω&#xff65;)&#xff89; →题目讲解 A - 積雪深差 …

upload

&#xff08;上传一句话木马&#xff0c;用蚁剑链接验证是否成功/传有回显的&#xff1a;<?php phpinfo();?>&#xff09; 学看代码 #function checkfile(){}&#xff1a;定义了一个名叫checkfile的函数 #var file方法.(获取名为‘upload_file’的元素)[获取哪些&…