MQ专题:顺序消息落地方案

一、什么是顺序消息

投递消息的顺序和消费消息的顺序一致。

比如生产者按顺序投递了1/2/3/4/5 这 5 条消息,那么消费的时候也必须按照1到5的顺序消费这些消息。

二、顺序消息如何实现?(2种方案)

  1. 方案1:生产者串行发送+同一个队列+消费者单线程消费
  2. 方案2:生产方消息带上连续递增编号+同一个队列+消费按编号顺序消费消息

2.1 方案1:生产者串行发送+同一个队列+消费者单线程消费

2.1.1 生产者串行发送消息

  • 使用分布式锁,让消息投递串行执行
  • 确保顺序消息到达同一个队列

2.1.2 消费者单线程消费

  • 消费者这边只能有一个线程,拉取一个消费一个,消费完成后再拉取下一个消费

2.2 方案2:生产方消息带上连续递增编号+同一个队列+消费按编号顺序消费消息

2.2.1 生产方消息带上连续递增编号

  • 顺序消息携带连续递增的编号,从1开始,连续递增,比如发送了3条消息,编号分别是1、2、3,后面再投递消息,编号就从4开始了
  • 确保顺序消息到达同一个队列

2.2.2 消费方按照编号顺序消费消息

  • 消费方需要记录消息消费的位置:当前轮到哪个编号的消息了
  • 收到消息后,将消息的编号和当前消费的位置对比下,是不是轮着这条消息消费了,如果是则进行消费,如果不是,则排队等待,等待前一个到达后,且消费成功后,将自己唤醒进行消费

这里举个例子,如下

  • 生产者发送了编号为看1、2、3 的3条消息

  • 到达的消费端顺序刚好相反,3先到,发现没有轮到自己,会进行排队

  • 然后2到了,发现也没有轮到自己,也会排队

  • 然后过了一会1到了,发现轮到自己了,然后1被消费了

  • 1消费后,会唤醒下一个编号的2进行消费

  • 2消费完了,会唤醒下一个编号的3进行消费。

本文我们会落地方案2。

三、代码

