RabbitMQ常见问题持续汇总

文章目录

  • 消息分发
    • 不公平分发
    • 限流-basic.qos
      • 主要功能
      • 使用场景
      • 示例代码
  • 消费者默认concurrency数量
    • prefetch和concurrency结合?
  • spring.rabbitmq.template.retry.enabled=true和spring.rabbitmq.listener.simple.retry.enabled=true有什么区别
    • 1. `spring.rabbitmq.template.retry.enabled=true`
    • 2. `spring.rabbitmq.listener.simple.retry.enabled=true`
    • 总结
  • 多机器(集群部署)同时消费某个队列的消息理解
    • 负载均衡
    • 高可用性
    • 和Fanout发布订阅对比理解
  • TODO--持续更新

这篇文章主要是汇总一些杂七杂八的问题,核心内容参考前三章节

RabbitMQ系列文章
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
不止于纸上谈兵,用代码案例分析如何确保RabbitMQ消息可靠性?
不止于方案,用代码示例讲解RabbitMQ顺序消费
RabbitMQ常见问题持续汇总

消息分发

不公平分发

RabbitMQ 默认分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1); 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

限流-basic.qos

通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

basic.qos 是 RabbitMQ 的一个方法,用于设置消息消费的流控(Quality of Service, QoS)。具体来说,它允许你限制在消费者确认(acknowledge)消息之前,RabbitMQ 能够推送给该消费者的消息数量。这样可以防止消费者因为处理不过来而被淹没在大量未处理的消息中。

 com.rabbitmq.client.Channel#basicQos(int, int, boolean)	/*** 请求对此通道应用特定的prefetchCount“服务质量”设置** @param prefetchCount 服务器将交付的最大消息数,如果不限制则为0* @throws java.io.IOException 如果遇到错误* * 注意: 该方法是基本服务质量机制的一部分,用于流量控制* 客户端可以通过设置prefetchCount来避免被服务器压垮*/
void basicQos(int prefetchCount) throws IOException;

主要功能

配置如下

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:concurrency: 1max-concurrency: 3# 消费者预取1条数据到内存,默认为250条# 消费端限流:每个消费者未确认的未处理消息的最大数量prefetch: 1  
  1. 设置预取计数(prefetch count):

    prefetch_countbasic_qos 方法中的一个参数,用于指定消费者可以接收的未确认消息的最大数量。在 RabbitMQ 中,prefetch_count 的值可以是 0 到 65535 之间的任意整数,其中 0 表示无限制 。RabbitMQ 允许为每个消费者独立设置 prefetch_count,而不是像 AMQP 0-9-1 协议中那样在通道级别共享 。

    • basic.qos 允许设置每个消费者在未确认的情况下能接收的最大消息数量。比如,basic.qos(1) 表示每个消费者在没有确认上一条消息之前不会再接收新的消息。这有助于实现公平调度,确保消费者不会被淹没。
  2. 限制消息分发:

    • 当有多个消费者时,basic.qos 通过限制每个消费者处理消息的数量,确保消息分发更均衡。消费者处理完消息并发送确认后,RabbitMQ 才会将新的消息发送给它。
  3. 避免消息堆积:

    • 通过合理设置 prefetch count,可以防止消费者端消息堆积,避免因为过多未处理的消息导致的内存消耗问题。

使用场景

  • 资源密集型任务: 如果你的消费者在处理某些需要消耗较多资源(如CPU、内存)的任务时,需要限制其一次处理的消息数量,以避免资源耗尽。
  • 分布式系统: 在分布式系统中,basic.qos 可以帮助实现更公平的负载均衡,确保每个消费者都能公平地获取消息处理。

示例代码

// 设置每个消费者最多处理3条未确认的消息
channel.basicQos(3);

通过上述设置,RabbitMQ 在每个消费者确认消息前只会推送3条消息,这样就能有效控制消息处理的并发量。

消费者默认concurrency数量

首先说一下concurrency配置,这个配置是设置listener初始化时的线程数,即消费者的数量,即消费者同时消费消息的数量。

