HDFSRPC通信框架详解

本文主要对HDFSRPC通信框架解析。包括listener,reader,handler,responser等实现类的源码分析。注意hadoop版本为3.1.1。

写在前面

rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。

https://blog.csdn.net/yhl_jxy/article/details/79332092

还有本文中涉及到的代码大部分都是作者都整理过的,会和server源码有些许区别。

RPC框架架构图

1871_2.jpeg

从架构图中可以看出一个socket连接的数据处理被多个模块分割,每个模块处理特定的问题。这样做的好处一方面保证了call的并发,另一方面也保证了代码的可扩展性。

Listener

listener就是监听线程,那到底是监听什么?显而易见是socket连接又称connection。

Listener.run、doAccpect

public void run() {LOG.info(Thread.currentThread().getName() + ": starting");Server.connectionManager.startIdleScan();while (Server.running) {SelectionKey key = null;try {getSelector().select();Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {// we can run out of memory if we have too many threads// log the event and sleep for a minute and give // some thread(s) a chance to finishLOG.warn("Out of Memory in server select", e);closeCurrentConnection(key, e);Server.connectionManager.closeIdle(true);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {closeCurrentConnection(key, e);}}LOG.info("Stopping " + Thread.currentThread().getName());synchronized (this) {try {acceptChannel.close();selector.close();} catch (IOException e) { }selector= null;acceptChannel= null;// close all connectionsServer.connectionManager.stopIdleScan();Server.connectionManager.closeAll();}}void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = Server.connectionManager.register(channel);// If the connectionManager can't take it, close the connection.if (c == null) {if (channel.isOpen()) {IOUtils.cleanup(null, channel);}Server.connectionManager.droppedConnections.getAndIncrement();continue;}key.attach(c);  // so closeCurrentConnection can get the objectreader.addConnection(c);}}

简单来说就是accept channel,变成connection,然后交给reader处理。

Reader

Reader在整个RPC框架中起着举足轻重的作用。在HDFSRPC协议详解一文中processOneRpc之前的工作都是reader完成的。总结一下就是以下几点:

  1. rpc connection初始7字节的检查。
  2. sasl握手与验证。
  3. IpcConnectionContext读取。
  4. processOneRpc准备工作,包括RequestHeaderProto解析。

还有一点要注意的一次reader就包含完成这所有工作,而不是多次完成。单次reader生成call以后,就会马上下次call的read,本质上call是并发的,由handler处理。

reader的源码其实很简单,本质上是循环执行了connection.readAndProcess()。本文不会对readAndProcess过多介绍,有兴趣可以查看HDFSRPC协议详解。

@Overridepublic void run() {LOG.info("Starting " + Thread.currentThread().getName());try {doRunLoop();} finally {try {readSelector.close();} catch (IOException ioe) {LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);}}}private synchronized void doRunLoop() {while (Server.running) {SelectionKey key = null;try {// consume as many connections as currently queued to avoid// unbridled acceptance of connections that starves the selectint size = pendingConnections.size();for (int i=size; i>0; i--) {Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);}readSelector.select();Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isReadable()) {doRead(key);}} catch (CancelledKeyException cke) {// something else closed the connection, ex. responder or// the listener doing an idle scan.  ignore it and let them// clean up.LOG.info(Thread.currentThread().getName() +": connection aborted from " + key.attachment());}key = null;}} catch (InterruptedException e) {if (Server.running) {                      // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (IOException ex) {LOG.error("Error in Reader", ex);} catch (Throwable re) {LOG.error("Bug in read selector!", re);//ExitUtil.terminate(1, "Bug in read selector!");}}}//from Listener doReadvoid doRead(SelectionKey key) throws InterruptedException {int count;Connection c = (Connection)key.attachment();if (c == null) {return;  }c.setLastContact(Time.now());try {count = c.readAndProcess();} catch (InterruptedException ieo) {LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);throw ieo;} catch (Exception e) {// Any exceptions that reach here are fatal unexpected internal errors// that could not be sent to the client.LOG.info(Thread.currentThread().getName() +": readAndProcess from client " + c +" threw exception [" + e + "]", e);count = -1; //so that the (count < 0) block is executed}// setupResponse will signal the connection should be closed when a// fatal response is sent.if (count < 0 || c.shouldClose()) {Server.closeConnection(c);c = null;}else {c.setLastContact(Time.now());}}   

CallQueue

callQueue主要是存放call队列,由于callqueue在hdfs是一个较为复杂的东西,后期会单做一期介绍。

Handler

handler线程也比较简单,实际上就是执行了call.run()。

@Overridepublic void run() {LOG.debug(Thread.currentThread().getName() + ": starting");while (Server.running) {try {final Call call = Server.callQueue.take(); // pop the queue; maybe blocked hereif (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": " + call);}CurCall.set(call);/*TODOUserGroupInformation remoteUser = call.getRemoteUser();if (remoteUser != null) {remoteUser.doAs(call);} else {call.run();}*/call.run();} catch (InterruptedException e) {if (Server.running) {                          // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (Exception e) {LOG.info(Thread.currentThread().getName() + " caught an exception", e);} finally {CurCall.set(null);}}LOG.debug(Thread.currentThread().getName() + ": exiting");}

主要的难点是这么执行call.run()。要知道call.run首先要知道protocols。

Protocols

每个server都自己的Protocols,protocols首先是以rpcKind分类的。

enum RpcKindProto {RPC_BUILTIN          = 0;  // Used for built in calls by testsRPC_WRITABLE         = 1;  // Use WritableRpcEngine RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine
}

3.x的rpckind都使用的是RPC_PROTOCOL_BUFFER,所以以这个为例。

RPC_PROTOCOL_BUFFER的protocols会放到一个hashmap里面。

Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);

key为ProtoNameVer,要注意的hashcode的实现方法。

static class ProtoNameVer {final String protocol;final long   version;ProtoNameVer(String protocol, long ver) {this.protocol = protocol;this.version = ver;}@Overridepublic boolean equals(Object o) {if (o == null) return false;if (this == o) return true;if (! (o instanceof ProtoNameVer))return false;ProtoNameVer pv = (ProtoNameVer) o;return ((pv.protocol.equals(this.protocol)) && (pv.version == this.version));     }@Overridepublic int hashCode() {return protocol.hashCode() * 37 + (int) version;    }}

所以任何protocol必须有protocol和version,即注解类ProtocolInfo。

@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {String protocolName();  // the name of the protocol (i.e. rpc service)long protocolVersion() default -1; // default means not defined use old way
}

一个protocol的接口类类似这样。

@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/*** Protocol that a clients use to communicate with the NameNode.** Note: This extends the protocolbuffer service based interface to* add annotations required for security.*/
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {
}

那反射的方法怎么来呢?我们可以发现ClientNamenodeProtocol.BlockingInterface其实是protobuf编译出来的,可以看一下ClientNamenodeProtocol.proto文件的最后service定义。

service ClientNamenodeProtocol {rpc getBlockLocations(GetBlockLocationsRequestProto)returns(GetBlockLocationsResponseProto);rpc getServerDefaults(GetServerDefaultsRequestProto)returns(GetServerDefaultsResponseProto);rpc create(CreateRequestProto)returns(CreateResponseProto);rpc append(AppendRequestProto) returns(AppendResponseProto);rpc setReplication(SetReplicationRequestProto)returns(SetReplicationResponseProto);rpc setStoragePolicy(SetStoragePolicyRequestProto)...
}

编译出来就是ClientNamenodeProtocol.BlockingInterface,里面就是方法列表。

我们自己的实现类只需要实现ClientNamenodeProtocolPB即可。例如ClientNamenodeProtocolServerSideTranslatorPB。

//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);    

最后call.run其实是根据RequestHeaderProto来找到对应的实现类。

message RequestHeaderProto {/** Name of the RPC method */required string methodName = 1;/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get metainfo* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto*/required string declaringClassProtocolName = 2;/** protocol version of class declaring the called method */required uint64 clientProtocolVersion = 3;
}

然后通过反射,去执行了实现类的方法。

 Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;RequestHeaderProto rpcRequest = request.getRequestHeader();String methodName = rpcRequest.getMethodName();/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get info* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto.*/String declaringClassProtoName = rpcRequest.getDeclaringClassProtocolName();long clientVersion = rpcRequest.getClientProtocolVersion();//LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);BlockingService service = (BlockingService) protocolImpl.protocolImpl;MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);if (methodDescriptor == null) {String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";LOG.warn(msg);throw new RpcNoSuchMethodException(msg);}Message prototype = service.getRequestPrototype(methodDescriptor);Message param = request.getValue(prototype);Message result = null;long startTime = Time.now();int qTime = (int) (startTime - receiveTime);Exception exception = null;boolean isDeferred = false;try {//server.rpcDetailedMetrics.init(protocolImpl.protocolClass);result = service.callBlockingMethod(methodDescriptor, null, param);// Check if this needs to be a deferred response,// by checking the ThreadLocal callback being set} catch (ServiceException e) {exception = (Exception) e.getCause();throw (Exception) e.getCause();} catch (Exception e) {exception = e;throw e;} finally {int processingTime = (int) (Time.now() - startTime);//if (LOG.isDebugEnabled()) {String msg ="Served: " + methodName + (isDeferred ? ", deferred" : "") +", queueTime= " + qTime +" procesingTime= " + processingTime;if (exception != null) {msg += " exception= " + exception.getClass().getSimpleName();}//LOG.debug(msg);LOG.info(msg);//LOG.info("params:" + param.toString());//LOG.info("result:" + result.toString());//}String detailedMetricsName = (exception == null) ?methodName :exception.getClass().getSimpleName();//server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);}return RpcWritable.wrap(result);}

完成以后如果有返回Message会放入rpccall.rpcResponse。然后再把call放入ResponseQueue。

ResponseQueue

在connection中,主要存放处理完的rpccall。

Responder

Responder线程主要负责call结果的返回。

 private boolean processResponse(LinkedList<RpcCall> responseQueue,boolean inHandler) throws IOException {boolean error = true;boolean done = false;       // there is more data for this channel.int numElements = 0;RpcCall call = null;try {synchronized (responseQueue) {//// If there are no items for this channel, then we are done//numElements = responseQueue.size();if (numElements == 0) {error = false;return true;              // no more data for this channel.}//// Extract the first call//call = responseQueue.removeFirst();SocketChannel channel = call.connection.channel;if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call);}//// Send as much data as we can in the non-blocking fashion//int numBytes = call.connection.channelWrite(channel, call.rpcResponse);if (numBytes < 0) {return true;}if (!call.rpcResponse.hasRemaining()) {//Clear out the response buffer so it can be collectedcall.rpcResponse = null;call.connection.decRpcCount();if (numElements == 1) {    // last call fully processes.done = true;             // no more data for this channel.} else {done = false;            // more calls pending to be sent.}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote " + numBytes + " bytes.");}} else {//// If we were unable to write the entire response out, then // insert in Selector queue. //call.connection.responseQueue.addFirst(call);if (inHandler) {// set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();try {// Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {//Its ok. channel might be closed else where.done = true;} finally {decPending();}}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote partial " + numBytes + " bytes.");}}error = false;              // everything went off well}} finally {if (error && call != null) {LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");done = true;               // error. no more data for this channel.Server.closeConnection(call.connection);}}return done;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/287756.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《量子计算:揭开未来科技新篇章》

随着科技的不断发展&#xff0c;量子计算作为一项颠覆性的技术逐渐走进人们的视野&#xff0c;引发了广泛的关注和探讨。本文将围绕量子计算的技术进展、技术原理、行业应用案例、未来趋势预测以及学习路线等方向&#xff0c;深入探讨这一领域的前沿动态和未来发展趋势。 量子…

极端道路天气数据集 雨天 雾天 道路晴朗

极端道路天气数据集 是一系列专为自动驾驶、智能交通系统研发以及计算机视觉算法测试而设计的真实世界或模拟的道路环境图像和视频集合。这些数据集包含了在各类极端天气条件下捕捉到的道路场景&#xff0c;例如大雾、暴雨、暴雪、冰雹、雾霾、道路结冰等&#xff0c;这些都是…

移动硬盘未初始化?数据恢复指南助你轻松应对

当我们插上移动硬盘准备使用时&#xff0c;却发现电脑提示“移动硬盘未初始化”&#xff0c;这无疑会让我们感到困扰和焦虑。毕竟&#xff0c;硬盘中可能存储着重要的文件、照片、视频等个人或工作资料。那么&#xff0c;面对移动硬盘未初始化的问题&#xff0c;我们该如何应对…

管理能力学习笔记三:管理者的时间管理法

时间管理三步法 1、对任务进行分类 2、估算任务时间 3、持续反思评估 对任务进行分类 分类方法&#xff1a;时间管理四象限 A类 B类 C类 D类 估算时间 需要预留休息时间和机动时间 持续反思评估 核对检查任务 自我提问 处理日常干扰的办法 对事情发出提问 对话内容进行…

后端系统开发之——功能完善

原文地址&#xff1a;https://blog.yiming1234.cn/?p830 下面是正文内容&#xff1a; 前言 通过SpringBoot开发用户模块的部分也就差不多要结束了&#xff0c;这一片文章就主要提一些在系统开发中需要注意到的细节部分和功能&#xff0c;也就是剩余的部分。 但是这个专栏只介…

我的创作纪念日 ---- 2024/3/26

前言 2024.3.26是我在CSDN成为创作者的第128天&#xff0c;也是我第一次真正在网上创作的第128天 当我还在日常创作时&#xff0c;突然发现我收到了一封信 我想我可以分享一下这段时间的感想以及收获 机缘 在CSDN的这段时间里&#xff0c;我学习到了很多知识&#xff0c;也…

服务运营 | 印第安纳大学翟成成:改变生活的水井选址

编者按&#xff1a; 作者于2023年4月在“Production and Operations Management”上发表的“Improving drinking water access and equity in rural Sub-Saharan Africa”探讨了欠发达地区水资源供应中的可达性和公平性问题。作者于2020年1月去往非洲埃塞俄比亚提格雷地区进行…

蓝桥杯 2022 省B 砍竹子

思路&#xff1a; 非常明显&#xff0c;这题是个贪心。因为这题是求最小操作次数&#xff0c;而且每次操作都会变小&#xff0c;所以肯定要优先操作大的元素&#xff0c;这样它变小之后才可能和其它元素一起操作以减少操作次数。 所以&#xff1a;建立两个数组&#xff0c;一…

js选择语句

文章目录 1. if 分支语句1.1. 示例代码1.2. 运行结果 2. if 双分支语句3. if 多分支语句4. switch 语句&#xff08;了解&#xff09;4.1. 注意4.2. case 穿透现象4.3. case 穿透产生的原因 5. switch 语句与选择语句区别别5.1. 语法上的区别5.2. 应用场景上的区别 6. 三元表达…

本地GPU调用失败问题解决2修改pytorch版本(失败)

一、基于现有anaconda中的环境复制新环境 1、管理员打开anaconda 进入当前环境&#xff1a; 输入 conda env list conda activate env_pytorch1121 2、复制当前环境为新环境 conda create --name env_pytorch2.2.0cu --clone env_pytorch1121 2&#xff09;删除其中的p…

库存控制秘诀:鞋服品牌如何避免库存积压风险

库存积压对于鞋服品牌而言&#xff0c;是一个普遍而又棘手的问题。过多的库存不仅占用了大量的资金&#xff0c;还可能导致产品过时、贬值&#xff0c;甚至影响品牌的长期发展。因此&#xff0c;如何有效地控制库存&#xff0c;避免积压风险&#xff0c;成为了鞋服品牌必须面对…

window下迁移SVN仓库到新的windows服务器

一、背景 一个基于 Windows 的 SVN 服务器&#xff0c;用于管理团队的代码库。该 SVN 仓库托管着公司的软件项目&#xff0c;包括多个分支和版本的代码。我们的团队规模约为 50 人&#xff0c;分布在不同的地理位置&#xff0c;他们都依赖 SVN 仓库来进行代码版本控制和协作开…

深度学习十大算法之图神经网络(GNN)

一、图神经网络的基础 图的基本概念 图是数学中的一个基本概念&#xff0c;用于表示事物间复杂的关系。在图论中&#xff0c;图通常被定义为一组节点&#xff08;或称为顶点&#xff09;以及连接这些节点的边。每个边可以有方向&#xff0c;称为有向边&#xff0c;或者没有方向…

C++剑指offer与高频面试题源码解答与分析

这是博主在当初秋招刷题时候记录的剑指offer第二版以及一些高频题的C源码和解法分析&#xff0c;可以说把这上面的题练好了面试不虚&#xff0c;最后也顺利帮助我拿下baidu ali meituan等多家大厂offer。整篇文章写了大概5W个字&#xff0c;也是积累了很长一段时间的作品&#…

函数进阶-Python

师从黑马程序员 函数中多个返回值的接收 def test_return():return 1,"hello",3x,y,ztest_return() print(x) print(y) print(z) 多种参数的使用 函数参数种类 位置参数 关键字参数 def user_info(name,age,gender):print(f"姓名是{name},年龄是:{age},性别是…

小学生古诗文大会往届真题测一测和独家详细解析(题目来自官方)

新学期开学一眨眼已经过了一个多月了&#xff0c;有家长朋友开始关心2024年上海市小学生古诗文大会什么时候开始&#xff1f;如何准备小学生古诗文大会&#xff1f;如何激发孩子学习古诗词的兴趣&#xff1f;如何提高小学古诗词和古诗文大会的学习成绩&#xff1f;... 最近&…

增强现实(AR)在广告中的力量

The Power of AR in Advertising 写在前面 增强现实&#xff08;AR -Augmented Reality&#xff09;是指借助软件、应用程序和智能手机、平板电脑或耳机等设备&#xff0c;为日常生活添加视觉和音频元素的技术。如今&#xff0c;品牌和广告商可以在营销活动中使用AR&#xff0…

解决mysql问题: this is incompatible with sql_mode=only_full_group_by

今天在部署一趟测试环境的服务&#xff0c;各种配置文件都配好了&#xff0c;启动服务后台报错&#xff0c;解决后记录一下&#xff0c;小伙伴们也可以看看&#xff01; ### Cause: java.sql.SQLSyntaxErrorException: Expression #1 of SELECT list is not in GROUP BY clause…

python爬虫-----输入输出与流程控制语句(第四天)

&#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; &#x1f388;&#x1f388;所属专栏&#xff1a;python爬虫学习&#x1f388;&#x1f388; ✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天…

Ubuntu Desktop - Updates (不升级到新版本)

Ubuntu Desktop - Updates [不升级到新版本] 1. UpdatesReferences 1. Updates System Settings -> Software & Updates -> Updates ubuntu-16.04.3-desktop-amd64.iso 不升级到新版本 ​ References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/