【RabbitMQ】消息分发、事务

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制为手动确认prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBind")public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}}
@RestController
@RequestMapping("/qos")
public class QosController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void qosQueue() {for (int i = 0; i < 10; i++) {this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);System.out.println("第" + i + "次发送消息成功!");}}}
@Configuration
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void qosListener(String msg, Channel channel, Message message) throws IOException {System.out.println("接收的消息为:" + msg);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {@Bean("transactionRabbitTemplate")public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 开启事务return rabbitTemplate;}}
@Configuration
public class TransactionConfig {@Bean("transactionQueue")public Queue transactionQueue() {return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();}@Bean("transactionExchange")public Exchange transactionExchange() {return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();}@Bean("transactionQueueBind")public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,@Qualifier("transactionExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();}}
@RestController
@RequestMapping("/transaction")
public class TransactionController {@Resource(name = "transactionRabbitTemplate")private RabbitTemplate rabbitTemplate;@Transactional@RequestMappingpublic void transactionQueue() {System.out.println("发送成功");this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");int i = 1 / 0;this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");}}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/429143.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux网络编程5

24.9.21学习目录 一.TCP1.TCP流程2.TCP相关函数3.三次握手 一.TCP 1.TCP流程 服务器流程&#xff1a; 创建套接字socket&#xff08;&#xff09;将套接字与服务器网络信息结构体绑定bind&#xff08;&#xff09;将套接字设置为监听状态listen&#xff08;&#xff09;阻塞等…

进程间的通信4 共享内存

共享内存 1.共享内存简介 共享内存是将分配的物理空间直接映射到进程的用户虚拟地址空间中&#xff0c;减少数据在内核空间缓存共享内存是一种效率较高的进程间通讯的方式在 Linux 系统中通过 ipcs -m 查看所有的共享内存 共享内存模型图 2.共享内存的创建 1.函数头文件 #…

Java算法专栏

专栏导读 在当今这个技术日新月异的时代&#xff0c;Java算法作为软件开发的核心&#xff0c;对于提升程序性能和解决复杂问题至关重要。本“Java算法”专栏旨在帮助读者深入理解Java编程语言中的算法原理和应用&#xff0c;通过实战案例和深入分析&#xff0c;使读者能够掌握…

Java汽车销售管理

技术架构&#xff1a; springboot mybatis Mysql5.7 vue2 npm node 功能描述&#xff1a; 针对汽车销售提供客户信息、车辆信息、订单信息、销售人员管理、财务报表等功能&#xff0c;提供经理和销售两种角色进行管理 效果图&#xff1a;

Python基础学习(3)

目录 一&#xff0c;函数 1&#xff0c;函数的定义 2&#xff0c;函数的参数 1&#xff0c;默认值 2&#xff0c;传参 3&#xff0c;返回值 4&#xff0c;变量的作用域 5&#xff0c;函数的调用 二&#xff0c;常用数据结构 1&#xff0c;列表 列表的定义 列表的特性…

【Geoserver使用】REST API调用(工作空间部分)

文章目录 前言一、Geoserver REST API(GeoServer Workspace)二、GeoServer Workspace接口使用1.GET请求 /workspaces2.POST请求 /workspaces3.GET请求 /workspaces/{workspaceName}4.PUT /workspaces/{workspaceName}5.DELETE /workspaces/{workspaceName} 总结 前言 根据Geos…

C++ | Leetcode C++题解之第423题从英文中重建数字

题目&#xff1a; 题解&#xff1a; class Solution { public:string originalDigits(string s) {unordered_map<char, int> c;for (char ch: s) {c[ch];}vector<int> cnt(10);cnt[0] c[z];cnt[2] c[w];cnt[4] c[u];cnt[6] c[x];cnt[8] c[g];cnt[3] c[h] - …

YOLOv10 简介

YOLOv10&#xff0c;由清华大学的研究人员基于 Ultralytics Python 包构建&#xff0c;引入了一种全新的实时目标检测方法&#xff0c;该方法解决了以往 YOLO 版本中后处理和模型架构方面的不足。通过消除非极大值抑制&#xff08;NMS&#xff09;并优化各种模型组件&#xff0…

【解决】chrome 谷歌浏览器,鼠标点击任何区域都是 Input 输入框的状态,能看到输入的光标

chrome 谷歌浏览器&#xff0c;鼠标点击任何区域都是 Input 输入框的状态&#xff0c;能看到输入的光标 今天打开电脑的时候&#xff0c;网页中任何文本的地方&#xff0c;只要鼠标点击&#xff0c;就会出现一个输入的光标&#xff0c;无论在哪个站点哪个页面都是如此。 我知道…

十四、运算放大电路

运算放大电路 1、理想运算放大器的概念。运放的输入端虚拟短路、虚拟断路之间的区别; 2、反相输入方式的运放电路的主要用途&#xff0c;以及输入电压与输出电压信号的相位 3、同相输入方式下的增益表达式(输入阻抗、输出阻抗)

Redis-01 入门和十大数据类型

Redis支持两种持久化方式&#xff1a;RDB持久化和AOF持久化。 1.RDB持久化是将Redis的数据以快照的形式保存在磁盘上&#xff0c;可以手动触发或通过配置文件设置定时触发。RDB保存的是Redis在某个时间点上的数据快照&#xff0c;可以通过恢复RDB文件来恢复数据。 2.AOF持久化…

55. QTableWidget的基本使用

1. 说明 在软件界面开发中,基本上离不开数据的展示以供客户查看一些比较关注的信息,比如公司做一个员工个人信息管理系统,需要一个界面能够展示员工个人基本信息,实现这种效果可以采用多种形式,其中比较简单的一种是使用QT提供的QTableWidget控件,这个控件已经封装了一些…

LeetCode 面试经典150题 190.颠倒二进制位

复习知识&#xff1a;正数的原码、反码、补码相同&#xff0c;负数的反码在其原码的基础上, 符号位不变&#xff0c;其余各个位取反&#xff0c;负数的补码是在其原码的基础上, 符号位不变, 其余各位取反, 最后1 (即在反码的基础上1)。 题目&#xff1a;颠倒给定的 32 位无符号…

Springboot3 + MyBatis-Plus + MySql + Uniapp 商品加入购物车功能实现(最新教程附源码)

Springboot3 MyBatis-Plus MySql Uniapp 商品加入购物车功能实现&#xff08;针对上一篇sku&#xff09; 1、效果展示2、后端代码2.1 model2.2 mapper server serverImpl 参照上一篇自动生成2.3 controller 3、前端代码3.1 index.js3.2 shop-info.vue3.3 ShopBottomButton.v…

计算机毕业设计hadoop+spark+hive新能源汽车销售数据分析系统 二手车销量分析 新能源汽车推荐系统 可视化大屏 汽车爬虫 机器学习

《HadoopSparkHive新能源汽车销售数据分析系统》开题报告 一、选题背景与意义 1.1 选题背景 随着全球对环境保护意识的增强和能源结构的转型&#xff0c;新能源汽车市场迅速崛起。新能源汽车的销售数据不仅反映了市场趋势和消费者偏好&#xff0c;还为企业决策、政府监管和政…

【玉米田】

题目 代码 #include <bits/stdc.h> using namespace std; typedef long long LL;const int mod 1e8; const int M 1 << 12; LL f[13][M]; int g[13]; vector<int> state; vector<int> p[M]; int n, m; bool check(int x) {return !(x & x <&…

“一屏显江山”,激光显示重构「屏中世界」

【潮汐商业评论/原创】 2024年国庆期间&#xff0c;曾感动过无数国人的舞蹈诗剧《只此青绿》改编的同名电影即将上映&#xff0c;而这一次观众们不必走进电影院&#xff0c;在家里打开官方合作的海信激光电视也能享受到同等的视听效果&#xff0c;这是激光电视在观影场景领域的…

java 获取集合a比集合b多出来的对象元素

public class OrderListEntity {/*** deprecated 对象集合的处理* param aData 集合a* param bData 集合b* return 返回集合a比集合b多出来的部分, 通过id判断*/public static List<OrderListEntity> AHasMoreThanBData(List<OrderListEntity> aData, List<Ord…

Stable Diffusion 使用详解(11)--- 场景ICON制作

目录 背景 controlNet 整体描述 Canny Lineart Depth 实际使用 AI绘制需求 绘制过程 PS打底 场景模型选择 设置提示词及绘制参数 controlnet 设置 canny 边缘 depth 深度 lineart 线稿 效果 背景 这段时间不知道为啥小伙伴似乎喜欢制作很符合自己场景的ICON。…

鸿蒙开发(HarmonyOS)组件化浅谈

众所周知&#xff0c;现在组件化在移动开发中是很常见的&#xff0c;那么组件化有哪些好处&#xff1a; 1. 提高代码复用性&#xff1a;组件化允许将应用程序的不同功能模块化&#xff0c;使得这些模块可以在不同的项目中重复使用&#xff0c;从而提高开发效率并减少重复工作。…