RabbitMQ 发布确认机制

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段

发布确认模式

  • 原理说明
  • 实现方式
    • 开启confirm(确认)模式
    • 阻塞确认
    • 异步确认
  • 总结

原理说明

  生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
  confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
在这里插入图片描述

  如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
  RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存
  事务机制和发布确认机制都存在以下注意点:

  • 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
  • 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
  • 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。

  上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。

实现方式

  RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。

开启confirm(确认)模式

  确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
  RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:

channel.confirmSelect();

阻塞确认

  阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:

    /*** Wait until all messages published since the last call have been* either ack'd or nack'd by the broker.  Note, when called on a* non-Confirm channel, waitForConfirms throws an IllegalStateException.* @return whether all the messages were ack'd (and none were nack'd)* @throws java.lang.IllegalStateException*/boolean waitForConfirms() throws InterruptedException;

  自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
  根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
  阻塞等待确认的方式如下代码所示:

//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

 阻塞批量确认的方式如下代码所示:

        // 存储未应带消息队列List<String> messages = new ArrayList<>();for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);messages.add(msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());// 每发送十条消息进行一次确认if(i > 0 && i % 10 == 0 ){// 如果确认不通过则将消息重新发送if(!channel.waitForConfirms()){for (String e : messages) {channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());}}else{// 如果确认成功则将这些消息从未应答队列中移除messages.clear();}}}

异步确认

  客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。

/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {void handleAck(long deliveryTag, boolean multiple)throws IOException;void handleNack(long deliveryTag, boolean multiple)throws IOException;
}

  异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:

        // 存储未确认消息,其中key为消息序号,value为消息实体HashMap<Long,String> msgMap = new HashMap<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {msgMap.remove(deliveryTag);}/*** 如果消息应带结果为nack则重新发送该消息* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String msg = msgMap.get(deliveryTag);if(msg != null){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}}});for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);// 将消息序号和消息存储map中msgMap.put(channel.getNextPublishSeqNo(),msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}

  上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。

总结

  发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:

比较事务机制发布确认机制
实现方式通过AMQP协议层面实现轻量级实现,采用RabbitMQ应答机制
命令详解Tx.Select
Basic.Publish
Tx.Commit
Commit.OK
Basic.Publish
Basic.Ack
性能同步,性能较慢可异步实现也可同步实现,性能快,AMQP命令交互少
消息到达队列时机事务提交后消息才会进入队列,消息入队存在滞后性消息发送后就进入队列,发布确认模式不影响消息进入队列时机
事务提交成功或消息应答时机消息被交换机处理完成后,或消息不可达同事务
实现复杂度简单相对复杂
适合场景批量发送消息,实现批量消息的原子性和一致性确保消息发送到交换机

  发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:

比较内容阻塞等待批量等待异步确认
性能
实现复杂度
确认范围每条消息批量消息每条消息
是否可以精准确认每条消息

  根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/82692.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring 面试题

一、Spring面试题 专题部分 1.1、什么是spring? Spring是一个轻量级Java开发框架&#xff0c;最早有Rod Johnson创建&#xff0c;目的是为了解决企业级应用开发的业务逻辑层和其他各层的耦合问题。它是一个分层的JavaSE/JavaEE full-stack&#xff08;一站式&#xff09;轻量…

Unity之ShaderGraph 节点介绍 Utility节点

Utility 逻辑All&#xff08;所有分量都不为零&#xff0c;返回 true&#xff09;Any&#xff08;任何分量不为零&#xff0c;返回 true&#xff09;And&#xff08;A 和 B 均为 true&#xff09;Branch&#xff08;动态分支&#xff09;Comparison&#xff08;两个输入值 A 和…

未来C#上位机软件发展趋势

C#上位机软件迎来新的发展机遇。随着工业自动化的快速发展&#xff0c;C#作为一种流行的编程语言在上位机软件领域发挥着重要作用。未来&#xff0c;C#上位机软件可能会朝着以下几个方向发展&#xff1a; 1.智能化&#xff1a;随着人工智能技术的不断演进&#xff0c;C#上位机…

中间件RabbitMQ消息队列介绍

1. MQ的相关概念 1.1 什么是MQ MQ&#xff08;message queue&#xff09;&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递消息…

手机开启应急预警通知 / 地震预警

前言 安卓手机在检测到地震时&#xff0c;将发送地震预警通知&#xff0c;但此设置是默认关闭的&#xff0c;原因是以防引发用户恐慌从而引发安全问题&#xff0c;且开启此设置需要完成指引教程&#xff0c;因此默认关闭此设置。下文介绍如何开启此设置。 开启方法 华为手机开…

nginx简介与安装配置,目录结构和配置文件介绍

一.nginx简介 1.简介 2.特性 二.nginx安装 1.rpm包方式 &#xff08;1&#xff09;下载扩展源 &#xff08;2&#xff09;安装扩展rpm包&#xff0c;nginx -V查看配置参数&#xff0c;后面源码安装时要用到 2.源码方式 &#xff08;1&#xff09;建议提前下好所需要的部…

github pages 用法详解 发布自己的网站

github pages 基础用法 URL 规则 假设你的 github 帐号为 mygithub&#xff0c;需要发布的仓库名为 myrepo&#xff0c;那么 pages 的 URL 为&#xff1a; https://mygithub.github.io/myrepo 添加内容 用任意编辑器写好&#xff08;或者生成&#xff09;标准的网页内容&a…

