【kafka系列】Kafka事务的实现原理

目录

1. 事务核心组件

1.1 幂等性生产者(Idempotent Producer)

1.2 事务协调器(TransactionCoordinator)

1.3 事务日志(Transaction Log)

2. 事务执行流程

2.1 事务初始化

2.2 发送消息

2.3 事务提交(两阶段提交)

2.4 事务完成

3. 消费者事务隔离

3.1 隔离级别

3.2 实现机制

4. 异常处理与容错

4.1 生产者宕机

4.2 协调器宕机

4.3 Broker宕机

5. 关键源码解析

5.1 事务协调器核心逻辑

5.2 两阶段提交实现

5.3 消费者过滤未提交消息

6. 事务配置与使用

6.1 生产者配置

6.2 消费者配置

7. 事务性能与限制

总结


  1. 幂等生产者:通过ProducerIDSequenceNumber去重,避免消息重复(源码见ProducerStateManager)。
  2. 事务协调器(TransactionCoordinator)
    • 每个事务绑定一个Coordinator,处理BEGIN_TRANSACTIONCOMMIT/ABORT请求。
    • 事务状态存储在内部Topic __transaction_state中(通过TransactionStateManager管理)。
  1. 两阶段提交
    • 阶段1:标记事务为“预提交”,写入所有参与分区的数据。
    • 阶段2:写入COMMIT标记到事务日志,消费者仅可见已提交的事务消息。

Kafka事务机制通过幂等性生产者事务协调器(TransactionCoordinator)两阶段提交(2PC) 实现跨分区的原子性写入,确保消息要么全部提交,要么全部丢弃。以下是核心实现机制:


1. 事务核心组件

1.1 幂等性生产者(Idempotent Producer)
  • 作用:确保单分区内消息不重复。
  • 实现机制
    • PID(Producer ID):每个生产者实例唯一,由Broker分配。
    • Sequence Number:每个消息的单调递增序列号,Broker校验序列号连续性。
    • 源码类ProducerStateManager(管理PID与序列号)。
1.2 事务协调器(TransactionCoordinator)
  • 作用:管理事务生命周期,协调事务提交或中止。
  • 实现机制
    • 每个事务绑定一个协调器(通过事务ID哈希选择Broker)。
    • 维护事务状态机(TransactionState),存储在内部Topic __transaction_state
    • 源码类TransactionCoordinatorTransactionStateManager
1.3 事务日志(Transaction Log)
  • 作用:持久化事务状态,防止协调器宕机后数据丢失。
  • 存储位置:内部Topic __transaction_state,每个分区对应一个协调器。
  • 数据格式:事务ID、PID、状态(PrepareCommitCompleted等)、超时时间。

2. 事务执行流程

2.1 事务初始化
  1. 生产者初始化事务
    • 调用initTransactions(),向协调器注册事务ID,获取PID。
    • 协调器在__transaction_state中记录事务元数据。
2.2 发送消息
  1. 发送事务消息
    • 生产者发送消息时携带PID、序列号、事务ID。
    • Broker将消息写入日志,但标记为未提交(对消费者不可见)。
2.3 事务提交(两阶段提交)
  • 阶段1:Prepare Commit
    生产者向协调器发送EndTxnRequest,协调器将事务状态置为PrepareCommit,并持久化到事务日志。
  • 阶段2:Commit Markers写入
    协调器向所有涉及的分区Leader发送WriteTxnMarkers请求,Leader在日志中写入事务提交标记(Control Batch)。
2.4 事务完成
  • Broker将事务消息标记为已提交,消费者可读取(需配置isolation.level=read_committed)。

3. 消费者事务隔离

3.1 隔离级别
  • read_committed:仅消费已提交的事务消息(跳过未提交的Control Batch)。
  • read_uncommitted:消费所有消息(默认模式,不保证事务原子性)。
3.2 实现机制
  • 消费者在拉取消息时,Broker根据隔离级别过滤未提交的事务消息。
  • 源码逻辑KafkaConsumerfetcher模块解析Control Batch,决定是否跳过消息。

4. 异常处理与容错

4.1 生产者宕机
  • 事务超时(transaction.timeout.ms):协调器自动中止未完成的事务。
  • 新生产者实例需重新初始化事务,旧事务状态由协调器清理。
4.2 协调器宕机
  • 事务日志持久化在__transaction_state,新协调器加载日志恢复状态。
4.3 Broker宕机
  • 副本机制保证事务日志和消息日志的高可用,Leader切换后继续处理事务。

5. 关键源码解析

