YarnClient发送和接收请求源码解析

YarnClient发送和接收请求流程

在这里插入图片描述

Yarn是通过RPC协议通信的,协议类型可以通过查看RpcKind类得知,总共有三种类型:

RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
RPC_PROTOCOL_BUFFER ((short) 3);

其中Hadoop和Yarn组件大部分是通过Protobuf(Protocol Buffers)协议进行通信。

Protobuf 是一种由 Google 开发的二进制序列化格式和相关的技术,它用于高效地序列化和反序列化结构化数据,通常用于网络通信、数据存储等场景。

Protobuf 在许多领域都得到了广泛应用,特别是在分布式系统、RPC(Remote Procedure Call)框架和数据存储中,它提供了一种高效、简洁和可扩展的方式来序列化和交换数据,Protobuf 的主要优点包括:

  • 高效性:Protobuf 序列化后的二进制数据通常比其他序列化格式(比如超级常用的JSON)更小,并且序列化和反序列化的速度更快,这对于性能敏感的应用非常有益。
  • 简洁性:Protobuf 使用一种定义消息格式的语法,它允许定义字段类型、顺序和规则
  • 版本兼容性:Protobuf 支持向前和向后兼容的版本控制,使得在消息格式发生变化时可以更容易地处理不同版本的通信。
  • 语言无关性:Protobuf 定义的消息格式可以在多种编程语言中使用,这有助于跨语言的通信和数据交换
  • 自动生成代码:Protobuf 通常与相应的工具一起使用,可以自动生成代码,包括序列化/反序列化代码和相关的类

发送和接收请求流程

发送请求

Yarn发送的请求协议都继承GeneratedMessage类实现Message接口,它们都是YarnServiceProtos的内部类。

请求头的协议类型可以在RpcHeaderProtos里面查看,它们都是RpcHeaderProtos的内部类,例如RpcRequestHeaderProto。

请求的协议类型可以在ApplicationClientProtocol里查看,它们都继承了ApplicationClientProtocol内部类。

YarnClient请求头使用的是ProtobufRpcEngineProtos的内部类RequestHeaderProto。

请求的连接信息都存储在Client的内部类ConnectionId里面,包含票据、协议类、目标地址、是否需要认证、配置文件、还有连接参数等信息。

调用器ProtobufRpcEngine.Invoker类用于代理客户端发送请求,该实例存储着Client、Client.ConnectionId等信息,它真正是通过Client实例发送请求的,Client每次请求时都会创建一个Client#Call和Client#Connection,代表每次发送的请求和连接。

Call表示调用操作,里面包含重试次数、id、响应、是否完成、请求等进行连接操作需要的信息。

Connection代表连接,它有ConnectionId的所有参数以及Socket客户端、流管道、连接参数等信息。

发送请求流程:

SaslRpcClient#sendSaslMessage
Connection#setupIOstreams
使用调用器代理发送请求
ProtobufRpcEngine.Invoker#invoke
调用器通过方法名、协议类名、协议版本构建RPC请求头
ProtobufRpcEngine.Invoker#constructRpcRequestHeader
获取参数中的Message协议以及使用创建的请求头,创建封装RpcRequestWrapper
客户端发送封装的RPC请求
Client#call
使用请求创建Call
通过ConnectionId创建Connection,并建立连接
建立该Connection的IO通道
Connection#setupIOstreams
不断建立连接并读取socket中的数据
建立连接
Client.Connection#setupConnection
先创建Socket的IO通道
NetUtils.getInputStream
NetUtils.getOutputStream
如果ConnectionId中包含UGI信息,则建立安全连接
Connection#setupSaslConnection
创建SaslRpcClient并开始连接
SaslRpcClient#saslConnect
发送Sasl协议请求RpcSaslProto,该协议初始状态为NEGOTIATE状态
SaslRpcClient#sendSaslMessage
开始循环握手交换信息
读取响应信息头,判断响应都是否有错误标志
读取响应体,解析响应体为RpcSaslProto协议响应
根据响应的RpcSaslProto状态判断认证是否有问题
如果状态为协商状态NEGOTIATE
如果状态为质疑状态CHALLENGE,则需要评估响应的Token,然后创建Sasl回复再次协商
如果状态为成功状态SUCCESS,简单认证则完成认证,否则还需再次评估Token再完成认证
继续发送请求
SaslRpcClient#sendSaslMessage
返回协商一致的认证方法
认证成功后,先创建响应的连接管道,并建立连接环境
Client.Connection#writeConnectionContext
开始接收RPC响应
Client#run && Client#receiveRpcResponse
发送RPC远程过程调用请求
Connection#sendRpcRequest
最后返回响应数据
Client.Call#getRpcResponse
从响应的可认证方式中选择相应的认证方式
SaslRpcClient#selectSaslClient
如果认证方式为SIMPLE认证,则直接完成认证
否则,读取响应的token信息,验证token并生成响应信息,该响应信息将会再次发送给服务器进行协商
SaslClient#evaluateChallenge(如果在身份验证过程中收到来自服务器的质询,则调用此方法来准备提交给服务器的适当的下一个响应)
创建Sasl响应回复,此次回复的SASL状态为INITIATE状态,认证方式为所选择的方式,并附带上面生成的响应信息
SaslClient#evaluateChallenge