-- 创建订单表
drop table if exists t_order_lesson034;
create table if not exists t_order_lesson034
(id    varchar(32)    not null primary key comment '订单id',goods varchar(100)   not null comment '商品',price decimal(12, 2) comment '订单金额'
) comment '订单表';-- 创建本地消息表
drop table if exists t_msg_lesson034;
create table if not exists t_msg_lesson034
(id               varchar(32) not null primary key comment '消息id',exchange         varchar(100) comment '交换机',routing_key      varchar(100) comment '路由key',body_json        text        not null comment '消息体,json格式',status           smallint    not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',expect_send_time datetime    not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',actual_send_time datetime comment '消息实际投递时间',create_time      datetime comment '创建时间',fail_msg         text comment 'status=2 时,记录消息投递失败的原因',fail_count       int         not null default 0 comment '已投递失败次数',send_retry       smallint    not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',next_retry_time  datetime comment '投递失败后,下次重试时间',update_time      datetime comment '最近更新时间',key idx_status (status)
) comment '本地消息表';-- 创建消息和消费者关联表
drop table if exists t_msg_consume_lesson034;
create table if not exists t_msg_consume_lesson034
(id              varchar(32)  not null primary key comment '消息id',producer        varchar(100) not null comment '生产者名称',producer_bus_id varchar(100) not null comment '生产者这边消息的唯一标识',consumer_class_name        varchar(300) not null comment '消费者完整类名',queue_name      varchar(100) not null comment '队列名称',body_json       text         not null comment '消息体,json格式',status          smallint     not null default 0 comment '消息状态,0:待消费,1:消费成功,2:消费失败',create_time     datetime comment '创建时间',fail_msg        text comment 'status=2 时,记录消息消费失败的原因',fail_count      int          not null default 0 comment '已投递失败次数',consume_retry   smallint     not null default 1 comment '消费失败后,是否还需要重试?1:是,0:否',next_retry_time datetime comment '投递失败后,下次重试时间',update_time     datetime comment '最近更新时间',key idx_status (status),unique uq_msg (producer, producer_bus_id, consumer_class_name)
) comment '消息和消费者关联表';drop table if exists t_msg_consume_log_lesson034;
create table if not exists t_msg_consume_log_lesson034
(id              varchar(32)  not null primary key comment '消息id',msg_consume_id        varchar(32) not null comment '消息和消费者关联记录',status          smallint     not null default 0 comment '消费状态,1:消费成功,2:消费失败',create_time     datetime comment '创建时间',fail_msg        text comment 'status=2 时,记录消息消费失败的原因',key idx_msg_consume_id (msg_consume_id)
) comment '消息消费日志';-- 幂等辅助表
drop table if exists t_idempotent_lesson034;
create table if not exists t_idempotent_lesson034
(id             varchar(50) primary key comment 'id,主键',idempotent_key varchar(500) not null comment '需要确保幂等的key',unique key uq_idempotent_key (idempotent_key)
) comment '幂等辅助表';-- 顺序消息编号生成器
drop table if exists t_sequential_msg_number_generator_lesson034;
create table if not exists t_sequential_msg_number_generator_lesson034
(id        varchar(50) primary key comment 'id,主键',group_id  varchar(256) not null comment '组id',numbering bigint       not null comment '消息编号',version   bigint       not null default 0 comment '版本号,每次更新+1',unique key uq_group_id (group_id)
) comment '顺序消息排队表';-- 顺序消息消费信息表,(group_id、queue_name)中的消息消费到哪里了?
drop table if exists t_sequential_msg_consume_position_lesson034;
create table if not exists t_sequential_msg_consume_position_lesson034
(id         varchar(50) primary key comment 'id,主键',group_id   varchar(256) not null comment '组id',queue_name varchar(100) not null comment '队列名称',consume_numbering  bigint   default 0   not null comment '当前消费位置的编号',version   bigint       not null default 0 comment '版本号,每次更新+1',unique key uq_group_queue (group_id, queue_name)
) comment '顺序消息消费信息表';-- 顺序消息排队表
drop table if exists t_sequential_msg_queue_lesson034;
create table if not exists t_sequential_msg_queue_lesson034
(id          varchar(50) primary key comment 'id,主键',group_id    varchar(256) not null comment '组id',numbering   bigint       not null comment '消息编号',queue_name  varchar(100) not null comment '队列名称',msg_json    text         not null comment '消息json格式',create_time datetime comment '创建时间',unique key uq_group_number_queue (group_id, numbering, queue_name)
) comment '顺序消息排队表';

3.1 发送顺序消息

如下模拟发送订单相关的5条顺序消息

com.itsoku.lesson034.controller.TestController#sendSequential@PostMapping("/sendSequential")
public Result<Void> sendSequential() {String orderId = IdUtil.fastSimpleUUID();List<String> list = Arrays.asList("订单创建消息","订单支付消息","订单已发货","买家确认收货","订单已完成");for (String type : list) {msgSender.sendSequentialWithBody(orderId,RabbitMQConfiguration.Order.EXCHANGE,RabbitMQConfiguration.Order.ROUTING_KEY,OrderMsg.builder().orderId(orderId).type(type).build());}return ResultUtils.success();
}

在这里插入图片描述

3.2 消息按顺序消费

3.2.1 消费者拉取消息时,先把消息连同顺序的编号保存在表t_sequential_msg_queue里

在这里插入图片描述

3.2.2 从t_sequential_msg_queue表里取出消息进行消费
在这里插入图片描述

有个疑问:

  1. 为什么加锁失败要去触发重试?比如消费者1,2,3,4,5分别拿到了编号为1,2,3,4,5的消息,1,2,3,4,5都已经入库了,这时候只需要一个加锁成功的线程就能消费全部消息了,因为有个循环。
  2. 假如有5个线程,那么每次加锁势必会有至多4个线程获取不到锁,那么消息会重新投递4次,需要这样做吗?
  3. 比如消费者1,2,3,4,5分别拿到了编号为1,2,3,4,5的消息,2,3,4,5都已经入库了,编号为1的消息还没有入库。这时候消费者2获得了锁,1,3,4,5都还没有获得锁,但消息队列表里编号最小的是2,所以比较编号的时候没轮到消费者2,直接break退出循环了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/415176.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【esp32】VScode添加库

