大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…

章节内容

上节我们完成了如下内容:

  • 磁盘存储
  • 零拷贝
  • 磁盘文件传输
  • JavaNIO、mmap、sendfile

在这里插入图片描述

事务场景

  • 如producer发的多条消息组成一个事务,这些消息需要对consumer同时可见又同时不可见。
  • producer可能会给多个Topic,多个Partition发消息,这些消息也需要能放一个事务里面,这就形成了一个典型的分布式事务。
  • Kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consumer-transform-produce过程需要放到一个事务里面,必须在消息处理或者发送过程中失败了,消费偏移量也不能提交。
  • producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务。
  • 在一个原子操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引入的场景,最后一种没用。

这三种情况是:

  • 只有Producer生产消息
  • 消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的consume-transform-produce模式
  • 只有consumer消费消息,这种操作其实没有什么意义,跟使用手动提交一样,而且也不是事务属性引入的目的,所以一般不会使用这种情况。

关键概念和推导

  • 因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinate和之前为了解决脑裂和惊群问题引入Group Coordinate在选举上类似。
  • 事务管理上事务日志是必不可少的,Kafka使用一个内部的topic来保存事务日志,这个设计和之前使用内部topic保存偏移量的设计保持一致。事务日志是TransactionCoordinate管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态,_transaction_state
  • 因为事务存在commit和abort两种操作,而客户端又read commit和read uncommited两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。
  • procuer挂掉重启或者漂移到其他机器需要能关联的之前的未完成的事务,所以需要有一个唯一标识符来进行关联,这个就是Transcational Id,一个producer挂了,另一个相同Transaction Id的producer能够接着处理这个事务未完成的状态。Kafka目前没有引入全局序,所以也没有transaction id,这个Transcation Id是用户提前配置的。
  • TranscationId能关联producer,也需要避免两个使用相同Transaction Id的producer同时存在,所以引入了producer epoch来保证对应一个Transcation Id只有一个活跃的producer。

事务语义

多分区原子写入

事务能够保证KafkaTopic下每个分区的原子写入,事务中所有的消息都将被写入或者丢弃。
首先,我们来考虑一下子原子:读取-处理-写入周期是什么意思。简而言之,这意味着如果某个应用程序在某个Topic0的偏移量X处读取的消息A,并且在对消息A进行了一些处理(如B=F(A))之后将消息B写入Topic tp1,则只有当消息A和B被认为被成功的消费并一起发布,或者完全不发布时,整个读取过程写入操作时原子的。
现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写入一个名为offset topic的内部KafkaTopic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。
由于offset commit只是对KafkaTopic的另外一次,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子 读取-处理-写入 循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。

粉碎“僵尸案例”

我们通过每个事务Producer分配一个称为Transcation Id的唯一标识来解决僵尸实例的问题,在进程重新启动时能够识别相同的Producer实例。
API要求事务性Producer的第一个操作应该是在Kafka集群中显示注册的TranscationId,当注册的时候,KafkaBroker用给定的Transcational Id检查打开的事务并完成处理。
Kafka也增加了一个与Transcational Id相关的epoch,epoch存储每个id内部元数据。
一旦epoch被触发,任何具有相同的Transcation Id和旧的epoch的生产者视为僵尸,Kafka拒绝来自这些生产者的后续事务性写入。

简而言之:Kafka可以保证Consumer最终只能消费非事务性消息或已提交事务性消息,它将保留来自未完成事务的消息,并过滤已终止事务的消息。

事务消息定义

生产者可以显式的发起事务会话,在这些会话中发送(事务)消息,并提交或中止事务。有如下的要求:

  • 原子性:消费者的应用程序不应暴露于未提交的消息中
  • 持久性:Broker不能丢失任何已提交的事务
  • 排序:事务消费者应在每个分区中以原始顺序查看事务消息
  • 交织:每个分区都应该能够接受来自事务性生产者非事务生产者的消息
  • 事务中不应该有重复的消息

如果允许事务性和非事务性消息的交织,则非事务和事务性消息的相对顺序将基于相加(对于非事务性消息)和最终提交(对于事务性消息)的相对顺序。

在这里插入图片描述
在上图中,分区P0和P1接收事务X1和X2的消息,以及非事务性消息。时间线是消息到达Broker的时间,由于首先提交了X2,所以每个分区都将在X1之前公开来自X2的消息,由于非事务性消息在X1和X2的提交之前到达,因此这些消息将在来自任一事务的消息之前公开。

事务配置

消费者

创建消费者代码,需要:
● 将配置中的自动提交属性(auto.commit)进行关闭
● 而且在代码里面也不能使用手动提交commitSync()或者commitAsync()
● 设置Isolation.level:READ_COMMITED或READ_UNCOMMITED

生产者

