RabbitMQ 实现分组消费满足服务器集群部署

实现思路

  1. 使用扇出交换机(Fanout Exchange):扇出交换机会将消息广播到所有绑定的队列,确保每个消费者组都能接收到相同的消息。
  2. 为每个消费者组创建独立的队列:每个消费者组拥有自己的队列,所有属于该组的消费者都订阅这个队列。
  3. 确保同一组内的消费者竞争消费:RabbitMQ 会将消息推送给组内第一个可用的消费者,确保同一消息不会被同一组中的多个消费者处理。

示例场景

假设我们有两个消费者组:GroupAGroupB。我们希望发送的消息能够同时被 GroupAGroupB 消费,但每个组内的多个消费者只会有一个成员消费该消息。

Spring Boot + RabbitMQ 广播式分组消费示例

1. 引入依赖

首先,在 pom.xml 中添加 RabbitMQ 和 Spring AMQP 的依赖:

<dependencies><!-- Spring Boot Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 其他依赖... -->
</dependencies>
2. 配置 RabbitMQ 交换机和队列

application.yml 中配置 RabbitMQ 的交换机、队列和绑定关系。使用 fanout 交换机,并为每个消费者组创建一个独立的队列。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# RabbitMQ 配置rabbitmq:listener:simple:acknowledge-mode: manual  # 手动确认消息concurrency: 5           # 每个队列的并发消费者数量max-concurrency: 10      # 最大并发消费者数量# 自定义队列和交换机配置rabbitmq:queues:group-a-queue:name: group_a_queuedurable: truegroup-b-queue:name: group_b_queuedurable: trueexchanges:fanout-exchange:name: fanout_exchangetype: fanoutdurable: truebindings:group-a-binding:exchange: fanout_exchangequeue: group_a_queuegroup-b-binding:exchange: fanout_exchangequeue: group_b_queue
3. 创建 RabbitMQ 配置类

创建一个配置类来声明交换机、队列和绑定关系。Spring AMQP 会自动根据配置创建这些资源。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout_exchange", true, false);}// 定义 Group A 队列@Beanpublic Queue groupAQueue() {return new Queue("group_a_queue", true);}// 定义 Group B 队列@Beanpublic Queue groupBQueue() {return new Queue("group_b_queue", true);}// 绑定 Group A 队列到扇出交换机@Beanpublic Binding groupABinding(Queue groupAQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupAQueue).to(fanoutExchange);}// 绑定 Group B 队列到扇出交换机@Beanpublic Binding groupBBinding(Queue groupBQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(groupBQueue).to(fanoutExchange);}
}
4. 发送消息

创建一个服务类来发送消息到扇出交换机。由于 fanout 交换机不使用路由键,它会将消息广播到所有绑定的队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic MessageProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}// 发送消息到扇出交换机public void sendMessage(String message) {System.out.println("Sending message: " + message);rabbitTemplate.convertAndSend("fanout_exchange", "", message);}
}
5. 创建消费者

为每个消费者组创建单独的消费者类。每个消费者组内的多个消费者会竞争性地从队列中消费消息。使用 @RabbitListener 注解来监听队列中的消息。

