RabbitMQ入门指南(九):消费者可靠性

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、消费者确认机制

二、失败重试机制

三、失败处理策略

四、业务幂等性

1.通过唯一标识符保证操作的幂等性

2.通过业务判断保证操作的幂等性

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。


当RabbitMQ向消费者投递消息后,了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息,可能会出现各种故障:

  • 网络故障:在消息投递过程中,如果RabbitMQ和消费者之间的网络连接出现故障,可能会导致消息无法正确投递给消费者。
  • 消费者宕机:如果消费者在接收消息后突然宕机,那么消息可能无法被正确处理。
  • 消费者处理异常:消费者在接收到消息后,由于处理不当或者出现异常情况,可能会导致消息处理失败。

以上情况都可能导致消息丢失,因此RabbitMQ需要知道消费者的处理状态,以便在消息处理失败时重新投递消息。

一、消费者确认机制

RabbitMQ的消费者确认机制Consumer Acknowledgement)是一种确保消息被成功处理的机制。当消费者处理消息结束后,需要向RabbitMQ发送一个回执,以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要,因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。

回执有三种可选值:

  • ACK(确认):表示消费者成功处理了消息,RabbitMQ会从队列中删除该消息。
  • NACK(否定确认):表示消息处理失败,RabbitMQ需要再次投递该消息。
  • REJECT(拒绝):表示消息处理失败并且被拒绝,RabbitMQ会从队列中删除该消息。

在实际应用中,一般使用ACK或NACK两种方式。REJECT方式的使用相对较少,通常只在消息格式存在问题,即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ACK,处理失败时返回NACK。

在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式 :

spring:rabbitmq:listener:simple:acknowledge-mode: auto

RabbitMQ 支持三种不同的确认模式,这些模式通过acknowledge-mode属性进行配置:

  • none:关闭ACK。消费者接收到消息后不需要发送任何确认给发送者,发送者将继续发送下一条消息。在这种模式下,如果消费者处理消息失败,消息将会丢失,无法保证消息的可靠性。
  • manual:手动ACK。消费者接收到消息后需要手动发送确认给发送者,发送者才会继续发送下一条消息。在这种模式下,如果消费者处理消息失败,可以手动发送NACK给发送者,告诉发送者这条消息处理失败,以便发送者重新发送消息。这种模式可以保证消息的可靠性,但需要消费者手动处理确认和NACK。
  • auto:自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP(面向切面编程)对消息处理逻辑做了环绕增强。当业务正常执行时,Spring AMQP会自动返回ACK。当业务出现异常时,根据异常判断返回不同结果:业务异常,自动返回NACK;消息处理或校验异常,自动返回REJECT。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 如果消费者持续出现异常,消息会不断地在队列中重新排队并重新发送,这可能会导致消息处理延迟和队列持续增长,给系统带来不必要的压力。

为了解决这个问题,Spring框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列 。

在consumer服务的application.yml文件中添加配置:

spring:rabbitmq:listener:simple:retry:enabled: trueinitial-interval: 1000msmultiplier: 1max-attempts: 3stateless: true
enabled: true开启消费者失败重试
initial-interval: 1000ms初始的等待时长
multiplier: 1每次重试的等待时长是上次等待时长的倍数
max-attempts: 3最大重试次数
stateless: truetrue表示重试是无状态的,即每次重试都是独立的,不会考虑之前的重试状态。如果业务中包含事务,需要改为false。

通过这样的配置,当消费者出现异常时,消息会在本地进行重试,而不是无限期地重新排队发送。在达到最大重试次数后,SpringAMQP会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的,并且消息不会被重新排队发送给消费者。

这种失败重试机制可以有效地减少消息处理的延迟和队列的增长,提高系统的稳定性和可用性。当然,在使用失败重试机制时,也需要考虑到业务逻辑和异常处理的合理性,避免因过度重试而导致的问题。

三、失败处理策略

在失败重试机制中,当达到最大重试次数后,消息会被直接丢弃。尽管这在某些场景中可能是可接受的,但对于那些对消息可靠性要求极高的业务来说,这显然是一个潜在的风险点。

Spring AMQP为此提供了强大的支持,允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的,它有三种不同的实现方式:

  • RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝消息,并丢弃该消息。这是默认的处理方式。
  • ImmediateRequeueMessageRecoverer:当重试次数耗尽后,返回一个NACK给生产者,使消息重新入队,以便再次发送。
  • RepublishMessageRecoverer:当重试次数耗尽后,可以将失败的消息投递到一个指定的交换机和队列中,这个交换机和队列专门用来存放异常的消息。

在处理策略中,一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后,它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理,使得开发人员可以更精细地处理和诊断问题。

在consumer服务中定义处理失败消息的交换机和队列:

    @Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}

