消息队列进阶-3.消息队列常见问题解决方案

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 确保消息可靠传递
    • 如何知道消息丢失
    • 确保消息可靠传递
  • 消息幂等:消息不被重复消费
    • 对业务幂等的理解
    • 消息投递的几种语义
    • 用幂等性解决重复消费问题
      • 利用数据库的唯一约束实现幂等
      • 为更新的数据设置前置条件
      • 记录并检查操作
  • 消息积压问题解决方案
    • 问题分析
    • 解决方案
    • 处理经验
    • 实战举例
  • 如何确保消息的顺序消费
    • 顺序消费的难点
    • 在这里插入图片描述
    • MQ对顺序消费的支持
    • 从业务角度保证顺序消费

确保消息可靠传递

如何知道消息丢失

在这里插入图片描述

解决思路:

利用消息队列的有序性来验证是否有消息丢失。 在消息生产端,给每个发出的消息都指定一个附加一个连续底层的版本号,然后在消费端检验序号的连续性。

落地方案:

利用拦截器机制,在Producer发送消息之前的拦截器中将序号注入到消息中,在Consumer收到消息的拦截器中检测序号的连续性。

细节问题:

不能保证在topic是严格顺序的,只能保证Queue/分区的消息是有序的,发消息的必须要指定的分区,在每个分区单独监测消息序号的连续性。

一般服务都是多实例进行部署,不好协调全局的Producer的发送顺序,每个Producer分别生成各自的消息序号,附加Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。

Consumer实例的数量最好和分区数量保持一致。

确保消息可靠传递

在这里插入图片描述

  • 生产阶段:

通过请求、确认机制来保证消息的可靠传递

  • 消息存储阶段:

如果对消息的可靠性要求非常高,通过调整Broker的参数避免因为服务器故障而丢失消息。

在RocketMQ中,可以将刷盘的方式 flushDiskType 配置为 SYCN_FLUSH 同步刷盘。

如果Broker是多个节点组成的集群,至少将消息发送到2个以上的节点,再给客户端发送确认响应。

  • 消费阶段:

客户端从MQ拉取消息后,执行用户的业务逻辑成功之后,再给MQ发送消费确认响应。

消息幂等:消息不被重复消费

应用的幂等是在分布式系统涉及时必须要考虑的一个方面,如果对幂等没有额外的考虑,那么在消息失败重新投递,或者远程服务超时重试时,可能会出现很多诡异的问题。

对业务幂等的理解

体现对于不满足幂等性的业务,在消费重复消费,会出现数据的不一致,导致业务数据错乱。

幂等数学上的概念,对一个函数(方法),使用相同的1参数,执行多次,获得的结果是一致的。

HTTP协议中有四个方法,GET/POST/PUT/DELETE,其中GET 和 DELLETE 是幂等的,而POST方法不是幂等的。

幂等的Update:

update order set status = 1 where id = 1001;

不符合幂等涉设计:

update order set price = price + 1 where id = 1001

消息投递的几种语义

为了进一步规范消息的调用,业界有许多消息队列的应用协议,其中也对消息投递标准做了一些约束。

在这里插入图片描述

  • At most once

消息在传递时,最多会被送达一次。消息可能会丢失,但是永远不会出现重复消息的问题。比如日志指标监控信息。

  • At least once

消息在传递时,至少会被送达一次。消息肯定不会丢,可能会出现重复消费。

绝大多数的应用中,都是使用At Least Once,MQ产品都支持该级别。

  • Exactly once

每条消息肯定会被传输一次且仅传输一次,并且保证送达,因为涉及发送端和生产端的各种协同机制,绝对的Exactly once级别很难实现的,通用的Exactly once方案几乎不可能存在。

用幂等性解决重复消费问题

如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次 和 消费多次对系统的影响时完全一样的。也就可以认为,消费多次等于消费一次。

从对系统的影响结果来说:

At least once + 幂等消费 = Exactly once

利用数据库的唯一约束实现幂等

举个例子来说明一下。在不考虑并发的情况下,将账号X的余额设置为100元,执行一次后对系统的影响时,账户X的余额变成了100元。只要提供的参数是100元不变,那即使再执行多少次,账户X的余额始终都是100元,不会变化,这个操作就是一个幂等的操作。

再举一个例子,将账户X的余额加100元,这个操作它就不是幂等的,每执行一次,账户余额就增加100元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。

可以限定,对于每个转账单每个账户只可以执行一次变更操作。转账流水表:转账单ID、账户ID、变更金额,联合主键(转账单ID、账户ID)

或者使用Redis的SETNS命令。

为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

