消息队列—RabbitMQ如何保证消息可靠性?

1. 如何保证消息的可靠性?

先来看看我们的万年老图,从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。

  • 生产者发出后保证到达了MQ。
  • MQ收到消息保证分发到了消息对应的Exchange。
  • Exchange分发消息入队之后保证消息的持久性。
  • 消费者收到消息之后保证消息的正确消费。

经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

2. 生产者发送消息到MQ失败

我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。

为了解决这个问题,RabbitMQ引入了事务机制发送方确认机制(publisher confirm)

  • 方法一:生产者在发送数据之前开启RabbitMQ的事务(采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。)
  • 方法二:开启confirm模式(使用springboot时在application.yml配置文件中做如下配置,实现confirm回调接口,生产者发送消息时设置confirm回调)
  • 小结:事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。

3. MQ接收失败或者路由失败

生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:

  1. 消息找不到对应的Exchange。
  2. 找到了Exchange但是找不到对应的Queue。

这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。

我们既然要做可靠性,当然是设置为返回到客户端。


配置:

spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打开消息确认机制
publisher-confirm-type: correlated
# 打开消息返回
publisher-returns: true
template:
mandatory: true

我们只需要在配置里面打开消息返回即可,template.mandatory: true这一步不要少~

生产者:

public void sendAndReturn() {User user = new User();log.info("Message content : " + user);rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("被退回的消息为:{}", message);log.info("replyCode:{}", replyCode);log.info("replyText:{}", replyText);log.info("exchange:{}", exchange);log.info("routingKey:{}", routingKey);});rabbitTemplate.convertAndSend("fail",user);log.info("消息发送完毕。");
}

这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。

4. 消息入队之后MQ宕机

到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。

消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

@Bean
public DirectExchange directExchange() {// 三个构造参数:name durable autoDeletereturn new DirectExchange("directExchange", false, false);
}@Bean
public Queue erduo() {// 其三个参数:durable exclusive autoDelete// 一般只设置一下持久化即可return new Queue("erduo",true);
}

创建Exchange和队列时只要设置好持久化,发送的消息默认就是持久化消息。

设置持久化时一定要将Exchange和队列都设置上持久化:

单单只设置Exchange持久化,重启之后队列会丢失。单单只设置队列的持久化,重启之后Exchange会消失,既而消息也丢失,所以如果不两个一块设置持久化将毫无意义。

Tip: 这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。

5. 消费者无法正常消费

最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。

spring:rabbitmq:addresses: 127.0.0.1host: 5672username: guestpassword: guestvirtual-host: /# 手动确认消息listener:simple:acknowledge-mode: manual

打开手动消息确认(手动ACK)之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。

当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。

所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。

幂等性相关内容不在本章讨论范围~所以我就不多做阐述了。

6. 消息可靠性案例

这个例子中的消息是先入库的,然后生产者从DB里面拿到数据包装成消息发给MQ,经过消费者消费之后对DB数据的状态进行更改,然后重新入库。

这中间有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时任务不停的去刷库,找到有问题的数据将它重新扔到生产者那里进行重新投递。

这个方案其实和网上的很多方案大同小异,基础的可靠性保证之后,定时任务做一个兜底进行不断的扫描,力图100%可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/282410.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工厂投产、电池装车,广汽能上动力电池行业的“餐桌”吗?

文 | 智能相对论 作者 | 沈浪 “如果你不在餐桌上,你就会出现在菜单上。”在某种程度上,追逐效益的动力电池行业正在上演着布林肯的“餐桌菜单论”。 于是,我们可以看到,尽管整体的动力电池市场被宁德时代、比亚迪、LG新能源、…

AI基础知识(4)--贝叶斯分类器

1.什么是贝叶斯判定准则(Bayes decision rule)?什么是贝叶斯最优分类器(Bayes optimal classifier)? 贝叶斯判定准则:为最小化总体风险,只需在每个样本上选择那个能使条件风险最小的…

【Java常用API】正则表达式练习

🍬 博主介绍👨‍🎓 博主介绍:大家好,我是 hacker-routing ,很高兴认识大家~ ✨主攻领域:【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 🎉点赞➕评论➕收藏 …

c/c++整数和浮点数在内存中存储

了解变量的储存原理是我们灵活运用和防止数据截断改变带来的危害的有效途径。 那么我们从int char和float double两类来阐述内存的储存。 首先我们讲内存单位: 内存单位从小到大分别是bit byte KB MB GB TB PB。 bit是最小的内存单位,它可以存储一…

【嵌入式——QT】QWT应用

【嵌入式——QT】QWT应用 概述步骤一步骤二步骤三代码声明代码实现图示 概述 QWT,全称Qt Widgets for Technical Applications,是一个基于Qt开发的第三方库。它主要为具有技术背景的程序提供GUI组件和一组实用类,用于生成各种统计图。QWT的主…

科技助力高质量发展:新质生产力的崛起与企业数字化转型