Group A 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupAConsumer {@RabbitListener(queues = "group_a_queue")public void consumeMessage(String message) {System.out.println("Group A consumer received: " + message);// 处理 Group A 的逻辑}
}
Group B 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class GroupBConsumer {@RabbitListener(queues = "group_b_queue")public void consumeMessage(String message) {System.out.println("Group B consumer received: " + message);// 处理 Group B 的逻辑}
}
6. 启动应用程序并测试

启动 Spring Boot 应用程序后,GroupAConsumerGroupBConsumer 会分别监听 group_a_queuegroup_b_queue。通过以下方式测试消息的发送和消费:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class TestRunner implements CommandLineRunner {private final MessageProducer messageProducer;@Autowiredpublic TestRunner(MessageProducer messageProducer) {this.messageProducer = messageProducer;}@Overridepublic void run(String... args) throws Exception {// 发送一条消息messageProducer.sendMessage("This is a broadcast message");}
}

7. 运行结果
当你启动应用程序时,TestRunner 会发送一条消息到 fanout_exchange。由于 fanout 交换机会将消息广播到所有绑定的队列,因此 group_a_queue 和 group_b_queue 都会接收到这条消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/487024.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件项目标书参考,合同拟制,开发合同制定,开发协议,标书整体技术方案,实施方案,通用套用方案,业务流程,技术架构,数据库架构全资料下载(原件)

1、终止合同协议书 2、项目合作协议 3、合同交底纪要 4、合同管理台账 软件资料清单列表部分文档清单&#xff1a;工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明书&#xff0c;需求调研计划&#xff0c;用户需求调查单&…

PADS系列:绘制RTL8306原理图的过程

大家好&#xff0c;我是山羊君Goat。 在所有相关的元件都被创建到了原理图库之后&#xff0c;就可以正式开始原理图的绘制了。不过绘制过程中也是会按照一定的顺序来进行的&#xff0c;这样可以达到事半功倍的效果。 首先就是主芯片的放置&#xff0c;这里有三个主芯片&#x…

c++:多态性

c三大特性&#xff1a;封装、继承、多态&#xff0c;其实这三者是相互联系的&#xff0c;相互包含的 1.概念 1.1编译时多态性&#xff08;静态多态性&#xff09; 主要通过函数重载和运算符重载来实现在编译时&#xff0c;编译器根据函数调用的参数类型或运算符的操作数类型来…

(ICML-2024)DoRA:权重分解低秩自适应

DoRA&#xff1a;权重分解低秩自适应 Paper是英伟达发表在ICML 2024的工作 Paper Title&#xff1a;DoRA: Weight-Decomposed Low-Rank Adaptation Code&#xff1a; 地址 Abstract 在广泛使用的参数高效微调 (PEFT) 方法中&#xff0c;LoRA 及其变体因避免了额外的推理成本而…

微信小程序从后端获取的图片,展示的时候上下没有完全拼接,有缝隙【已解决】

文章目录 1、index.wxml2、index.js3、detail.detail为什么 .rich-text-style 样式可以生效&#xff1f;1. <rich-text> 组件的特殊性2. 类选择器的作用范围3. 样式优先级4. line-height: 0 的作用5. 为什么直接使用 rich-text 选择器无效&#xff1f; 总结 上下两张图片…

基于单片机的空调温度控制器设计

摘 要 随着国民经济的发展和人民生活水平的提高&#xff0c;空调已被广泛应用于社会的各种场合。空调因具有节能、低噪、恒温控制、全天候运转、启动低频补偿、快速达到设定温度等性能&#xff0c;大大提高了其舒适性&#xff0c;得到越来越多的人们的喜爱。单片机和数字温度传…

ElasticSearch常见的索引_集群的备份与恢复方案

方案一&#xff1a;使用Elasticsearch的快照和恢复功能进行备份和恢复。该方案适用于集群整体备份与迁移&#xff0c;包括全量、增量备份和恢复。 方案二&#xff1a;通过reindex操作在集群内或跨集群同步数据。该方案适用于相同集群但不同索引层面的迁移&#xff0c;或者跨集…

【计算机网络】实验12:网际控制报文协议ICMP的应用

实验12 网际控制报文协议ICMP的应用 一、实验目的 验证ping命令和tracert命令的工作原理。 二、实验环境 Cisco Packet Tracer模拟器 三、实验过程 1.构建网络拓扑并进行信息标注&#xff0c;将所需要配置的IP地址写在对应的主机或者路由器旁边&#xff0c;如图1所示。 图…

Redis安装和Python练习(Windows11 + Python3.X + Pycharm社区版)

环境 Windows11 Python3.X Pycharm社区版 思路 1 github下载redis压缩包 &#xff0c;安装并启动redis服务&#xff0c;在客户端连接redis服务。 2 在pycharm中运行python程序&#xff0c;连接redis服务&#xff0c;熟悉redis的使用和巩固python语言。 3 python开发环境…

8.解决跨域问题的三种方案

开启域名&#xff0c;单点登录后&#xff0c;就使用最上面的接口了

【机器学习】基础知识:拟合度(Goodness of Fit)

拟合度概念及意义 拟合度&#xff08;Goodness of Fit&#xff09;是衡量统计模型对数据解释能力的指标&#xff0c;用于评价模型对观测数据的拟合效果。在回归分析、分类模型或其他预测模型中&#xff0c;拟合度是模型性能的重要衡量标准。 1. 拟合度的作用 拟合度的主要作用…

【Elasticsearch】实现用户行为分析

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

初识Linux · 线程同步

目录 前言&#xff1a; 认识条件变量 认识接口 快速使用接口 生产消费模型 前言&#xff1a; 前文我们介绍了线程互斥&#xff0c;线程互斥是为了防止多个线程对临界资源访问的时候出现了对一个变量同时操作的情况&#xff0c;对于线程互斥来说&#xff0c;我们使用到了锁…

使用 LlamaFactory 结合开源大语言模型实现文本分类:从数据集构建到 LoRA 微调与推理评估

文章目录 背景介绍文本分类数据集Lora 微调模型部署与推理期待模型的输出结果 文本分类评估代码 背景介绍 本文将一步一步地&#xff0c;介绍如何使用llamafactory框架利用开源大语言模型完成文本分类的实验&#xff0c;以 LoRA微调 qwen/Qwen2.5-7B-Instruct 为例。 文本分类…

【已解决】MacOS上VMware Fusion虚拟机打不开的解决方法

在使用VMware Fusion时&#xff0c;不少用户可能会遇到虚拟机无法打开的问题。本文将为大家提供一个简单有效的解决方法&#xff0c;只需删除一个文件&#xff0c;即可轻松解决这一问题。 一、问题现象 在MacOS系统上&#xff0c;使用VMware Fusion运行虚拟机时&#xff0c;有…

【教程】创建NVIDIA Docker共享使用主机的GPU

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 这套是我跑完整理的。直接上干货&#xff0c;复制粘贴即可&#xff01; # 先安装toolkit sudo apt-get update sudo apt-get install -y ca-certifica…

设备CTA进网许可认证有哪些值得注意的测试内容?

设备进网许可认证的测试项目与测试内容有哪些?在CTA进网认证过程中是否存在需要注意的地方?本篇是英利检测针对这两点给大家进行的资料整理&#xff0c;帮助大家更进一步了解项目难点所在。 一、电磁兼容测试(EMC测试) 电磁兼容测试旨在评估设备在电磁环境中的表现&#xff0…

flex布局容易忽略的角色作用

目录 清除浮动 作用于行内元素 flex-basis宽度 案例一&#xff1a; 案例二&#xff1a; 案例三&#xff1a; flex-grow设置权重 案例一&#xff1a; 案例二&#xff1a; 简写flex-grow:1 0 auto; 目录 清除浮动 作用于行内元素 flex-basis宽度 案例一&#xff1a…

vue自定义弹窗点击除了自己区域外关闭弹窗

这里使用到vue的自定义指令 <div class"item" v-clickoutside"clickoutside1"><div click"opencity" class"text":style"{ color: popup.iscitypop || okcitylist.length ! 0 ? #FF9500 : #000000 }">选择地区…

旧衣物回收小程序搭建,便捷回收,绿色生活!

随着人们生活水平的提高&#xff0c;各种衣物的更新速度逐渐加快&#xff0c;为了减少衣物的浪费&#xff0c;旧衣物回收市场受到了人们的关注。 如今&#xff0c;旧衣物回收行业的技术正在不断创新&#xff0c;利用科技的发展&#xff0c;结合了互联网的模式&#xff0c;提高…