rabbitmq 使用SAC队列实现顺序消息

rabbitmq 使用SAC队列实现顺序消息

前提

SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

目的

实现消息顺序消费,操作:

  • 创建4个SAC队列,
  • 消息的路由key 取队列个数模,这里是4
  • 发送消息到每个队列,保证每个队列只有一个消费者!!

实现

定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {//消息idprivate String requestNo;//消息中顺序,1,2,3,4private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {public static final String EXCHANGE = "order-ex";public static final String RK_PREFIX = "rk-";public static final String ONE_QUEUE = "one-queue";public static final String TWO_QUEUE = "two-queue";public static final String THREE_QUEUE = "three-queue";public static final String FOUR_QUEUE = "four-queue";@Beanpublic DirectExchange exchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, false);}@Beanpublic Binding oneBinding() {return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);}@Beanpublic Binding twoBinding() {return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);}@Beanpublic Binding threeBinding() {return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);}@Beanpublic Binding fourBinding() {return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 4);}@Beanpublic Queue oneQueue() {return createSacQueue(ONE_QUEUE);}@Beanpublic Queue twoQueue() {return createSacQueue(TWO_QUEUE);}@Beanpublic Queue threeQueue() {return createSacQueue(THREE_QUEUE);}@Beanpublic Queue fourQueue() {return createSacQueue(FOUR_QUEUE);}private static Queue createSacQueue(String queueName) {Map<String, Object> arguments = new HashMap<>(2);arguments.put("x-single-active-consumer", true);return new Queue(queueName, true, false, false, arguments);}}

重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

在这里插入图片描述

创建 消费者

为每个队列创建一个监听消费者

@Slf4j
@Component
public class OrderListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))public void onMessage1(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", ONE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))public void onMessage2(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", TWO_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))public void onMessage3(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", THREE_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE,declare = "false"),value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))public void onMessage4(Message message, @Headers Channel channel) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("{} recv: {}", FOUR_QUEUE, messageStr);} catch (Exception e) {log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);}}}
生产者发送消息
@GetMapping("/send/seq/messqge")public String sendSeqMessage() throws JsonProcessingException {int cnt = 100;int mod = 4;int seqSize = 6;for (int i = 0; i < cnt; i++) {for (int j = 0; j < seqSize; j++) {int rk = i % mod + 1;SeqMessage seqMessage = new SeqMessage("seq-" + i, j);String s = objectMapper.writeValueAsString(seqMessage);log.info("routeKey: {}, send msg: {}", rk, s);rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);}}return "success";}

运行结果:

two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}

可以发现,消息消费是顺序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/313881.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Java EE] 多线程(五):单例模式与阻塞队列

1. 单例模式 单例模式是校招中最长考的设计模式之一,首先我们来谈一谈什么是设计模式: 设计模式就好像象棋中的棋谱一样,如果红方走了什么样的局势,黑方就有一定地固定地套路,来应对这样的局势,按照固定地套路来,可以保证在该局势下不会吃亏. 软件开发也是同样的道理,有很多…

BGP的基本配置

l 按照以下步骤配置BGP协议&#xff1a; 第1步&#xff1a;设备基本参数配置&#xff0c;AS内配置IGP确保内部网络连通性&#xff1b; l 配置IGP&#xff08;OSPF协议等&#xff09;路由解决peer对等体的源和目标IP之间连通性&#xff0c;确保peer之间TCP&#xff08;179&a…

【后端】python与django的开发环境搭建指南

安装Git 双击Git 客户端安装文件&#xff0c;在安装页面&#xff0c;单击“Next” 在安装路径选择页面&#xff0c;保持默认&#xff0c;单击“Next” 在功能组件选择页面&#xff0c;保持默认&#xff0c;单击“Next” 在开始菜单文件夹设置页面&#xff0c;保持默认&am…

好看到爆炸的弹窗公告源码

源码介绍 好看到爆炸的弹窗公告源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面&#xff0c; 源码截图 源码下载 好看到爆炸的弹窗公告源码

【Elasticsearch】Elasticsearch 从入门到精通(二):基础使用

《Elasticsearch 从入门到精通》共包含以下 2 2 2 篇文章&#xff1a; Elasticsearch 从入门到精通&#xff08;一&#xff09;&#xff1a;基本介绍Elasticsearch 从入门到精通&#xff08;二&#xff09;&#xff1a;基础使用 &#x1f60a; 如果您觉得这篇文章有用 ✔️ 的…

SpringBoot+vue开发记录(二)

说明&#xff1a;本篇文章的主要内容为SpringBoot开发中后端的创建 项目创建: 1. 新建项目&#xff1a; 如下&#xff0c;这样简单创建就行了&#xff0c;JDK什么的就先17&#xff0c;当然1.8也是可以的&#xff0c;后面可以改。 这样就创建好了&#xff1a; 2. pom.xml…

光伏无人机:巡检无人机解决巡检难题

随着科技的飞速发展&#xff0c;无人机技术已经广泛应用于各个领域&#xff0c;其中光伏无人机在解决光伏电站巡检难题方面发挥了重要作用。光伏无人机以其高效、精准、安全的特点&#xff0c;为光伏电站的巡检工作带来了革命性的变革。 光伏电站通常位于广阔的户外场地&#x…

