使用消息队列怎样防止消息重复?

大家好,我是君哥。

使用消息队列时,我们经常会遇到一个可能对业务产生影响的问题,消息重复。在订单、扣款、对账等对幂等有要求的场景,消息重复的问题必须解决。

那怎样应对重复消息呢?今天来聊一聊这个话题。

1.三个语义

正确使用消息队列,我们会考虑到消息防丢失、防重复,我们介绍 3 个语义:

  • At Least Once:在消息队列中,指消息不丢失,一条消息最少被消费一次,但是可能会有重复消费。

  • Exactly Once:在消息队列中,消息被精准消费一次,不丢失,也不会重复;

  • At Most Once:在消息队列中,消息不会被重复消费,但是可能会有消息丢失

不同的消息场景,需要的语义不同。比如 Exactly Once 最难实现,一般需要引入事务消息。

不同使用场景,对语义的要求也不一样。比如日志收集类的场景,At Most Once 就可以满足,而支付类的场景则要求 Exactly Once。

2.消息重复

什么情况下会导致消息重复呢?

生产者发送消息后,Broker 保存成功,但是没有成功给生产者返回 ACK,生产者以为消息发送失败,重试,再次给 Broker 发送。Broker 保存了重复消息,导致 Consumer 多次消费。

图片

消费者消费消息后,给 Broker 返回 ACK 失败,导致 Broker 没有修改偏移量,同一条消息再次发送给消费者,或者被消费者拉取到。

图片

3.生产者防重

有的消息中间件是支持生产者幂等的。比如 Kafka 从 0.11.0 版本开始引入了幂等 Producer,可以使用下面代码开启幂等 Producer:

Properties props = new Properties();
//省略其他代码
//配置幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
//创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Kafka 实现生产者幂等的原理是在生产者引入了 Producer ID(PID)和 Sequence Number 这两个参数。

  • PID:Producer 拥有的 ID,唯一标识一个 Producer。

  • Sequence Number:自增的数值,唯一标识同一个 Producer 发送到指定分区的消息 ID。

有了这两个参数,Broker 单分区就可以唯一标识一个生产者发送的唯一一条消息<PID,SequenceNumber>。Broker 收到消息时,如果检查到消息的<PID,SequenceNumber>已经存在,就不会再保留这条消息。

但幂等 Producer 只能在单分区下生效,多分区情况下是不生效的。因为多个分区之间并不能相互访问对方的<PID,SequenceNumber>。

图片

4.Broker 防重

Broker 如果可以防重,那对于生产者和消费者来说,节省了大量的工作。下面我们看下 Pulsar 是怎样防重的。

Broker 通过参数 BrokerDeduplicationEnabled 开启防重功能。对于 Producer 发送的重复消息,Broker 返回响应 -1:-1。

Producer 发送消息时,会带一个 sequenceId 字段,Broker 会按照 ProducerName 维度记录当前生产者最大的 sequenceId(highestSequenceId)。Broker 收到消息时,首先会判断消息中的 sequenceId 是否大于自己保存的当前生产者的 highestSequenceId,如果是则保存消息并更新 highestSequenceId,否则丢弃消息,并且给 Producer 返回 -1:-1。

下面是三个极端情况:

  1. Producer 断开连接:这种情况下,跟 Broker 重新建立连接后,本地保存的 sequenceId 还在,只要使用 sequenceId 递增后发送消息即可;

  2. Producer 宕机:Producer 重启后,缓存的 sequenceId 肯定不存在了,这时跟 Broker 重新建立连接后,Broker 会根据 ProducerName 找出 highestSequenceId 发给 Producer,Producer 使用这个 sequenceId 来发送消息;

  3. Producer 和 Broker 都宕机:Broker 重启后,可以从宕机前保存的快照中恢复各 Producer 对应的 highestSequenceId 发送给各 Producer。但这个 highestSequenceId 不一定准确,因为 Broker 宕机瞬间很有可能最新的 sequenceId 没有来得及保存快照。

需要注意的是,跟 Kafka 的幂等 Producer 类似,Pulsar 的 Broker 幂等也只能保证 Topic/Partition 级别。

5.消费者防重

从上面的分析可以看出,靠生产者防重和 Broker 防重,只能在 Topic/Partition 级别生效,这通常并不能满足我们的需求。而为了避免消费者重复消费对业务造成影响,消息防重还是必要的。这就要求我们做最后一道防线,在消费端进行防重或幂等处理。

