本文主要对HDFSRPC通信框架解析。包括listener,reader,handler,responser等实现类的源码分析。注意hadoop版本为3.1.1。
写在前面
rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。
https://blog.csdn.net/yhl_jxy/article/details/79332092
还有本文中涉及到的代码大部分都是作者都整理过的,会和server源码有些许区别。
RPC框架架构图
从架构图中可以看出一个socket连接的数据处理被多个模块分割,每个模块处理特定的问题。这样做的好处一方面保证了call的并发,另一方面也保证了代码的可扩展性。
Listener
listener就是监听线程,那到底是监听什么?显而易见是socket连接又称connection。
Listener.run、doAccpect
public void run() {LOG.info(Thread.currentThread().getName() + ": starting");Server.connectionManager.startIdleScan();while (Server.running) {SelectionKey key = null;try {getSelector().select();Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {// we can run out of memory if we have too many threads// log the event and sleep for a minute and give // some thread(s) a chance to finishLOG.warn("Out of Memory in server select", e);closeCurrentConnection(key, e);Server.connectionManager.closeIdle(true);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {closeCurrentConnection(key, e);}}LOG.info("Stopping " + Thread.currentThread().getName());synchronized (this) {try {acceptChannel.close();selector.close();} catch (IOException e) { }selector= null;acceptChannel= null;// close all connectionsServer.connectionManager.stopIdleScan();Server.connectionManager.closeAll();}}void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = Server.connectionManager.register(channel);// If the connectionManager can't take it, close the connection.if (c == null) {if (channel.isOpen()) {IOUtils.cleanup(null, channel);}Server.connectionManager.droppedConnections.getAndIncrement();continue;}key.attach(c); // so closeCurrentConnection can get the objectreader.addConnection(c);}}
简单来说就是accept channel,变成connection,然后交给reader处理。
Reader
Reader在整个RPC框架中起着举足轻重的作用。在HDFSRPC协议详解一文中processOneRpc之前的工作都是reader完成的。总结一下就是以下几点:
- rpc connection初始7字节的检查。
- sasl握手与验证。
- IpcConnectionContext读取。
- processOneRpc准备工作,包括RequestHeaderProto解析。
还有一点要注意的一次reader就包含完成这所有工作,而不是多次完成。单次reader生成call以后,就会马上下次call的read,本质上call是并发的,由handler处理。
reader的源码其实很简单,本质上是循环执行了connection.readAndProcess()。本文不会对readAndProcess过多介绍,有兴趣可以查看HDFSRPC协议详解。
@Overridepublic void run() {LOG.info("Starting " + Thread.currentThread().getName());try {doRunLoop();} finally {try {readSelector.close();} catch (IOException ioe) {LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);}}}private synchronized void doRunLoop() {while (Server.running) {SelectionKey key = null;try {// consume as many connections as currently queued to avoid// unbridled acceptance of connections that starves the selectint size = pendingConnections.size();for (int i=size; i>0; i--) {Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);}readSelector.select();Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isReadable()) {doRead(key);}} catch (CancelledKeyException cke) {// something else closed the connection, ex. responder or// the listener doing an idle scan. ignore it and let them// clean up.LOG.info(Thread.currentThread().getName() +": connection aborted from " + key.attachment());}key = null;}} catch (InterruptedException e) {if (Server.running) { // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (IOException ex) {LOG.error("Error in Reader", ex);} catch (Throwable re) {LOG.error("Bug in read selector!", re);//ExitUtil.terminate(1, "Bug in read selector!");}}}//from Listener doReadvoid doRead(SelectionKey key) throws InterruptedException {int count;Connection c = (Connection)key.attachment();if (c == null) {return; }c.setLastContact(Time.now());try {count = c.readAndProcess();} catch (InterruptedException ieo) {LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);throw ieo;} catch (Exception e) {// Any exceptions that reach here are fatal unexpected internal errors// that could not be sent to the client.LOG.info(Thread.currentThread().getName() +": readAndProcess from client " + c +" threw exception [" + e + "]", e);count = -1; //so that the (count < 0) block is executed}// setupResponse will signal the connection should be closed when a// fatal response is sent.if (count < 0 || c.shouldClose()) {Server.closeConnection(c);c = null;}else {c.setLastContact(Time.now());}}
CallQueue
callQueue主要是存放call队列,由于callqueue在hdfs是一个较为复杂的东西,后期会单做一期介绍。
Handler
handler线程也比较简单,实际上就是执行了call.run()。
@Overridepublic void run() {LOG.debug(Thread.currentThread().getName() + ": starting");while (Server.running) {try {final Call call = Server.callQueue.take(); // pop the queue; maybe blocked hereif (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": " + call);}CurCall.set(call);/*TODOUserGroupInformation remoteUser = call.getRemoteUser();if (remoteUser != null) {remoteUser.doAs(call);} else {call.run();}*/call.run();} catch (InterruptedException e) {if (Server.running) { // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (Exception e) {LOG.info(Thread.currentThread().getName() + " caught an exception", e);} finally {CurCall.set(null);}}LOG.debug(Thread.currentThread().getName() + ": exiting");}
主要的难点是这么执行call.run()。要知道call.run首先要知道protocols。
Protocols
每个server都自己的Protocols,protocols首先是以rpcKind分类的。
enum RpcKindProto {RPC_BUILTIN = 0; // Used for built in calls by testsRPC_WRITABLE = 1; // Use WritableRpcEngine RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
3.x的rpckind都使用的是RPC_PROTOCOL_BUFFER,所以以这个为例。
RPC_PROTOCOL_BUFFER的protocols会放到一个hashmap里面。
Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
key为ProtoNameVer,要注意的hashcode的实现方法。
static class ProtoNameVer {final String protocol;final long version;ProtoNameVer(String protocol, long ver) {this.protocol = protocol;this.version = ver;}@Overridepublic boolean equals(Object o) {if (o == null) return false;if (this == o) return true;if (! (o instanceof ProtoNameVer))return false;ProtoNameVer pv = (ProtoNameVer) o;return ((pv.protocol.equals(this.protocol)) && (pv.version == this.version)); }@Overridepublic int hashCode() {return protocol.hashCode() * 37 + (int) version; }}
所以任何protocol必须有protocol和version,即注解类ProtocolInfo。
@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {String protocolName(); // the name of the protocol (i.e. rpc service)long protocolVersion() default -1; // default means not defined use old way
}
一个protocol的接口类类似这样。
@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/*** Protocol that a clients use to communicate with the NameNode.** Note: This extends the protocolbuffer service based interface to* add annotations required for security.*/
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {
}
那反射的方法怎么来呢?我们可以发现ClientNamenodeProtocol.BlockingInterface其实是protobuf编译出来的,可以看一下ClientNamenodeProtocol.proto文件的最后service定义。
service ClientNamenodeProtocol {rpc getBlockLocations(GetBlockLocationsRequestProto)returns(GetBlockLocationsResponseProto);rpc getServerDefaults(GetServerDefaultsRequestProto)returns(GetServerDefaultsResponseProto);rpc create(CreateRequestProto)returns(CreateResponseProto);rpc append(AppendRequestProto) returns(AppendResponseProto);rpc setReplication(SetReplicationRequestProto)returns(SetReplicationResponseProto);rpc setStoragePolicy(SetStoragePolicyRequestProto)...
}
编译出来就是ClientNamenodeProtocol.BlockingInterface,里面就是方法列表。
我们自己的实现类只需要实现ClientNamenodeProtocolPB即可。例如ClientNamenodeProtocolServerSideTranslatorPB。
//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);
最后call.run其实是根据RequestHeaderProto来找到对应的实现类。
message RequestHeaderProto {/** Name of the RPC method */required string methodName = 1;/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get metainfo* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto*/required string declaringClassProtocolName = 2;/** protocol version of class declaring the called method */required uint64 clientProtocolVersion = 3;
}
然后通过反射,去执行了实现类的方法。
Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;RequestHeaderProto rpcRequest = request.getRequestHeader();String methodName = rpcRequest.getMethodName();/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get info* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto.*/String declaringClassProtoName = rpcRequest.getDeclaringClassProtocolName();long clientVersion = rpcRequest.getClientProtocolVersion();//LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);BlockingService service = (BlockingService) protocolImpl.protocolImpl;MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);if (methodDescriptor == null) {String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";LOG.warn(msg);throw new RpcNoSuchMethodException(msg);}Message prototype = service.getRequestPrototype(methodDescriptor);Message param = request.getValue(prototype);Message result = null;long startTime = Time.now();int qTime = (int) (startTime - receiveTime);Exception exception = null;boolean isDeferred = false;try {//server.rpcDetailedMetrics.init(protocolImpl.protocolClass);result = service.callBlockingMethod(methodDescriptor, null, param);// Check if this needs to be a deferred response,// by checking the ThreadLocal callback being set} catch (ServiceException e) {exception = (Exception) e.getCause();throw (Exception) e.getCause();} catch (Exception e) {exception = e;throw e;} finally {int processingTime = (int) (Time.now() - startTime);//if (LOG.isDebugEnabled()) {String msg ="Served: " + methodName + (isDeferred ? ", deferred" : "") +", queueTime= " + qTime +" procesingTime= " + processingTime;if (exception != null) {msg += " exception= " + exception.getClass().getSimpleName();}//LOG.debug(msg);LOG.info(msg);//LOG.info("params:" + param.toString());//LOG.info("result:" + result.toString());//}String detailedMetricsName = (exception == null) ?methodName :exception.getClass().getSimpleName();//server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);}return RpcWritable.wrap(result);}
完成以后如果有返回Message会放入rpccall.rpcResponse。然后再把call放入ResponseQueue。
ResponseQueue
在connection中,主要存放处理完的rpccall。
Responder
Responder线程主要负责call结果的返回。
private boolean processResponse(LinkedList<RpcCall> responseQueue,boolean inHandler) throws IOException {boolean error = true;boolean done = false; // there is more data for this channel.int numElements = 0;RpcCall call = null;try {synchronized (responseQueue) {//// If there are no items for this channel, then we are done//numElements = responseQueue.size();if (numElements == 0) {error = false;return true; // no more data for this channel.}//// Extract the first call//call = responseQueue.removeFirst();SocketChannel channel = call.connection.channel;if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call);}//// Send as much data as we can in the non-blocking fashion//int numBytes = call.connection.channelWrite(channel, call.rpcResponse);if (numBytes < 0) {return true;}if (!call.rpcResponse.hasRemaining()) {//Clear out the response buffer so it can be collectedcall.rpcResponse = null;call.connection.decRpcCount();if (numElements == 1) { // last call fully processes.done = true; // no more data for this channel.} else {done = false; // more calls pending to be sent.}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote " + numBytes + " bytes.");}} else {//// If we were unable to write the entire response out, then // insert in Selector queue. //call.connection.responseQueue.addFirst(call);if (inHandler) {// set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();try {// Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {//Its ok. channel might be closed else where.done = true;} finally {decPending();}}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote partial " + numBytes + " bytes.");}}error = false; // everything went off well}} finally {if (error && call != null) {LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");done = true; // error. no more data for this channel.Server.closeConnection(call.connection);}}return done;}