Rabbitmq消息积压问题如何解决以及如何进行限流

一、增加处理能力
优化系统架构、增加服务器资源、采用负载均衡等手段,以提高系统的处理能力和并发处理能力。通过增加服务器数量或者优化代码,确保系统能够及时处理所有的消息。

二、异步处理
将消息的处理过程设计为异步执行,即接收到消息立即返回响应,然后将消息放入队列中进行后续处理。这样可以避免同步请求的阻塞,提高系统的吞吐量和响应速度。

三、消息分片
如果消息体较大或者复杂,可以考虑将消息分片处理。将消息拆分为多个小的部分进行处理,减少单个消息的处理时间,从而提高整体处理能力。

四、集群扩展
根据实际情况,可以考虑通过添加更多的节点来扩展消息处理的集群规模,实现分布式部署和负载均衡,以应对大量消息的处理需求。

五、优化数据库操作
如果消息的处理涉及到数据库操作,可以考虑对数据库查询和写入进行性能优化,如建立索引、合理使用缓存等,以减少数据库的压力。

六、监控和报警
建立监控系统,实时监测消息队列的积压情况、处理延迟等指标,并设置相应的报警机制,及时发现和解决潜在问题,确保消息的正常处理。需要根据具体的场景和需求来选择适合的解决方案,综合考虑各种因素,以提高系统的消息处理能力和性能。

七、增加消费者数量
增加消费者可以提高消息处理速度,从而减少消息积压。可以根据消息的类型和优先级分配消费者,使消息得到及时处理。

八、增加队列
可以增加队列数量来缓解消息积压。根据消息的类型和优先级,可以将不同类型的消息存储在不同的队列中,更好的管理消息流量

九、设置消息的过期时间
可以设置消息的过期时间,当消息在队列中等待时间超过指定时间时,会被自动删除,直到消息被正确处理或超过最大重试次数为止。

十、使用限流机制
可以使用限流机制来控制消费者的消费速度,避免消息过多导致消费者无法及时处理。可以使用Qos机制,设置每个消费者同时处理消息的最大数量,从而保证系统的性能和稳定性。

如何进行限流了

电商中秒杀请求,属于瞬间大流量,同一时刻会有大量的请求涌入到系统中,可能导致系统挂掉。应付这种瞬间大流量的其中一种方式,便是利用消息队列

1、利用消息队列先进先出的特性,将请求进行削峰;
2、控制好消费端的消费速度,进行必要的限流。

在消费端,要做到上面提到的第2点,在Spring Boot RabbitMQ中只需要利用@RabbitListener注解,做一些简单配置就可以了。


一个listener对应多个consumer


默认情况一下,一个listener对应一个consumer,如果想对应多个,有两种方式。

方式一:直接在application.yml文件中配置

spring:rabbitmq:listener:simple:concurrency: 5max-concurrency: 10

这个是个全局配置,应用里的任何队列对应的listener都至少有5个consumer,但是千万别这么做,因为一般情况下,一个listener对应一个consumer是够用的。只是针对部分场景,才需要一对多。

方式二:直接在@RabbitListener上配置

@Component
public class SpringBootMsqConsumer {@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10")public void receive(Message message) {System.out.println("receive message:" + new String(message.getBody()));}
}

利用@RabbitListener中的concurrency属性进行指定就行。例如上面的

concurrency = “5-10”

就表示最小5个,最大10个consumer。启动消费端应用后,找到spring-boot-direct-queue这个队列的consumer,会发现有5个。

在这里插入图片描述

这5个消费者都可以从spring-boot-direct-queue这个队列中获取消息,加快队列中消息的消费速度,提高吞吐量。


限流


我们经过压测,来判断consumer的消费能力,如果单位时间内,consumer到达的消息太多,也可能把消费者压垮。
得到压测数据后,可以在@RabbitListener中配置prefetch count

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringBootMsqConsumer {@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10",containerFactory = "mqConsumerlistenerContainer")public void receive(Message message) {System.out.println("receive message:" + new String(message.getBody()));}
}

只需要在@RabbitListener中,用containerFactory指定一个监听器工厂类就行。这里用的是:

