Kafka 幂等性与事务

文章目录

  • 幂等性
    • 实现机制
    • 配置使用
    • 局限性
  • 事务
    • 使用场景
    • 配置使用
    • 实现机制
    • 事务过程
      • 事务初始化
      • 事务开始
      • 事务提交
      • 事务取消
      • 事务消费

幂等性

Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。

实现机制

通过引入ProducerID和SequenceNumber来实现Broker对于每条接收的消息都会验证PID,同时会检查SeqNumber是否比Broker维护的SeqNumber值严格+1,只有符合要求的才是合法的,其他情况都会丢弃。

  • ProducerID:Producer初始化时由Broker分配,作为每个Producer会话的唯一标识
  • SequenceNumber:Producer发送的每条消息的标识(更准确地说是每一个消息批次,即ProducerBatch),从0开始单调递增。Broker根据它来判断写入的消息是否可接受。

配置使用

Producer设置

  • enable.idempotence=true:表示使用幂等性生产者。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
  • acks=all

局限性

  • 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态无法同步。

事务

Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。

使用场景

  • 对多个 Topic 、多个 Partition 的原子性的写入
  • Consumer-Transform-Producer模式下,将消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。避免重复消费

配置使用

Producer设置

  • transactional.id:事务id,类型为string,客户端自定义

Consumer设置

  • isolation.level:read_committed。事务隔离级别,默认为空。

实现机制

引入以下组件:

  • Transactional Coordinator‌:负责管理和协调事务。每个Kafka broker上都会运行一个Transactional Coordinator实例。
  • Transaction Log‌:这是一个内部Topic(__transaction_state),用于存储事务的元数据信息,包括事务的状态、参与的分区等。
  • Control Messages:由Transactional Coordinator‌写入topic的一种特殊消息,但对于Consumer来说不可见。是用来让Broker告知consumer拉取的消息是否已被原子性提交。
  • TransactionId:事务ID,类型为String字符串,由Producer客户端自定义。提供稳定不变的ID意义在于可以在异常后重启从断点进行恢复。
  • Epoch:单调递增的事务Id标识,可以保证具有相同TransactionId的Producer,旧的无法写入。
  • ProducerID、SequenceNumber:标记生产者、消息的唯一标识

事务过程

事务初始化

所有的事务操作都需要Transactional Coordinator‌管理和协调
1.获取Transactional Coordinator‌地址
Producer发送携带Transactionid的请求到任意一个Broker,Broker对获取到Transactionid做hashcode后对topic(__transaction_state)默认分区(50)取模,所得分区主副本所在的Broker作为TransactionalCoordinator‌
2.获取ProducerID和Epoch
Producer对TransactionalCoordinator‌发送请求,此时会分配ProducerId及Epoch,并将信息持久化。最后向Producer返回ProducerId+Epoch。之后的每次请求都会携带ProducerId和Epoch。
(__transaction_state中信息格式为key-value,key为Transactionid,value包含ProducerID、Epoch、事务和分区信息等)

事务开始

3.消息写入
Producer开始事务写入,先将本地事务状态更改为IN_TRANSACTION,然后发送消息之前,Producer会将topic-partition相关的信息发送给TransactionalCoordinator‌,由它完成持久化(更新__transaction_state)。之后Producer开始对相关topic-partition发送消息

事务提交

4.Producer触发事务提交
Producer首先发送请求给TransactionalCoordinator‌,由它更新__transaction_state将事务状态更改为PrepareCommit,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages(会持续重试,直到成功)给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteCommit。

事务取消

5.Producer或Coordinator触发事务取消
事物取消可以由Producer发起取消或者TransactionalCoordinator‌检测到事务超时而取消,此时均会更新__transaction_state更改为PrepareAbort,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteAbort。

取消的事务会记录在.txnindex文件中,主要包含以下信息:currentVersion、producerId、firstOffset(当前事务的开始offset)、lastOffset(当前事务的结束offset)、lastStableOffset(存储时的LSO)

事务消费