消费端做防重,就不再考虑消息中间件层面的配置(比如 sequenceId),而是从消息体进行下手。

生产者发送消息时,给消息体赋值一个全局唯一的 ID,消费者处理消息时,根据全局唯一 ID 做防重。

比如消费端的逻辑是保存一条订单消息,那把唯一 ID 保存到数据库并且加一个唯一索引,这样根据唯一索引就可以做消息去重。

不过使用唯一索引也有缺点:

  • 如果使用 MySQL 数据库,不能使用 Change Buffer;

  • 非插入的场景(比如更新库存)不能去重。

对于唯一索引的缺点,我们可以引入 Redis 对唯一 ID 做保存,利用 setNx 判断消息是否已经处理过。如下图:

图片

if (jedis.setnx(ID, "1") == 1) {//处理业务,返回 ACK
}else {//直接返回 返回 ACK
}

6.总结

使用消息队列,在一些场景下是需要防重的。主流消息队列提供了一些防重的能力,但并不是完全可靠的。在对重复消息敏感的场景下,最好是在消费端处理消息时,从业务层面进行消息防重。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/24937.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【单片机】MSP430MSP432入门

文章目录 0 前言1 开发方式选择2 CCS和开发相关软件3 Keil开发MSP4324 IAR for 430开发MSP4305 总结 0 前言 最近因为想学DSP&#xff0c;所以把之前卸载的CCS给装回来了&#xff0c;手头也还有之前电赛剩下的MSP430和MSP432的板子&#xff0c;由于年代久远&#xff0c;想着花点…

[记录贴] 火绒奇怪的进程保护

最近一次更新火绒6.0到最新版&#xff0c;发现processhacker的结束进程功能无法杀掉火绒的进程&#xff0c;弹窗提示如下&#xff1a; 可能是打开进程时做了权限过滤&#xff0c;火绒注册了两个回调函数如下&#xff1a; 但奇怪的是&#xff0c;在另外一台机器上面更新到最新版…

跨平台公式兼容性大模型提示词模板(飞书 + CSDN + Microsoft Word)

飞书云文档 CSDN MD编辑器 Microsoft Word 跨平台公式兼容方案&#xff1a; 一、背景痛点与解决方案 在技术文档创作中&#xff0c;数学公式的跨平台渲染一直存在三大痛点&#xff1a; 飞书云文档&#xff1a;原生KaTeX渲染与导出功能存在语法限制微软Word&#xff1a;Math…

【Linux】基于UDP/TCP套接字编程与守护进程

目录 一、网路套接字编程 &#xff08;一&#xff09;基础概念 1、源IP地址与目的IP地址 2、端口号 3、TCP与UDP 4、网络字节序 &#xff08;二&#xff09;套接字编程接口 1、socket 常见API 2、sockaddr结构 &#xff08;三&#xff09;UDP套接字 1、UDP服务器创建…

ue5 3dcesium中从本地配置文件读取路3dtilles的路径