创建生产者,代码如下:
● 配置transacational.id属性
● 配置enable.idempotence属性

事务概览

生产者将表示事务开始、结束、中止状态的事务控制消息发送给使用多阶段协议管理事务的高可用事务协调器。生产者将事务控制记录(开始、结束、中止)发送到事务协调器,并将事务的消息直接发送到目标数据分区,消费者需要了解事务并缓冲每个待处理的事务,直到它们到达其相应的结束(提交、中止)记录为止。

  • 事务组
  • 事务组中的生产者
  • 事务组的事务协调器
  • Leader Brokers(事务数据所在的分区的Broker)
  • 事务的消费者

事务组

事务组用于映射到特定的事务协调器(基于日志分区数字的哈希)。该组中的生产者需要配置为该组事务生产者,由于来自这些生产者的所有事务都通过此协调器进行,因此我们可以在这些事务生产者之间实现严格有序。

生产者ID和事务组状态

事务生产者需要两个参数:

  • 生产者ID
  • 生产组
    需要将生产者的输入状态与上一个已提交的事务相关联,这使事务生产者能够重试事务(通过为该事务重新创建输入状态:在我们用例中通过是偏移量的向量)

可以使用消费者偏移管理机制来管理这些状态,消费者偏移量管理器将每个键(consumergroup-topic-partition)与该分区的最后一个检查点偏移量和元数据相关联。在事务生产者中,我们保存消费者的偏移量,该偏移量与事务的提交点关联。此偏移提交记录,(在__consumer_offsets主题中) 应作为事务的一部分写入。
即,存储消费组偏移量的(__consumer_offsets)主题分区将需要参与事务。因此,假定生产者在事务中间失败(事务协调器随后到期)。当生产者恢复时,它可以发出偏移量获取请求,以恢复与最后提交的事务相关联的输入偏移量,并从该点恢复事务处理。

为了支持此功能,我们需要对偏移量管理器和压缩的(__consumer_offsets)主题进行一些增强。
首先,压缩的主题现在还将包含事务控制记录,我们将需要为这些控制记录提出剔除策略。
其次,偏移量管理器需要具有事务意识,特别是,如果组与待处理的事务相关联,则偏移量提取请求应返回错误。

事务协调器

事务协调器 __transaction_state 主题特定分区的Leader分区所在的Broker,它负责初始化、提交以及回滚事务。事务协调器在内存管理如下的状态:

  • 对应正在处理的事务的第一个消息的HW,事务协调器周期性的将HW写到ZK中。
  • 事务控制日志中存储对应于日志HW的所有正在处理的事务
  • 事务消息主题分区的列表:事务的超时时间、与事务关联的Producer ID

需要确保无论是什么样的保留策略(日志分区的删除还是压缩),都不能删除包含事务HW的日志分段。

事务流程

初始阶段

  • Producer:计算哪个Broker作为事务协调器
  • Producer:向事务协调器发送BeginTransaction(producerId,generation、partitions)请求,当然也可以发送另一个包含事务过期时间的,如果生产者需要将消费者状态作为事务的一部分提交事务,则需要在BeginTransaction中包含对应的 __consumer_offsets 主题分区的信息。
  • Broker:生成事务ID
  • Coordinator:向事务协调主题追加BEIGIN(TxId,ProducerId,Generattion、Partitions)消息,然后发送响应给生产者
  • Producer:读取响应(包含了事务ID:TxId)
  • Coordinator(and followers):在内存更新当前事务的待确认事务状态和数据分区信息

发送阶段

  • Producer:发送事务消息给主题Leader分区所在的Broker
  • 每个消息包含TxId和TxCtl字段
  • TxCtl仅用于标记事务的最终状态(提交还是中止),生产者请求也封装了生产者ID,但不是不追加到日志中。

结束阶段

(生产者准备提交事务)

  • Producer:发送OffsetCommitRequest请求提交与事务结束状态关联的输入法状态(如下一个事务输入从哪儿开始)
  • Producer:发送CommitTranscation(TxId,ProducerId,Generation)请求给事务协调器并等待响应(如果响应中没有错误信息,表示将提交事务)。
  • Coordinator:向事务控制主题追加PREPARE_COMMIT(TxId)请求并向生产者发送响应。
  • Coordinator:向事务设计到的每个Leader分区(事务的业务数据的目标主题)的Broker发送一个CommitTranscation(TxId,Partitions…)请求。
  • 事务业务数据的目标主题相关Leader分区Broker:(情况1:)如果不是__consumer_offsets主题的Learder分区,一收到CommitTransaction(TxId,Partition1,Partition2)请求就会向对应的分区Broker发送空(NULL)消息,并给该消息设置TxId和TxCtl(设置为COMMITED)字段,Leader分区的Broker给协调器发送响应。
  • 事务业务数据的目标主题相关Leader分区Broker:(情况2:)如果是__consumer_offsets主题的Leader分区:追加消息,该消息的key是G-LAST-COMMIT,Value就是TxId的值,同时也应该给该消息设置TxId和TxCtl字段。
  • Coordinator:向事务控制主题发送COMMITED(TxId)请求,__transaction_state
  • Coordinator(and followers):尝试更新HW。