Windows安装Redis

自己电脑做个测试&#xff0c;需要用到Redis&#xff0c;把安装过程记录下&#xff0c;方便有需要的人 1、找到下载地址&#xff1a;Releases microsoftarchive/redis GitHub Windows的Redis需要到GitHub上下载&#xff1a; 2、下载完后设置密码&#xff0c;打开文件夹&…

Linux下TCP网络服务器与客户端通信程序入门

文章目录 目标服务器与客户端通信流程TCP服务器代码TCP客户端代码 目标 实现客户端连接服务器&#xff0c;通过终端窗口发送信息给服务器端&#xff0c;服务器接收到信息后对信息数据进行回传&#xff0c;客户端读取回传信息并返回。 服务器与客户端通信流程 TCP服务器代码 …

车云一体化系统基础理论

车云一体化系统基础理论 介绍目标正文 参考文档 介绍 最近在调研车云链路一体化的整套解决方案&#xff0c;涉及分布式消息队列&#xff08;RocketMQ&#xff09;、分布式存储&#xff08;Doris&#xff09;、离线数据处理&#xff08;Spark&#xff09;、用户行为日志分析&am…

性能测试浅谈

早期的性能测试更关注后端服务的处理能力。 一个用户去访问一个页面的请求过程&#xff0c;如上图。 数据传输时间 当你从浏览器输入网址&#xff0c;敲下回车&#xff0c;开始... 真实的用户场景请不要忽视数据传输时间&#xff0c;想想你给远方的朋友写信&#xff0c;信件…

【雕爷学编程】Arduino动手做(184)---快餐盒盖,极低成本搭建机器人实验平台

吃完快餐粥&#xff0c;除了粥的味道不错之外&#xff0c;我对个快餐盒的圆盖子产生了兴趣&#xff0c;能否做个极低成本的简易机器人呢&#xff1f;也许只需要二十元左右 知识点&#xff1a;轮子&#xff08;wheel&#xff09; 中国词语。是用不同材料制成的圆形滚动物体。简…

深度优先搜索与动态规划|543, 124, 687

深度优先搜索与动态规划|543. 二叉树的直径&#xff0c;124. 二叉树中的最大路径和&#xff0c;687. 最长同值路径 二叉树的直径二叉树中的最大路径和最长同值路径 二叉树的直径 好久没写二叉树了&#xff0c;主要还是看遍历的顺序是什么样的。 # Definition for a binary tr…

桥接模式-java实现

桥接模式 桥接模式的本质&#xff0c;是解决一个基类&#xff0c;存在多个扩展维度的的问题。 比如一个图形基类&#xff0c;从颜色方面扩展和从形状上扩展&#xff0c;我们都需要这两个维度进行扩展&#xff0c;这就意味着&#xff0c;我们需要创建一个图形子类的同时&#x…

linux内网穿透应用场景有哪些?快解析有什么用处?

随着网络技术的不断发展&#xff0c;无论是工作上还是在生活中人们对网络的依赖和需求越来越高。Linux内网穿透作为一种创新的解决方案&#xff0c;为我们提供了无限可能。 首先我们了解一下Linux操作系统。Linux是一套免费使用和自由传播的类Unix操作系统&#xff0c;是一个基…

oracle sql developer批量删除某个用户

随着navicate收费&#xff0c;还得破解&#xff0c;pl/sql developer配置麻烦&#xff0c;最近使用oracle sql developer来试试oracle的操作如何&#xff1b; 用着还行&#xff0c;没有卡顿现象&#xff0c; 最近要oracle sql developer批量删除某个用户下所有的表&#xff0…

基于 CentOS 7 构建 LVS-DR 群集

LVS-DR模式工作原理 首先&#xff0c;来自客户端计算机CIP的请求被发送到Director的VIP。 然后Director使用相同的VIP目的IP地址将 请求发送到集群节点或真实服务器。 然后&#xff0c;集群某个节点将回复该数据包&#xff0c;并将该数据包直接发送到客户端计算机&#xff08;…

如何测试Linux磁盘的读写速度

在Linux系统中也有很多命令可以测试硬盘的读写速度指标。以下是几个常用命令&#xff08;注意&#xff1a;在执行测试命令之前&#xff0c;请务必备份数据以避免数据丢失&#xff01; 1、dd 命令 首先挂载磁盘 mount /dev/sdb /testdd 命令可用于进行硬盘读写速度测试。 例…

Linux6.36 Kubernetes Pod进阶

文章目录 计算机系统5G云计算第三章 LINUX Kubernetes Pod进阶一、资源限制1.CPU 资源单位2.内存 资源单位3.重启策略&#xff08;restartPolicy&#xff09;4.健康检查&#xff1a;又称为探针&#xff08;Probe&#xff09;5.启动、退出动作 计算机系统 5G云计算 第三章 LIN…

任务14、无缝衔接,MidJourney瓷砖(Tile)参数制作精良贴图

14.1 任务概述 在这个实验任务中,我们将深入探索《Midjourney Ai绘画》中的Tile技术和其在艺术创作中的具有挑战性的应用。此任务将通过理论学习与实践操作相结合的方式,让参与者更好地理解Tile的核心概念,熟练掌握如何在Midjourney平台上使用Tile参数,并实际运用到AI绘画…