

rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。









public void run() {LOG.info(Thread.currentThread().getName() + ": starting");Server.connectionManager.startIdleScan();while (Server.running) {SelectionKey key = null;try {getSelector().select();Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {// we can run out of memory if we have too many threads// log the event and sleep for a minute and give // some thread(s) a chance to finishLOG.warn("Out of Memory in server select", e);closeCurrentConnection(key, e);Server.connectionManager.closeIdle(true);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {closeCurrentConnection(key, e);}}LOG.info("Stopping " + Thread.currentThread().getName());synchronized (this) {try {acceptChannel.close();selector.close();} catch (IOException e) { }selector= null;acceptChannel= null;// close all connectionsServer.connectionManager.stopIdleScan();Server.connectionManager.closeAll();}}void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = Server.connectionManager.register(channel);// If the connectionManager can't take it, close the connection.if (c == null) {if (channel.isOpen()) {IOUtils.cleanup(null, channel);}Server.connectionManager.droppedConnections.getAndIncrement();continue;}key.attach(c);  // so closeCurrentConnection can get the objectreader.addConnection(c);}}

简单来说就是accept channel,变成connection,然后交给reader处理。



  1. rpc connection初始7字节的检查。
  2. sasl握手与验证。
  3. IpcConnectionContext读取。
  4. processOneRpc准备工作,包括RequestHeaderProto解析。



@Overridepublic void run() {LOG.info("Starting " + Thread.currentThread().getName());try {doRunLoop();} finally {try {readSelector.close();} catch (IOException ioe) {LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);}}}private synchronized void doRunLoop() {while (Server.running) {SelectionKey key = null;try {// consume as many connections as currently queued to avoid// unbridled acceptance of connections that starves the selectint size = pendingConnections.size();for (int i=size; i>0; i--) {Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);}readSelector.select();Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isReadable()) {doRead(key);}} catch (CancelledKeyException cke) {// something else closed the connection, ex. responder or// the listener doing an idle scan.  ignore it and let them// clean up.LOG.info(Thread.currentThread().getName() +": connection aborted from " + key.attachment());}key = null;}} catch (InterruptedException e) {if (Server.running) {                      // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (IOException ex) {LOG.error("Error in Reader", ex);} catch (Throwable re) {LOG.error("Bug in read selector!", re);//ExitUtil.terminate(1, "Bug in read selector!");}}}//from Listener doReadvoid doRead(SelectionKey key) throws InterruptedException {int count;Connection c = (Connection)key.attachment();if (c == null) {return;  }c.setLastContact(Time.now());try {count = c.readAndProcess();} catch (InterruptedException ieo) {LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);throw ieo;} catch (Exception e) {// Any exceptions that reach here are fatal unexpected internal errors// that could not be sent to the client.LOG.info(Thread.currentThread().getName() +": readAndProcess from client " + c +" threw exception [" + e + "]", e);count = -1; //so that the (count < 0) block is executed}// setupResponse will signal the connection should be closed when a// fatal response is sent.if (count < 0 || c.shouldClose()) {Server.closeConnection(c);c = null;}else {c.setLastContact(Time.now());}}   





@Overridepublic void run() {LOG.debug(Thread.currentThread().getName() + ": starting");while (Server.running) {try {final Call call = Server.callQueue.take(); // pop the queue; maybe blocked hereif (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": " + call);}CurCall.set(call);/*TODOUserGroupInformation remoteUser = call.getRemoteUser();if (remoteUser != null) {remoteUser.doAs(call);} else {call.run();}*/call.run();} catch (InterruptedException e) {if (Server.running) {                          // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (Exception e) {LOG.info(Thread.currentThread().getName() + " caught an exception", e);} finally {CurCall.set(null);}}LOG.debug(Thread.currentThread().getName() + ": exiting");}




enum RpcKindProto {RPC_BUILTIN          = 0;  // Used for built in calls by testsRPC_WRITABLE         = 1;  // Use WritableRpcEngine RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine



Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);


static class ProtoNameVer {final String protocol;final long   version;ProtoNameVer(String protocol, long ver) {this.protocol = protocol;this.version = ver;}@Overridepublic boolean equals(Object o) {if (o == null) return false;if (this == o) return true;if (! (o instanceof ProtoNameVer))return false;ProtoNameVer pv = (ProtoNameVer) o;return ((pv.protocol.equals(this.protocol)) && (pv.version == this.version));     }@Overridepublic int hashCode() {return protocol.hashCode() * 37 + (int) version;    }}


public @interface ProtocolInfo {String protocolName();  // the name of the protocol (i.e. rpc service)long protocolVersion() default -1; // default means not defined use old way


@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/*** Protocol that a clients use to communicate with the NameNode.** Note: This extends the protocolbuffer service based interface to* add annotations required for security.*/
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {


service ClientNamenodeProtocol {rpc getBlockLocations(GetBlockLocationsRequestProto)returns(GetBlockLocationsResponseProto);rpc getServerDefaults(GetServerDefaultsRequestProto)returns(GetServerDefaultsResponseProto);rpc create(CreateRequestProto)returns(CreateResponseProto);rpc append(AppendRequestProto) returns(AppendResponseProto);rpc setReplication(SetReplicationRequestProto)returns(SetReplicationResponseProto);rpc setStoragePolicy(SetStoragePolicyRequestProto)...



//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);    


message RequestHeaderProto {/** Name of the RPC method */required string methodName = 1;/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get metainfo* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto*/required string declaringClassProtocolName = 2;/** protocol version of class declaring the called method */required uint64 clientProtocolVersion = 3;


 Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;RequestHeaderProto rpcRequest = request.getRequestHeader();String methodName = rpcRequest.getMethodName();/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get info* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto.*/String declaringClassProtoName = rpcRequest.getDeclaringClassProtocolName();long clientVersion = rpcRequest.getClientProtocolVersion();//LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);BlockingService service = (BlockingService) protocolImpl.protocolImpl;MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);if (methodDescriptor == null) {String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";LOG.warn(msg);throw new RpcNoSuchMethodException(msg);}Message prototype = service.getRequestPrototype(methodDescriptor);Message param = request.getValue(prototype);Message result = null;long startTime = Time.now();int qTime = (int) (startTime - receiveTime);Exception exception = null;boolean isDeferred = false;try {//server.rpcDetailedMetrics.init(protocolImpl.protocolClass);result = service.callBlockingMethod(methodDescriptor, null, param);// Check if this needs to be a deferred response,// by checking the ThreadLocal callback being set} catch (ServiceException e) {exception = (Exception) e.getCause();throw (Exception) e.getCause();} catch (Exception e) {exception = e;throw e;} finally {int processingTime = (int) (Time.now() - startTime);//if (LOG.isDebugEnabled()) {String msg ="Served: " + methodName + (isDeferred ? ", deferred" : "") +", queueTime= " + qTime +" procesingTime= " + processingTime;if (exception != null) {msg += " exception= " + exception.getClass().getSimpleName();}//LOG.debug(msg);LOG.info(msg);//LOG.info("params:" + param.toString());//LOG.info("result:" + result.toString());//}String detailedMetricsName = (exception == null) ?methodName :exception.getClass().getSimpleName();//server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);}return RpcWritable.wrap(result);}






 private boolean processResponse(LinkedList<RpcCall> responseQueue,boolean inHandler) throws IOException {boolean error = true;boolean done = false;       // there is more data for this channel.int numElements = 0;RpcCall call = null;try {synchronized (responseQueue) {//// If there are no items for this channel, then we are done//numElements = responseQueue.size();if (numElements == 0) {error = false;return true;              // no more data for this channel.}//// Extract the first call//call = responseQueue.removeFirst();SocketChannel channel = call.connection.channel;if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call);}//// Send as much data as we can in the non-blocking fashion//int numBytes = call.connection.channelWrite(channel, call.rpcResponse);if (numBytes < 0) {return true;}if (!call.rpcResponse.hasRemaining()) {//Clear out the response buffer so it can be collectedcall.rpcResponse = null;call.connection.decRpcCount();if (numElements == 1) {    // last call fully processes.done = true;             // no more data for this channel.} else {done = false;            // more calls pending to be sent.}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote " + numBytes + " bytes.");}} else {//// If we were unable to write the entire response out, then // insert in Selector queue. //call.connection.responseQueue.addFirst(call);if (inHandler) {// set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();try {// Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {//Its ok. channel might be closed else where.done = true;} finally {decPending();}}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote partial " + numBytes + " bytes.");}}error = false;              // everything went off well}} finally {if (error && call != null) {LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");done = true;               // error. no more data for this channel.Server.closeConnection(call.connection);}}return done;}