以添加PubSubClient库为例 如图操作&#xff0c;在搜索框输入PubSubClient&#xff0c;点击下载 给你的某一个工程添加该库 编译成功

前端跨域问题详解与解决方案指南

什么是跨域问题 跨域问题通常是由浏览器的同源策略&#xff08;Same-OriginPolicy&#xff0c;SOP&#xff09;引起的访问问题 同源策略是浏览器的一个重要安全机制&#xff0c;它用于限制一个来源的文档或脚本如何能够与另一个来源的资源进行交互 同源策略的定义 同源策略要…

数学建模常用工具总结

数学建模常用工具总结 绘图篇pythonMATLABLIVEGAP CHARTSApache EChartsBioLadderHiplot Pro 生物医学可视化平台Graph EditorRAWGraphs 2.0ExcalidrawPPT绘图 配色篇Color SpaceAdobe Color 素材篇手绘素材插画网iconfont-阿里巴巴矢量图标库下面四个都是实物风格的素材&#…

Android图片缓存工具类LruCache原理和使用介绍

LruCache & DiskLruCache原理。 常用的三级缓存主要有LruCache、DiskLruCache、网络&#xff0c;其中LruCache对应内存缓存、 DiskLruCache对应持久化缓存。Lru表示最近最少使用&#xff0c;意思是当缓存到达限制时候&#xff0c;优先淘汰近 期内最少使用的缓存&#xff0c…

评价决策类——层次分析法+数学建模+实战分析

目录 一、前言 二、历年题型分析 2.1 常用算法归纳 2.1.1 优化类算法 2.1.2 预测类算法 2.1.3 评价决策类 2.1.4 NP-hard类 2.2 评价类模型求解 2.2.1 层次分析法&#xff08;AHP&#xff09; 2.2.2 多指标评价法&#xff08;MCDA&#xff09; 2.2.3 算法区别 三、层…

Golang 小项目(3)

Golang 小项目(3) 前言 本项目适合 Golang 初学者,通过简单的项目实践来加深对 Golang 的基本语法和 Web 开发的理解。 前往 torna.top 免费查阅 项目结构 D:. ├─ go.mod ├─ go.sum │ ├─cmd │ └─main │ main.go │ └─pkg├─config│ app.go│…

C# DLL已定义类或方法,但是编译报错未定义

现有应用程序1个&#xff0c;动态链接库3个分别称为A、B、C。 应用程序输出在目录P1&#xff0c;动态链接库输出在目录P2。 应用程序引用A、B、C动态链接库&#xff0c;动态链接库A引用B&#xff0c;B引用C。 此时修改动态链接库C&#xff0c;在VS中开发应用程序时可以识别到…

用RPC Performance Inspector 优化你的区块链

目录 什么是RPC&#xff1f; RPC Performance Inspector 是做什么的&#xff1f; 为什么需要这个工具&#xff1f; 如何使用它&#xff1f; 适合谁用&#xff1f; 如何使用&#xff1f; 什么是RPC&#xff1f; RPC Performance Inspector 是一个专门用于测试和分析RPC性能…

C语言 动态内存管理 #动态内存函数的介绍 #常见的动态内存错误 #C\C++ 程序的内存开辟 #柔性数组

文章目录 前言 一、为什么存在动态内存分配 二、动态内存函数的介绍 1、malloc 2、free 3、calloc 4、realloc realloc 的工作原理&#xff1a; 三、常见的动态内存错误 1、对NULL指针的解引用操作 2、对动态开辟空间的越界访问 3、对非动态开辟的空间使用 free 来释…

World of Warcraft [CLASSIC][80][Grandel]Sapphire Hive Drone

Sapphire Hive Drone 蓝玉虫巢雄蜂 蓝玉虫巢巨峰 索拉查盆地 实用性不强&#xff0c;好看是好看&#xff0c;模型很大&#xff0c;无奈栏位太少

