【Rabbitmq篇】RabbitMQ⾼级特性----消息确认

目录

前言:

一.消息确认机制 

• ⾃动确认

• ⼿动确认

 手动确认方法又分为三种:

二. 代码实现(spring环境)

配置相关信息:

1). AcknowledgeMode.NONE 

2 )AcknowledgeMode.AUTO

3)AcknowledgeMode.MANUAL

总结:


前言:

前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下

Rabbitmq官网


一.消息确认机制 

⽣产者发送消息之后,到达消费端之后,可能会有以下情况:
a. 消息处理成功
b. 消息处理异常

RabbitMQ向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失.
那么如何确保消费端已经成功接收了,并正确处理了呢
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(messageacknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种: 

• ⾃动确认

当autoAck等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,⽽不管消费者是否真正地消费到了这些消息.⾃动确认模式适合对于消息可靠性要求不⾼的场景.


• ⼿动确认

当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息.这种模式适合对消息可靠性要求⽐较⾼的场景. 

 手动确认方法又分为三种:

  • 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)                  RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了.
  • 否定确认: Channel.basicReject(long deliveryTag, boolean requeue) 
    消费者客⼾端可以调⽤channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息.
  • 否定批量确认: Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
    Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令.消费者客⼾端可以调⽤channel.basicNack⽅法来实现.

参数说明:

1)deliveryTag :
消息的唯⼀标识,它是⼀个单调递增的64位的⻓整型值. deliveryTag 是每个通道
(Channel)独⽴维护的,所以在每个通道上都是唯⼀的.当消费者确认(ack)⼀条消息时,必须使⽤对应的通道上进⾏确认.

2)multiple 

是否批量确认.在某些情况下,为了减少⽹络流量,可以对⼀系列连续的 deliveryTag 进
⾏批量确认.值为true则会⼀次性ack所有⼩于或等于指定deliveryTag的消息.值为false,则只确认当前指定deliveryTag的消息.

 

 3)requeue

表⽰拒绝后,这条消息如何处理.如果requeue参数设置为true,则RabbitMQ会重新将这条
消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,⽽不会把它发送给新的消费者.


二. 代码实现(spring环境)

1.可以直接使用RabbitMQ Java Client 库

2.使用spring集成的amqp

 主要介绍第二种,在spring环境下实现

Spring-AMQP 对消息确认机制提供了三种策略.

public enum AcknowledgeMode { NONE //确认,MANUAL//手动 ,AUTO //默认;
}

配置相关信息:

基本信息以及确认机制

队列,交换机,以及它们之间的绑定关系 

package com.bite.extensions.config;import com.bite.extensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}@Bean("ackBinding")public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}

生产者:

主要解释消费者在不同确认机制的状态

package com.bite.extensions.controller;import com.bite.extensions.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");return "消息发送成功!";}
}

1)AcknowledgeMode.NONE 

这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会⾃动确认
消息,从RabbitMQ队列中移除消息.如果消费者处理消息失败,消息可能会丢失.

 

1)消费者 正常消费情况下

package com.bite.extensions.listener;import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");System.out.println("业务逻辑完成!");}
}

消费者正确处理,MQ删除相应信息

2)消费者 异常消费情况下

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");int num = 3/0; //异常System.out.println("业务逻辑完成!");}
}

 可以看到,消费者处理失败,但是消息已经从RabbitMQ中移除.

2 )AcknowledgeMode.AUTO

这种模式下,消费者在消息处理成功时会⾃动确认消息,但如果处理过程中抛出了异常,则不会确认消息. 

    listener:simple:acknowledge-mode: auto  #消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");//int num = 3/0;System.out.println("业务逻辑完成!");}
}

 

2)消费者 异常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");int num = 3/0;System.out.println("业务逻辑完成!");}
}
..........
接收到信息: consumer ack mode test..., deliveryTag: 88
业务逻辑处理!
2024-11-17T15:19:11.420+08:00  WARN 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
接收到信息: consumer ack mode test..., deliveryTag: 89
业务逻辑处理!
2024-11-17T15:19:11.477+08:00  INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2024-11-17T15:19:11.477+08:00  INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

消费者处理异常,会一直重试发送,所有仍然保留在mq中

3)AcknowledgeMode.MANUAL

⼿动确认模式下,消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息.如果消
息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可⽤时重新投递该消息,这
种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,⽽是可以被重新处理.

    listener:simple:acknowledge-mode: manual#消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");//int  num = 3/0;System.out.println("业务逻辑完成!");//肯定确认channel.basicAck(deliverTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliverTag,false,true);}}
}