正常消费时
读隔离级别为 read-committed, 在内部会使用存储在topic-partition中的Control messgae,来过滤掉没有提交的消息。(回滚的消息也没有删除,只是在读数据时过滤该数据)

对于Consumer-Transform-Producer下,会通过groupId算出__consumer_offsets topic中对应的partition,然后加该partition的信息也加入到Transaction Log‌中,最终在统一取消或提交。同样也会将Control message写入__consumer_offsets对应的分区。

  • 需要将enable.auto.commit设置为false
  • 使用producer.sendOffsetsToTransaction()来提交offset

在这里插入图片描述

参考
https://z.itpub.net/article/detail/F86DD78AECAC4DEC92468DEFFEB4ED0D
https://www.cnblogs.com/hongdada/p/16945086.html
学习笔记之Kafka幂等和事务_transaction.state.log.replication.factor-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/499126.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ActiveMQ支持哪些传输协议

ActiveMQ 支持多种传输协议,以满足不同场景下的需求。这些协议包括但不限于以下几种: 1. OpenWire: • 这是 ActiveMQ 的默认和专有协议。 • 提供了高效、可靠的消息传递功能。 • 支持多种消息传递模式,如点对点和发布/订阅。 2…

MySQL数据库——常见慢查询优化方式

本文详细介绍MySQL的慢查询相关概念,分析步骤及其优化方案等。 文章目录 什么是慢查询日志?慢查询日志的相关参数如何启用慢查询日志?方式一:修改配置文件方式二:通过命令动态启用 分析慢查询日志方式一:直…

Qt天气预报系统设计界面布局第四部分左边

Qt天气预报系统设计 1、第四部分左边的第一部分1.1添加控件1.2修改控件名字 2、第四部分左边的第二部分2.1添加控件2.2修改控件名字 3、第四部分左边的第三部分3.1添加控件3.2修改控件名字 4、对整个widget04l调整 1、第四部分左边的第一部分 1.1添加控件 拖入一个widget&…

【嵌入式硬件】嵌入式显示屏接口

数字显示串行接口(Digital Display Serial Interface) SPI 不过多赘述。 I2C-bus interface 不过多赘述 MIPI DSI MIPI (Mobile Industry Processor Interface) Alliance, DSI (Display Serial Interface) 一般用于移动设备,下面是接口…

一个在ios当中采用ObjectC和opencv来显示图片的实例

前言 在ios中采用ObjectC编程利用opencv来显示一张图片,并简单绘图。听上去似乎不难,但是实际操作下来,却不是非常的容易的。本文较为详细的描述了这个过程,供后续参考。 一、创建ios工程 1.1、选择ios工程类型 1.2、选择接口模…

el-input输入框需要支持多输入,最后传输给后台的字段值以逗号分割

需求&#xff1a;一个输入框字段需要支持多次输入&#xff0c;最后传输给后台的字段值以逗号分割 解决方案&#xff1a;结合了el-tag组件的动态编辑标签 那块的代码 //子组件 <template><div class"input-multiple-box" idinputMultipleBox><div>…

[2474].第04节:Activiti官方画流程图方式

我的后端学习大纲 Activiti大纲 1.安装位置&#xff1a; 2.启动&#xff1a;

用再生龙备份和还原操作系统(三)

续上篇《用再生龙备份和还原操作系统&#xff08;二&#xff09;》 三&#xff0c;用再生龙将镜像文件还原到硬盘 将再生龙工具盘、待还原系统的硬盘&#xff08;与源盘一样大或更大&#xff09;、镜像文件所在磁盘&#xff08;如果是U盘&#xff0c;也可以后插&#xff09;安…

2025:OpenAI的“七十二变”?

朋友们&#xff0c;准备好迎接AI的狂欢了吗&#xff1f;&#x1f680; 是不是跟我一样&#xff0c;每天醒来的第一件事就是看看AI领域又有什么新动向&#xff1f; 尤其是那个名字如雷贯耳的 OpenAI&#xff0c;简直就是AI界的弄潮儿&#xff0c;一举一动都牵动着我们这些“AI发…

谷粒商城项目125-spring整合high-level-client