YarnClient发送请求案例:

YarnClientImpl#getApplications()
HadoopYarnProtoRPC#getProxy
ClientRMProxy#createRMProxy
YarnClientImpl#serviceStart
YarnClient服务初始化
YarnClientImpl#serviceStart
创建ApplicationClientProtocol代理实例
ClientRMProxy#createRMProxy
如果设置了高可用和重试机制,则会先创建RMFailoverProxy
创建RPC代理
RMProxy#getProxy
通过YarnRPC获取代理
HadoopYarnProtoRPC#getProxy
从配置文件中读取并实例化RpcClientFactory的实现类 (yarn.ipc.client.factory.class)
通过RpcClientFactory获取客户端
RpcClientFactoryPBImpl#getClient
从配置文件中读取相应的ApplicationClientProtocol实现类配置,默认为ApplicationClientProtocolPBClientImpl,并创建该实例
ProtobufRpcEngine创建ProtobufRpcEngine.Invoker代理ApplicationClientProtocolPB实例
ProtobufRpcEngine#getProxy
YarnClientImpl发送getApplications请求
YarnClientImpl#getApplications()
创建请求实例GetApplicationsRequest
GetApplicationsRequest#newInstance
ApplicationClientProtocolPBClientImpl代理客户端发送getApplications请求
ApplicationClientProtocolPBClientImpl#getApplications
ApplicationClientProtocolPBClientImpl#getApplications
创建GetApplicationsRequestProto协议,使用调用器代理发送请求
ProtobufRpcEngine.Invoker#invoke
创建并返回GetApplicationsResponsePBImpl响应

接收请求

Yarn使用org.apache.hadoop.ipc.Server作为服务器,因为Yarn有三种RPC类型,所以Server也有三种实现类,ProtobufRpcEngine、RPC和WritableRPCEngine、TestServer(用于测试)

服务器创建时会创建连接监听器、响应器、地址、hadoop配置文件、端口、处理器数量、请求调用队列、安全管理器、 连接管理器,指标服务等。

服务器的连接监听器 (org.apache.hadoop.ipc.Server.Listener) 用于接收连接,它几乎代表着服务器本身,它在创建时会创建一个nio服务器,用于接收和管理所有请求,它将接收的请求放入连接管理器中,然后交给Reader处理。

Reader (org.apache.hadoop.ipc.Server.Listener.Reader) 也将在Listener创建时创建的,Listener会创建多个Reader,每个Reader代表一个数据处理器,它用于读取连接监听器接收的请求信息,通过请求信息创建请求调用 (Call) 实例,并将Call实例放入请求调用队列中,请求调用队列将会由后面的Handler (服务器启动后创建) 处理。

Listener#readAndProcess
Reader创建和启动
创建连接器队列,用于存储连接监听器接收到的请求
创建Selector循环监听并处理连接器队列中的请求
Reader#doRunLoop
Reader#doRead
循环接收并处理请求中的数据
Listener#readAndProcess
先读取连接数据中的一些信息创建连接头,如数据长度、连接头(请求类型、版本、认证信息)等,然后开始处理请求
Server#channelRead
处理请求体数据
Server.Connection#processOneRpc
Server.Connection#processOneRpc
序列化获取RpcRequestHeaderProto
Server.Connection#decodeProtobufFromStream
校验RPC头
Server.Connection#checkRpcHeaders
开始处理RCP请求,反序列化创建RpcRequestWrapper,并创建相应的调用请求实例,然后存放到调用请求队列中
Server.Connection#processRpcRequest