5.1 事务协调器核心逻辑
//事务状态管理(TransactionStateManager)
public class TransactionStateManager {// 持久化事务状态到__transaction_statedef appendTransactionToLog(transactionState: TransactionState): Unit = {val records = List(new SimpleRecord(transactionState.toBytes))transactionLog.append(records)}
}
5.2 两阶段提交实现
// 协调器发送提交标记(TransactionCoordinator)
private def sendTxnMarkers(transactionState: TransactionState): Unit = {
// 向所有分区Leader发送WriteTxnMarkersRequest
transactionState.partitions.foreach { partition =>val request = new WriteTxnMarkersRequest.Builder(partition, Commit)sendRequestToLeader(request)}
}
5.3 消费者过滤未提交消息
// 消费者拉取消息过滤(ConsumerFetcherThread)
private def filterCommittedMessages(records: ConsumerRecords): ConsumerRecords = {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {records.filter(_.controlBatchType != ControlBatchType.ABORT)
} else {records
}
}

6. 事务配置与使用

6.1 生产者配置
props.put("enable.idempotence", "true");  // 开启幂等性
props.put("transactional.id", "my-tx-id"); // 必须设置事务ID
props.put("transaction.timeout.ms", "60000"); // 事务超时时间
6.2 消费者配置
props.put("isolation.level", "read_committed"); // 仅消费已提交消息

7. 事务性能与限制

  • 性能开销:事务引入两阶段提交和日志持久化,吞吐量下降约20%-30%。
  • 限制
    • 事务仅支持单会话(一个生产者实例)。
    • 事务消息的消费者必须使用Kafka Consumer API(不支持旧版基于ZooKeeper的消费者)。

总结

Kafka事务通过以下机制实现跨分区的原子性:

  1. 幂等性生产者:避免单分区消息重复。
  2. 事务协调器与两阶段提交:确保所有分区要么全部提交,要么全部回滚。
  3. 事务日志持久化:保障协调器故障恢复后状态一致。
  4. 消费者隔离级别:控制事务消息的可见性。

正确配置后,Kafka事务可支持金融级场景的精确一次(Exactly-Once)语义

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/21434.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用C++ Qt实现安卓电池充电动效 | 打造工业级电量控件

一、为什么需要自定义电池控件? 在工业控制、车机系统、智能硬件等领域的UI开发中,电池状态显示是高频出现的UI组件。通过实现一个支持颜色渐变、动态充电动画、警戒阈值提示的电池控件,开发者可以系统掌握以下核心能力: Qt绘图…

一批起飞猪名单配图

好久没有使用风口猪选股指标了,今天去玩了一把,发现起飞猪指标显示了好多一批猪票 华曙高科 汉威科技 双林股份 曼恩斯特 长盈精密 江苏雷利 双飞集团 奥飞数据 硅宝科技 水晶光电 长盈精密

机器学习笔记——常用损失函数

大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本笔记介绍机器学习中常见的损失函数和代价函数,各函数的使用场景。 热门专栏 机器学习 机器学习笔记合集 深度学习 深度学习笔记合集 文章目录 热门…

Ubuntu 服务器Llama Factory 搭建DeepSeek-R1微调训练环境

1.首先了解一下什么是LLM微调 LLM 微调指的是在已经预训练好的大型语言模型基础上,使用特定的任务数据或领域数据,通过进一步的训练来调整模型的参数,使其在特定任务或领域上能够表现得更好。简单来说,就是对一个已经具备了丰富语…

环境变量与本地变量

目录 本地变量的创建 环境变量VS本地变量 认识完了环境变量我们来认识一下本地变量。 本地变量的创建 我们如果直接env是看不到本地变量的,因为本地变量和环境变量都具有独立性,环境变量是系统提供的具有全局属性的变量,都存在bash进程的…

智慧农业新生态 | 农业数字化服务平台——让土地生金,让服务无忧

一部手机管农事,从播种到丰收,全链路数字化赋能! 面向农户、农机手、农服商、农资商打造的一站式农业产业互联网平台,打通农资交易、农机调度、农服管理、技术指导全场景闭环,助力乡村振兴提效增收。 三大核心场景&am…

【运维自动化-作业平台】如何创建执行方案和作业模板

蓝鲸智云作业平台,以下简称作业平台或JOB平台作业模板和执行方案:将运维操作场景中涉及到的多个脚本执行或文件分发步骤组合成一个作业模板,这个作业模板尽可能把场景相关的共性逻辑都包含进去,然后再根据实际使用场景衍生出相应的…

广度优先搜索--之重生之我是蒟蒻,从入坟到入坑式讲解

广度优先搜索 1.什么是广度优先搜索? 广度优先搜索(Breadth-First Search,简称BFS)是一种遍历或搜索树和图的算法,也称为宽度优先搜索,BFS算法从图的某个节点开始,依次对其所有相邻节点进行探索和遍历&am…