将账户 X 的余额增加为 100 元,增加一个前置条件:如果账户X的余额是250,才执行将余额增加100操作。在消息中带上余额,如果余额和数据库中一致,才执行。

通用解决方案呢:给数据增加一个版本号属性,通常表现为在表中添加一个版本号的列,每次更新之前,比较当前数据的版本号和消息中的版本号是否一致,如果一致则更新,如果不一致就拒绝更新数据,更新数据的同时需要将版本号+1,实现幂等设计。

记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

  • 检查消费状态
  • 更新数据
  • 设置消费状态

如上的三个操作要保证原子性,才能实现幂等性。

如果不能保证原子性的话,可能会出现下面的问题。

消息的全局ID为250,操作:给ID为 38 的账号增加100元:

t0:消费者A收到消息,检查执行状态,发现消费未处理,开始执行 “ 账户增加100 ” 操作。

t1:消费者B收到消息,检查执行状态,发现消费未处理。

执行两次!

消息积压问题解决方案

问题分析

如果出现了积压,那一定是性能问题,想要解决消息从生产到消费上的性能问题,就首先要知道那些环节可能出现消息积压,然后再考虑如何解决。

  • 跟消息生产者没有关系
  • 跟消息队列本身没有关系
  • 消息消费者的消费能力不足引起的

解决方案

如果是突发问题,临时扩容,增加消费者的数量。通过扩容和降级承担流量,应急问题的处理。

其次,才是排查解决异常问题。监控、日志分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。

最后,如果消费端处理不足,水平扩容提升消费端并发处理能力。在扩容消费者实例的同事,必须要同步扩容Topic分区的数量,确保消费者的实例数和分区数是相同的,分区是单线程消费的。

在涉及系统的时候,一定要保障消费端的消费的性能要高于生产端生产的性能。

处理经验

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。

如何排查消息积压的原因?

如果赶上大促场景,扩容消费实例,如果服务器资源不足,系统降级:关闭一些不重要的业务,减少发送方的数据量,最低限度的去运行。

实战举例

在高并发的场景中,消息积压问题,可以说如影随形,真的没办法从根本上解决。表面上看,已经解决了,但后面不知道什么时候,就会冒出一次。

参考 《苏三说技术》所举的实际情况

有天下午,产品过来说:有几个商户投诉过来了,他们说菜品有延迟,快查一下原因。

这次问题出现得有点奇怪。

为什么这么说?

首先这个时间点就有点奇怪,平常出问题,不都是中午或者晚上用餐高峰期吗?怎么这次问题出现在下午?

根据以往积累的经验,我直接看了kafka的topic的数据,果然上面消息有积压,但这次每个partition都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。

我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午发生了什么事情没?他们说下午有个促销活动,跑了一个JOB批量更新过有些商户的订单信息。

这时,我一下子如梦初醒,是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢?实在太坑了。

虽说知道问题的原因了,倒是眼前积压的这十几万的消息该如何处理呢?

此时,如果直接调大partition数量是不行的,历史消息已经存储到4个固定的partition,只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。

直接加服务节点也不行,因为kafka允许同组的多个partition被一个consumer消费,但不允许一个partition被同组的多个consumer消费,可能会造成资源浪费。

因此,为了保证Kafka系统的稳定性和性能,不建议将同一个分区分配给多个消费者组。在实际应用中,可以根据实际需求和消费者组的数量,合理调整分区数量,以提高系统的并发处理能力和负载能力。

看来只有用多线程处理了。

为了紧急解决问题,我改成了用线程池处理消息,核心线程和最大线程数都配置成了50。

大致用法如下:

  • 先定义一个线程池:
@Configuration
public class ThreadPoolConfig {@Value("${thread.pool.corePoolSize:5}")private int corePoolSize;@Value("${thread.pool.maxPoolSize:10}")private int maxPoolSize;@Value("${thread.pool.queueCapacity:200}")private int queueCapacity;@Value("${thread.pool.keepAliveSeconds:30}")private int keepAliveSeconds;@Value("${thread.pool.threadNamePrefix:ASYNC_}")private String threadNamePrefix;@Bean("messageExecutor")public Executor messageExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);executor.setThreadNamePrefix(threadNamePrefix);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}
  • 再定义一个消息的consumer:
@Service
public class MyConsumerService {@Autowiredprivate Executor messageExecutor;@KafkaListener(id="test",topics={"topic-test"})public void listen(String message){System.out.println("收到消息:" + message);messageExecutor.submit(new MyWork(message);}
}
  • 在定义的Runable实现类中处理业务逻辑:
public class MyWork implements Runnable {private String message;public MyWork(String message) {this.message = message;}@Overridepublic void run() {System.out.println(message);}
}

果然,调整之后消息积压数量确实下降的非常快,大约半小时后,积压的消息就非常顺利的处理完了。

而对于RocketMQ来说:允许多个消费者同时消费一个队列。这种消费模式通常被称为“共享模式消费”或“广播模式消费”。

在共享模式下,多个消费者可以同时从同一个队列中接收消息。这对于需要水平扩展消费能力的场景非常有用,因为可以简单地增加消费者实例来提高整体的消费能力。每个消费者都会接收到队列中的所有消息的副本,但是每条消息只会被其中一个消费者处理。

在广播模式下,多个消费者也可以同时消费同一个队列,不同的是每个消费者都会独立地接收队列中的所有消息。这种模式适用于需要多个消费者独立处理同一份消息的场景,比如日志分析系统等。

如何确保消息的顺序消费

消息投递的顺序!

顺序消费的难点

在这里插入图片描述

MQ对顺序消费的支持

在这里插入图片描述

Kafka:在同一个分区中天然有序,如果是多分区可以通过定制的分发策略,将同一类消息分发到同一个分区中。比如订单场景,写入Kafka时通过订单ID进行分发,保证同一个订单ID的消息发送到同一个分区中。同一个订单下的消息1和消息2,如果1失败了,重发的时候会出现在消息2的后面。max.in.flight.request.per.connection 该参数可以控制客户端在等待响应之前可以发送的未确认请求的数量。

Rocket:在同一个Queue中保证有序性,如果把对应一个业务主键的消息都路由到同一个Queue中,可以实现消息的有序传输。

从业务角度保证顺序消费

