面试官问:如何解决消息队列的延时以及过期失效问题?

消息的可靠性传输

ActiveMQ

要保证消息的可靠性,除了消息的持久化,还包括两个方面,一是生产者发送的消息可以被ActiveMQ收到,二是消费者收到了ActiveMQ发送的消息。

生产者

非持久化又不在事务中的消息,可能会有消息的丢失。为保证消息可以被ActiveMQ收到,我们应该采用事务消息或持久化消息。

消费者

对消息的确认有4种机制
1、AUTO_ACKNOWLEDGE = 1 自动确认
2、CLIENT_ACKNOWLEDGE = 2 客户端手动确认
3、DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
4、SESSION_TRANSACTED = 0 事务提交并确认
ACK_MODE描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。所以ack_mode描述的不是producer于broker之间的关系,而是customer于broker之间的关系。
对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer与Broker之间建立一种简单的“担保”机制.

AUTO_ACKNOWLEDGE

自动确认
“同步”(receive)方法返回message给消息时会立即确认。
在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),如果onMessage方法正常结束,消息将会正常确认。如果onMessage方法异常,将导致消费者要求ActiveMQ重发消息。

CLIENT_ACKNOWLEDGE

客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。
我们可以在当前消息处理成功之后,立即调用message.acknowledge()方法来"逐个"确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过需要自行权衡。

DUPS_OK_ACKNOWLEDGE

类似于AUTO_ACK确认机制,为自动批量确认而生,而且具有“延迟”确认的特点,ActiveMQ会根据内部算法,在收到一定数量的消息自动进行确认。在此模式下,可能会出现重复消息,什么时候?当consumer故障重启后,那些尚未ACK的消息会重新发送过来。

SESSION_TRANSACTED

当session使用事务时,就是使用此模式。当决定事务中的消息可以确认时,必须调用session.commit()方法,commit方法将会导致当前session的事务中所有消息立即被确认。在事务开始之后的任何时机调用rollback(),意味着当前事务的结束,事务中所有的消息都将被重发。当然在commit之前抛出异常,也会导致事务的rollback。

RabbitMQ

(1)生产者弄丢了数据

生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。此时可以选择用RabbitMQ提供的事务功能,就是生产者发送数据之前开启RabbitMQ事务(channel.txSelect),然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,RabbitMQ事务机制一搞,基本上吞吐量会下来,因为太耗性能。
所以一般来说,如果要确保RabbitMQ的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用confirm机制的。

(2)RabbitMQ弄丢了数据

就是RabbitMQ自己弄丢了数据,这个你必须开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。
设置持久化有两个步骤,第一个是创建queue和交换器的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化相关的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。
而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。
哪怕是你给RabbitMQ开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ中,但是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ挂了,就会导致内存里的一点点数据会丢失。

(3)消费端弄丢了数据

RabbitMQ如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。
这个时候得用RabbitMQ提供的ack机制,简单来说,就是你关闭RabbitMQ自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

Kafka

(1)消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
大家都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。但是此时确实还是会重复消费,比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
生产环境碰到的一个问题,就是说我们的kafka消费者消费到了数据之后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,然后消费者会自动提交offset。
然后此时我们重启了系统,就会导致内存queue里还没来得及处理的数据就丢失了

(2)kafka弄丢了数据

这块比较常见的一个场景,就是kafka某个broker宕机,然后重新选举partiton的leader时。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据啊。
所以此时一般是要求起码设置如下4个参数:
给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本。
在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。
在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。
在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

(3)生产者会不会弄丢数据

如果按照上述的思路设置了ack=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

消息的顺序性

从根本上说,异步消息是不应该有顺序依赖的。在MQ上估计是没法解决。要实现严格的顺序消息,简单且可行的办法就是:保证生产者 - MQServer - 消费者是一对一对一的关系。

ActiveMQ

1、通过高级特性consumer独有消费者(exclusive consumer)
queue = new ActiveMQQueue(“TEST.QUEUE?consumer.exclusive=true”);
consumer = session.createConsumer(queue);
当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者可以接收到消息。
在这里插入图片描述