时序优化的常见

本期求职笔试题目来源大疆硬件逻辑岗&#xff0c;共2道题&#xff0c;涉及知识点包含&#xff1a;时序约束中异步时钟的设置、典型时序优化方法。 33、根据约束关系set_clock_groups -async -group {CLK1CLK3}{CLK2}&#xff0c;下图哪些路径会进行时序检查( )&#xff08;多选…

MySQL 基础命令

目录 一、MySQL简介 1.MySQL 的主要特点包括 2.MySQL 的主要用途包括&#xff1a; 二、MySQL 基础命令 1. 基本操作 1.1 进入 1.2 选择数据库 1.3 修改密码 1.4 所有命令后面都要加 “;” 2. 创建 2.1 创建数据库 2.2 创建数据表 2.3 常见字段 3. 修改/更新 3.1…

PPT制作加速器:3款工具插件的演示文稿制作更高效

IvyhTools英豪插件 IvyhTools是一款功能强大的PPT插件&#xff0c;主要用于辅助用户进行各种PPT编辑和处理操作。该插件具备以下主要功能&#xff1a; 字体编辑&#xff1a;用户可以对PPT中的字体进行编辑和调整。 动图录制&#xff1a;支持录制动态图像&#xff0c;方便用户在…

Codeforces Round 970 (Div. 3) (个人题解)(未补完)

前言&#xff1a; 昨天晚上的比赛&#xff0c;可惜E题太笨了没想到如何解决&#xff0c;不过好在看到F过的多直接跳过去写F了&#xff0c;能过个5个也还不错了&#xff0c;而且一个罚时也没吃。之后的题我还是会再能补的时候补完的噢&#xff01; 正文&#xff1a; 链接&…

PLUTO: 推动基于模仿学习的自动驾驶规划的极限

PLUTO: Pushing the Limit of Imitation Learning-based Planning for Autonomous Driving PLUTO: 推动基于模仿学习的自动驾驶规划的极限 https://arxiv.org/abs/2404.14327 Abstract We present PLUTO, a powerful framework that Pushes the Limit of imitation learn…

AJAX基础与进阶

一、express基本使用 1. 在最外层启动终端&#xff0c;添加文件 2. 创建 express 框架 // 1. 引入express const express require(express);// 2. 创建应用对象 const app express();// 3. 创建路由规则 //request 是对请求报文的封装 //response 是对响应报文的封装 app.g…

Java Excel转PDF(免费)

目前市面上 Excel 转 PDF 的组件较多&#xff1a; 收费&#xff1a;aspose、GcExcel、spire开源&#xff1a;jacob、itextpdf 其中收费的组件封装得比较好&#xff0c;代码简洁&#xff0c;转换的效果也很好&#xff0c;但收费也高得离谱&#xff1a; 为了成本考虑&#xff…

1、Django Admin学习模型

此专栏应用环境和模型基于此文 开发环境 系统&#xff1a;windows11 开发工具&#xff1a;vscode 开发语言&#xff1a;python 3.8 开发框架&#xff1a;django 3.2 数据库&#xff1a;mysql8.4.1 项目目录 settings 注册两个应用 INSTALLED_APPS [django.contrib.ad…

关于STC-ISP软件选项“下次下载用户程序时擦除用户EEPROM区”的质疑

1.以前&#xff0c;在用STC-ISP软件下载代码时&#xff0c;该选项一般都默认勾选&#xff01;见图1&#xff1b;因没用到该功能无视&#xff1b; 2.近日&#xff0c;首次下载需写入一些用户核心数据&#xff0c;以后谁升级代码下载都不能查看和更改这些数据&#xff01; 3.于…

eureka一

Eureka 什么是eureka eureka服务调用流程 springcloud技术栈应用 分布式理论 CAP CAP理想运行情况 CAP不理想运行情况 CAP取舍 BASE BASE原理 搭建单机注册中心 服务提供者 服务消费者 集群服务注册中心 eureka功能详解 核心功能演示 Eureka源码解析 lifecycle的start