YarnClient发送和接收请求流程
Yarn是通过RPC协议通信的,协议类型可以通过查看RpcKind类得知,总共有三种类型:
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3);
其中Hadoop和Yarn组件大部分是通过Protobuf(Protocol Buffers)协议进行通信。
Protobuf 是一种由 Google 开发的二进制序列化格式和相关的技术,它用于高效地序列化和反序列化结构化数据,通常用于网络通信、数据存储等场景。
Protobuf 在许多领域都得到了广泛应用,特别是在分布式系统、RPC(Remote Procedure Call)框架和数据存储中,它提供了一种高效、简洁和可扩展的方式来序列化和交换数据,Protobuf 的主要优点包括:
高效性:Protobuf 序列化后的二进制数据通常比其他序列化格式(比如超级常用的JSON)更小,并且序列化和反序列化的速度更快,这对于性能敏感的应用非常有益。 简洁性:Protobuf 使用一种定义消息格式的语法,它允许定义字段类型、顺序和规则 版本兼容性:Protobuf 支持向前和向后兼容的版本控制,使得在消息格式发生变化时可以更容易地处理不同版本的通信。 语言无关性:Protobuf 定义的消息格式可以在多种编程语言中使用,这有助于跨语言的通信和数据交换 自动生成代码:Protobuf 通常与相应的工具一起使用,可以自动生成代码,包括序列化/反序列化代码和相关的类
发送和接收请求流程
发送请求
Yarn发送的请求协议都继承GeneratedMessage类实现Message接口,它们都是YarnServiceProtos的内部类。
请求头的协议类型可以在RpcHeaderProtos里面查看,它们都是RpcHeaderProtos的内部类,例如RpcRequestHeaderProto。
请求的协议类型可以在ApplicationClientProtocol里查看,它们都继承了ApplicationClientProtocol内部类。
YarnClient请求头使用的是ProtobufRpcEngineProtos的内部类RequestHeaderProto。
请求的连接信息都存储在Client的内部类ConnectionId里面,包含票据、协议类、目标地址、是否需要认证、配置文件、还有连接参数等信息。
调用器ProtobufRpcEngine.Invoker类用于代理客户端发送请求,该实例存储着Client、Client.ConnectionId等信息,它真正是通过Client实例发送请求的,Client每次请求时都会创建一个Client#Call和Client#Connection,代表每次发送的请求和连接。
Call表示调用操作,里面包含重试次数、id、响应、是否完成、请求等进行连接操作需要的信息。
Connection代表连接,它有ConnectionId的所有参数以及Socket客户端、流管道、连接参数等信息。
发送请求流程:
SaslRpcClient#sendSaslMessage
Connection#setupIOstreams
使用调用器代理发送请求 ProtobufRpcEngine.Invoker#invoke
调用器通过方法名、协议类名、协议版本构建RPC请求头ProtobufRpcEngine.Invoker#constructRpcRequestHeader
获取参数中的Message协议以及使用创建的请求头,创建封装RpcRequestWrapper
客户端发送封装的RPC请求 Client#call
使用请求创建Call
通过ConnectionId创建Connection,并建立连接
建立该Connection的IO通道Connection#setupIOstreams
不断建立连接并读取socket中的数据
建立连接Client.Connection#setupConnection
先创建Socket的IO通道NetUtils.getInputStream
NetUtils.getOutputStream
如果ConnectionId中包含UGI信息,则建立安全连接Connection#setupSaslConnection
创建SaslRpcClient并开始连接SaslRpcClient#saslConnect
发送Sasl协议请求RpcSaslProto,该协议初始状态为NEGOTIATE状态SaslRpcClient#sendSaslMessage
开始循环握手交换信息
读取响应信息头,判断响应都是否有错误标志
读取响应体,解析响应体为RpcSaslProto协议响应
根据响应的RpcSaslProto状态判断认证是否有问题
如果状态为协商状态NEGOTIATE
如果状态为质疑状态CHALLENGE,则需要评估响应的Token,然后创建Sasl回复再次协商
如果状态为成功状态SUCCESS,简单认证则完成认证,否则还需再次评估Token再完成认证
继续发送请求SaslRpcClient#sendSaslMessage
返回协商一致的认证方法
认证成功后,先创建响应的连接管道,并建立连接环境Client.Connection#writeConnectionContext
开始接收RPC响应Client#run && Client#receiveRpcResponse
发送RPC远程过程调用请求Connection#sendRpcRequest
最后返回响应数据Client.Call#getRpcResponse
从响应的可认证方式中选择相应的认证方式SaslRpcClient#selectSaslClient
如果认证方式为SIMPLE认证,则直接完成认证
否则,读取响应的token信息,验证token并生成响应信息,该响应信息将会再次发送给服务器进行协商SaslClient#evaluateChallenge
(如果在身份验证过程中收到来自服务器的质询,则调用此方法来准备提交给服务器的适当的下一个响应)
创建Sasl响应回复,此次回复的SASL状态为INITIATE状态,认证方式为所选择的方式,并附带上面生成的响应信息SaslClient#evaluateChallenge
YarnClient发送请求案例:
YarnClientImpl#getApplications()
HadoopYarnProtoRPC#getProxy
ClientRMProxy#createRMProxy
YarnClientImpl#serviceStart
YarnClient服务初始化YarnClientImpl#serviceStart
创建ApplicationClientProtocol代理实例ClientRMProxy#createRMProxy
如果设置了高可用和重试机制,则会先创建RMFailoverProxy
创建RPC代理RMProxy#getProxy
通过YarnRPC获取代理HadoopYarnProtoRPC#getProxy
从配置文件中读取并实例化RpcClientFactory的实现类 (yarn.ipc.client.factory.class)
通过RpcClientFactory获取客户端RpcClientFactoryPBImpl#getClient
从配置文件中读取相应的ApplicationClientProtocol实现类配置,默认为ApplicationClientProtocolPBClientImpl,并创建该实例
ProtobufRpcEngine创建ProtobufRpcEngine.Invoker代理ApplicationClientProtocolPB实例ProtobufRpcEngine#getProxy
YarnClientImpl发送getApplications请求YarnClientImpl#getApplications()
创建请求实例GetApplicationsRequestGetApplicationsRequest#newInstance
ApplicationClientProtocolPBClientImpl代理客户端发送getApplications请求ApplicationClientProtocolPBClientImpl#getApplications
ApplicationClientProtocolPBClientImpl#getApplications
创建GetApplicationsRequestProto协议,使用调用器代理发送请求ProtobufRpcEngine.Invoker#invoke
创建并返回GetApplicationsResponsePBImpl响应
接收请求
Yarn使用org.apache.hadoop.ipc.Server作为服务器,因为Yarn有三种RPC类型,所以Server也有三种实现类,ProtobufRpcEngine、RPC和WritableRPCEngine、TestServer(用于测试)
服务器创建时会创建连接监听器、响应器、地址、hadoop配置文件、端口、处理器数量、请求调用队列、安全管理器、 连接管理器,指标服务等。
服务器的连接监听器 (org.apache.hadoop.ipc.Server.Listener) 用于接收连接,它几乎代表着服务器本身,它在创建时会创建一个nio服务器,用于接收和管理所有请求,它将接收的请求放入连接管理器中,然后交给Reader处理。
Reader (org.apache.hadoop.ipc.Server.Listener.Reader) 也将在Listener创建时创建的,Listener会创建多个Reader,每个Reader代表一个数据处理器,它用于读取连接监听器接收的请求信息,通过请求信息创建请求调用 (Call) 实例,并将Call实例放入请求调用队列中,请求调用队列将会由后面的Handler (服务器启动后创建) 处理。
Listener#readAndProcess
Reader创建和启动
创建连接器队列,用于存储连接监听器接收到的请求
创建Selector循环监听并处理连接器队列中的请求Reader#doRunLoop
Reader#doRead
循环接收并处理请求中的数据Listener#readAndProcess
先读取连接数据中的一些信息创建连接头,如数据长度、连接头(请求类型、版本、认证信息)等,然后开始处理请求Server#channelRead
处理请求体数据Server.Connection#processOneRpc
Server.Connection#processOneRpc
序列化获取RpcRequestHeaderProtoServer.Connection#decodeProtobufFromStream
校验RPC头Server.Connection#checkRpcHeaders
开始处理RCP请求,反序列化创建RpcRequestWrapper,并创建相应的调用请求实例,然后存放到调用请求队列中Server.Connection#processRpcRequest
服务器的响应器 (org.apache.hadoop.ipc.Server.Responder) 用于响应请求 ,返回请求结果或者异常。
服务器启动后,将会创建多个处理器 (org.apache.hadoop.ipc.Server.Handler) 轮询处理调用请求队列中的调用请求。
理器轮询调用Call实例Handler#run
判断调用请求中是否包含UGI信息,如果包含ugi,则在调用之前需要使用ugi进行认证
处理调用请求Server#call
由于大部分Yarn 组件使用通信组件都是 RPC 组件,且调用器通常都是ProtobufRPCEngine,这里就再解析一下ProtobufRPCEngine处理请求原理。
RPC 服务器处理请求RPC.Server#call
获取 RPC 请求调用器 (RpcInvoker)
RPCInvoker 请求调用器处理请求RpcInvoker#call
ProtobufRpcEngine.Server.ProtoBufRpcInvoker#call
获取协议的实现类,实现类也是一个阻塞服务(BlockingService)ProtobufRpcEngine.Server.ProtoBufRpcInvoker#getProtocolImpl
通过BlockingService获取需要调用的方法Descriptors.ServiceDescriptor#findMethodByName
BlockingService开始调用方法BlockingService#callBlockingMethod
创建响应实例Server#setupResponse
响应请求Responder#doRespond