新年快乐! 致2025年还在努力学习的你! 你已经很努力了&#xff0c;今晚就让自己好好休息一晚吧! 在后端中选用哪种elasticsearch客户端&#xff1f; elasticsearch可以通过9200或者9300端口进行操作 1&#xff09;9300&#xff1a;TCP spring-data-elasticsearch:transport-…

9.若依-自定义表单构建

表单构建工具&#xff0c;开发者通过图形界面和拖拽等操作&#xff0c;可以快速构建复杂的表单。 需求&#xff1a;做一些复杂的功能提交&#xff0c;涉及到多张标的数据提交。 自定义一个特殊表单页面 1.拖拉一个布局组件中行容器&#xff0c;然后在组件属性中设置表单删格在…

Matlab Hessian矩阵计算

文章目录 一、简介二、实现代码三、实现效果一、简介 图像的Hessian矩阵用于描述图像灰度值的二阶导数,可以用来分析图像的局部曲率和变化。例如,在图像边缘检测、特征点检测等任务中,Hessian矩阵能帮助我们识别图像的结构。 Hessian矩阵定义 对于二维图像,Hessian矩阵是由…

java重装小结

一、Java安装 安装路径 https://www.oracle.com/java/technologies/javase/javas e8-archive-downloads.html 具体类型可参考&#xff1a; Java安装配置-CSDN博客 我在这一步主要碰到的问题就是访问官网报404错误&#xff0c;可参考&#xff1a; 在oracle官网下载资源显示…

4G报警器WT2003H-16S低功耗语音芯片方案开发-实时音频上传

一、引言 在当今社会&#xff0c;安全问题始终是人们关注的重中之重。无论是家庭、企业还是公共场所&#xff0c;都需要一套可靠的安全防护系统来保障人员和财产的安全。随着科技的飞速发展&#xff0c;4G 报警器应运而生&#xff0c;为安全防范领域带来了全新的解决方案。…

工业5G路由器让无人机数据传输 “飞” 起来

无人机上搭载5G通信模块&#xff0c;该模块与工业5G路由器通过5G网络建立连接。无人机的飞控系统、传感器以及摄像头等设备采集到的数据&#xff0c;如飞行姿态、高度、速度、环境图像、温度湿度等&#xff0c;经过编码、加密、调制等处理后转换为适合5G网络传输的信号形式。 …

Spring Certified Professional 2024 (2V0-72.22)

关于认证 Spring Certified Professional (2V0-72.22) 认证可证明您在 Spring Framework 方面的专业知识&#xff0c;Spring Framework 是构建企业级 Java 应用程序的领先平台。此认证在全球范围内得到认可&#xff0c;并证明您在 Spring 的各个方面都具有熟练程度&#xff0c;…

深信服云桌面系统的终端安全准入设置

深信服的云桌面系统在默认状态下没有终端的安全准入设置&#xff0c;这也意味着同样的虚拟机&#xff0c;使用云桌面终端或者桌面套件都可以登录&#xff0c;但这也给系统带来了一些安全隐患&#xff0c;所以&#xff0c;一般情况下需要设置终端的安全准入策略&#xff0c;防止…

通过 4 种方法将数据从 OnePlus 传输到Android

概括 由于它们是不同的品牌&#xff0c;因此将数据从 OnePlus 传输到Android是否很困难&#xff1f;也许您可以从这篇介绍 OnePlus 到Coolmuster Android数据传输的 4 个实用解决方案的文章中获得帮助。学习完它们后&#xff0c;您将有一个顺利的转移过程&#xff0c;所以为什…

springboot534售楼管理系统(论文+源码)_kaic

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本售楼管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&am…

自学记录:鸿蒙5使用ArkTS和ArkUI实现Live View功能

这次&#xff0c;我决定挑战一个全新的模块——Live View Kit&#xff0c;它提供了实况窗的创建、更新和管理功能。作为API 13的全新特性&#xff0c;我想用它开发一个智能餐厅的点餐和取餐提醒功能。 这篇文章不仅是我的学习记录&#xff0c;也是我探索HarmonyOS Next API 13的…