如果不进行确认 又会发送什么?

 当我们使用手动确认(manual)的时候,一定要手动添加上肯定确认,不然即使消费者处理成功,也不会进行确认!

 2)消费者 异常消费情况下 

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑long deliverTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);//业务逻辑处理System.out.println("业务逻辑处理!");int  num = 3/0;System.out.println("业务逻辑完成!");//肯定确认channel.basicAck(deliverTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliverTag,false,true);}}
}

 否定确认完,又会进行重新入队,会变成Ready状态

此时修改为false,不让它入队,会发生什么? 

消费者处理异常,会不停的重试 

使用manual,一定要进行手动确认


总结:

模式确认方式可靠性性能使用场景
None无确认低,可能丢失消息不关心消息是否成功消费,丢失消息可容忍的场景
Auto自动确认较低,可能丢失消息较高对丢失消息容忍度较高的场景
Manual手动确认高,消息只有成功处理才会确认较低需要确保每条消息被成功消费的场景
  • None 适用于性能要求高,但对消息丢失不敏感的场景。
  • Auto 适合那些不需要太高消息可靠性的应用,但仍然需要自动化确认机制。
  • Manual 最适合那些对消息处理的可靠性要求较高,尤其是在出现异常时需要精细控制消息是否重新入队或丢弃的场景。

选择哪种模式取决于你的具体需求,尤其是对于消息可靠性的要求以及系统的性能考虑。 


结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/475571.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Pikachu】SSRF(Server-Side Request Forgery)服务器端请求伪造实战

尽人事以听天命 1.Server-Side Request Forgery服务器端请求伪造学习 SSRF(服务器端请求伪造)攻击的详细解析与防范 SSRF(Server-Side Request Forgery,服务器端请求伪造) 是一种安全漏洞,它允许攻击者通…

鸿蒙NEXT自定义组件:太极Loading

【引言】(完整代码在最后面) 本文将介绍如何在鸿蒙NEXT中创建一个自定义的“太极Loading”组件,为你的应用增添独特的视觉效果。 【环境准备】 电脑系统:windows 10 开发工具:DevEco Studio NEXT Beta1 Build Vers…

JSONObject jsonObject = JSON.parseObject(json);

