大纲
1.RPC的相关概念
2.RPC服务调用端动态代理实现
3.Netty客户端之RPC远程调用过程分析
4.RPC网络通信中的编码解码器
5.Netty服务端之RPC服务提供端的处理
6.RPC服务调用端实现超时功能
5.Netty服务端之RPC服务提供端的处理
(1)RPC服务提供端NettyServer
(2)基于反射调用请求对象的目标方法
(1)RPC服务提供端NettyRpcServer
public class ServiceConfig {private String serviceName;//调用方的服务名称private Class serviceInterfaceClass;//服务接口类型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到这一步为止,server启动了而且监听指定的端口号了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//进入一个阻塞的状态,同步一直等待到你的server端要关闭掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多个服务public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}
(2)基于反射调用请求对象的目标方法
//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//参数类型private Object[] args;//参数值private String invokerApplicationName;//调用方的服务名称private String invokerIp;//调用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此时我们要实现什么呢?//我们需要根据RpcRequest指定的class,获取到这个class//然后通过反射构建这个class对象实例//接着通过反射获取到这个RpcRequest指定方法和入参类型的method//最后通过反射调用,传入方法,拿到返回值//根据接口名字拿到接口实现类的名字后再获取类ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc调用结果封装到响应里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}
6.RPC服务调用端实现超时功能
public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}}); try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//标记一下请求发起的时间NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除发起请求时间的标记NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}