RabbitMQ实现多线程处理接收消息

前言:在使用@RabbitListener注解来指定消费方法的时候,默认情况是单线程去监听队列,但是这个如果在高并发的场景中会出现很多个任务,但是每次只消费一个消息,就会很缓慢。单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,这个就很伤。

处理办法:可以添加配置类,设置RabbitMQ的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

一、编写配置类

package com.quick.config;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;/*** RabbitMQ配置类*/
@Configuration
/*@ConditionalOnClass(RabbitTemplate.class) //有RabbitTemplate依赖才会生效,否则不生效*/
public class MqConfig {// 定义线程数、最大线程数常量private static final int INITIAL_CONCURRENT_CONSUMERS = 10;private static final int MAX_CONCURRENT_CONSUMERS = 10;/*** 将多线程配置配置注入容器工厂*/@Bean("customContainerFactory")public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConcurrentConsumers(INITIAL_CONCURRENT_CONSUMERS); //设置线程数factory.setMaxConcurrentConsumers(MAX_CONCURRENT_CONSUMERS); //最大线程数configurer.configure(factory, connectionFactory);return factory;}}

  • setConcurrentConsumers(int concurrentConsumers): 这个方法设置了容器应该同时启动的监听器(消费者)线程的数量。这些线程会并发地从RabbitMQ队列中拉取并处理消息。这个值决定了系统初始时能够并行处理消息的能力。

