kafka源码阅读-ReplicaStateMachine(副本状态机)解析

概述

Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:

  1. 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。

  2. Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。

  3. Connect源码 :用来构建异构数据双向流式同步服务。

  4. Stream源码 :用来实现实时流处理相关功能。

  5. Raft源码 :实现了Raft一致性协议。

  6. Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。

  7. Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。

  8. Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。

  9. Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。

  10. Common模块 :包含各种异常类以及错误验证。

  11. Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。

  12. Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。

  13. Coordinator模块 :负责管理部分consumer group和他们的offset。

  14. Javaapi模块 :提供Java语言的Producer和Consumer的API接口。

  15. Log模块 :负责Kafka文件存储,读写所有Topic消息数据。

  16. Message模块 :封装多条数据组成数据集或压缩数据集。

  17. Metrics模块 :负责内部状态监控。

  18. Network模块 :处理客户端连接,网络事件模块。

  19. Producer模块 :生产者细节实现,包括同步和异步消息发送。

  20. Security模块 :负责Kafka的安全验证和管理。

  21. Serializer模块 :序列化和反序列化消息内容。

  22. Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。

  23. Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。

  24. Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。

这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。

kafka源码分支为1.0.2

分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。

kafkaController初始化时,会启动replicaStateMachine和partitionStateMachine:

    //在 KafkaController 中//有两个状态机:分区状态机和副本状态机;//一个管理器:Channel 管理器,负责管理所有的 Broker 通信;//相关缓存:Partition 信息、Topic 信息、broker id 信息等;//四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发;//启动副本状态机,初始化所有 Replica 的状态信息,如果 Replica 所在节点是 alive 的,那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligible;replicaStateMachine.startup()//启动分区状态机,初始化所有 Partition 的状态信息,如果 leader 所在 broker 是 alive 的,那么状态更新为 OnlinePartition,否则更新为 OfflinePartitionpartitionStateMachine.startup()