事务中止

当事务生产者发送业务消息的时候如果发生了异常,可以中止该事务,如果事务提交超时,事务协调器也会中止当前事务。

  • Producer:向事务协调器发送AbortTransaction(TxId)请求并等待响应。(一个没有异常的响应表示事务将会中止)
  • Coordinator:向事务控制主题追加PREPARE_ABORT(Txid)消息,然后向生产者发送响应。
  • Coordinator:向事务业务数据的目标主题的每个涉及到的Leader分区Broker发送AbortTranscation(TxId,Partition)请求。

基本事务流程的失败

  • 生产者发送BeginTranscation(TxId)的时候超时或响应中包含异常,生产者使用相同的TxId重试。
  • 生产者发送数据时的Broker错误:生产者中止(然后重做)事务(使用新的TxId)。
  • 如果生产者没有中止事务,则协调器将在事务超时后中止事务。仅在可能已将请求数据附加并复制到Follower的错误的情况下才需要重做事务。例如:生产者请求超时将需要重做,而NotLeaderForPartitionException不需要重做。
  • 生产者发送CommitTranscation(TxId)请求超时或响应中包含异常,生产者使用相同的TxId重试事务,此时需要幂等性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/397247.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DC-5靶机渗透测试

DC-5靶场 文章目录 DC-5靶场信息收集漏洞发现漏洞利用 --- 日志文件包含漏洞利用 --- 文件包含过滤器链的RCEshell反弹权限提升 信息收集 使用--scriptvuln扫描发现了一个thankyou.php界面 感觉会有问题,前往访问网站信息 漏洞发现 来到thankyou.php界面&#xff…

haproxy详解

目录 一、haproxy简介 二、什么是负载均衡 2.1 负载均衡的类型 2.2.1 硬件 2.2.2 四层负载均衡 2.2.3 七层负载均衡 2.2.4 四层和七层的区别 三、haproxy的安装及服务信息 3.1 示例的环境部署: 3.2 haproxy的基本配置信息 3.2.1 global 配置参数介绍 3…

sleuth+zipkin分布式链路追踪

概述 结构图 常见的链路追踪 cat zipkin pinpoint skywalking sleuth Sleuth介绍 Trace Span Annotation 使用Sleuth 添加依赖 <!--sleuth--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starte…

运维工具的衍化对运维工作的新挑战

运维工具的衍化对运维工作产生了深远的影响&#xff0c;这些影响体现在多个方面&#xff0c;包括提升运维效率、优化资源配置、增强故障应对能力、促进团队协作与沟通&#xff0c;以及面临新的挑战如数据安全和隐私保护等。运维工具的衍化对运维工作带来了多方面的新挑战&#…

Yolo-World初步使用

Yolo v8目前已经支持Yolo-World&#xff0c;整理一下初步使用步骤。 使用步骤 1 先下载Yolo-World的pt文件&#xff0c;下载地址&#xff1a;GitHub - AILab-CVC/YOLO-World: [CVPR 2024] Real-Time Open-Vocabulary Object Detection 官网应该是点这里&#xff08;有个笑脸…

【C#】读取与写入txt文件内容

在 C# 中读取和写入文本文件内容是一个常见的任务。以下是使用几种不同方法读取和写入文本文件的示例。 一、读取txt文件内容 1.1 使用 StreamReader using System; using System.IO;class Program {static void Main(){string filePath "C:\path\to\your\file.txt&qu…

QT多语言工具实现支持生成ts文件,ts文件和xlsx文件互转

一. 工具介绍 1.如果你是Qt项目,为多语言发愁的话,看到这篇文件,恭喜你有福啦!工具截图如下:​ 2.在项目开发的过程中,尽量将所有需要翻译的文本放在一个文件中,qml翻译用一个文件,cpp用一个,如下: test.h #pragma once /******************************************…

Java面试篇(线程池相关专题)

文章目录 1. 为什么要使用线程池2. 线程池的核心参数和线程池的执行原理2.1 线程池的核心参数2.2 线程池的执行原理 3. 线程池中常见的阻塞队列3.1 常见的阻塞队列3.2 ArrayBlockingQueue 和 LinkedBlockingQueue 的区别 4. 如何确定线程池的核心线程数4.1 应用程序中任务的类型…

【代码随想录】长度最小的子数组——滑动窗口

本博文为《代码随想录》的学习笔记。原文链接&#xff1a;代码随想录 题目 原题链接&#xff1a;长度最小的子数组 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 子数组 [numsl, numsl1, ..., numsr-1, nums…