containerFactory = “mqConsumerlistenerContainer”

定义监听器工厂类很简单。

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Bean(name = "mqConsumerlistenerContainer")public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(50);return factory;}
}

上面有一句factory.setPrefetchCount(50);,就是用于设置prefetch count的,启动后,会在spring-boot-direct-queue队列的consumer中体现出来。
在这里插入图片描述
配置成功后,consumer单位时间内接收到消息就是50条。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/113980.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenGL-入门-BMP像素图glReadPixels(2)保存显示的界面

用glReadPixels保存显示的界面 #include <GL/glut.h> #include <iostream> #include <fstream> #include <vector>// Save pixel data as BMP image void saveBMP(const std::string& filename, int width, int height, const std::vector<GLu…

多项式求逆

已知 F F F&#xff0c;求 G G G 考虑倍增 F ( x ) ∗ H ( x ) ≡ 1 ( m o d x n / 2 ) F(x) * H(x) \equiv 1 \pmod{x^{n/2}} F(x)∗H(x)≡1(modxn/2) F ( x ) ∗ G ( x ) ≡ 1 ( m o d x n / 2 ) F(x) * G(x) \equiv 1 \pmod{x^{n/2}} F(x)∗G(x)≡1(modxn/2) 假设 H H…

Shell编程之流程控制

目录 if判断 case语句 for循环 while循环 if判断 语法&#xff1a; if [ 条件判断表达式 ] then 程序 elif [ 条件判断表达式 ] then 程序 else 程序 fi 注意&#xff1a; [ 条件判断表达式 ]&#xff0c;中括号和条件判断表达式之间必须有空格。if&#xff0c;elif…

自然语言处理(NLP)是什么?

NLP(自然语言处理) 和 Phoebe Liu 的简介 您有没有和聊天机器人互动过&#xff1f;或者您是否向虚拟助手&#xff0c;例如 Siri、Alexa 或您车上的车载娱乐系统发出过某些请求&#xff1f;您使用过在线翻译吗&#xff1f;我们大多数人都曾与这些人工智能 (AI) 互动过&#xff…

某人事系统架构搭建设计记录

首发博客地址 https://blog.zysicyj.top/ 先大致列一下基础情况 架构必须是微服务 场景上涉及大量查询操作&#xff0c;分析操作 存在临时大量写入的场景 并发并不高 对高可用要求较高&#xff0c;不能挂掉 对安全要求高 要能过等保测试等三方测试 使用人数并不多&#xff0c;十…

git rebase和merge区别

一、概述 merge和rebase 标题上的两个命令&#xff1a;merge和rebase都是用来合并分支的。 这里不解释rebase命令&#xff0c;以及两个命令的原理&#xff0c;详细解释参考这里。 下面的内容主要说的是两者在实际操作中的区别。 1.1 什么是分支 分支就是便于多人在同一项目…

基于流计算 Oceanus(Flink) CDC 做好数据集成场景

由于第一次做实时&#xff0c;所以踩坑比较多&#xff0c;见谅(测试环境用的flink),小公司没有用到hadoop组件 一、踩坑记录 1:本地代码的flink版本是flink1.15.4&#xff0c;生产环境是flink1.16.1&#xff0c;在使用侧输出流时报错&#xff0c;需要使用以下写法,需要使用Si…

如何为你的公司选择正确的AIGC解决方案?

如何为你的公司选择正确的AIGC解决方案&#xff1f; 摘要引言词汇解释&#xff08;详细版本&#xff09;详细介绍1. 确定需求2. 考虑技术能力3. 评估可行性4. 比较不同供应商 代码快及其注释注意事项知识总结 博主 默语带您 Go to New World. ✍ 个人主页—— 默语 的博客&…

适配ADRC自抗扰控制算法的MFP450-ADRC 套件焕新而来

关注 FMT 开源自驾仪的开发者可能知道&#xff0c;早在 2018 年 7 月 FMT开源自驾仪的早期版本就已经实现了 ADRC 算法。 经过几年的发展&#xff0c;FMT 在自抗扰控制算法的适配上做了进一步的优化&#xff0c;为了方便科研工作者和开发者快速上手&#xff0c;我们针对搭载 F…