ReplicaStateMachine类相关方法:

  /*** Invoked on successful controller election. First registers a broker change listener since that triggers all* state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.* Then triggers the OnlineReplica state change for all replicas.*/def startup() {// //初始化所有副本的状态信息initializeReplicaState()//将online的replica状态转变为OnlineReplicahandleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)info("Started replica state machine with initial state -> " + replicaState.toString())}/*** Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions* in zookeeper*///初始化所有副本的状态信息// 这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState 中,并没有真正进行状态转移的操作。private def initializeReplicaState() {for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {val topic = topicPartition.topicval partition = topicPartition.partitionassignedReplicas.foreach { replicaId =>val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)//如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica//replicaId即brokerIdif (controllerContext.isReplicaOnline(replicaId, topicPartition))replicaState.put(partitionAndReplica, OnlineReplica)else {// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.// This is required during controller failover since during controller failover a broker can go down,// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.//否则设置为 ReplicaDeletionIneligible 状态replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)}}}}/*** This API is invoked by the broker change controller callbacks and the startup API of the state machine* @param replicas     The list of replicas (brokers) that need to be transitioned to the target state* @param targetState  The state that the replicas should be moved to* The controller's allLeaders cache should have been updated before this*///用于处理 Replica 状态的变化def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,callbacks: Callbacks = (new CallbackBuilder).build) {if (replicas.nonEmpty) {info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))try {brokerRequestBatch.newBatch()//状态转变replicas.foreach(r => handleStateChange(r, targetState, callbacks))//向 broker 发送相应请求brokerRequestBatch.sendRequestsToBrokers(controller.epoch)} catch {case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)}}}/*** This API exercises the replica's state machine. It ensures that every state transition happens from a legal* previous state to the target state. Valid state transitions are:* NonExistentReplica --> NewReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the*   partition to every live broker** NewReplica -> OnlineReplica* --add the new replica to the assigned replica list if needed** OnlineReplica,OfflineReplica -> OnlineReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the*   partition to every live broker** NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica* --send StopReplicaRequest to the replica (w/o deletion)* --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and*   UpdateMetadata request for the partition to every live broker.** OfflineReplica -> ReplicaDeletionStarted* --send StopReplicaRequest to the replica (with deletion)** ReplicaDeletionStarted -> ReplicaDeletionSuccessful* -- mark the state of the replica in the state machine** ReplicaDeletionStarted -> ReplicaDeletionIneligible* -- mark the state of the replica in the state machine** ReplicaDeletionSuccessful -> NonExistentReplica* -- remove the replica from the in memory partition replica assignment cache* @param partitionAndReplica The replica for which the state transition is invoked* @param targetState The end state that the replica should be moved to*/def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,callbacks: Callbacks) {val topic = partitionAndReplica.topicval partition = partitionAndReplica.partitionval replicaId = partitionAndReplica.replicaval topicAndPartition = TopicAndPartition(topic, partition)// Replica 不存在的话,状态初始化为 NonExistentReplicaval currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)try {def logStateChange(): Unit =stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +s"$currState to $targetState")val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)//校验状态转变是否符合要求assertValidTransition(partitionAndReplica, targetState)targetState match {case NewReplica => 其前置状态只能为 NonExistentReplica// start replica as a follower to the current leader for its partition//从 zk 获取 Partition 的 leaderAndIsr 信息val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)leaderIsrAndControllerEpochOpt match {case Some(leaderIsrAndControllerEpoch) =>//若是leader的replica状态不能变为NewReplicaif(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +s"be moved to NewReplica state as it is being requested to become leader")//向该 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),topic, partition, leaderIsrAndControllerEpoch,replicaAssignment, isNew = true)//对于新建的 Partition,处于这个状态时,该 Partition 是没有相应的 LeaderAndIsr 信息的case None => // new leader request will be sent to this replica when one gets elected}//将该 Replica 的状态转移成 NewReplica,然后结束流程。replicaState.put(partitionAndReplica, NewReplica)logStateChange()case ReplicaDeletionStarted => //其前置状态只能为 OfflineReplica//更新向该 Replica 的状态为 ReplicaDeletionStarted;replicaState.put(partitionAndReplica, ReplicaDeletionStarted)// send stop replica command//发送 StopReplica 请求给该副本,并设置 deletePartition=true//broker收到这请求后,会从物理存储上删除这个 Replica 的数据内容brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,callbacks.stopReplicaResponseCallback)logStateChange()case ReplicaDeletionIneligible => //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionIneligible)logStateChange()case ReplicaDeletionSuccessful => //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)logStateChange()case NonExistentReplica => //其前置状态只能为 ReplicaDeletionSuccessful。// NonExistentReplica 是副本完全删除、不存在这个副本的状态// remove this replica from the assigned replicas list for its partition//在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息;val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))//将这个 Topic 从缓存中删除。replicaState.remove(partitionAndReplica)logStateChange()case OnlineReplica =>//其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible//副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 followerreplicaState(partitionAndReplica) match {case NewReplica => //其前置状态如果为 NewReplica// add this replica to the assigned replicas list for its partition//从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 AR;val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)//如果 Replica 不在 AR 中的话,那么就将其添加到 Partition 的 AR 中;if(!currentAssignedReplicas.contains(replicaId))controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)logStateChange()case _ => //其前置状态如果为:OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// check if the leader for this partition ever existed//如果该 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求//否则不做任何处理;controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(leaderIsrAndControllerEpoch) =>brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)replicaState.put(partitionAndReplica, OnlineReplica)logStateChange()case None => // that means the partition was never in OnlinePartition state, this means the broker never// started a log for that partition and does not have a high watermark value for this partition}}//最后将 Replica 的状态设置为 OnlineReplica 状态。replicaState.put(partitionAndReplica, OnlineReplica)case OfflineReplica => //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// send stop replica command to the replica so that it stops fetching from the leader//发送 StopReplica 请求给该副本,先停止副本同步 (deletePartition = false)brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)// As an optimization, the controller removes dead replicas from the ISRval leaderAndIsrIsEmpty: Boolean =controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(_) =>//将该 replica 从 Partition 的 isr 移除这个 replica(前提 isr 中还有其他有效副本)controller.removeReplicaFromIsr(topic, partition, replicaId) match {case Some(updatedLeaderIsrAndControllerEpoch) =>// send the shrunk ISR state change request to all the remaining alive replicas of the partition.val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {// 发送 LeaderAndIsr 请求给剩余的其他副本,因为 ISR 变动了brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)}//更新这个 Replica 的状态为 OfflineReplicareplicaState.put(partitionAndReplica, OfflineReplica)logStateChange()falsecase None =>true}case None =>true}if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))throw new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +s"and isr path in zookeeper is empty")}}catch {case t: Throwable =>stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +s"$currState to $targetState failed", t)}}

上面 Replica 各种转移的触发的条件:
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/385279.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

乐鑫ESP32-H2设备联网芯片,集成多种安全功能方案,启明云端乐鑫代理商

在数字化浪潮的推动下&#xff0c;物联网正以前所未有的速度融入我们的日常生活。然而&#xff0c;随着设备的激增&#xff0c;安全问题也日益成为公众关注的焦点。 乐鑫ESP32-H2致力于为所有开发者提供高性价比的安全解决方案&#xff0c;这款芯片经过专门设计以集成多种安全…

基于微信小程序的校园二手交易平台/Java的二手交易网站/基于Javaweb校园二手商品交易系统(附源码)

摘 要 使用校园二手交易平台管理校园二手物品交易&#xff0c;不仅实现了智能化管理&#xff0c;还提高了管理员的管理效率&#xff0c;用户查询的功能也需要校园二手交易平台来提供。 设计校园二手交易平台是毕设的目标&#xff0c;校园二手交易平台是一个不断创新的系统&…

React搭建Vite项目及各种项目配置

1. 创建Vite项目 在操作系统的命令终端&#xff0c;输入以下命令&#xff1a; yarn create vite 输入完成以后输入项目名称、选择开发框架&#xff0c;选择开发语言&#xff0c;如下图所示&#xff0c;即可完成项目创建。 注意事项&#xff1a; 1. Node版本必须符合要求&…

在VMware创建Ubuntu24

目录 一、创建虚拟机 1. 自定义创建虚拟机 2. 设置虚拟机兼容 3. 选择镜像 4. 命名虚拟机&#xff0c;选择存放位置 5. 处理器配置 6. 内存配置 7. 网络类型配置 8. I/O控制器类型 9. 磁盘配置 10. 完成虚拟机创建 二、Ubuntu安装 1. 进入虚拟机中进行ubuntu的安…

20240724----安装git和配置git的环境变量/如何用命令git项目到本地idea

备注参考博客&#xff1a; 1&#xff09;可以参考博客&#xff0c;用git把项目git到本地 2&#xff09;可以参考博客vcs没有git 3)git版本更新&#xff0c;覆盖安装 &#xff08;一&#xff09;安装git &#xff08;1&#xff09;官网下载的链接 https://git-scm.com/downlo…

go-kratos 学习笔记(7) 服务发现服务间通信grpc调用

服务发现 Registry 接口分为两个&#xff0c;Registrar 为实例注册和反注册&#xff0c;Discovery 为服务实例列表获取 创建一个 Discoverer 服务间的通信使用的grpc&#xff0c;放到data层&#xff0c;实现的是从uses服务调用orders服务 app/users/internal/data.go 加入 New…

数据结构(2)

文章目录 1. 线性表的顺序表示2. 线性表的链式表示 1. 线性表的顺序表示 1. 线性表是具有相同数据类型的 n n n 个数据元素的有限序列&#xff0c;其中 n n n 为表长。 2. 线性表的顺序存储又称顺序表。它是用一组地址连续的存储单元依次存储线性表中的数据元素&#xff0c;从…

如何改桥接模式

桥接模式主要是解决 路由功能的 因为NAT多层 主要是网络连接数太多时 然后路由器要好 不然光猫 比差路由要强的 光猫 请注意&#xff0c;对光猫的任何设置进行修改前&#xff0c;请一定要备份光猫的配置文件&#xff0c;并在每次修改前都截图保存原始设置信息&#xff01;不要…

【建议收藏】CTF网络安全夺旗赛刷题指南(非常详细)零基础入门到精通,收藏这一篇就够了

在数字化浪潮汹涌澎湃的今天&#xff0c;网络安全已成为国家、企业和个人无法忽视的重要议题。为了挖掘和培养网络安全人才&#xff0c;一场场紧张刺激、充满智慧的CTF&#xff08;Capture The Flag&#xff09;安全竞赛应运而生。 一、CTF安全竞赛简介 CTF安全竞赛&#xff0c…

【初阶数据结构篇】单链表的实现(赋源码)

文章目录 单链表的实现代码位置概念与结构概念&#xff1a;结构&#xff1a; 链表的性质链表的分类单链表的实现单链表的创建和打印及销毁单链表的创建单链表的打印单链表的销毁 单链表的插入单链表头插单链表尾插单链表在指定位置之前插入数据单链表在指定位置之后插入数据 单…

RK3568 Linux 平台开发系列讲解(内核入门篇):如何高效地阅读 Linux 内核设备驱动

在嵌入式 Linux 开发中,设备驱动是实现操作系统与硬件之间交互的关键。对于 RK3568 这样的平台,理解和阅读 Linux 内核中的设备驱动程序至关重要。 1. 理解内核架构 在阅读设备驱动之前,首先要了解 Linux 内核的基本架构。内核主要由以下几个部分组成: 内核核心:处理系…

【Django】在vscode中运行调试Django项目(命令及图形方式)

文章目录 命令方式图形方式默认8000端口设置自定义端口 命令方式 python manage.py runserver图形方式 默认8000端口 设置自定义端口

Python学习笔记44:游戏篇之外星人入侵(五)

前言 上一篇文章中&#xff0c;我们成功的设置好了游戏窗口的背景颜色&#xff0c;并且在窗口底部中间位置将飞船加载出来了。 今天&#xff0c;我们将通过代码让飞船移动。 移动飞船 想要移动飞船&#xff0c;先要明白飞船位置变化的本质是什么。 通过上一篇文章&#xff0…

Live555源码阅读笔记:哈希表的实现(C++)

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

数据倾斜优化思路实践

数据倾斜&#xff0c;顾名思义&#xff0c;就是在计算过程中数据分散度不够&#xff0c;导致某个节点数据过于集中&#xff0c;从而导致任务执行效率大大降低。参照对比下MR的整体流程和ODPS&#xff0c;整体结合理解数据倾斜发生的几个生命周期的节点&#xff0c;如下图&#…

Stable Diffusion绘画 | 新手必备知识点(一)

模型 模型分为 大模型 Checkpoint 、 Lora模型 以及 其他模型。 大模型是生成图片的基石&#xff0c;选择怎样的大模型&#xff0c;会直接影响图片最终生成的风格。 大模型 大模型又分为 普通模型 以及 SDXL模型&#xff0c;包括&#xff1a; 真实风二次元2.5D 普通模型&…

[C#]调用本地摄像头录制视频并保存

AForge.NET是一个基于C#框架设计的开源计算机视觉和人工智能库&#xff0c;专为开发者和研究者设计。它提供了丰富的图像处理和视频处理算法、机器学习和神经网络模型&#xff0c;具有高效、易用、稳定等特点。AForge库由多个组件模块组成&#xff0c;包括AForge.Imaging&#…

Datawhale AI 夏令营——AI+逻辑推理——Task1

# Datawhale AI 夏令营 夏令营手册&#xff1a;从零入门 AI 逻辑推理 比赛&#xff1a;第二届世界科学智能大赛逻辑推理赛道&#xff1a;复杂推理能力评估 代码运行平台&#xff1a;魔搭社区 比赛任务 本次比赛提供基于自然语言的逻辑推理问题&#xff0c;涉及多样的场景&…

大数据——Hive原理

摘要 Apache Hive 是一个基于 Hadoop 分布式文件系统 (HDFS) 的数据仓库软件项目&#xff0c;专为存储和处理大规模数据集而设计。它提供类似 SQL 的查询语言 HiveQL&#xff0c;使用户能够轻松编写复杂的查询和分析任务&#xff0c;而无需深入了解 Hadoop 的底层实现。 Hive…

Android TabLayout的简单用法

TabLayout 注意这里添加tab&#xff0c;使用binding.tabLayout.newTab()进行创建 private fun initTabs() {val tab binding.tabLayout.newTab()tab.text "模板库"binding.tabLayout.addTab(tab)binding.tabLayout.addOnTabSelectedListener(object : TabLayout.On…