文章目录
- 死信队列
- RabbitMQ 配置类 RabbitMQConfig.java
- 生产者 OrderTimeoutProducer.java
- 消费者 OrderTimeoutConsumer.java
- 应用配置 application.yml
- pom.xml 依赖
- 实体类 Order.java(不变)
- Mapper 接口 OrderMapper.java(不变)
- 服务层 OrderService.java(不变)
- 缓存配置 CacheConfig.java(不变)
- 对账服务 ReconciliationTask.java(不变)
- 控制器 OrderController.java(不变)
死信队列
在代码中,processedMsgCache的类型是Cache<String, Boolean>,键是消息ID,值是Boolean。当处理成功时,会执行processedMsgCache.put(msgId, true),将消息ID存入缓存,并设置有效期1小时,最大容量10000条。这样,在1小时内,如果同样的消息再次被处理,缓存中已经有记录,就会直接ACK并跳过处理。
不过,这里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默认可能不是唯一的,生产者发送消息时可以设置messageId。如果生产者没有明确设置,可能无法保证唯一性,这会导致幂等性检查失效。因此,需要确保生产者发送消息时设置了唯一的messageId,或者在消费者端使用其他唯一标识,比如消息内容中的orderId结合其他属性。
另外,缓存的过期时间设置为1小时,这可能与业务场景有关。如果消息的存活时间超过1小时,可能会有重复处理的风险。需要根据实际消息的存活时间来调整缓存的过期时间,确保覆盖消息可能被重新投递的时间窗口。
还有一个问题是,缓存是本地缓存,如果消费者有多个实例,每个实例的缓存是独立的。这可能导致不同的实例处理同一条消息,因为一个实例处理过,但另一个实例的缓存中没有记录。这种情况下,本地缓存的幂等性检查可能不够,需要考虑分布式缓存,比如Redis,来保证全局唯一性。但根据当前代码,在单实例或允许短暂重复的场景下使用本地缓存。
总结来说,幂等性检查的逻辑是通过缓存已处理消息的ID,在消息处理前检查是否已存在,存在则跳过处理,避免重复执行。这适用于消息队列保证至少一次投递,但业务需要确保幂等的场景。
+---------------------+| RabbitMQ Message || (携带唯一messageId) |+----------+----------+|v
+----------------+ +-------+-------+ +-----------------+
| 消息到达消费者 | ----> | 检查缓存是否存在 | ----> | 存在:直接ACK丢弃消息 |
+----------------+ +-------+-------+ +-----------------+|| 不存在v+-------+-------+ +-----------------+| 执行业务逻辑处理 | ----> | 成功:存入缓存并ACK |+---------------+ +-----------------+
缓存过期时间(1小时)> 消息最大存活时间(30分钟+重试时间)
计算公式:缓存过期时间 = 消息TTL + 最大重试时间 * 重试次数 + 缓冲时间
缓存击穿 | 空值缓存 | 对不存在的key也进行缓存(需设置较短过期时间) |
---|---|---|
缓存穿透 | 布隆过滤器 | 在缓存前增加过滤层 |
消费者重启 | 持久化存储 | 配合数据库记录处理状态 |
网络分区 | 最终一致性 | 依赖对账服务修正状态 |
组件 | 类型 | 作用说明 |
---|---|---|
processedMsgCache | Caffeine缓存 | 存储已处理消息的唯一标识 |
messageId | 字符串 | 消息唯一标识(需生产者保证唯一性) |
deliveryTag | 长整型 | RabbitMQ消息投递标识 |
sequenceDiagramparticipant RabbitMQparticipant Consumerparticipant Cacheparticipant DBRabbitMQ->>Consumer: 投递消息(messageId=123)Consumer->>Cache: 查询messageId=123alt 存在缓存Cache-->>Consumer: 返回trueConsumer->>RabbitMQ: 发送ACKelse 无缓存Consumer->>DB: 执行取消操作alt 操作成功Consumer->>Cache: 写入messageId=123Consumer->>RabbitMQ: 发送ACKelse 操作失败Consumer->>RabbitMQ: 发送NACK(requeue=true)endend
RabbitMQ 配置类 RabbitMQConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 订单超时相关配置public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";public static final String ORDER_DELAY_QUEUE = "order.delay.queue";public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";// 死信队列配置public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";// 声明延迟队列(设置死信参数)@Beanpublic Queue orderDelayQueue() {return QueueBuilder.durable(ORDER_DELAY_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY).build();}// 声明延迟交换机@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}// 绑定延迟队列到交换机@Beanpublic Binding delayBinding() {return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}// 声明死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(ORDER_DEAD_LETTER_QUEUE, true);}// 声明死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);}// 绑定死信队列到交换机@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ORDER_DEAD_LETTER_ROUTING_KEY);}// JSON 消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}
生产者 OrderTimeoutProducer.java
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class OrderTimeoutProducer {private final RabbitTemplate rabbitTemplate;public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendTimeoutMessage(String orderId) {// 设置消息过期时间为30分钟(单位:毫秒)MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1800000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_DELAY_EXCHANGE,RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,orderId,messagePostProcessor);}
}
消费者 OrderTimeoutConsumer.java
import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
public class OrderTimeoutConsumer {private final OrderService orderService;private final Cache<String, Boolean> processedMsgCache;public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) {this.orderService = orderService;this.processedMsgCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();}@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)public void processMessage(Message message, Channel channel) throws IOException {String orderId = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 幂等性检查if (processedMsgCache.getIfPresent(messageId) != null) {channel.basicAck(deliveryTag, false);return;}boolean success = orderService.safeCancel(orderId);if (success) {processedMsgCache.put(messageId, true);System.out.println("订单超时取消成功: " + orderId);}channel.basicAck(deliveryTag, false);} catch (Exception e) {// 记录错误日志,重新放回队列channel.basicNack(deliveryTag, false, true);System.err.println("处理订单超时取消失败: " + orderId);e.printStackTrace();}}
}
应用配置 application.yml
spring:rabbitmq:host: ${RABBITMQ_HOST:localhost}port: 5672username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /connection-timeout: 5000template:retry:enabled: truemax-attempts: 3initial-interval: 1000mslistener:simple:acknowledge-mode: manual # 手动确认模式prefetch: 10 # 每次预取数量retry:enabled: truemax-attempts: 3initial-interval: 1000ms
pom.xml 依赖
<!-- 移除 RocketMQ 依赖 -->
<!-- 添加 RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
实体类 Order.java(不变)
public class Order {// 保持原有实现
}
Mapper 接口 OrderMapper.java(不变)
@Mapper
public interface OrderMapper {// 保持原有SQL操作
}
服务层 OrderService.java(不变)
@Service
public class OrderService {// 保持原有业务逻辑
}
缓存配置 CacheConfig.java(不变)
@Configuration
public class CacheConfig {// 保持原有缓存配置
}
对账服务 ReconciliationTask.java(不变)
@Component
public class ReconciliationTask {// 保持原有定时任务逻辑
}
控制器 OrderController.java(不变)
@RestController
@RequestMapping("/orders")
public class OrderController {// 保持原有API接口
}
关键差异对比
功能点 | RocketMQ 实现 | RabbitMQ 实现 |
---|---|---|
延迟机制 | 内置延迟级别 | TTL+死信队列 |
消息存储 | 持久化到CommitLog | 内存+磁盘持久化 |
消费确认 | 自动ACK | 手动ACK+重试机制 |
消息追踪 | 原生支持消息轨迹 | 需要额外实现 |
集群方案 | 主从复制 | 镜像队列 |
部署注意事项
队列初始化:确保首次启动时自动创建所需交换机和队列
消息持久化:所有队列声明时设置durable=true
监控配置:需要监控以下指标:
死信队列消息堆积量
消费者处理耗时
消息TTL过期情况
集群配置:建议至少3节点RabbitMQ集群
故障转移:配置镜像队列实现高可用
性能优化建议
批量处理:
@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {// 批量查询缓存Map<String, Boolean> existMap = cache.getAllPresent(messageIds);// 过滤已处理消息// 批量执行数据库操作
}
异步更新:
CompletableFuture.runAsync(() -> {processedMsgCache.put(messageId, true);
});
分级缓存:
Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();
首先看CacheConfig.java,里面有一个orderStatusCache的Bean,使用Caffeine配置,最大大小10000,过期时间10分钟。然后在OrderService.java中,有一个注入的Cache<String, OrderStatus> orderCache。看起来这里应该用的是这个Bean,因为在OrderService里用了@RequiredArgsConstructor,所以orderCache这个字段会被构造器注入,而对应的Bean应该就是CacheConfig里的orderStatusCache。所以虽然用户可能没有显式地用@Autowired,但是因为用了Lombok的@RequiredArgsConstructor,如果orderCache是final的话,Spring会自动注入同类型的Bean。这时候需要检查OrderService中的orderCache的类型是否匹配。
看OrderService的定义:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是这个类型,所以应该会被正确注入。所以这个Bean是有被使用的,可能没注意到OrderService里的注入,导致认为没有用到。
另外,在OrderTimeoutConsumer.java中,还有一个processedMsgCache,这个是用Caffeine直接new的,并没有使用Spring管理的Bean。所以这个缓存是消费者自己创建的,和CacheConfig无关。而orderStatusCache是被OrderService使用的,所以这个配置是必须的,否则OrderService中的orderCache会找不到对应的Bean,导致启动失败。
所以可能在于误解了哪里用到了这个Bean。可能在代码中没有显式地看到@Autowired或者@Resource,但实际上因为Lombok的@RequiredArgsConstructor和Spring的自动注入机制,这个Bean是被OrderService使用的。如果没有CacheConfig中的Bean,Spring启动时会报错,提示找不到符合条件的Cache类型的Bean来注入到OrderService中。