服务器的响应器 (org.apache.hadoop.ipc.Server.Responder) 用于响应请求 ,返回请求结果或者异常。

服务器启动后,将会创建多个处理器 (org.apache.hadoop.ipc.Server.Handler) 轮询处理调用请求队列中的调用请求。

  • 处理器轮询调用Call实例

理器轮询调用Call实例

Handler#run
判断调用请求中是否包含UGI信息,如果包含ugi,则在调用之前需要使用ugi进行认证
处理调用请求
Server#call
由于大部分Yarn 组件使用通信组件都是 RPC 组件,且调用器通常都是ProtobufRPCEngine,这里就再解析一下ProtobufRPCEngine处理请求原理。
RPC 服务器处理请求
RPC.Server#call
获取 RPC 请求调用器 (RpcInvoker)
RPCInvoker 请求调用器处理请求
RpcInvoker#call
ProtobufRpcEngine.Server.ProtoBufRpcInvoker#call
获取协议的实现类,实现类也是一个阻塞服务(BlockingService)
ProtobufRpcEngine.Server.ProtoBufRpcInvoker#getProtocolImpl
通过BlockingService获取需要调用的方法
Descriptors.ServiceDescriptor#findMethodByName
BlockingService开始调用方法
BlockingService#callBlockingMethod
创建响应实例
Server#setupResponse
响应请求
Responder#doRespond

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/411257.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

比特币的签名和验证(基于ECDSA)

比特币(Bitcoin)和以太坊(Ethereum)等区块链技术使用了加密算法来确保交易的安全性。私钥签名和公钥验证是这些算法的核心部分,主要用于证明交易的发起者拥有交易中使用的资金的控制权,而不需要暴露私钥本身…

浪潮服务器NVME 硬盘通过 Intel VROC 做RAID

INTEL VROC Configuration solution 1.VMD configuration in BIOS Processor > IIO Configuration> Intel(R) VDM Technology> Intel(R) VMD for volume Management Device on Socket 0 “CPU 0”, Intel VMD for volume management device for “PStack0” or “PSta…

【香橙派系列教程】(十七) 视觉垃圾桶-功能完善优化

【十七】视觉垃圾桶-功能完善优化 文章目录 【十七】视觉垃圾桶-功能完善优化一、增加垃圾桶开关盖1.引脚2.PWM 频率的公式3.PWM_APIsoftPwmCreatesoftPwmWrite附加说明softPwmStop 4.代码pwm.cpwm.hmain.c 二、项目代码优化编译运行 三、增加OLED 屏幕显示功能myoled.hmyoled.…

小白之 FastGPT Windows 本地化部署

目录 引言环境步骤1. 安装 docker2. 启动 docker3. 浏览器访问4. One API 配置语言模型、向量模型渠道和令牌5. 创建 FastGPT 知识库6. 创建 FastGPT 应用 官方文档 引言 部署之前可以先看一下 RAG 技术原理,也可以后面回过头来看,对一些概念有些了解&a…

Qt+FFmpeg开发视频播放器笔记(一):环境搭建

一、FFmpeg介绍 FFmpeg是一个开源的跨平台多媒体处理工具集,它可以用于处理音频、视频和其他多媒体数据。FFmpeg提供了一组功能强大的命令行工具,用于音频和视频的编解码、转换、处理、流媒体传输等任务。 FFmpeg支持多种音频和视频格式,包…

【自动化】考试答题自动化完成答案,如何实现100%正确呢

一、科目仿真考试不能自动答题 我的答案是可以的,电脑程序可以模拟人的操作完成所有的答题并提交结束考试 二、分析页面内容 完成一个题目,包括判断题,对与错2选1答案,单选题ABCD4选1答案,多选题大家想一想 F12查看按…

C语言 ——— 将动态版本的通讯录实现为文件存储联系人模式

目录 前言 在退出通讯录之前 在运行通讯录之前 前言 在这篇博客中,实现了动态版本的通讯录,接下来会增加函数,能用文件存储通讯录中的联系人 C语言 ——— 在控制台实现通讯录(增删查改、动态开辟内存空间)-CSDN…

#网络高级 笔记

modbus_tcp协议 modbus_rtu协议和modbus库 http协议和web服务器搭建 服务器原码分析和基于WebServer的工业数据采集项目 第H5,即网页制作,项目完善 一、modbus起源 1.起源 Modbus由Modicon公司于1979年开发,是一种工业现场总线协议标准 Mo…

python将字典数据保存为json文件