  • setMaxConcurrentConsumers(int maxConcurrentConsumers): 这个方法设置了容器在需要时可以增加到的最大并发消费者数量。这通常用于处理负载高峰,当队列中的消息积压时,可以动态地增加并发消费者数量以提高处理速度。然而,请注意,这并不意味着系统会立即创建所有最大数量的线程,而是会根据需要逐渐增加到这个上限。

这个容器负责监听 RabbitMQ 的队列,并将接收到的消息分发给相应的处理器(即 @RabbitListener 注解标记的方法)

二、修改监听者

在接收消息方里面的@RabbitListener注解中添加配置

@RabbitListener(queues = {"监听队列名"},containerFactory = "customContainerFactory")


/*** 接收消息*/
@Component
public class StoreListener {@Resourceprivate IStoreService storeService;@Resourceprivate StoreMapper storeMapper;/*** 更新店铺收藏人数,实现收藏人数+1* @param storeId 店铺id*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "store.addFavorite.success.queue", durable = "true"), // 队列 起名规则(服务名+业务名+成功+队列),durable持久化exchange = @Exchange(name = "addFavorite.direct"), // 交换机名称,交换机默认类型就行direct,所以不用配置directkey = "addFavorite.success" // 绑定的key),// 在@RabbitListener注解中指定容器工厂containerFactory = "customContainerFactory")public void listenAddFavoriteCountsSuccess(Long storeId){storeService.updateStoreFavoriteUsersCountAdd1(storeId);}/*** 根据传过来的店铺实体类修改店铺信息* @param store 店铺实体类*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "store.updateStore.success.queue", durable = "true"),exchange = @Exchange(name = "updateStore.direct"),key = "updateStore.success"),// 在@RabbitListener注解中指定容器工厂containerFactory = "customContainerFactory")public void updateStoreByEntity(Store store){storeMapper.updateById(store);}}

多线程的好处:

1、提高吞吐量:通过并行处理消息,系统可以更快地处理大量消息,从而提高整体吞吐量。

2、更好的资源利用率:在多核处理器上,多线程可以更好地利用硬件资源,减少处理延迟。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/403318.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习(YOLO、DETR) 十折交叉验证

二:交叉验证 在 K 折验证之前最常用的验证方法就是交叉验证,即把数据划分为训练集、验证集和测试集。一般的划分比例为 7:1:2。但如何合理的抽取样本就成为了使用交叉验证的难点,不同的抽取方法会导致截然不同的训练性…

c语言学习,malloc()函数分析

1:malloc() 函数说明: 申请配置size大小内存空间 2:函数原型: void *malloc(size_t size) 3:函数参数: 参数size,为申请内存大小 4:返回值: 配置成功则返回指针&#…

Nginx实验

编译安装 Nginx 准备rhel9环境 下载安装包nginx-1.24.0(xftp)/复制下载链接 (nginx.org——>download) 解压 [rootnginx nginx-1.24.0]# tar zxf nginx-1.24.0.tar.gz [rootnginx nginx-1.24.0]#tar zxf nginx-1.24.0.tar.…

[掘金社区]自动签到脚本

直接上脚本 脚本临时写的,今天是运行的第一天,虽然报错编码,但是签到、抽奖都成功了。 下面是修改了之后的版本。 # -*- coding: utf-8 -*- import requests import logginglogging.basicConfig(levellogging.INFO)def check_sign_in_status(base_url,h…

Clobbering DOM attributes to bypass HTML filters

目录 寻找注入点 代码分析 payload构造 注入结果 寻找注入点 DOM破坏肯定是出现在js文件中,我们首先来看源码 /resources/labheader/js/labHeader.js这个源码没什么问题我们重点关注在下面两个源码上 /resources/js/loadCommentsWithHtmlJanitor.js这个源码中重…

STM32cubeMX配置Systick的bug

STM32cubeMX版本:6.11.0 现象 STM32cubeMX配置Systick的时钟,不管选择不分频 还是8分频。 生成的代码都是一样的,代码都是不分频。 即不管选择不分频还是8分频,Systick都是使用的系统时钟 函数调用 HAL_Init() → HAL_Init…

HarmonyOS开发案例:列表场景实例-TaskPool

介绍 本实例通过列表场景实例讲解,介绍在TaskPool线程中操作关系型数据库的方法,涵盖单条插入、批量插入、删除和查询操作。 效果图预览 使用说明 进入页面有insert(单条数据插入)、batch insert(批量数据插入)、query(查询操作)三个按钮,…

【安卓】播放多媒体文件

文章目录 播放音频播放视频 播放音频 在Android中播放音频文件一般是使用MediaPlayer类实现的,它对多种格式的音频文件提供了非常全面的控制方法,从而使播放音乐的工作变得十分简单。 MediaPlayer类中常用的控制方法。 常用方法名描述setDataSource()设…

使用html+css+js实现完整的登录注册页面

在这篇博客中,我们将讨论如何使用简单的 HTML 和 CSS 构建一个登录与注册页面。这个页面包含两个主要部分:登录界面和注册界面。我们还会展示如何通过 JavaScript 切换这两个部分的显示状态。 页面结构 我们将创建一个页面,其中包含两个主要…

CSS3-新特性

1.新增选择器 1.属性选择器 2.结构伪类选择器 3.伪元素选择器(重点) 4.CSS3 盒子模型 2.CSS3滤镜filter 3.CSS3 calc 函数 4.CSS3 过渡(重点)

95后医疗行业女性转型记:如何成功踏入人工智能项目管理领域

分享目录 一、自我介绍,给大家分享一下拿到offer的心情吧 二、在整个求职转型陪跑营里,你收获最大的三个点是什么? 三、求职转行过程中,你遇到了哪些困难?七芊老师和强哥是怎么帮助你的?你是怎么走过来的…

seata的使用(SpringBoot项目整合seata)

文章目录 1、解压 seata-server-1.7.1.zip2、启动 双击 seata-server.bat3、启动 seata 控制台用户界面4、所有分布式事务相关数据库要有undo-log5、项目引入seata依赖6、项目添加seata配置7、代码实现: 1、解压 seata-server-1.7.1.zip 2、启动 双击 seata-server.…

Polars简明基础教程十二:可视化(二)

设置绘图后端 我们可以使用 hv.extension 更改绘图后端。但是,我们不在此处运行此单元格,因为它会导致下面的 Matplotlib/Seaborn 图表无法渲染。 注释: hvPlot 利用 HoloViews 库来构建图表,并且可以使用多个后端进行渲染&…

微信小程序骨架屏

骨架屏是常用的一种优化方案,针对于页面还未加载完时给用户的一种反馈方式。如果自己要写骨架屏有点复杂因为页面的元素过多且不稳定,这边直接使用微信开发工具生成骨架屏。也不只有微信开发工具有像常用的抖音开发工具,字节开发工具都有对应…

使用MicroApp重构旧项目

前言 随着技术的飞速发展,我们公司内部一个基于“上古神器” jQuery PHP 构建的十年历史老项目已显力不从心,技术非常老旧且维护成本高昂,其实已经无数次想要重构,但是苦于历史遗留原因以及业务的稳定性而一直难以下手&#xff0…

Tomcat的核心文件讲解

参考视频&#xff1a;对应视频 server.xml中的以下部分可修改&#xff1a; 1.connector标签里的port可以修改。--修改端口号 Tomcat默认端口号&#xff1a;8080 <Connector connectionTimeout"20000" maxParameterCount"1000"port"8080" prot…

调研-音视频

音视频 基础概念主要内容音频基础概念音频量化过程音频压缩技术视频基础概念视频bug视频编码H264视频像素格式YUVRGB参考文献基础概念 ● 实时音视频应用环节 ○ 采集、编码、前后处理、传输、解码、缓冲、渲染等很多环节。 主要内容 音频 基础概念 三要素:音调(音频)、…

算法的学习笔记—链表中倒数第 K 个结点(牛客JZ22)

&#x1f600;前言 在编程过程中&#xff0c;链表是一种常见的数据结构&#xff0c;它能够高效地进行插入和删除操作。然而&#xff0c;遍历链表并找到特定节点是一个典型的挑战&#xff0c;尤其是当我们需要找到链表中倒数第 K 个节点时。本文将详细介绍如何使用双指针技术来解…

8.16 day bug

bug1 题目没看仔细 额外知识 在 Bash shell 中&#xff0c;! 符号用于历史扩展功能。当你在命令行中输入 ! 后跟一些文本时&#xff0c;Bash 会尝试从你的命令历史中查找与该文本相匹配的命令。这是一种快速重用之前执行过的命令的方法。 如何使用历史扩展 基本用法: !strin…

利用亚马逊云科技Bedrock和LangChain开发AI驱动数据分析平台

项目简介&#xff1a; 小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。 本次介绍的是如何在亚马逊云科技上SageMak…