计算机毕设 基于机器学习的餐厅销量预测 -大数据 python

文章目录 0 前言餐厅销量预测模型简介2.ARIMA模型介绍2.1自回归模型AR2.2移动平均模型MA2.3自回归移动平均模型ARMA 三、模型识别四、模型检验4.1半稳性检验(1)用途(1)什么是平稳序列?(2)检验平稳性 ◆白噪声检验(纯随机性检验)(1)用途(1)什么是纯随机序列?(2)检验纯随机性 五…

9. 解谜游戏

目录 题目 Description Input Notes 思路 暴力方法 递归法 注意事项 C代码&#xff08;递归法&#xff09; 关于DFS 题目 Description 小张是一个密室逃脱爱好者&#xff0c;在密室逃脱的游戏中&#xff0c;你需要解开一系列谜题最终拿到出门的密码。现在小张需要打…

海面漂浮物垃圾识别检测算法

海面漂浮物垃圾识别检测算法通过yolo系列网络框架模型算法&#xff0c;海面漂浮物垃圾识别检测算法一旦识别到海面的漂浮物垃圾&#xff0c;海面漂浮物垃圾识别检测算法立即发出预警信号。目标检测架构分为两种&#xff0c;一种是two-stage&#xff0c;一种是one-stage&#xf…

IDEA全局统一设置Maven

原来每次打开新建的项目都需要经过 File-> Settings 重新配置maven&#xff0c;这样很不爽 然而经过 File-> New Projects Setup -> Settings for New Projects 后&#xff0c;再如上图配置后就全局设置好了

C语言控制语句——循环语句

什么是循环 重复执行代码 为什么需要循环 循环的实现方式 whiledo…whilefo while语句 语法格式&#xff1a; while (条件) {循环体…… }需求&#xff1a;跑步5圈 示例代码&#xff1a; #include <stdio.h>int main() {// 需求跑步5圈// 1. 条件变量的定义int i 1;…

头歌MYSQL——课后作业6 函数

第1关&#xff1a;数值函数 任务描述 本关任务&#xff1a;对表达式取整 相关知识 四舍五入的函数 ROUND(X,D) 返回X&#xff0c;其值保留到小数点后D位&#xff0c;而第D位的保留方式为四舍五入。 若D的值为0,则对小数部分四舍五入。 若将D设为负值&#xff0c;保留X值小数…

研磨设计模式day14模板方法模式

目录 场景 原有逻辑 有何问题 解决方案 解决思路 代码实现 重写示例 模板方法的优缺点 模板方法的本质 何时选用 场景 现在模拟一个场景&#xff0c;两个人要登录一个系统&#xff0c;一个是管理员一个是用户&#xff0c;这两个不同身份的登录是由后端对应的两个接…

lnmp架构-PHP

08 PHP源码编译 09 php初始化配置 nginx 的并发能力强 phpinfo函数 就是 显示php信息 10 php的功能模块 编译memcache模块 php的动态模块方式 mamcache 就是内存 直接从内存中命中 所以性能非常好 但是 这还不是最好的方式 工作流程 关键看后端的 php 什么时候处理完 mamcac…

五、多表查询-3.4连接查询-联合查询union

一、概述 二、演示 【例】将薪资低于5000的员工&#xff0c;和 年龄大于50岁的 员工全部查询出来 1、查询薪资低于5000的员工 2、查询年龄大于50岁的员工 3、将薪资低于5000的员工&#xff0c;和 年龄大于50岁的 员工全部查询出来&#xff08;把上面两部分的结果集直接合并起…

Unity碰撞检测(3D和2D)

Unity碰撞检测3D和2D 前言准备材料3D2D 代码3D使用OnCollisionEnter()进行碰撞Collider状态代码 使用OnTriggerEnter()进行碰撞Collider状态代码 2D使用OnCollisionEnter2D()进行碰撞Collider2D状态代码 使用OnTriggerEnter2D()进行碰撞Collider2D状态代码 区别3D代码OnCollisi…

力扣141. 环形链表

141. 环形链表 简单 2K 相关企业 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链…