消息队列实现 Exactly Once,看 Pulsar 是怎样实现的。

大家好 ,我是君哥。

在使用消息队列时,我们希望消息能够精准推送(Exactly Once),不会丢失、也不会重复。Exactly Once 其实是很难实现的,Pulsar 这款消息中间件使用事务消息实现了 Exactly Once,今天就带大家了解一下。

1.一个场景

为什么需要 Exactly Once 呢?下面我们看一个转账场景。

图片

客户从转账 APP 上操作,从 A 账户向 B 账户转账 100 元,但是 B 账户增加金额后,给 Broker 返回 ACK 失败,导致 Broker 再次给账户 B 推送增加金额的消息,导致账户 B 增加了两次,最终导致金额不一致。

当然,账户 B 通过消费者幂等可以避免这个问题,但如果是生产者重复发送导致 Broker 保存了两条消息呢?

2.Pulsar 去重

通过消息去重可以解决上面的消息重复问题吗?我们看一下 Pulsar 的去重机制。

Producer 发送消息时,消息体带一个 sequenceId 字段,这个字段在同一个 Producer 内是严格递增的。Broker 通过<ProducerName, sequenceId> 来记录每一个 Producer 的最大 sequenceId。如果 Broker 收到 Producer 的消息小于等于保存的当前 Producer 的 sequenceId,说明是重复消息,直接返回失败。

消息去重从一定程度上可以避免消息重复,但是只能保证在 Topic-Partition 这个维度进行去重,如果一个 Topic 对应多个 Partition,如下图:

图片

Producer 发送消息后,Broker1 保存成功,但是没有返回 ack,Producer 把消息重新发送到了 Broker2,最终导致 Consumer 收到 2 条消息。

3.事务消息

Pusar 的事务消息不仅可以解决上面的去重问题,还可以解决一些复杂场景。比如下面这个场景:

图片

Consumer 从 Topic1 的两个 Partition 中各消费一条消息后,做加工计算(重复消费会影响加工结果),然后把结果分别发送到 Topic2 的两个 Partition 中。这个复杂的事务,要保证消息既不会重复也不会丢失,仅仅靠去重,就很难实现了。Pulsar 参考了分布式事务的主流实现,支持了消息的分布式事务。

Pulsar 的事务模型能保证生产和消费都能精确一次,即使 Broker 宕机,也不会处理失败。

同时,Pulsar 事务消息支持更复杂的场景,比如:

  • 生产者在一个事务中分别发送一条消息到不同 Partition,要不同时成功,要不同时失败;

  • 消费者从不同 Partition 消费多条消息,要不全部成功,要不全部失败;

  • 上面两个场景的组合,见上面的图。

那 Pulsar 的事务消息是怎么实现的呢?Pulsar 参考了分布式事务的实现方式,我们再回顾一下分布式事务的三个角色:

  • TC: 事务协调器,管理全局事务和分支事务的状态,Pulsar 会选择 Topic 中 Partition 所在的一个 Broker 作为 TC;

  • TM:管理全局事务,包括开启全局事务,提交/回滚全局事务。Pulsar 使用 pulsarClient.newTransaction()开启一个事务,这会向 TC 注册全局事务并且获得全局事务 ID(TCID);

  • RM:管理分支事务。

下图,我们把上面复杂的事务用分布式事务来实现:

图片

说明几点:

  1. Producer1 既是生产者也是 TM;

  2. Broker1 既是 TC 也是 RM;

  3. Producer 和 Consumer 的事务分开来管理。上图中只是画出了生产者的事务提交,消费者类似;

  4. 我们知道,分布式事务的实现模式一般包括 AT、TCC、SAGA 和 XA,那 Pulsar 的实现模式是哪一种呢?对于 Producer 和 Consumer,情况不一样。 

    对于 Producer 的事务消息,更像是 AT 模式,消息直接发送给 Broker 并持久化,不过持久化之前会在 TopicTransactionBuffer 中记录元数据(类似 AT 模式中的回滚日志),全局事务回滚时可以使用这些元数据回滚消息。当然回滚消息并不是删除消息,而是让消息不被消费到,具体做法是在回滚的事务会被打上 Aborted 标签,根据这个标签来决定消息不会推送给 Consumer。 

    对于 Consumer 的事务消息,我个人觉得有点参考 XA 模式,不过这里没有数据源代理,而是用了消息缓存,这里缓存的不是消息本身,而是消费者的 ack 消息。也就是说消费者消费完成后并没有直接发送 ack 给 Broker,而是先发送到 pendingAckSore 做缓存,在提交全局事务时才会真正地提交 ack 消息。

    图片

  5. 全局事务没有提交之前,消息可能会被消费到吗?不会,每个 Topic 都会记录自己的 maxReadPosition 属性,标识消费者可以从 Broker 拉取消息的最大位置,分布式事务提交全局事务之前,maxReadPosition 是不变的,所有未提交全局事务的消息不可能被消费到。但这里也会有一个隐患,那就是阻塞普通消息的消费,在当前事务提交之前,普通消息即使发送成功了,消费者也拉取不到。

