Kafka源码分析(四) - Server端-请求处理框架

系列文章目录

Kafka源码分析-目录

一. 总体结构

先给一张概览图:
在这里插入图片描述

服务端请求处理过程涉及到两个模块:kafka.networkkafka.server

1.1 kafka.network

该包是kafka底层模块,提供了服务端NIO通信能力基础。

有4个核心类:SocketServer、Acceptor、Processor、RequestChannel。各自角色如下:

  • SocketServer:服务端的抽象,是服务端通信的入口;

  • Acceptor:Reactor通信模式中处理连接ACCEPT事件的线程/线程池所执行的任务;

  • Processor:Reactor通信模式中处理连接可读/可写事件的线程/线程池所执行的任务;

  • RequestChannel:请求队列,存储已经解析好的请求以等待处理;

对于上层模块而言,该基础模块有两个输入和一个输出

  1. 输入:IP+端口号,该模块会对目标端口实现监听;

  2. 输出:解析好的请求,通过RequestChannel进行输出;

  3. 输入:待发送的Response,通过Processor.responseQueue来完成输入;

1.2 kafka.server

该包在kafka.network的基础上实现各种请求的处理逻辑,主要包含KafkaServer和KafkaApis两个类。其中:

  • KafkaServer:Kafka服务端的抽象,统一维护Kafka服务端的各流程和状态;

  • KakfaApis:维护了各类请求对应的业务逻辑,通过KafkaServer.apis字段组合到KafkaServer之中;

二. Server的端口监听

整体流程如图:
在这里插入图片描述

接下来按调用顺序依次分析各方法

2.1 KafkaServer.startup()

关于端口监听的核心逻辑分4步,代码如下(用注释说明各部分的目的):

def startup() {// 省略无关代码... ...// 1. 创建SocketServersocketServer = new SocketServer(config, metrics, time, credentialProvider)// 2. 启动端口监听// (在这里完成了Acceptor的创建和端口ACCEPT事件的监听)// (startupProcessors = false表示暂不启动Processor处理线程)socketServer.startup(startupProcessors = false)// 3. 启动请求处理过程中的相关依赖// (这也是第2步中不启动Processor处理线程的原因,有依赖项需要处理)... ...// 4. 启动端口可读/可写事件处理线程(即Processor线程)socketServer.startProcessors()// 省略无关代码... ...
}

2.2 SocketServer.startup(Boolean)

代码及说明性注释如下:

def startup(startupProcessors: Boolean = true) {this.synchronized {// 省略无关代码... ...// 1. 创建Accetpor和Processor的实例,// 同时页完成了Acceptor对端口ACCEPT事件的监听createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)// 2. [可选]启动各Acceptor对应的Processor线程if (startupProcessors) {startProcessors()}}
}

2.3 ScocketServer.createAcceptorAndProcessor()

直接上注释版的代码,流程分3步:

// 入参解释
// processorsPerListener: 对于每个IP:Port, 指定Reactor模式子线程池大小, 
//                        即处理端口可读/可写事件的线程数(Processor线程);
// endpoints: 接收请求的IP:Port列表;
def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {// 省略无关代码... ...endpoints.foreach { endpoint =>// 省略无关代码... ...// 1. 创建Acceptor对象// 在此步骤中调用Acceptor.openServerSocket, 完成了对端口ACCEPT事件的监听val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)// 2. 创建了与acceptor对应的Processor对象列表// (这里并未真正启动Processor线程)addProcessors(acceptor, endpoint, processorsPerListener)// 3. 启动Acceptor线程KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()// 省略无关代码... ...}}

2.4 Acceptor.openServerSocket()

该方法中没什么特殊点,就是java NIO的标准流程:

def openServerSocket(host: String, port: Int): ServerSocketChannel = {// 1. 构建InetSocketAddress对象val socketAddress =if (host == null || host.trim.isEmpty)new InetSocketAddress(port)elsenew InetSocketAddress(host, port)// 2. 构建ServerSocketChannel对象, 并设置必要参数值val serverChannel = ServerSocketChannel.open()serverChannel.configureBlocking(false)if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)serverChannel.socket().setReceiveBufferSize(recvBufferSize)// 3. 端口绑定, 实现事件监听try {serverChannel.socket.bind(socketAddress)info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))} catch {case e: SocketException =>throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)}// 4. 返回ServerSocketChannel对象, 用于后续register到Selector中serverChannel
}

2.5 SocketServer.startProcessor()

从这步开始,仅剩的工作就是启动Processor线程,代码都非常简单。比如本方法只是遍历Acceptor列表,并调用Acceptor.startProcessors()

def startProcessors(): Unit = synchronized {acceptors.values.asScala.foreach { _.startProcessors() }info(s"Started processors for ${acceptors.size} acceptors")
}

2.6 Acceptor.startProcessors()

该方法很简明,直接上代码

def startProcessors(): Unit = synchronized {if (!processorsStarted.getAndSet(true)) {startProcessors(processors)}
}def startProcessors(processors: Seq[Processor]): Unit = synchronized {processors.foreach { processor =>KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",processor).start()}
}

三. 请求/响应的格式

3.1 格式概述

在这里插入图片描述
请求和响应都由两部分组成:Header和Body。RequestHeader中包含ApiKey、ApiVersion、CorrelationId、ClientId;ResponseHeader中只包含CorrelationId字段。接下来逐个讲解这些字段。

  • ApiKey

    2字节整型,指明请求的类型;比如0代表Produce请求,1代表Fetch请求;具体id和请求类型之间的映射关系可在 org.apache.kafka.common.protocol.ApiKeys 中找到;

  • ApiVersion

    随着API的升级迭代,各类型请求的请求体格式可能有变更;这个2字节的整型指明了请求体结构的版本;

  • CorrelationId

    4字节整型,在Response中传回,Kafka Server端不处理,用于客户端内部关联业务数据;

  • ClientId

    可变长字符串,标识客户端;

3.2 请求体/响应体的具体格式

各业务操作(比如Produce、Fetch等)对应的请求体和响应体格式都维护在 org.apache.kafka.common.protocol.ApiKeys 中。接下来以Produce为例讲解ApiKeys是如何表达数据格式的。

ApiKeys是个枚举类,其核心属性如下:

public enum ApiKeys {// 省略部分代码... ...// 上文提到的请求类型对应的idpublic final short id;// 业务操作名称public final String name;// 各版本请求体格式public final Schema[] requestSchemas;// 各版本响应体格式public final Schema[] responseSchemas;// 省略部分代码... ...
}

其中PRODUCE枚举项的定义如下

PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions())

可以看到各版本的请求格式维护在 ProduceRequest.schemaVersions(),代码如下

public static Schema[] schemaVersions() {return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
}

这里只是简单返回了一个Schema数组。一个Schema对象代表了一种数据格式。请求头中的ApiVersion指明了请求体的格式对应数组的第几项(从0开始)。

接下来我们看看Schema是如何表达数据格式的。其结构如下
在这里插入图片描述
Schema有两个字段:fields和fieldsByName。其中fields是体现数据格式的关键,它指明了字段的排序和各字段类型;而fieldsByName只是按字段名重新组织的Map,用于根据名称查找对应字段。

BoundField只是Field的简单封装。Field有两个核心字段:name和type。其中name表示字段名称,type表示字段类型。常见的Type如下:

Type.BOOLEAN;
Type.INT8;
Type.INT16;
Type.INT32;// 可通过org.apache.kafka.common.protocol.types.Type查看全部类型
... ...

回到PRODUCE API,通过查看Schema的定义,能看到其V0版本的请求体和响应体的结构如下:
在这里插入图片描述

四. 请求的处理流程

在这里插入图片描述

  1. Acceptor监听到ACCEPT事件(TCP创建连接"第一次握手"的SYN);

  2. Acceptor将将连接注册到Processor列表内的其中一个,由该Processor监听这个连接的后续可读可写事件;

  3. Processor接收到完整请求后,会将Request追加到RequestChannel中进行排队,等待后续处理;

  4. KafkaServer中有个requestHandlerPool的字段,KafkaRequestHandlerPool类型,代表请求处理线程池;KafkaRequestHandler就是其中的线程,会从RequestChannel拉请求进行处理;

  5. KafkaRequestHandler将拉到的Request传入KafkaApis.handle(Request)方法进行处理;

  6. KafkaApis根据不同的ApiKey调用不同的方法进行处理,处理完毕后会将Response最终写入对应的Processor的ResponseQueue中等待发送;KafkaApis.handle(Request)的方法结构如下:

    def handle(request: RequestChannel.Request) {try {// 省略部分代码... ...request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request)case ApiKeys.FETCH => handleFetchRequest(request)case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)case ApiKeys.METADATA => handleTopicMetadataRequest(request)case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)// 省略部分代码... ...}} catch {case e: FatalExitError => throw ecase e: Throwable => handleError(request, e)} finally {request.apiLocalCompleteTimeNanos = time.nanoseconds}
    }
    
  7. Processor从自己的ResponseQueue中拉取待发送的Respnose;

  8. Processor将Response发给客户端;

五. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)
感谢阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/315465.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PotatoPie 4.0 实验教程(24) —— FPGA实现摄像头图像中心差分变换

为什么要对图像进行中心差分变换? 对图像进行中心差分变换的主要目的是计算图像中每个像素点的梯度。梯度在图像处理中是一个非常重要的概念,它可以用来描述图像中灰度变化的快慢和方向,常用于边缘检测、特征提取和图像增强等任务中。 具体…

【GitHub】2FA认证(双重身份验证)

GitHub 2FA认证(双重身份验证) 写在最前面一、使用 TOTP 应用程序配置双2FA(双因素身份验证)1. 介绍2. github3. 认证 官网介绍小结 & 补充 :权限不足or验证码错误问题 🌈你好呀!我是 是Yu欸…

CCS项目持续集成

​ 因工作需要,用户提出希望可以做ccs项目的持续集成,及代码提交后能够自动编译并提交到svn。调研过jenkins之后发现重新手写更有性价比,所以肝了几晚终于搞出来了,现在分享出来。 ​ 先交代背景: 1. 代码分两部分&am…

C++设计模式:适配器模式(十四)

1、定义与动机 定义:将一个类的接口转换成客户希望的另外一个接口。Adapter模式使得原本由于接口不兼容而不能一起工作的哪些类可以一起工作。 动机: 在软件系统中,由于应用环境的变化,常常需要将“一些现存的对象”放在新的环境…

【前端缓存】localStorage是同步还是异步的?为什么?

写在开头 点赞 收藏 学会 首先明确一点,localStorage是同步的 一、首先为什么会有这样的问题 localStorage 是 Web Storage API 的一部分,它提供了一种存储键值对的机制。localStorage 的数据是持久存储在用户的硬盘上的,而不是内存。这意…

Professional CUDA C Programming

2023/4/28 1.使用nvfrof时,报错 解决方法: 将路径 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.4\extras\CUPTI\lib64 下的文件cupti64_2020.2.0.dll复制到路径 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v11.1\bin下即可。 2…

岚图汽车与东软睿驰签署战略合作协议

4月26日,东软睿驰与岚图汽车正式签署战略合作协议,双方将结合在各自领域拥有的产业资源、技术研发和资本运作等优势,聚焦智能化产品和应用,建立长期共赢的战略合作伙伴关系,通过不断探索未来新技术、新产业、新业态和新模式,围绕用户需求共同打造极致的智能出行体验。 图为岚图…

Java设计模式 _创建型模式_工厂模式(普通工厂和抽象工厂)

一、工厂模式 属于Java设计模式创建者模式的一种。在创建对象时不会对客户端暴露创建逻辑,并且是通过使用一个共同的接口来指向新创建的对象。 二、代码示例 场景:花店有不同的花,通过工厂模式来获取花。 1、普通工厂模式 逻辑步骤&#…

华为matebook 14安装ubuntu双系统

一、准备u盘 首先格式化u盘(选择FAT32) 二、确认电脑类型 键盘按下win+r(win:开始键/也就是Windows的标志那个键),在输入框内输入msinfo32后,回车确认 确定自己电脑 硬盘 的类型: 在显示屏下方的搜索框内搜索“计算机管理” 点击进入后,再点击左边列表内的“磁…

java多功能手机

随着科技的发展,手机的使用已经普及到每个家庭甚至个人,手机的属性越来越强大,功能也越来越多,因此人们在生活中越来越依赖于手机。 任务要求,使用所学知识编写一个手机属性及功能分析程序设计,测试各个手机…

时间序列生成数据,TransformerGAN

简介:这个代码可以用于时间序列修复和生成。使用transformer提取单变量或者多变时间窗口的趋势分布情况。然后使用GAN生成分布类似的时间序列。 此外,还实现了基于prompt的数据生成,比如指定生成某个月份的数据、某半个月的数据、某一个星期的…

WEB逆向—X-Bogus逆向分析(纯算+补环境)

声明 本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除! 前言 此平台 本人 仅限…

深度学习之视觉特征提取器——VGG系列

VGG 提出论文:1409.1556.pdf (arxiv.org) 引入 距离VGG网络的提出已经约十年,很难想象在深度学习高速发展的今天,一个模型能够历经十年而不衰。虽然如今已经有VGG的大量替代品,但是笔者研究的一些领域仍然有大量工作选择使用VG…

Web前端安全问题分类综合以及XSS、CSRF、SQL注入、DoS/DDoS攻击、会话劫持、点击劫持等详解,增强生产安全意识

前端安全问题是指发生在浏览器、单页面应用、Web页面等前端环境中的各类安全隐患。Web前端作为与用户直接交互的界面,其安全性问题直接关系到用户体验和数据安全。近年来,随着前端技术的快速发展,Web前端安全问题也日益凸显。因此&#xff0c…

SQLite导出数据库至sql文件

SQLite是一款实现了自包含、无服务器、零配置、事务性SQL数据库引擎的软件库。SQLite是世界上部署最广泛的SQL数据库引擎。 SQLite 是非常小的,是轻量级的,完全配置时小于 400KiB,省略可选功能配置时小于250KiB。 SQLite 源代码不受版权限制。…

Visual Studio Code基础:打开一个编辑器(文件)时,覆盖了原编辑器

相关阅读 VS codehttps://blog.csdn.net/weixin_45791458/category_12658212.html?spm1001.2014.3001.5482 在使用vscode时,偶尔会出现这样的问题:打开了某个编辑器(文件,下面统称文件)后,再打开其他文件…

Python AI库 Pandas的常见操作的扩展知识

Python AI库 Pandas的常见操作的扩展知识 本文默认读者具备以下技能: 熟悉python基础知识,vscode或其它编辑工具 熟悉表格文件的基本操作 具备自主扩展学习能力 前文中对Pandas的数据结构以及基础操作做了介绍,本文中会在前文的基础上,对常见的操作进…

MacOS通过命令行开启关闭向日葵远程控制的后台服务

categories: [Tips] tags: MacOS Tips 写在前面 经常有小伙伴问我电脑相关的问题, 而解决问题的一个重要途径就是远程了. 关于免费的远程工具我试过向日葵和 todesk, 并且主要使用向日葵, 虽然 MacOS 下要设置很多权限, 但是也不影响其丝滑的控制. 虽然用着舒服, 但是向日葵…

arm架构,django4.2.7适配达梦8数据库

【Python相关包版本信息】 Django 4.2.7 django-dmPython 3.1.7 dmPython 2.5.5 【达梦数据库版本】 DM Database Server 64 V8 DB Version: 0x7000c 适配过程中发现的问题如下: 错误一:d…

[图解]软件开发中的糊涂用语-04-为什么要追究糊涂用语

0 00:00:00,030 --> 00:00:05,620 今天呢,我们来说一个为什么要追究糊涂用语的问题 1 00:00:06,310 --> 00:00:06,548 2 00:00:06,548 --> 00:00:11,077 大家知道我们前些天都发了好几个视频 3 00:00:11,077 --> 00:00:13,461 追究这个糊涂用语 4 00…