历史库,成本与性能如何兼得?| OceanBase应用实践

随着数据量的迅猛增长&#xff0c;企业和组织在数据库管理方面遭遇的挑战愈发凸显。数据库性能逐渐下滑、存储成本节节攀升&#xff0c;以及数据运维复杂性的增加&#xff0c;这些挑战使得DBA和开发者在数据管理上面临更大的压力。 为了应对这些挑战&#xff0c;对数据生命周期…

uni-app学习笔记

一、下载HBuilder https://www.dcloud.io/hbuilderx.html 上述网址下载对应版本&#xff0c;下载完成后进行解压&#xff0c;不需要安装&#xff0c;解压完成后&#xff0c;点击HBuilder X.exe文件进行运行程序 二、创建uni-app项目 此处我是按照文档创建的uni-ui项目模板…

DWG图纸识别工作

DWG图纸识别工作 目的&#xff1a;完成从DWG图纸中数据的提取&#xff0c;在数据提取之前先要对DWG图纸进行识别。得到某个图层的数据。 最终完成图纸建筑外轮廓线坐标数据的提取。 1、 DWG图纸&#xff0c;通过AutoCAD软件导出 DXF文件 2、 DXF文件上传到服务端&#xff0c;…

Java设计模式(适配器模式)

定义 将一个类的接口转换成客户希望的另一个接口。适配器模式让那些接口不兼容的类可以一起工作。 角色 目标抽象类&#xff08;Target&#xff09;&#xff1a;目标抽象类定义客户所需的接口&#xff08;在类适配器中&#xff0c;目标抽象类只能是接口&#xff09;。 适配器类…

XJTUSE-离散数学-图论

概述 图的定义 几个定义&#xff0c;不赘述 多重图&#xff1a;有平行边存在 简单图&#xff1a;无平行边 无自环 子图 and 补图 完全图的概念 结点的度 入度&#xff0c;出度 奇结点、偶结点 定理&#xff1a;对于无向图&#xff0c;奇结点的个数为偶数 图的同构 必…

Golang 并发编程

Golang 并发编程 Goroutine 什么是协程 创建 Goroutine 主 goroutine &#xff08;main函数&#xff09;退出后&#xff0c;其它的工作 goroutine 也会自动退出 package mainimport ("fmt""time" )func myFunc() {i : 0for {ifmt.Println("func: …

MySQL:表的设计原则和聚合函数

所属专栏&#xff1a;MySQL学习 &#x1f48e;1. 表的设计原则 1. 从需求中找到类&#xff0c;类对应到数据库中的实体&#xff0c;实体在数据库中表现为一张一张的表&#xff0c;类中的属性对应着表中的字段 2. 确定类与类的对应关系 3. 使用SQL去创建具体的表 范式&#xff1…

从“抠图”到“抠视频”,Meta上新AI工具SAM 2。

继2023年4月首次推出SAM&#xff0c;实现对图像的精准分割后&#xff0c;Meta于北京时间2024年7月30日推出了能够分割视频的新模型SAM 2&#xff08;Segment Anything Model 2&#xff09;。SAM 2将图像分割和视频分割功能整合到一个模型中。所谓“分割”&#xff0c;是指区别视…

API 签名认证:AK(Access Key 访问密钥)和 SK(Secret Key 私密密钥)

API签名认证 在当今的互联网时代&#xff0c;API作为服务与服务、应用程序与应用程序之间通信的重要手段&#xff0c;其安全性不容忽视。你是否遇到过需要在HTTP请求中加入访问密钥(ak)和私密密钥(sk)的情况&#xff1f;是不是担心这些敏感信息会被拦截或者泄露&#xff1f;本…

【多线程】乐观/悲观锁、重量级/轻量级锁、挂起等待/自旋锁、公平/非公锁、可重入/不可重入锁、读写锁

文章目录 乐观锁和悲观锁重量级锁和轻量级锁挂起等待锁和自旋锁公平锁和非公平锁可重入锁和不可重入锁读写锁相关面试题 锁&#xff1a;非常广义的概念&#xff0c;不是指某个具体的锁&#xff0c;所有的锁都可以往这些策略中套 synchronized&#xff1a;只是市面上五花八门的锁…

[独家原创]基于分位数回归的Bayes-GRU多变量时序预测【区间预测】 (多输入单输出)Matlab代码

[独家原创]基于分位数回归的Bayes-GRU多变量时序预测【区间预测】 &#xff08;多输入单输出&#xff09;Matlab代码 目录 [独家原创]基于分位数回归的Bayes-GRU多变量时序预测【区间预测】 &#xff08;多输入单输出&#xff09;Matlab代码效果一览基本介绍程序设计参考资料 效…