关卡蓝图中获得3dtiles的引用 拉出设置url 设置路径 至于设置的路径从哪里来 可以使用varest读取文件里的接送字符串 path中配置地址 path变量的值为: Data/VillageStartMapConfig.json此地址代表content的地下的data文件夹里的config.json文件 {"FilePath": &quo…

渗透测试(WAF过滤information_schema库的绕过,sqllib-46关,海洋cms9版本的注入)

1.sqlin-lib 46关 打开网站配置文件发现 此网站的对ID进行了排序&#xff0c;我们可以知道&#xff0c;order by接不了union &#xff0c;那我们可以通过测试sort&#xff0c;rond等函数&#xff0c;观察网页的反馈来判断我们的盲注是否正确 我们发现 当参数有sort来排序时&…

CSS笔记一

一、语法 选择器{属性&#xff1a;属性值&#xff1b;属性&#xff1a;属性值} 二、书写分类 行内样式&#xff1a;直接通过style属性写在标签上 <p style"font-size80px">123456</p> 页内样式&#xff1a;在html页面创建style标签 外链样式&…

Flutter: TextEditingValue的实现

文章目录 TextEditingValue一、fromJSON二、text、selection、composing、empty三、isComposingRangeValid四、replaced TextEditingValue /// The current text, selection, and composing state for editing a run of text. immutable class TextEditingValue {const TextEd…

Web开发:ORM框架之使用Freesql的导航属性

一、什么时候用导航属性 看数据库表的对应关系&#xff0c;一对多的时候用比较好&#xff0c;不用多写一个联表实体&#xff0c;而且查询高效 二、为实体配置导航属性 1.给关系是一的父表实体加上&#xff1a; [FreeSql.DataAnnotations.Navigate(nameof(子表.子表关联字段))]…

RAG(检索增强生成)原理、实现与评测方法探讨

RAG是什么&#xff1f; 看一下RAG的英文全称&#xff1a;Retrieval-Augmented Generation&#xff0c;建索、增强、生成&#xff1b;一句话串起来就是通过检索增强模型的生成&#xff0c;是的&#xff0c;这就是RAG。 RAG怎么做&#xff1f; 目前比较通用的套路是这样的&#x…

【嵌入式Linux应用开发基础】网络编程(4):UDP协议

目录 一、UDP 协议概述 二、UDP 协议特点 三、UDP协议的字段格式 四、UDP协议的数据传输过程 五、嵌入式UDP编程核心API 六、UDP 在嵌入式 Linux 中的编程实现 6.1 UDP 服务器代码示例 6.2 UDP 客户端代码示例 七、UDP 协议的应用场景 八、UDP 协议的优缺点 8.1 优点…

视频字幕识别和翻译

下载的视频很多不是汉语的&#xff0c;我们需要用剪映将语音识别出来作为字幕压制到视频中去。 剪映6.0以后语音识别需要收费&#xff0c;但是低版本还是没有问题。 如果想要非汉语字幕转成中文&#xff0c;剪映低版本不提供这样功能。但是&#xff0c;用剪映导出识别字幕&am…

小迪安全-24天-文件管理,显示上传,黑白名单,访问控制

上节课回顾&#xff0c;token问题 没有更新token值&#xff0c;造成了复用 加上这段代码就好了&#xff0c;就不会复用了 文件管理-文件上传 upload.html文件&#xff0c;找ai生成就行 uoload.php接受文件上传的信息 这里在写个临时文件存储换个地方 因为上面临时文件存在c盘…

单入单出队列性能优化(Lock-Free)

摘要&#xff1a;文中首先介绍了有锁线程安全循环队列的基本实现&#xff0c;然后探讨了使用原子变量实现 Lock-Free 队列的优势&#xff0c;能够减少线程之间的数据竞争。接着&#xff0c;介绍了数据对齐的策略&#xff0c;以降低伪共享的概率&#xff0c;随后引入了索引缓存来…

java项目之网络游戏交易系统源码(ssm+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的网络游戏交易系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 本网络游戏交易系统分为管理员…

PyTorch 源码学习:GPU 内存管理之深入分析 CUDACachingAllocator

因引入 expandable_segments 机制&#xff0c;PyTorch 2.1.0 版本发生了较大变化。本文关注的是 PyTorch 原生的 GPU 内存管理机制&#xff0c;故研究的 PyTorch 版本为 2.0.0。代码地址&#xff1a; c10/cuda/CUDACachingAllocator.hc10/cuda/CUDACachingAllocator.cpp 更多内…

【PromptCoder】使用 package.json 生成 cursorrules

【PromptCoder】使用 package.json 生成 cursorrules 在当今快节奏的开发世界中&#xff0c;效率和准确性至关重要。开发者们不断寻找能够优化工作流程、帮助他们更快编写高质量代码的工具。Cursor 作为一款 AI 驱动的代码编辑器&#xff0c;正在彻底改变我们的编程方式。但如…

学习路程五 向量数据库Milvus操作

前序 前面安装好了docker且成功拉取Milvus镜像&#xff0c;启动。通过python成功连接上了数据。接下来就继续更多Milvus的操作 在开始之前&#xff0c;先来简单了解一下向量数据库内一些东西的基本概念 概念描述数据库&#xff08;Database&#xff09;类似与MySQL的database…

SpringBoot 热部署

1、添加 DevTools 依赖 <!-- 热部署依赖 --> <dependency> <groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId> </dependency>2、在IDEA的菜单栏中依次选择“File”→“Settings”&#x…

SOC-ATF 安全启动BL1流程分析(1)

一、ATF 源码下载链接 1. ARM Trusted Firmware (ATF) 官方 GitHub 仓库 GitHub 地址: https://github.com/ARM-software/arm-trusted-firmware 这是 ATF 的官方源码仓库&#xff0c;包含最新的代码、文档和示例。 下载方式&#xff1a; 使用 Git 克隆仓库&#xff1a; git…