那么如果没有显性设置concurrency时,默认的线程数是多少呢,答案是1。

具体的我们可以在源码org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer看到

在这里插入图片描述

prefetch和concurrency结合?

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 全局并发设置concurrency: 1  max-concurrency: 3# 消费者预取1条数据到内存,默认为250条# 消费端限流:每个消费者未确认的未处理消息的最大数量prefetch: 10  

若一个消费者配置prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个线程到内存中(注意不是两个线程去共享内存中抓取的消息)。

concurrency在注解里面可以配置,配置了以后以注解为准,yml里面相当于是全局的

@Component
public class MyConsumer {//会覆盖配置文件中的参数。@RabbitListener(queues = {"myQueue"},concurrency ="2")public void receiver(Message msg, Channel channel) throws InterruptedException {//...业务处理}}

spring.rabbitmq.template.retry.enabled=true和spring.rabbitmq.listener.simple.retry.enabled=true有什么区别

spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.listener.simple.retry.enabled=true 是 Spring AMQP 中配置 RabbitMQ 消息重试机制的两个不同选项,分别用于不同的场景。

1. spring.rabbitmq.template.retry.enabled=true

  • 作用范围: 这个配置用于通过 RabbitTemplate 进行消息发送时的重试机制。
  • 适用场景: 当你在使用 RabbitTemplate 主动发送消息到 RabbitMQ 时,如果消息发送失败(如网络问题、连接超时等),Spring 会自动进行重试。
  • 默认行为: 如果开启了这个选项,RabbitTemplate 会按照配置的重试策略(如重试次数、重试间隔等)自动重试消息发送操作,直到成功或者达到重试上限。
  • 常见使用: 适用于主动调用 RabbitTemplate.convertAndSend() 或类似方法进行消息发送的场景。

2. spring.rabbitmq.listener.simple.retry.enabled=true

  • 作用范围: 这个配置用于在使用消息监听器(Message Listener)时的重试机制,特别是在使用简单消息监听容器(SimpleMessageListenerContainer)时。
  • 适用场景: 当你使用 @RabbitListener 或其他监听机制从队列中接收和处理消息时,如果消息处理失败(如业务逻辑异常),Spring 会按照配置的重试策略自动进行重试。
  • 默认行为: 如果开启了这个选项,当消费者处理消息时抛出异常,Spring 会自动进行重试,直到处理成功或达到最大重试次数。
  • 常见使用: 适用于使用 @RabbitListener 注解来监听队列消息,并希望在处理失败时自动重试的场景。

总结

  • spring.rabbitmq.template.retry.enabled=true 是用于发送消息失败后的重试机制,针对的是消息的生产者(发送端)。
  • spring.rabbitmq.listener.simple.retry.enabled=true 是用于消费消息时处理失败后的重试机制,针对的是消息的消费者(监听端)。

多机器(集群部署)同时消费某个队列的消息理解

对于这个代码,我们的代码在实际业务中是集群部署的,不同的机器都可能去消费这个队列的消息!

public class MessageConsumer {@RabbitListener(queues = "queue1")public void receiveMessageFromQueue1(String message) {// 处理来自queue1的消息System.out.println("Received from queue1: " + message);}}

负载均衡

通过多个消费者实例共同消费同一个队列的消息,负载可以均匀分布在所有消费者上,提高整体系统的处理能力。当多个消费者监听同一个队列时,RabbitMQ 会按照轮询的方式将消息分发给这些消费者。每条消息只会被一个消费者消费,这样可以有效地分摊负载,提高系统的吞吐量和可靠性。

假设有两个消费者(Consumer A 和 Consumer B),都监听同一个队列 queue1。RabbitMQ 将会按如下方式分发消息:

  • 消息 1 发送给 Consumer A
  • 消息 2 发送给 Consumer B
  • 消息 3 发送给 Consumer A
  • 依此类推…

高可用性

如果某个消费者实例宕机,RabbitMQ 会自动将新的消息发送给其他存活的消费者,确保消息不会丢失,业务处理不中断。

和Fanout发布订阅对比理解

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

在这里插入图片描述

也就是说发送一条消息A,假设有队列1、队列2都订阅了这个消息A,那么这监听这两个队列的消费者都会去消费消息A,同时,在集群部署的情况下,对应监听队列1、2的消费者都会有多个,但是每次MQ都会采用负载均衡策略(默认轮询),这样每个队列对应实际只会有一个消费者去消费!

下面展示了消息A被两个队列订阅,并且每个队列有多个消费者,但在负载均衡策略下,每次只有一个消费者去消费消息A的情况。

在这里插入图片描述

如图所示:

  • 消息A被发送到两个队列:队列1和队列2。
  • 每个队列有多个消费者:队列1有消费者1、消费者2和消费者3,队列2有消费者4、消费者5和消费者6。
  • 在负载均衡策略下,每次只有一个消费者去消费消息A。例如,队列1的消息A可能会被消费者1、消费者2或消费者3中的一个消费,队列2的消息A可能会被消费者4、消费者5或消费者6中的一个消费。

TODO–持续更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/456239.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Appium环境搭建全流程(含软件)

目录 1.node.js的安装 1--中文下载地址 2--node.js设置镜像源 2.安装appium 1--全局安装appium 2--安装驱动 uiautomator2 3--查看插件的命令 4--安装 images插件 5--安装 execute-driver 插件 6--安装 relaxed-caps插件 7--安装 universal-xml 插件 3.python安装 1--python下…

Excel功能区变灰是什么原因造成?怎么解决?

Microsoft Excel是广泛使用的电子表格软件,但有时用户可能会遇到功能区变灰的问题,这可能导致功能无法使用,影响工作效率和用户体验。本文将深入探讨Excel功能区灰色的原因及解决方案,帮助读者快速解决这一问题。 一、Excel功能区…

Windows 使用命令行开启热点(无线网卡连接下)

winr 打开命令提示符框(管理员模式)在cmd中输入 netsh wlan show drivers 检查无线网卡是否已插入继续输入 ssid后面是wifi名称,key后面是密码,可以自行更改 netsh wlan set hostednetwork modeallow ssidWDW keyWDWWDWWDW 设置热…

开源模型应用落地-Qwen2.5-7B-Instruct与vllm实现推理加速的正确姿势-Gradio

一、前言 目前,Qwen模型已经升级到了2.5版本。无论是语言模型还是多模态模型,它们都是在大规模的多语言和多模态数据上进行预训练的,并通过高质量的数据进行后期微调,以更好地符合人类的需求。 Gradio作为一个强大的工具&#xff…

APP专项测试-冷启动-流量-电量-内存

1、响应时间 1.1怎么获取冷启动时间(热启动,就是后台不关后台再次打开) 方法一 1.2怎么获取包名 与 启动页 方法三soloPi:启动时间(用户角度出发,页面差异进行计算时间): 然后默认配置。点击开始录制 1开…

云计算行业应用实训室建设方案

一、引言 云计算作为信息技术领域的重要分支,正在深刻影响着各行各业的发展。随着云计算技术的不断成熟和应用领域的不断拓展,对云计算专业人才的需求日益增长。实训室作为培养学生实践能力和创新能力的重要场所,其建设对于提高教育质量和满…

模拟信号采集显示器+GPS同步信号发生器制作全过程(焊接、问题、代码、电路)

1、制作最小系统板 在制作最小系统板的时候,要用USB转TTL给板子供电,留了一个电源输入的四个接口,同时又用排针引出来VCC和GND用于后续其他外设的电源供应,电源配有电源指示灯和保护电容, 当时在焊接的时候把接口处的…

数学建模与优化算法:从基础理论到实际应用

数学建模和优化算法,它们不仅帮助我们理解和描述复杂系统的行为,还能找到系统性能最优化的解决方案。本文将从基础的数学理论出发,逐步深入到各种优化算法,并探讨它们在实际问题中的应用。 思维导图文件可获取:https:…

51单片机应用开发(进阶)---外部中断(按键+数码管显示0-F)

实现目标 1、巩固数码管、外部中断知识 2、具体实现:按键K4(INT1)每按一次,数码管从0依次递增显示至F,再按则循环显示。 一、共阳数码管 1.1 共阳数码管结构 1.2 共阳数码管码表 共阳不带小数点0-F段码为&#xff…

Python异常检测- DBSCAN

系列文章目录 Python异常检测- Isolation Forest(孤立森林) python异常检测 - 随机离群选择Stochastic Outlier Selection (SOS) python异常检测-局部异常因子(LOF)算法 文章目录 系列文章目录前言一、DBSCAN算法原理二、DBSCAN算…

【小白学机器学习16】 概率论的世界观2: 从正态分布去认识世界

目录 1 从正态分布说起 1.1 正态分布的定义 1.2 正态分布的名字 1.3 正态分布的广泛,和基础性 2 正态分布的公式和图形 2.1 正态分布 2.2 标准正态分布 3 正态分布的认识的3个层次 3.1 第1层次:个体的某个属性的样本值,服从正态分布…

《IDE 巧用法宝:使用技巧全解析与优质插件推荐》

在日常撸代码的时候,相信兄弟们在IDEA 中用到不少插件,利用插件,不仅可以提高工具效率,撸起代码来,也格外的娃哈哈…… 一、IntelliJ IDEA 作为一个资深 Java 程序员,除了 IDEA 中默认的插件,我…

重学SpringBoot3-Reactive-Streams规范

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-Reactive-Streams规范 1. 什么是 Reactive-Streams 规范?2. Reactive-Streams 的核心组件2.1 Publisher(发布者)2.2 …

飞睿智能超宽带UWB音频传输模块,超低延迟数据传输,实时音频声音更纯净

在信息爆炸的时代,音频传输技术正以未有的速度发展,创新我们进入一个全新的听觉世界。今天,我们要探讨的,就是这场技术创新中的一颗璀璨明星——飞睿智能超宽带(UWB)音频传输模块。它以其独特的优势&#x…

光谱指标-预测含水量-多种特征提取方式

目录 1 介绍1.1 变量投影重要性(VIP)分析1.2. 灰色关联度(GRA)分析1.3. 皮尔逊相关性分析1.4 总结 2 GRA灰色关联度3 皮尔逊(Person)相关性4 变量投影重要性(Variable importance in projection,VIP)分析5 机器学习 1 …

webpack 老项目升级记录:从 node-sass 限制的的 node v8 提升至支持 ^node v22

老项目简介 技术框架 vue 2.5.17webpack 4.16.5"webpack-cli": "3.1.0""node-sass": "^4.7.2" 几个阶段 第一步:vue2 升级到最新 第一步:升级 vue2 至最新版本,截止到目前(2024-10-…

戴维南,叠加,稳态笔记

一点点学习笔记,仅做个人复习使用 节点电压分清电流电压源,电流源才能写在右边,容易混淆 叠加定理仅适用于线性电路,且不能用于计算功率,主要是方向,要看源的方向判断等效之后的,受控源不参与除源&#x…

DMVPN协议

DMVPN(Dynamic Multipoint VPN)动态多点VPN 对于分公司和分总公司内网实现通信环境下,分公司是很多的。我们不可能每个分公司和总公司都挨个建立ipsec隧道 ,而且如果是分公司和分公司建立隧道,就会很麻烦。此时我们需…

iPhone当U盘使用的方法 - iTunes共享文件夹无法复制到电脑怎么办 - 如何100%写入读出

效果图 从iPhone复制文件夹到windows电脑 步骤windows 打开iTunes通过USB连接iPhone和电脑手机允许授权iTunes中点击手机图标,进入到点击左边“文件共享”,在右边随便选择一个App(随意...)写入U盘:拖动电脑的文件&am…

随机抽取学号

idea 配置 抽学号 浏览器 提交一个100 以内的整数。,后端接受后,根据提供的整数,产生 100 以内的 随机数,返回给浏览器? 前端:提供 随机数范围 ,病发送请求后端:处理随机数的产生&…