  • 根据不同的业务场景,以发送端或者消费端时间戳为准
  • 每次消息发送时生成唯一递增的ID
  • 通过缓存时间戳的方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/206005.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2、用命令行编译Qt程序生成可执行文件exe

一、创建源文件 1、新建一个文件夹&#xff0c;并创建一个txt文件 2、重命名为main.cpp 3、在main.cpp中添加如下代码 #include <QApplication> #include <QDialog> #include <QLabel> int main(int argc, char *argv[]) { QApplication a(argc, argv); QDi…

老泮识趣:难忘何家桥

回忆何家桥往事&#xff0c;写了篇《消失的何家桥》&#xff0c;没想到点击率如此高&#xff0c;出乎意料。网友的共鸣可见&#xff0c;城市发展的今天&#xff0c;乡情是个美好的存在&#xff0c;清贫、朴实&#xff0c;丝毫不影响美感。由于大家的鼓励&#xff0c;触动了我再…

Android flutter项目 启动优化实战(一)使用benchmark分析项目

背景描述 启动时间是用户对应用的第一印象&#xff0c;较慢的加载会对用户的留存和互动造成负面影响 在刚上线的B端项目中&#xff1a; 1.提高启动速度能提高整体流程的效率 2.提高首次运行速度能提高应用推广的初体验效果 问题描述 项目刚上线没多久、目前存在冷启动过程存在…

4.6-容器的端口映射

首先&#xff0c;我们来拉取Nginx的image镜像。 docker pull nginx 接下来我们创建一个Nginx的容器。 docker run --name nginx -d nginx 但是&#xff0c;这样启动nginx容器的话我们没法访问。这个时候怎么办呢&#xff1f;就需要将Nginx这个服务暴露给外面的世界。 这时可以使…

linux磁盘已满,查看哪个文件占用多

使用df -h查看磁盘空间占用情况 使用sudo du -s -h /* | sort -nr命令查看那个目录占用空间大 然后那个目录占用多 再通过sudo du -s -h /var/* | sort -nr 一层层排查&#xff0c;找到占用文件多的地方 如果通过以上方法没有找到问题所在&#xff0c;那么可以使用 lsof |…

飞书全新版本搭载AI智能伙伴,支持用户自选底层大模型!

原创 | 文 BFT机器人 近日&#xff0c;字节跳动旗下飞书正式发布“飞书智能伙伴”系列AI产品。此次新产品有专属、易协作、有知识、有记忆、更主动等特点。除此之外&#xff0c;“飞书智能伙伴”作为一个开放的AI服务框架&#xff0c;各企业可根据业务场景自主选择适合的底层大…

UI自动化测试的正确姿势 —— Airtest设备连接API详解第一篇

一、背景 Airtest作为一款优秀的自动化测试工具&#xff0c;有着强大的API功能&#xff0c;处理日常自动化测试过程中需要的各类操作。今天就给大家逐一介绍关于设备连接和常用API部分&#xff0c;结合自动化测试中的各类需求&#xff0c;看看如何通过使用Airtest来快速实现。…

excel 计算断面水质等级

在工作中遇到根据水质监测结果要判断断面等级。写了下面的公式&#xff1a; 因子标准值 limits {COD: [15,15, 20, 15,20],氨氮: [0.15, 0.5, 1, 1.5, 2.0],总磷: [0.02, 0.1, 0.2, 0.3, 0.4] } excel公式&#xff1a; IFS(MAX(IF(M2>20,1,0), IF(N2>2,1,0), IF(O2&g…

CSS伪类伪元素?:hover,::before,::after使用(举例)

文章目录 什么是CSS伪类&#xff1f;什么是伪元素&#xff1f;怎么用伪元素&#xff1f;可以做些什么&#xff1f;::before&#xff0c;在标签选择器之前添加内容&#xff0c;::after正好与之相反::before&#xff0c;在类选择器之前添加内容&#xff08;:制作一个悬浮提示窗 参…

webpack如何处理css

一、准备工作 新建目录 添加样式 .word {color: red; } index.js添加dom元素&#xff0c;添加一个css word import ./css/index.css;const div document.createElement("div"); div.innerText "hello word!!!"; div.className "word"; do…

使用影刀指令+python实现简单的长文本乱序加密

本文意在利用影刀指令python代码&#xff0c;实现一种较为简单的长文本加密和解密&#xff0c;流程结构分为两步&#xff1a; 加密原理–是把字符转为列表&#xff0c;利用列表random模块中的shuffle函数做随机乱序。解密原理–是利用了列表的索引追踪&#xff0c;先前创建字典…

Android中使用Google Map

在app的使用过程中&#xff0c;我们经常会跟地图进行交互&#xff0c;如果是海外的应用&#xff0c;那选择使用Google Map 是最合适的选择。 在Android中如何使用Google Map&#xff0c;这里做一个简要的说明。 Google API_KEY的申请 Google Map 的使用并不是免费的&#xf…

解决plot画图中文乱码问题(macbook上 family ‘sans-serif‘ not found)

一、matplotlib画图中文乱码问题 使用matplotlib.pyplot画图&#xff0c;有中文字体会显示乱码问题&#xff0c;这时需要添加如下代码&#xff1a; import matplotlib.pyplot as pltplt.rcParams["font.sans-serif"] ["SimHei"]二、macbook没有SimHei的…

基于Java SSM框架实现KTV点歌系统项目【项目源码+论文说明】

基于java的SSM框架实现KTV点歌系统演示 摘要 本论文主要论述了如何使用JAVA语言开发一个KTV点歌系统&#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述KTV点歌系…

Mysql DDL语句建表及空字符串查询出0问题

DDL语句建表 语法&#xff1a; create table 指定要建立库的库名.新建表名 &#xff08;... 新建表的字段以及类型等 ...&#xff09;comment 表的作用注释 charset 表编译格式 row_format DYNAMIC create table dev_dxtiot.sys_url_permission (id integer …

阿里云崩溃了,为什么你没有收到补偿?【补偿领取方式放文末】

事情经过 北京时间11月27日&#xff0c;阿里云部分地域云数据库控制台访问出现异常。据悉&#xff0c;从当日09:16起&#xff0c;阿里云监控发现北京、上海、杭州、深圳、青岛、香港以及美东、美西地域的数据库产品(RDS、PolarDB、Redis等)的控制台和OpenAPI访问出现异常&…

【Hadoop】集群资源管理器 YARN

一、yarn 简介 Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.x 引入的分布式资源管理系统。主要用于解决 hadoop 1.x 架构中集群资源管理和数据计算耦合在一起&#xff0c;导致维护成本越来越高的问题。 yarn主要负责管理集群中的CPU和内存 用户可以将各种服…

用bat制作图片马——一句话木马

效果图 代码 ECHO OFF TITLE PtoR MODE con COLS55 LINES25 color 0A:main cls echo.当前时间&#xff1a;%date% %time% echo.欢迎使用图片马制作工具 echo.请确保图片和php在同一路径下 echo.echo 请将图像文件拖放到此窗口并按 Enter&#xff1a; set /p "imagefile&q…

C语言常见算法

算法&#xff08;Algorithm&#xff09;&#xff1a;计算机解题的基本思想方法和步骤。算法的描述&#xff1a;是对要解决一个问题或要完成一项任务所采取的方法和步骤的描述&#xff0c;包括需要什么数据&#xff08;输入什么数据、输出什么结果&#xff09;、采用什么结构、使…

飞翔的鸟小游戏

第一步是创建项目 项目名自拟 第二步创建个包名 来规范class 再创建一个包 来存储照片 如下 package game; import java.awt.*; import javax.swing.*; import javax.imageio.ImageIO;public class Bird {Image image;int x,y;int width,height;int size;double g;double t;…