引言 随着科技的飞速发展,我们正逐渐步入数字化智能时代,这个时代不仅为企业带来了无限的机遇,也让其面对前所未有的挑战。在这个快速变革的时代,企业必须不断调整自己的经营策略,适应数字化转型的浪潮,以…

时间减少90%以上!分布式系统的性能优化实战

1背景 分布式批量系统指的是采用分布式数据库架构,主体功能由批量程序实现的系统。分布式系统批量程序的性能测试,除了和联机交易性能测试一样关注服务器资源使用率是否合理、是否存在性能异常外,在测试执行阶段需要关注是否因数据分布不均衡…

初识二叉树

文章目录 一.什么是树二.什么是二叉树三.二叉树的访问次序四.特殊的二叉树五.求结点个数六.平衡二叉树总结 一.什么是树 树是由一个集合以及在该集合上定义的一种关系构成的。 集合中的元素称为树的节点,所定义的关系称为父子关系。 父子关系在树的节点之间建立了一…

RuoYi-Vue-Plus(基础知识点jackson、mybatisplus、redis)

一、JacksonConfig 全局序列化反序列化配置 1.1yml中配置 #时区 spring.jackson.time-zoneGMT8 #日期格式 spring.jackson.date-formatyyyy-MM-dd HH:mm:ss #默认转json的属性,这里设置为非空才转json spring.jackson.default-property-inclusionnon_null #设置属性…

JetPack之DataBinding基础使用

目录 一、简介二、使用2.1 使用环境2.2 xml文件绑定数据2.3 数据绑定的对象2.3.1 object2.3.2 ObseravbleField2.3.3 ObseravbleCollection 2.4 绑定数据 三、应用场景 一、简介 DataBinding是谷歌15年推出的library,DataBinding支持双向绑定,能大大减少绑定app逻辑…

python灾害应急救援平台flask-django-php-nodejs

灾害应急救援平台的目的是让使用者可以更方便的将人、设备和场景更立体的连接在一起。能让用户以更科幻的方式使用产品,体验高科技时代带给人们的方便,同时也能让用户体会到与以往常规产品不同的体验风格。 与安卓,iOS相比较起来,…

【Selenium(五)】

一、鼠标事件 from selenium import webdriver # 导入ActionChains类进行鼠标悬停操作 from selenium.webdriver.common.action_chains import ActionChains import time# 打开一个浏览器 # 法一、添加环境变量重启电脑 # 法二、填写浏览器驱动的绝对路径 driver webdriver.E…

功率半导体IGBT模块封装工艺

功率器件最近非常火热,作为新型功率半导体器件的主流器件,IGBT应用非常广泛,如家用电器、电动汽车、铁路、充电基础设施、充电桩,光伏、风能,工业制造、电机驱动,以及储能等领域。IGBT模块是新一代的功率半…

undo log

从这篇「执行一条 SQL 查询语句,期间发生了什么? (opens new window)」中,我们知道了一条查询语句经历的过程,这属于「读」一条记录的过程,如下图: 那么,执行一条 update 语句,期间发…

桌面显示器PD芯片:引领桌面显示技术的新篇章

随着科技的飞速发展,桌面显示器作为人们日常工作与生活中不可或缺的重要设备,其性能与品质也在不断提升。其中,PD芯片作为桌面显示器中的核心组件,发挥着至关重要的作用。本文将对桌面显示器PD芯片进行详细介绍,探讨其…

MD5源码(C语言描述)

本文介绍MD5源码(C语言描述)。 MD5(Message-Digest Algorithm 5),即消息摘要算法5,是一种被广泛使用的消息散列算法。散列算法的基础原理是:将数据(如一段文字)经过运算转换为一段固定长度&…

#Linux(VMwareTOOL安装)

(一)发行版:Ubuntu16.04.7 (二)记录: (1) (2)打开虚拟机然后安装,出现灰色可能是已经安装过但是自己没有找到 (3)删除VM…

流畅切换Linux的应用程序

流畅切换Linux的应用程序 流畅切换Linux的应用程序一.Linux启动一个程序在后台执行1. 使用nohup和&:2. 使用ctrlZ:3.使用screen:3.1 创建会话3.2 要重新连接到此会话:3.3 中途退出会话,但程序继续运行:3.4 结束一个…

STM32的简单介绍

STM32是一种基于ARM Cortex-M内核的32位微控制器,由意法半导体公司开发和生产。STM32具有丰富的外设和功能,适用于各种应用场合,如工业控制、消费电子、物联网、人机交互等。STM32的优势包括低功耗、高性能、高可靠性、易于开发等。STM32的系…

上海晋名室外暂存柜助力新技术皮革制品生产行业安全

本周上海晋名又有一台室外危化品暂存柜项目通过验收,此次项目主要用于新技术皮革制品生产行业油桶、化学品等物资的室外暂存安全。 用户单位创立于2004年,是一家从事新技术皮革制品加工、生产的外资企业。 上海晋名作为一家专注工业安全防护领域&#…