Springboot RabbitMQ 消费失败消息清洗与重试机制

📌 引言

在分布式系统中,RabbitMQ 是一个非常流行的消息队列中间件,广泛用于解耦系统、异步处理任务、提高系统性能。然而,在实际使用中,消费端可能因为 代码异常、数据库故障、网络问题 等原因导致消息消费失败。

如果不对这些失败消息进行处理,可能会导致数据丢失,影响业务流程。因此,我们需要一个 可靠的失败消息清洗和重试机制 来保证消息最终被成功消费或进行合理的存储处理。


📌 1. 失败消息的常见问题

在 RabbitMQ 消费过程中,可能会遇到以下问题:

  • 消息处理逻辑异常(如空指针异常、数据格式转换错误)。
  • 数据库异常(如主键冲突、唯一索引冲突、数据库连接失败)。
  • 外部依赖不可用(如调用第三方 API 超时)。
  • 消息重复消费(由于网络抖动或 RabbitMQ 配置问题)。

这些异常可能导致:

  • 消息丢失:如果 RabbitMQ 没有进行消息重试,消息可能永远丢失。
  • 死循环消费:如果未配置死信队列(DLX),消息可能会不断重新入队,造成死循环。
  • 系统压力过大:频繁失败的消息可能会影响正常消息的消费,拖垮整个消费端。

因此,我们需要一个 完整的失败消息清洗与重试机制 来保障系统的稳定性。


📌 2. 解决方案

✅ 目标

  1. 记录失败的消息,存入数据库,方便后续排查。
  2. 实现自动重试,允许在一定次数内自动重试消费失败的消息。
  3. 配置延迟队列,避免立即重试导致的瞬时高并发问题。
  4. 超出重试次数后进入死信队列,避免死循环消费,便于后续人工干预。

📝 数据库表设计

我们创建一个 failed_message_log 表来存储失败的消息:

CREATE TABLE failed_message_log (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL,exchange_name VARCHAR(255),routing_key VARCHAR(255),queue_name VARCHAR(255),message TEXT NOT NULL,error_reason TEXT NOT NULL,retry_count INT DEFAULT 0,status ENUM('PENDING', 'FAILED', 'RETRYING', 'SUCCESS') DEFAULT 'PENDING',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

字段说明:

  • message_id:消息的唯一标识符。
  • exchange_name / routing_key / queue_name:记录消息来源,方便追溯。
  • message:存储原始消息内容。
  • error_reason:记录失败的具体原因。
  • retry_count:当前消息的重试次数。
  • status:消息状态(PENDING:待处理,FAILED:失败,RETRYING:正在重试,SUCCESS:成功)。
  • created_at / updated_at:时间戳,方便查询和统计。

📌 3. 消费端实现

📍 监听 RabbitMQ 消息

@Slf4j // 使用 Lombok 提供的日志功能,简化日志记录
@Component // 让 Spring 管理该组件
public class RabbitMqConsumer {private static final int MAX_RETRY_COUNT = 3; // 最大重试次数,超过后将消息放入死信队列@Autowiredprivate FailedMessageLogMapper failedMessageLogMapper; // 失败消息日志的数据库访问对象,用于记录失败消息@Autowiredprivate RabbitTemplate rabbitTemplate; // RabbitMQ 消息发送模板,用于消息重试和死信处理/*** 监听 RabbitMQ 队列,处理消息** @param messageId   消息的唯一标识 ID* @param redelivered 是否是 RabbitMQ 重新投递的消息(用于判断是否是重试消息)* @param message     消息内容(JSON 格式字符串)*/@RabbitListener(queues = "your_queue_name") // 监听指定队列,自动消费消息public void processMessage(@Header("messageId") String messageId,@Header(AmqpHeaders.REDELIVERED) Boolean redelivered,String message) {try {log.info("Processing message: {}", message);JSONObject jsonObject = JSON.parseObject(message); // 解析 JSON 格式的消息handleMessage(jsonObject); // 处理业务逻辑} catch (Exception e) {log.error("Message processing failed: {}", message, e); // 记录错误日志// 查询数据库中是否已存在该失败消息的记录FailedMessageLog failedMessage = failedMessageLogMapper.findByMessageId(messageId);if (failedMessage == null) {// 如果数据库中没有记录,则插入一条新的失败记录failedMessage = new FailedMessageLog(messageId, message, e.getMessage(), 0, "PENDING");failedMessageLogMapper.insert(failedMessage);} else {// 如果已存在记录,则增加重试次数failedMessage.setRetryCount(failedMessage.getRetryCount() + 1);failedMessageLogMapper.update(failedMessage);}// 判断重试次数是否超过最大限制if (failedMessage.getRetryCount() >= MAX_RETRY_COUNT) {log.warn("Message {} reached max retry limit, moving to dead letter queue", messageId);failedMessage.setStatus("FAILED"); // 标记消息为失败failedMessageLogMapper.update(failedMessage); // 更新数据库记录// 将消息发送到死信队列(DLX),用于后续人工干预rabbitTemplate.convertAndSend("dlx_exchange", "dlx_routing_key", message);} else {log.warn("Retrying message: {}, attempt {}", messageId, failedMessage.getRetryCount());failedMessage.setStatus("RETRYING"); // 更新状态为重试中failedMessageLogMapper.update(failedMessage);// 发送到延迟队列,等待 5 秒后再重试,避免频繁失败造成系统压力rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", message, msg -> {msg.getMessageProperties().setDelay(5000); // 设置消息延迟 5 秒return msg;});}}}/*** 业务处理逻辑(模拟成功处理)** @param jsonObject 解析后的消息对象*/private void handleMessage(JSONObject jsonObject) {log.info("业务处理成功: {}", jsonObject);}
}

📌 4. 配置延迟队列 & 死信队列

📍 延迟队列

@Bean
public Queue delayedQueue() {return QueueBuilder.durable("delayed_queue").withArgument("x-dead-letter-exchange", "dlx_exchange")  .withArgument("x-dead-letter-routing-key", "dlx_routing_key").withArgument("x-message-ttl", 10000)  // 10秒延迟.build();
}

📍 死信队列

@Bean
public Queue dlxQueue() {return QueueBuilder.durable("dlx_queue").build();
}

📌 5. 总结

方案作用
数据库存储失败消息记录失败原因,方便排查
延迟队列避免瞬时失败,允许重试
最大重试次数防止死循环消费
死信队列彻底失败后进入死信队列,人工干预

这样,我们就可以 保证 RabbitMQ 消息消费的高可用性,同时避免消息丢失和死循环消费的问题。🚀🚀🚀

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/17739.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】Vector容器

为什么要学习vector? 1. 上一章分享了string,而string实际上是一个管理字符的顺序表。 2. 而除了字符以外,我们经常用到整形数组,所以我们需要针对其他类型数据的顺序表。 3. vector实际上也是一个顺序表,而且主要用来…

国内 ChatGPT Plus/Pro 订阅教程

1. 登录 chat.openai.com 依次点击 Login ,输入邮箱和密码 2. 点击升级 Upgrade 登录自己的 OpenAI 帐户后,点击左下角的 Upgrade to Plus,在弹窗中选择 Upgrade plan。 如果升级入口无法点击,那就访问这个网址,htt…

Winform禁止高分辨下缩放布局成功方法

Windows自动缩放布局会导致窗体上的按钮和文本挤在一起根本看不清楚。 那么该如何解决呢? 具体操作步骤如下: 1、在项目属性上切换到【安全性】菜单,勾选【启用ClickOnce安全设置】,然后立刻取消勾选; 为了生成app.…

数据结构——Makefile、算法、排序(2025.2.13)

目录 一、Makefile 1.功能 2.基本语法和相关操作 (1)创建Makefile文件 (2)编译规则 (3)编译 (4)变量 ①系统变量 ②自定义变量 二、 算法 1.定义 2.算法的设计 &#xff…

Xcode证书密钥导入

证书干嘛用 渠道定期会给xcode证书,用来给ios打包用,证书里面有记录哪些设备可以打包进去。 怎么换证书 先更新密钥 在钥匙串访问中,选择系统。(选登录也行,反正两个都要导入就是了)。 mac中双击所有 .p12 后缀的密钥&#xff…

span标签 鼠标移入提示框 el-tooltip element-ui

<el-tooltip :content"item.value" placement"top"><span>{{ item.valueHidden }}</span></el-tooltip>

[创业之路-300]:进一步理解货币与金钱, 货币与货币政策

目录 一、货币 1.1 概述 1、货币的定义 2、货币的形态演变 3、货币的职能 4、货币的价值衡量 1.2 货币的分层 1、货币分层的目的与意义 2、货币分层的划分标准与层次 3、各国货币分层的实践 4、货币分层的影响与应用 1.3、M0、M1、M2变化对股市的影响 1、M0变化对…

pnpm的使用

pnpm的使用 1.安装和使用2.统一包管理工具下载依赖 1.安装和使用 pnpm:performant npm &#xff0c;意味“高性能的npm”。 pnpm由npm/yarn衍生而来,解决了npm/yarn内部潜在的bug,极大的优化了性能,扩展了使用场景。被誉为“最先进的包管理工具”。 pnpm安装指令: npm i -g p…

vue+springboot+webtrc+websocket实现双人音视频通话会议

前言 最近一些时间我有研究&#xff0c;如何实现一个视频会议功能&#xff0c;但是找了好多资料都不太理想&#xff0c;最终参考了一个文章 WebRTC实现双端音视频聊天&#xff08;Vue3 SpringBoot&#xff09; 只不过&#xff0c;它的实现效果里面只会播放本地的mp4视频文件&…

nginx播放视频(auth_request鉴权)

学习链接 Nginx通过auth_request结合Springboot实现静态文件下载鉴权 nginx搭建直播推流服务&推流拉流鉴权 步骤 1、安装nginx 这里nginx的版本是nginx-1.24.0 ./configure --with-http_ssl_module --with-stream --with-stream_ssl_module --with-http_auth_req…

【论文笔记】ZeroGS:扩展Spann3R+GS+pose估计

spann3r是利用dust3r做了增量式的点云重建&#xff0c;这里zeroGS在前者的基础上&#xff0c;进行了增量式的GS重建以及进行了pose的联合优化&#xff0c;这是一篇dust3r与GS结合的具有启发意义的工作。 abstract NeRF和3DGS是重建和渲染逼真图像的流行技术。然而&#xff0c;…

Webpack相关优化总结

在使用webpack时提供了各种配置&#xff0c;这里结合在业务中常用的配置汇总一下可以进行的一系列的webpack优化 缩小文件搜索范围 其原理是在构建时&#xff0c;会以用户配置的Entry为开始依次递归遍历每个Module&#xff0c;在遍历每个Module时会调用相应合适的Loader对原模…

【操作系统】操作系统结构

内核 什么是内核&#xff1f; 内核作为应用程序连接硬件设备的桥梁&#xff0c;使得应用程序只需关心与内核交互&#xff0c;不用关心硬件细节。 内核有哪些能力呢&#xff1f; 内核是怎么工作的&#xff1f; Linux 的设计 MultiTask SMP ELF ELF 的意思是可执行文件链接格式…

【无线感知会议系列-22 】Vivisecting Mobility Management in 5G Cellular Networks

这篇是发表在SIGCOMM上的一篇paper 研究方向国内一些移动应用APP厂商&#xff1a;比如抖音,腾讯可以借鉴一下&#xff0c;和终端 厂商联合开发&#xff0c;提高其QOE。 摘要 随着5G技术对多种无线电频段和不同部署模式&#xff08;例如独立组网&#xff08;SA&#xff09;与…

【RAG落地利器】Weaviate、Milvus、Qdrant 和 Chroma 向量数据库对比

什么是向量数据库? 向量数据库是一种将数据存储为高维向量的数据库&#xff0c;高维向量是特征或属性的数学表示。每个向量都有一定数量的维度&#xff0c;根据数据的复杂性和粒度&#xff0c;可以从数十到数千不等。 向量通常是通过对原始数据(如文本、图像、音频、视频等)…

算法18(力扣136)只出现一次的数字

1、问题 给你一个 非空 整数数组 nums&#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法来解决此问题&#xff0c;且该算法只使用常量额外空间。 2、示例 &#xff08;1&…

【鸿蒙开发】第三十章 应用稳定性-检测、分析、优化、运维汇总

目录​​​​​​​ 1 概述 2 使用Asan检测内存错误 2.1 背景 2.2 原理概述 2.3 使用约束 2.4 配置参数 2.4.1 在app.json5中配置环境变量 2.4.2 在Run/Debug Configurations中配置环境变量 2.5 Asan使能 方式一 方式二 运行ASan 2.6 ASan异常检测类型 heap-buf…

20250214在ubuntu20.04下使用obs studio录制外挂的1080p的USB摄像头【下载安装】

20250214在ubuntu20.04下使用obs studio录制外挂的1080p的USB摄像头 2025/2/14 9:10 缘起&#xff1a;笔记本电脑在ubuntu20.04下使用Guvcview录制自带的摄像头&#xff0c;各种问题。 1、降帧率。WIN10/11自带的相机应用可以满速30fps&#xff0c;马上重启到ubuntu20.04&#…

phpipam1.7安装部署

0软件说明 phpipam是一个开源Web IP地址管理应用程序&#xff08;IPAM&#xff09; phpipam官网&#xff1a;https://www.phpipam.net/ 1安装环境 操作系统&#xff1a;Rocky Linux9.5x86_64 phpipam版本&#xff1a;1.7 php版本&#xff1a;8.0.30 数据库版本&#xff1a…

「vue3-element-admin」Vue3 + TypeScript 项目整合 Animate.css 动画效果实战指南

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall ︱vue3-element-admin︱youlai-boot︱vue-uniapp-template &#x1f33a; 仓库主页&#xff1a; GitCode︱ Gitee ︱ Github &#x1f496; 欢迎点赞 &#x1f44d; 收藏 ⭐评论 …