独占消息就是在有多个消费者同时消费一个queue时,可以保证只有一个消费者可以消费消息,这样虽然保证了消息的顺序问题,不过也带来了一个问题,就是这个queue的所有消息将只会在这一个主消费者上消费,其他消费者将闲置,达不到负载均衡分配,而实际业务我们可能更多的是这样的场景,比如一个订单会发出一组顺序消息,我们只要求这一组消息是顺序消费的,而订单与订单之间又是可以并行消费的,不需要顺序,因为顺序也没有任何意义,有没有办法做到呢?可以利用activemq的另一个高级特性之messageGroup
2、利用Activemq的高级特性:messageGroups
Message Groups特性是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:Consumer被关闭。Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1
在这里插入图片描述

bytesMessage.setStringProperty(“JMSXGroupID”, “constact-20100000002”);
bytesMessage.setIntProperty(“JMSXGroupSeq”, -1);
如上图所示,同一个queue中,拥有相同JMSXGroupID的消息将发往同一个消费者,解决顺序问题,不同分组的消息又能被其他消费者并行消费,解决负载均衡的问题。

RabbitMQ

如果有顺序依赖的消息,要保证消息有一个hashKey,类似于数据库表分区的的分区key列。保证对同一个key的消息发送到相同的队列。A用户产生的消息(包括创建消息和删除消息)都按A的hashKey分发到同一个队列。只需要把强相关的两条消息基于相同的路由就行了,也就是说经过m1和m2的在路由表里的路由是一样的,那自然m1会优先于m2去投递。而且一个queue只对应一个consumer。

Kafka

一个topic,一个partition,一个consumer,内部单线程消费

如何解决消息队列的延时以及过期失效问题?

rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间,而又没有设置死信队列机制,就会被rabbitmq给清理掉,这个数据就没了。
ActiveMQ则通过更改配置,支持消息的定时发送。

有几百万消息持续积压几小时怎么解决?

发生了线上故障,几千万条数据在MQ里积压很久。是修复consumer的问题,让他恢复消费速度,然后等待几个小时消费完毕?这是个解决方案。不过有时候我们还会进行临时紧急扩容。
一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条。1000多万条,所以如果积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量。然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。
这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
等快速消费完积压数据之后,再恢复原先部署架构,重新用原先的consumer机器来消费消息。

print"hello world"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/426756.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

黑神话悟空+云技术,游戏新体验!

近期,一款名为黑神话悟空的游戏因其独特的艺术风格和创新的技术实现在玩家中产生了不小的影响。 而云桌面技术作为一种新兴的解决方案,正在改变人们的游戏体验方式,使得高性能游戏可以在更多设备上流畅运行。 那么,黑神话悟空如…

《深入理解 Java 线程池:高效管理线程的利器》

线程池 1. 什么是线程池? ​ 线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或…

滑动窗口(6)_找到字符串中所有字母异位词

个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 滑动窗口(6)_找到字符串中所有字母异位词 收录于专栏【经典算法练习】 本专栏旨在分享学习算法的一点学习笔记,欢迎大家在评论区交流讨论&#x1f4…

STM32 芯片启动过程

目录 一、前言二、STM32 的启动模式三、STM32 启动文件分析1、栈 Stack2、堆 Heap3、中断向量表 Vectors3.1 中断响应流程 4、复位程序 Reset_Handler5、中断服务函数6、用户堆栈初始化 四、STM32 启动流程分析1、初始化 SP、PC 及中断向量表2、设置系统时钟3、初始化堆栈并进入…

【Transformer深入学习】之一:Sinusoidal位置编码的精妙

看苏神的文章提到:Transformer原论文使用Sinusoidal位置编码,作为位置编码的一个显式解,Google 在原论文中对它的描述寥寥无几,只是简单提及了它可以表达相对位置信息,并未提及这个编码的合理性。 看了几篇文章&#x…

云计算实训50——Kubernetes基础命令、常用指令

一、Kubernetes 自动补齐 # 安装自动补齐软件 [rootmaster ~]# yum -y install bash-completion # 临时开启自动补齐功能 [rootmaster ~]# source # 永 久开启自动补齐功能 [rootmaster ~]# echo "source > ~/.bashrc 二、Kubernetes 基础命令 kubectl [command] …

【数据结构】数据结构系列学习笔记——导航篇

一:概述 数据结构是计算机科学中的核心概念之一,是优化算法性能和资源利用率的关键。在软件开发和数据处理中,选择合适的数据结构对于算法的效率至关重要。数据结构的选择通常基于数据的使用模式,包括数据元素之间的关系、数据的存…

【专题】2024中国生物医药出海现状与趋势蓝皮书报告合集PDF分享(附原数据表)