是用于将一个 JSON 格式的字符串解析为一个 JSONObject 对象的语句。具体来说: JSON.parseObject(json): 作用: JSON 是 FastJSON 库提供的一个工具类。parseObject 方法可以将 JSON 格式的字符串(例如:{"key1&qu…

python成绩分级 2024年6月python二级真题 青少年编程电子学会编程等级考试python二级真题解析

目录 python成绩分级 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序代码 四、程序说明 五、运行结果 六、考点分析 七、 推荐资料 1、蓝桥杯比赛 2、考级资料 3、其它资料 python成绩分级 2024年6月 python编程等级考试二级编程题 一、题目要求 …

【面试题】接口怎么测试?如何定位前后端的Bug?

接口怎么测试? 接口测试用来验证不同软件组件之间的交互是否正常。包括验证数据传输,参数传递,我在多个项目中有过测试接口的经验。(… 当进行接口测试时,会使用Postman和Python的Requests库。首先根据接口文档设计测…

.net6.0(.net Core)读取 appsettings.json 配置文件

① 新项目中创建名为 appsettings.json 的 json文件,内容为: {//数据库连接字符串:"ConnectionString": {"DBconn": "server127.0.0.1;databasedb;uidsa;pwd123456;Timeout600;EncryptTrue;TrustServerCertificateTrue;"…

应用于各种小家电的快充协议芯片

前言 随着快充技术的广泛应用,以往小家电的慢充模式已经满足不了人们对充电速度的要求,因此商家纷纷对小家电应用了诱骗取电快充协议芯片 例如(XSP16H),有了快充的支持小家电的充电速度有了很大的提升,节省了很多的充电…

ftrack 24.10全面升级:Autodesk Flame集成与多项新功能性能改进将发布

管理复杂项目绝非易事,但ftrack Studio的最新更新旨在简化这一过程。我们设计了这些增强功能,以优化大家的工作流、提高可用性,并让你们有更多时间专注于创意工作。 让我们来看看都有什么新内容吧! ​增强功能来优化工作流 轻松…

网络工程师教程第6版(2024年最新版)

网络工程师教程(第6版)由清华大学出版社出版,由工业和信息化部教育与考试中心组编,张永刚、王涛、高振江任主编,具体介绍如下。 相关信息: 出版社: 清华大学出版社 ISBN:9787302669197 内容简介: 本书是工业和信息化部教育与考试中心组织编写的考试用书。本书 根据…

Leetcode739.每日温度(HOT100)

链接 第一次暴力提交错误&#xff0c;超时了&#xff1a; class Solution { public:vector<int> dailyTemperatures(vector<int>& temperatures) {int n temperatures.size();vector<int> res(n,0);for(int i 0;i<n;i){int j i1;while(j<n){i…

java八股-SpringCloud-服务雪崩,服务降级,服务熔断

文章目录 服务雪崩服务降级服务熔断本章小结 服务雪崩 服务降级 服务熔断 本章小结 服务降级针对的是接口不可用&#xff0c;服务熔断针对的是整个服务不可用&#xff01;

cesium for unity的使用

先聊聊导入 看到这里的因该能够知道&#xff0c;官网以及网上绝大多数的方法都导入不进来&#xff0c;那么解决方法如下: 两个链接&#xff1a;按照顺序依次下载这两个tgz和zip&#xff0c;其中tgz为主要部分&#xff0c;zip为示例工程项目 如果您要查看示例工程项目的话&am…

06 —— Webpack优化—压缩过程

css代码提取后想要压缩 —— 使用css-minimizer-webpack-plugin插件 下载 css-minimizer-webpack-plugin 本地软件包 npm install css-minimizer-webpack-plugin --save-dev 配置 webpack.config.js 让webpack拥有该功能 const CssMinimizerPlugin require(css-minimizer-…

Win11 24H2新BUG或影响30%CPU性能,修复方法在这里

原文转载修改自&#xff08;更多互联网新闻/搞机小知识&#xff09;&#xff1a; 一招提升Win11 24H2 CPU 30%性能&#xff0c;小BUG大影响 就在刚刚&#xff0c;小江在网上冲浪的时候突然发现了这么一则帖子&#xff0c;标题如下&#xff1a;基准测试&#xff08;特别是 Time…

华为openEuler考试真题演练(附答案)

【单选题】 以下关于互联网的描述&#xff0c;哪个选项是正确的? A:Nginx 在万维网中可以作为 ftp 服务器的反向代理&#xff0c;并与ftp服务器的数量--对应 B:Nginx 在互联网中可以作为 web服务器端&#xff0c;成为万维网的一个节点 C:互联网上的的资源需使用 Nginx进行七层…

Hello-Go

Hello-Go 环境变量 GOPATH 和 GOROOT &#xff1a;不同于其他语言&#xff0c;go中没有项目的说法&#xff0c;只有包&#xff0c;其中有两个重要的路径&#xff0c;GOROOT 和 GOPATH Go开发相关的环境变量如下&#xff1a; GOROOT&#xff1a;GOROOT就是Go的安装目录&…

C++【nlohmann/json】库序列化与反序列化

1.nlohmann/json官方网站 GitHub - nlohmann/json: JSON for Modern C Overvew - JSON for Modern C 上述是点击就进入&#xff0c;下面的是要自己粘 https://github.com/nlohmann/json https://json.nlohmann.me/api/basic_json/ 2.使用过的nlohmann/json官方中的某版本代码…

Python 绘图工具详解:使用 Matplotlib、Seaborn 和 Pyecharts 绘制散点图

目录 数据可视化1.使用 matplotlib 库matplotlib 库 2 .使用 seaborn 库seaborn 库 3 .使用 pyecharts库pyecharts库 注意1. 确保安装了所有必要的库2. 检查Jupyter Notebook的版本3. 使用render()方法保存为HTML文件4. 使用IFrame在Notebook中显示HTML文件5. 检查是否有其他输…

小程序20-样式:自适应尺寸单位 rpx

手机设备的宽度逐渐多元化&#xff0c;也就需要开发者开发过程中&#xff0c;去适配不同屏幕宽度的手机&#xff0c;为了解决屏幕适配问题&#xff0c;微信小程序推出了 rpx 单位 rpx&#xff1a;小程序新增的自适应单位&#xff0c;可以根据不同设备的屏幕宽度进行自适应缩放 …

TR3:Pytorch复现Transformer

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、实验目的 从整体上把握Transformer模型&#xff0c;明白它是个什么东西&#xff0c;可以干嘛读懂Transformer的复现代码 二、实验环境 语言环境&#xff1…