4.总结

Pulsar 使用事务消息实现了 Exactly Once 这个消息投递的最高要求。从上面的讲解看,事务消息的实现还是比较复杂的,不过从 Producer 和 Consumer 端分开实现这个角度看 ,更容易理解一些。

最后,一起思考一个极端场景,如果分布式事务中有两个消费者,一个消费者消费成功并且发送 ack,另一个消费者因为代码问题消费失败并且没有回复 ack,最终全局事务因为超时而做回滚,那第一个消费者已经消费,这还能保证全局一致吗?当然不能,除非消费者消费逻辑也加入这个全局事务。

消息队列的分布式事务一直是一个复杂的话题,分布式事务的设计思想也非常值得我们借鉴学习。但无论使用哪个中间件,消费端幂等是保障业务正确性的底线,最靠谱的方式还是从业务代码层面来保证幂等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/33445.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Audacity的安装和使用

安装 下载地址&#xff1a;官方网站&#xff1a;Audacity 软件开源免费&#xff0c;但部分功能可能需要额外插件。 一.介绍 Audacity 是一款免费、开源的音频编辑软件&#xff0c;适用于Windows、macOS、Linux等操作系统。它支持多轨编辑、录音、音频效果处理、格式转换等功…

C++:类和对象(从底层编译开始)详解[前篇]

目录 一.inline内联的详细介绍 &#xff08;1&#xff09;为什么在调用内联函数时不需要建立栈帧&#xff1a; &#xff08;2&#xff09;为什么inline声明和定义分离到两个文件会产生链接错误&#xff0c;链接是什么&#xff0c;为什么没有函数地址&#xff1a; 二.类&…

【蓝桥】-动态规划-倒水

目录 一、问题描述​ 二、解题思路 三、完整代码 二维dp 使用滚动数组 一、问题描述 二、解题思路 一个变种的01背包问题&#xff1a; 不选该物品&#xff1a;获得固定收益 e 选择方案1&#xff1a;消耗体积 a&#xff0c;获得价值 b 选择方案2&#xff1a;消耗体积 c&…

【软考网工-实践篇】DHCP 动态主机配置协议

一、DHCP简介 DHCP&#xff0c;Dynamic Host Configuration Protocol&#xff0c;动态主机配置协议。 位置&#xff1a;DHCP常见运行于路由器上&#xff0c;作为DHCP服务器功能&#xff1a;用于自动分配IP地址及其他网络参数给网络中的设备作用&#xff1a;简化网络管理&…

使用 Arduino 和 ThingSpeak 通过互联网进行实时温度和湿度监测

使用 ThingSpeak 和 Arduino 通过 Internet 进行温度和湿度监控 湿度和温度是许多地方(如农场、温室、医疗、工业家庭和办公室)非常常见的测量参数。我们已经介绍了使用 Arduino 进行湿度和温度测量,并在 LCD 上显示数据。 在这个物联网项目中,我们将使用ThingSpeak在互联…

电子电子架构 --- 车载ECU信息安全

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活,除了生存温饱问题之外,没有什么过多的欲望,表面看起来很高冷,内心热情,如果你身…

有关Spring 简介和第一个Spring案例:基于XML配置的IoC容器

1.Spirng是什么? Spring 是一个分层的 轻量级开源框架&#xff0c;专为简化企业级Java应用开发而设计。 它由Rod Johnson于2003年提出&#xff0c;核心目标是解决企业应用开发的复杂性&#xff0c;通过 控制反转&#xff08;IoC&#xff09; 和 面向切面编程&#xff08;AOP&…

警惕!Ollama大模型工具的安全风险及应对策略

文章目录 **Ollama的安全隐患&#xff1a;不容忽视的风险****未授权访问&#xff1a;门户洞开的风险****数据泄露&#xff1a;敏感信息的外泄****漏洞利用&#xff1a;历史遗留的隐患** **安全加固&#xff1a;守护数据与服务的防线****限制监听范围&#xff1a;内网隔离的保护…

Qt从入门到入土(十) -数据库操作--SQLITE