定义一个RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

通过这样的配置,当消息在尝试多次重试后仍然失败时,它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息,进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。

四、业务幂等性

在计算机科学和软件开发中,幂等性是一个重要的概念。简单来说,如果一个操作或函数不论执行一次还是多次,其结果都是相同的,那么称这个操作或函数是幂等的。在业务处理中,幂等性尤其关键。它可以保证系统的稳定性,确保在某些异常情况下,多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域,幂等性是确保系统稳定和数据准确的重要前提。

1.通过唯一标识符保证操作的幂等性

为每个操作生成唯一的标识符(如ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能,只要开启这个功能即可。

Jackson的消息转换器示例:

@Bean
public MessageConverter messageConverter(){// 定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:

    @Testvoid testSendMessage2Queue() {String queueName = "demo.queue";String msg = "Idempotent Test";rabbitTemplate.convertAndSend(queueName, msg);}

运行测试用例,查看结果:

2.通过业务判断保证操作的幂等性

业务判断,是一种基于业务逻辑和状态的检查,以确定是否对重复的请求或消息进行处理。在多种业务场景中,这一策略的思路各有不同。

比如在支付订单的案例中,业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”(需要防止重复支付)。因此,在执行这一业务时,可以判断订单的状态是否为“未支付”。若状态不是“未支付”,则说明该订单已经被处理过,无需重复处理。与基于唯一标识符的方案相比,业务判断方案无需对原有数据库进行改造,因此更为推荐。

以支付修改订单的业务为例:

    @Overridepublic void markOrderPaySuccess(Long orderId) {// 查询订单Order order = getById(orderId);// 判断订单状态,订单不存在或者订单状态不是1,放弃处理if (order == null || order.getStatus() != 1) {return;}// 尝试更新订单order.setStatus(2);order.setPayTime(LocalDateTime.now());orderService.updateById(order);}

以上代码示例判断和更新是两步动作 ,极小概率下可能存在线程安全问题,所以可以进行以下修改:

    @Overridepublic void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();}
 

总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容,希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/225348.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TikTok地理标签:通过短视频游走全球景点

TikTok,这个全球短视频平台,不仅是创意的播放场所,更是连接全球用户的数字旅行通道。通过TikTok的地理标签,用户可以在短视频中游走于世界各地的景点,感受异国风情,分享旅行心情。本文将深入探讨TikTok地理…

7.3 uvm_config_db in UVM

uvm_config_db类派生自uvm_resource_db类。它是uvm_resource_db顶部的另一层便利层,简化了用于uvm_component实例的基本接口(资源库的访问方法)。 下面uvm_config_db类的代码段取自uvm源代码。 class uvm_config_db#(type Tint) extends uv…

【GitHub精选项目】短信系统测试工具:SMSBoom 操作指南

前言 本文为大家带来的是 OpenEthan 开发的 SMSBoom 项目 —— 一种用于短信服务测试的工具。这个工具能够发送大量短信,通常用于测试短信服务的稳定性和处理能力。在合法和道德的范畴内,SMSBoom 可以作为一种有效的测试工具,帮助开发者和系统…

【编译原理】基于词法分析器的LL1语法分析器

【编译原理】基于词法分析器的LL1语法分析器 实验要求 设计一个满足以下要求的⽂法: (1)识别只包含变量声明语句和执行语句程序段的语法结构合法性; (2)变量声明中只使用int,char,float 3类基本类型&…

Android studio 花式按键

一、activity_main.xml代码&#xff1a; <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app"http://schemas.a…

【C++】STL 容器 - list 双向链表容器 ① ( 容器特点 | 容器操作时间复杂度 | 构造函数 )

文章目录 一、 list 双向链表容器简介1、容器特点2、容器操作时间复杂度3、遍历访问5、头文件 二、 list 双向链表容器 构造函数1、默认无参构造函数2、创建包含 n 个相同元素的 list 双向链表3、使用初始化列表构造 list 双向链表4、使用另外一个 list 容器 构造 list 双向链表…

【Java基础】Java中异常分类,他们之间的区别?

&#x1f341;Java中异常分哪两类 &#x1f341;Java中异常类&#x1f341;受检异常&#x1f341;非受检异常 &#x1f341;拓展知识仓&#x1f341;什么是Throwable&#x1f341;Error和Exception的区别和联系&#x1f341; 列举几个常用的RuntimeException&#x1f341;Java异…

关于“Python”的核心知识点整理大全41

目录 scoreboard.py game_functions.py game_functions.py 14.3.8 显示等级 game_stats.py scoreboard.py scoreboard.py scoreboard.py game_functions.py game_functions.py alien_invasion.py 14.3.9 显示余下的飞船数 ship.py scoreboard.py 我们将最高得分圆整…

微信这样分类客户,帮你轻松提升业绩!

无论是什么行业&#xff0c;都会遇到各种各样的客户&#xff0c;能不能成交这些客户&#xff0c;关键的一点在于有没有明确的客户分类。 今天就给大家分享几个高效分类客户的方法&#xff0c;帮助大家提高成交率和业绩。 1、分组管理 在微信中创建不同的分组&#xff0c;比如…

MyBatis——MyBatis的缓存

MyBatis的缓存 创建工程&#xff1a; 1缓存介绍 为什么使用缓存&#xff1f; 首次访问时&#xff0c;查询数据库&#xff0c;并将数据存储到内存中&#xff1b;再次访问时直接访问缓存&#xff0c;减少IO、硬盘读写次数、提高效率 Mybatis中的一级缓存和二级缓存&#xff1f;…

IP 地址归属地查询

IP 地址归属地查询 1. IP 地址归属地查询2. IP 地址归属地查询References 1. IP 地址归属地查询 https://tool.lu/ip/index.html 2. IP 地址归属地查询 https://www.ip.cn/ip/.html References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

【JavaScript】Set、Map、WeakSet、WeakMap

✨ 专栏介绍 在现代Web开发中&#xff0c;JavaScript已经成为了不可或缺的一部分。它不仅可以为网页增加交互性和动态性&#xff0c;还可以在后端开发中使用Node.js构建高效的服务器端应用程序。作为一种灵活且易学的脚本语言&#xff0c;JavaScript具有广泛的应用场景&#x…

Gin框架之使用 go-ini 加载.ini 配置文件

首先,联想一个问题,我们在部署服务时,通常为了方便,对于需要迭代更新的代码进行修改,但是比对shell,可以搞一个变量将需要修改的,以及修改起来变动处多的,写在变量内,到时候如果需要变更,可以直接变更变量即可; 那么,golang有没有什么方式可以将需要变的东西保存起…

TCP协议及工作原理(三)客户端的搭建

ui界面的搭建 &#xff1a; QTcpServer是基于TCP的服务器类提供一种方便的方式管理和创建TCP服务器&#xff0c;QTcpSocket处理TCP套接字编程用于建立TCP连接&#xff0c;发送接收数据等功能。 参考前两篇可深入理解&#xff01;&#xff01;&#xff01;&#xff01;&#xff…

php 不加后缀访问

实现不带后缀访问php文件的方法&#xff1a;首先在htaccess文件中加入内容“RewriteRule ^(api/token) token.php [L]”&#xff1b;然后通过根目录下的“token.php”来接受“api/token”&#xff1b;最后修改配置文件。 考虑的做法有&#xff1a; HTTP重写技术&#xff0c;让…

Win10电脑蓝牙默认音量100的设置教程

在Win10电脑操作过程中&#xff0c;用户想设置连接蓝牙后音量默认是100&#xff0c;但不知道具体的设置操作步骤。这时候用户需要打开Win10系统上的注册表&#xff0c;点击修改注册表来完成这一设置&#xff0c;下面就是Win10电脑蓝牙默认音量100的设置教程介绍&#xff0c;帮助…

【Java、Python】获取电脑当前网络IP进行位置获取(附源码)

我相信看到这篇博客的时候心里肯定是想解决自己的一个问题的&#xff0c;而这篇博客我就以简单快速的方式解决这些烦恼&#xff01; 一、获取当前IP 在Java中自带了一些自己的流对象来获取当前的IP地址&#xff0c;不多说我们直接上代码。 //获取当前网络ip地址 ipAddress Ine…

CentOS安装MongoDB

CentOS安装MongoDB 文章目录 CentOS安装MongoDB1. 安装并运行2. 创建用户/密码3. 测试语句4. 允许外网访问 1. 安装并运行 在 CentOS 上安装 MongoDB&#xff0c;你可以按照以下步骤进行&#xff1a; 导入 MongoDB 的 GPG 密钥&#xff1a; sudo rpm --import https://www.mon…

React快速入门之组件

组件 文件&#xff1a;Profile.js export default function Profile({isPacked true&#xff0c;head,stlyeTmp,src,size 80}) {if (isPacked) {head head " ✔";}return (<div><h1>{head}</h1><imgsrc{src}alt"Katherine Johnson&q…

MongoDB ReplicaSet 部署

文章目录 前言1. 环境准备2. 生成密钥3. 配置参数4. 创建 ReplicaSet5. 副本集维护5.1 新增成员5.2 移除节点5.4 主节点降级5.5 阻止选举5.6 允许副本节点读5.7 延迟观测 6. 连接副本集 后记 前言 本篇文章介绍 MongoDB ReplicaSet 如何搭建&#xff0c;及常用的维护方法。 1…