CSDN文章质量分查询系统【赠python爬虫、提分攻略】

CSDN文章质量分查询系统 https://www.csdn.net/qc 点击链接-----> CSDN文章质量分查询系统 <------点击链接 点击链接-----> https://www.csdn.net/qc <------点击链接 点击链接-----> CSDN文章质量分查询系统 <------点击链接 点击链…

为AI聊天工具添加一个知识系统 之113 详细设计之54 Chance:偶然和适配 之2

本文要点 要点 祖传代码中的”槽“ &#xff08;占位符变量&#xff09; 和 它在实操中的三种槽&#xff08;占据槽&#xff0c;请求槽和填充槽&#xff0c; 实时数据库&#xff08;source&#xff09;中数据(流入 ETL的一个正序流程 行列并发 靶向整形 绑定变量 &#xff09…

微信小程序实现拉卡拉支付

功能需求&#xff1a;拉卡拉支付&#xff08;通过跳转拉卡拉平台进行支付&#xff09;&#xff0c;他人支付&#xff08;通过链接进行平台跳转支付&#xff09; 1.支付操作 //支付 const onCanStartPay async (obj) > {uni.showLoading({mask: true})// 支付接口获取需要传…

Spring框架基本使用(Maven详解)

前言&#xff1a; 当我们创建项目的时候&#xff0c;第一步少不了搭建环境的相关准备工作。 那么如果想让我们的项目做起来方便快捷&#xff0c;应该引入更多的管理工具&#xff0c;帮我们管理。 Maven的出现帮我们大大解决了管理的难题&#xff01;&#xff01; Maven&#xf…

unity学习46:反向动力学IK

目录 1 正向动力学和反向动力学 1.1 正向动力学 1.2 反向动力学 1.3 实现目标 2 实现反向动力 2.1 先定义一个目标 2.2 动画层layer&#xff0c;需要加 IK pass 2.3 增加头部朝向代码 2.3.1 专门的IK方法 OnAnimatorIK(int layerIndex){} 2.3.2 增加朝向代码 2.4 …

力扣hot100——螺旋矩阵 超简单易懂的模拟搜索方法

给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 解法思路&#xff1a; // 模拟螺旋搜索设定四个边界// left right// top |————————————————|// | |// |…

格瑞普推出革命性半固态电池,为行业无人机续航注入未来动力

引言&#xff1a;行业痛点与解决方案 在行业无人机快速发展的今天&#xff0c;续航时间短、安全性不足以及效率低下等问题始终是行业难题。无论是物流运输、电力巡检&#xff0c;还是农业植保&#xff0c;用户对更持久、更安全、更高效的电池技术充满期待。 今天&#xff0c;…

C++【多态】

通俗来说&#xff0c;多态就是指同一个操作或者行为在不同的对象上可以有不同的表现形式或实现方式。举个例子&#xff1a;以 “吃” 这个行为为例&#xff0c;不同的动物有不同的 “吃” 的方式和内容。比如&#xff0c;猫吃鱼、狗吃肉、兔子吃草&#xff0c;虽然都是 “吃” …

《道德经的启示:人际关系交往的智慧》

第二章:人际关系交往的智慧 🤝 引言:现代人际关系的困境 🌟 时代背景:超连接时代的人际迷思 🌐 在这个前所未有的超连接时代,我们似乎比任何时候都更"在线"、更"联系",但真正的人际连接却越发稀缺。你是否也有过这样的困扰: 🏢 职场上愈是…

一个前端,如何同时联调多个后端

文章目录 场景解决方案思路实现步骤创建项目目标前端配置安装cross-env配置vue.config.js配置package.json 测试 场景 一个前端&#xff0c;需要同时和N个后端联调 一个需求里有若干个模块&#xff0c;分别给不同的后端开发&#xff0c;前端需要和N个后端联调 本地开启一个端…

HTML5+CSS多层级ol标签序号样式问题

在CSS中&#xff0c;ol标签用于创建有序列表&#xff0c;而多层级的ol标签可以通过CSS实现不同的序号样式。以下是一些常见的问题和解决方案&#xff1a; 1. 多层级ol的序号格式问题 默认情况下&#xff0c;多层级的ol标签会自动继承父级的序号格式&#xff0c;但有时我们可能…

DeepSeek全栈技术体系解密:从算法源码到企业级智能体开发实战

在AGI技术加速演进的时代背景下&#xff0c;DeepSeek作为行业级大模型的代表&#xff0c;正在重塑智能系统的开发范式。本课程体系首次系统性披露DeepSeek技术栈的完整实现细节&#xff0c;涵盖从底层算法创新、工程架构设计到企业级落地的全链条知识体系。 课程核心价值矩阵 …