认识 数据库是用于存储、管理和检索数据的系统化集合。它是一种按照特定结构组织数据的存储方式&#xff0c;通过软件&#xff08;数据库管理系统&#xff0c;DBMS&#xff09;来实现数据的高效存储、查询、更新和管理。通过文件存储数据适用于少量的数据&#xff0c;而当拥有…

嵌入式2-按键

一、按键 1.原理图&#xff1a; P14按下低电平&#xff0c;不按则高电平。 if((t&(1<<5))!0)& 优先级 8 ! 优先级 7 二、STC89Cxx中文参考手册 1.ram(随机访问存储器&#xff09;易失性 1.1sram&#xff08;512字节&#xff09;静态存储器 2.rom(只读存储…

论文分享 | HE-Nav: 一种适用于复杂环境中空地机器人的高性能高效导航系统

阿木实验室始终致力于通过开源项目和智能无人机产品&#xff0c;为全球无人机开发者提供强有力的技术支持&#xff0c;并推出了开源项目校园赞助活动&#xff0c;助力高校学子在学术研究与技术创新中取得更大突破。近日&#xff0c;香港大学王俊铭同学&#xff0c;基于阿木实验…

平安养老险广西分公司2025年“3∙15”金融消费者权益教育宣传活动暨南湖公园健步行活动

2025年3月11日&#xff0c;由国家金融监督管理总局广西监管局、中国人民银行广西壮族自治区分行指导&#xff0c;平安养老保险股份有限公司&#xff08;以下简称“平安养老险”&#xff09;广西分公司联合平安银行南宁分行、平安人寿广西分公司、平安产险广西分公司、平安证券广…

学习threejs,使用MeshFaceMaterial面材质容器

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.MeshFaceMaterial 二…

电脑内存不足怎么办?

常规解决方法盘点 关闭后台程序&#xff1a;按下【Ctrl Shift Esc】组合键打开任务管理器&#xff0c;在 “进程” 选项卡里&#xff0c;把当前不用的程序统统 “结束任务” &#xff0c;像那些自动更新的软件、常驻后台的播放器&#xff0c;关了能释放不少内存。比如音乐软…

Excel中国式排名,3种方法!

大家好&#xff0c;我是小鱼。 什么是中国式排名呢&#xff1f; 举个例子比如说公司一共有10名员工进行成绩考核&#xff0c;如果9个人考核成绩都是90分&#xff0c;你是89分&#xff0c;按照国际惯用的排名法则&#xff1a;9 个人考核成绩并列第一&#xff0c;你第10名&…

deepseek+kimi做ppt教程记录

1.首先注册deepseek和kimi deepseek官网&#xff1a;https://chat.deepseek.com/ kimi官网&#xff1a;https://kimi.moonshot.cn/ 以下以一篇工作总结报告为例 2.使用deepseek生成ppt大纲 让deepseek生成kimi生成ppt所需要的内容时&#xff0c;需要注意提示词内容&#xff0c;…

前端无限滚动内容自动回收技术详解:原理、实现与优化

文章目录 一、核心需求与技术挑战1.1 无限滚动的问题症结1.2 自动回收的三大目标 二、技术实现原理2.1 虚拟滚动核心机制2.2 关键技术指标 三、完整实现方案3.1 基础HTML结构3.2 CSS关键样式3.3 JavaScript核心逻辑3.3.1 滚动控制器3.3.2 动态尺寸处理 四、性能优化策略4.1 内存…

【训练细节解读】文本智能混合分块(Mixtures of Text Chunking,MoC)引领RAG进入多粒度感知智能分块阶段

RAG系统在处理复杂上下文时,传统和语义分块方法的局限性,文本分块的质量限制了检索到的内容,从而影响生成答案的准确性。尽管其他算法组件有所进步,但分块策略中的增量缺陷仍可能在一定程度上降低整体系统性能。如何直接量化分块质量?如何有效利用大型语言模型(LLMs)进行…

Jmeter下载及环境配置

Jmeter下载及环境配置 java环境变量配置配置jdk环境变量检查是否配置成功JMeter下载 java环境变量配置 访问地址&#xff1a; https://www.oracle.com/cn/java/technologies/downloads/ 注意&#xff1a;需要自己注册账号 下载完成&#xff0c;解压后的目录为&#xff1a; …

coze ai assistant Task 2

创建一个智能体&#xff1a;夸夸机器人 https://www.coze.cn/store/agent/7480939060010713138?bot_idtrue 改为豆包系列-豆包角色扮演 添加bingWebSearch搜索 添加前&#xff1a; 添加后&#xff1a; 改为工具调用&#xff1a; 添加知识库 使用长期记忆 结合自己的需求&…