如何理解自然语言处理中的位置编码(Positional Encoding)

在自然语言处理和特别是在使用Transformer模型中,位置编码(Positional Encoding)是一个关键的概念。它们的作用是为模型提供序列中各个元素的位置信息。由于Transformer架构本身并不像循环神经网络(RNN)那样具有处理序列的固有能力,位置编码因此显得尤为重要。 为什么需…

7天入门Android开发之第1天——初识Android

一、Android系统 1.Linux内核层&#xff1a; 这是安卓系统的底层&#xff0c;它提供了基本的系统功能&#xff0c;如内存管理、进程管理、驱动程序模型等。安卓系统构建在Linux内核之上&#xff0c;借助于Linux的稳定性和安全性。 2.系统运行库层&#xff1a; 这一层包括了安卓…

一次违法网站的渗透经历

0x01 前言 在一次攻防演练中&#xff0c;我发现了一个有趣的渗透路径。在信息收集阶段&#xff0c;我注意到目标网站和用户资产网站共享相同的IP网段。这意味着它们可能在同一台服务器上托管&#xff0c;或者至少由同一家互联网服务提供商管理。这种情况为我们的渗透测试提供了…

Linux命令继续学习

which命令&#xff0c;找到各种命令程序所处在的位置 语法&#xff1a;which查找的命令 那么对于我们想查找其他类型文件所在的位置&#xff0c;我们可以用到find命令 find命令 选项为-name&#xff0c;表示按照文件名进行查找 find命令中通配符 find命令和前面rm命令一样&…

学习Rust第14天:HashMaps

今天我们来看看Rust中的hashmaps&#xff0c;在 std::collections crate中可用&#xff0c;是存储键值对的有效数据结构。本文介绍了创建、插入、访问、更新和迭代散列表等基本操作。通过一个计算单词出现次数的实际例子&#xff0c;我们展示了它们在现实世界中的实用性。Hashm…

xgp加速器免费 微软商店xgp用什么加速器

2001年11月14日深夜&#xff0c;比尔盖茨亲自来到时代广场&#xff0c;在午夜时分将第一台Xbox交给了来自新泽西的20岁年轻人爱德华格拉克曼&#xff0c;后者在回忆中说&#xff1a;“比尔盖茨就是上帝。”性能超越顶级PC的Xbox让他们趋之若鹜。2000年3月10日&#xff0c;微软宣…

ScriptableObject数据容器讲解

概述 是Unity提供的一个用于创建可重用的数据容器或逻辑的基类。 ScriptableObject 是继承自 UnityEngine.Object 的一个类&#xff0c;但与普通的 MonoBehaviour 不同&#xff0c;它不能附加到GameObject上作为组件。 相反&#xff0c;ScriptableObject 通常用于存储和管理…

意法半导体STM32F407VET6TR单片机优缺点、参数、应用和引脚封装

ST(意法半导体)的型号STM32F407VET6TR属于32位MCU微控制器&#xff0c;基于高性能的ArmCortex-M4 32位RISC核心&#xff0c;工作频率高达168MHz。单精度浮点单元(FPU)用于Cortex-M4核心&#xff0c;支持所有Arm单精度数据处理指令和数据类型。它还实现了一套完整的DSP指令和一个…

就业班 第三阶段(负载均衡) 2401--4.18 day2 LVS-DR模式

3、LVS/DR 模式 实验说明&#xff1a; 1.网络使用NAT模式 2.DR模式要求Director DIP 和 所有RealServer RIP必须在同一个网段及广播域 3.所有节点网关均指定真实网关 主机名ip系统用途client172.16.147.1mac客户端lvs-server172.16.147.154centos7.5分发器real-server1172.16.…

k8s日常动手实践 ~~ pod访问 pod请求 k8s api ~ 含新版带curl的busybox镜像

前言&#xff1a; 可以使用 Kubernetes API 获取集群信息。使用 Service Account&#xff08;SA&#xff09;进行身份验证&#xff0c;可以以安全的方式访问 Kubernetes API&#xff0c;而无需在 Pod 中使用明文凭据。 以下是一个使用 Service Account 访问 Kubernetes API 获…

Selenium IDE 常见错误笔记

错误1&#xff1a;Failed:Exceeded waiting time for new window to appear 2000ms 这个错误通常出现在第一次运行时&#xff0c;有两个原因&#xff1a; Firefox阻止了弹出式窗口&#xff0c;在浏览器设置里允许这个操作即可。 有些网站设置了反扒机制&#xff0c;脚本运行…

数据库并发控制思维导图+大纲笔记

思维导图 大纲笔记 多用户数据库系统 定义 允许多个用户同时使用的数据库系统特点 在同一时刻并发运行的事务数可达数百上千个多事务执行方式 事务串行执行交叉并发方式 单处理机系统同时并发方式 多处理机系统事务并发执行带来的问题 产生多个事务同时存取同一数据的情况可能…

Group Query Attention (GQA) 机制详解以及手动实现计算

Group Query Attention (GQA) 机制详解 1. GQA的定义 Grouped-Query Attention (GQA) 是对 Multi-Head Attention (MHA) 和 Multi-Query Attention (MQA) 的扩展。通过提供计算效率和模型表达能力之间的灵活权衡&#xff0c;实现了查询头的分组。GQA将查询头分成了G个组&#…