目录 一、json库介绍 二、字典生成json文件 1、导入 json 模块 2、将字典数据保存为 json 文件 (1) 创建一个python字典 (2) 指定要保存的 json 文件路径 (3) 将字典数据存为 json 文件 3、读取 json文件,并打印 一、json库介绍 方法作用json.dumps()将py…

对数据处理过程中,缺失值和异常值应该怎么处理?

创作不易,您的关注、点赞、收藏和转发是我坚持下去的动力! 大家有技术交流指导、论文及技术文档写作指导、项目开发合作的需求可以私信联系我。 在数据处理过程中,缺失值和异常值的处理是非常重要的步骤,它们可能会对模型的性能…

Datawhale AI夏令营第五期学习!

学习日志 日期: 2024年8月27日 今日学习内容: 今天,我学习了如何在深度学习任务中使用卷积神经网络(CNN)进行图像分类的基本流程,并成功地在JupyterLab中运行了一个完整的项目。以下是我今天的学习和操作…

【扩散模型(六)】IP-Adapter 是如何训练的?2 源码篇(IP-Adapter Plus)

系列文章目录 【扩散模型(二)】IP-Adapter 从条件分支的视角,快速理解相关的可控生成研究【扩散模型(三)】IP-Adapter 源码详解1-训练输入 介绍了训练代码中的 image prompt 的输入部分,即 img projection…

【Verilog 数字系统设计教程】Verilog 基础:硬件描述语言入门指南

目录 摘要 1. 引言 2. Verilog 历史与发展 3. Verilog 基本语法 4. Verilog 模块与端口 5. 组合逻辑与时序逻辑 6. 时钟域与同步设计 7. 测试与仿真 8. Verilog 高级特性 任务(Tasks) 函数(Functions) 多维数组 结构体…

【二叉树】OJ题目

🌟个人主页:落叶 目录 单值⼆叉树 【单值二叉树】代码 相同的树 【相同二叉树】代码 对称⼆叉树 【对称二叉树】代码 另一颗树的子树 【另一颗树的子树】代码 二叉树的前序遍历 【二叉树前序遍历】代码 二叉树的中序遍历 【二叉树中序遍历】…

【大模型】llama系列模型基础

前言:llama基于transformer架构,与GPT相似,只用了transformer的解码器部分。本文主要是关于llama,llama2和llama3的结构解读。 目录 1. llama1.1 整体结构1.2 RoPE1.3 SwiGLU 激活函数 2. llama22.2 GQA架构2.3 RLHF3. llama3 参考…

CAD中命令和系统变量

屏幕去除菜单全屏显示: ThisDrawing.SendCommand ("CLEANSCREENON ") 恢复原始:ThisDrawing.SendCommand ("CLEANSCREENOFF ") CAD中系统变量决定图形的基本设置。 第一个系统变量:uscicon vba代码如下: …

【Linux】——Rocky Linux配置静态IP

Rocky Linux配置静态IP Rocky Linux Rocky Linux 进入官网进行下载,下载版本自定义 官网link 获取ip地址 ip addr 获取服务器ip地址 进入网络配置文件目录: cd /etc/NetworkManager/system-connections/vi打开ens33.nmconnection 在IPv4下输入配置信…

Ubuntu美化为类Windows风格

博主的系统为 Ubuntu22.04 参考文献:How to Make Ubuntu Look Like Windows 11 | 22.04 GNOME 43 / 42 | Linux AF Tech 可能遇到的bug的解决方法:如何在 Linux 中安装和更改 GNOME 主题 先来一下视频演示: 下面正式开始安装。在主文件夹下打…

DWF 支持的 TON 链 Telegram 免费宠物游戏 Gatto_game,推出 “Paws Up! 世界锦标赛”

TON 链在这轮牛市里无疑是一匹脱缰的黑马,创造了一个又一个爆款,为持有者带来了不菲的收益。 Gatto_game 是一款 TON链 Tamagotchi 电子宠物风格的 P2E web3 游戏。可以通过喂养升级,参加比赛赚取 $TON 或者 $GTON ,或许就是下一个…

python解释器[源代码层面]

1 PyDictObject 在c中STL中的map是基于 RB-tree平衡二元树实现,搜索的时间复杂度为O(log2n) Python中PyDictObject是基于散列表(散列函数)实现,搜索时间最优为O(1) 1.1 散列列表 问题:散列冲突:多个元素计算得到相同的哈希值 …