原文链接:https://tecdat.cn/?p37719 出海已成为中国医药产业实现提速扩容的重要途径。目前,中国医药产业发展态势良好,创新能力不断增强,然而也面临着医保政策改革和带量集采带来的压力。政府积极出台多项政策支持医药企业出海…

Vim编辑器常用命令

目录 一、命令模式快捷键 二、编辑/输入模式快捷键 三、编辑模式切换到命令模式 四、搜索命令 一、命令模式快捷键 二、编辑/输入模式快捷键 三、编辑模式切换到命令模式 四、搜索命令

我的AI工具箱Tauri版-VideoReapeat视频解说复述克隆

本教程基于自研的AI工具箱Tauri版进行VideoReapeat视频解说复述克隆。 视频解说复述克隆样片 《我的AI工具箱Tauri版-VideoReapeat视频解说复述克隆》样片 进入软件后可以直接搜索 VideoReapeat 或者依次点击 Python音频技术/视频tools 进入该模块。 该模块会消耗TTS文本转语…

【四范式】浅谈NLP发展的四个范式

自然语言处理(Natural Language Processing,NLP)是计算机科学,人工智能,语言学关于计算机和人类自然语言之间的相互作用的领域,是计算机科学领域与人工智能领域中的一个重要方向。NLP发展到今天已经进入到了…

kubernetes架构

kubernetes cluster由master和node组成,节点上运行着若干kubernetes服务Master节点: master是kubernetes cluster的大脑,运行着的Daemon服务包括kube-apiserver,kube-scheduler,kube-controller-manager,etcd和Pod网络…

Dify 中的讯飞星火平台工具源码分析

本文主要对 Dify 中的讯飞星火平台工具 spark 进行了源码分析,该工具可根据用户的输入生成图片,由讯飞星火提供图片生成 API。通过本文学习可自行实现将第三方 API 封装为 Dify 中工具的能力。 源码位置:dify-0.6.14\api\core\tools\provide…

出厂非澎湃OS手机解BL锁

脚本作者:酷安mlgmxyysd 脚本项目链接:https://github.com/MlgmXyysd/Xiaomi-HyperOS-BootLoader-Bypass/ 参考 B站作者:蓝空穹 https://www.bilibili.com/read/cv33210124/ 其他参考:云墨清风、水墨青竹、Magisk中文网 决定解BL…

django学习入门系列之第十点《A 案例: 员工管理系统10》

文章目录 12 管理员操作12.4 密码加密12.5 获取对象(防止id错误--编辑界面等)12.6 编辑管理员12.7 重置密码 往期回顾 12 管理员操作 12.4 密码加密 密码不应该以明文的方式直接存储到数据库,应该加密才放进去 定义一个md5的方法&#xff…

js | TypeError: Cannot read properties of null (reading ‘indexOf’) 【解决】

js | TypeError: Cannot read properties of null (reading ‘indexOf’) 【解决】 描述 概述 在前端开发中,遇到TypeError: Cannot read properties of null (reading indexOf)这类错误并不罕见。这个错误通常表明你试图在一个null值上调用indexOf方法&#xff0c…

飞睿智能UWB BLE Tag蓝牙防丢器模块,APP测距定位一键绑定,安全守护每一刻

我们总在不经意间与生活中的小物件擦肩而过——钥匙遗忘在咖啡厅的角落,钱包遗失在拥挤的地铁,甚至孩子的书包在人群中悄然消失……每一次的失而复得都是幸运的眷顾,但更多的是遗憾与不便。今天,就让我带你走进一个智能守护的新世…

Linux驱动开发 ——架构体系

只读存储器(ROM) 1.作用 这是一种非易失性存储器,用于永久存储数据和程序。与随机存取存储器(RAM)不同,ROM中的数据在断电后不会丢失,通常用于存储固件和系统启动程序。它的内容在制造时或通过…

【算法】遗传算法

一、引言 遗传算法(Genetic Algorithm, GA)是一种模拟生物进化过程的启发式搜索算法,它通过模拟自然选择、遗传、交叉和突变等生物学机制来优化问题的解决方案。遗传算法因其通用性、高效性和鲁棒性,在多个领域中得到了广泛应用&a…

esp32核心跑分程序

https://github.com/ochrin/coremark/tree/esp32 最近一直捣腾esp32s3 (Sense) 做微型摄像。过程中发现一款不错的跑分软件,特此记一笔。 其中针对esp32s3各类参数设定(用idf.py menuconfig),做个记录。 CPU Frequency去240MHz&#xff08…