手撕RPC——实现简单的RPC调用
- 一、场景设计
- 二、设计思路
- 2.1 客户端的设计
- 2.2 服务端的设计
- 2.3 通信设计
- 三、代码实现
- 3.1 定义用户信息
- 3.2 用户服务接口
- 3.3 用户服务接口实现
- 3.4 定义消息格式
- 3.5 实现动态代理类
- 3.6 封装信息传输类
- 3.7 定义服务端Server接口
- 3.8 实现RpcServer接口
- 3.9 实现WorkThread类
- 3.10 实现本地服务存放器
- 3.11 客户端主程序
- 3.12 服务端主程序
一、场景设计
现在A,B位于不同的服务器上,但现在A想调用B的某个方法,如何实现呢?
服务端B:
有一个用户表
- UserService 里有一个功能:getUserByUserId(Integer id)
- UserServiceImpl 实现了UserService接口和方法
客户端A:
调用getUserByUserId方法, 内部传一个Id给服务端,服务端查询到User对象返回给客户端
如何实现以上调用过程呢?
二、设计思路
主要考虑客户端、服务端、以及双方如何通信才能实现此功能
2.1 客户端的设计
- 调用getUserByUserId方法时,内部将调用信息处理后发送给服务端B,告诉B我要获取User
- 外部调用方法,内部进行其它的处理——这种场景我们可以使用动态代理的方式,改写原本方法的处理逻辑
2.2 服务端的设计
- 监听到A的请求后,接收A的调用信息,并根据信息得到A想调用的服务与方法
- 根据信息找到对应的服务,进行调用后将结果发送回给A
2.3 通信设计
- 使用Java的socket网络编程进行通信
- 为了方便A ,B之间 对接收的消息进行处理,我们需要将请求信息和返回信息封装成统一的消息格式
三、代码实现
在此部分我们将理论转化为代码,分别实现客户端和服务端。
项目目录结构
3.1 定义用户信息
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {// 客户端和服务端共有的private Integer id;private String userName;private Boolean sex;
}
3.2 用户服务接口
public interface UserService {// 客户端通过这个接口调用服务端的实现类User getUserByUserId(Integer id);//新增一个功能Integer insertUserId(User user);
}
3.3 用户服务接口实现
public class UserServiceImpl implements UserService {@Overridepublic User getUserByUserId(Integer id) {System.out.println("客户端查询了"+id+"的用户");// 模拟从数据库中取用户的行为Random random = new Random();User user = User.builder().userName(UUID.randomUUID().toString()).id(id).sex(random.nextBoolean()).build();return user;}@Overridepublic Integer insertUserId(User user) {System.out.println("插入数据成功"+user.getUserName());return user.getId();}
}
3.4 定义消息格式
//定义请求信息格式RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {//服务类名,客户端只知道接口private String interfaceName;//调用的方法名private String methodName;//参数列表private Object[] params;//参数类型private Class<?>[] paramsType;
}
//定义返回信息格式RpcResponse(类似http格式)
@Data
@Builder
public class RpcResponse implements Serializable {//状态码private int code;//状态信息private String message;//具体数据private Object data;//构造成功信息public static RpcResponse sussess(Object data){return RpcResponse.builder().code(200).data(data).build();}//构造失败信息public static RpcResponse fail(){return RpcResponse.builder().code(500).message("服务器发生错误").build();}
}
3.5 实现动态代理类
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {//传入参数service接口的class对象,反射封装成一个requestprivate String host;private int port;//jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//构建requestRpcRequest request=RpcRequest.builder().interfaceName(method.getDeclaringClass().getName()).methodName(method.getName()).params(args).paramsType(method.getParameterTypes()).build();//IOClient.sendRequest 和服务端进行数据传输RpcResponse response= IOClient.sendRequest(host,port,request);return response.getData();}public <T>T getProxy(Class<T> clazz){Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);return (T)o;}
}
3.6 封装信息传输类
public class IOClient {//这里负责底层与服务端的通信,发送request,返回responsepublic static RpcResponse sendRequest(String host, int port, RpcRequest request){try {Socket socket=new Socket(host, port);ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());oos.writeObject(request);oos.flush();RpcResponse response=(RpcResponse) ois.readObject();return response;} catch (IOException | ClassNotFoundException e) {e.printStackTrace();return null;}}
}
3.7 定义服务端Server接口
public interface RpcServer {//开启监听void start(int port);void stop();
}
3.8 实现RpcServer接口
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {private ServiceProvider serviceProvide;@Overridepublic void start(int port) {try {ServerSocket serverSocket=new ServerSocket(port);System.out.println("服务器启动了");while (true) {//如果没有连接,会堵塞在这里Socket socket = serverSocket.accept();//有连接,创建一个新的线程执行处理new Thread(new WorkThread(socket,serviceProvide)).start();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void stop() {}
}
3.9 实现WorkThread类
WorkThread类负责启动线程和客户端进行数据传输,WorkThread类中的getResponse方法负责解析收到的request信息,寻找服务进行调用并返回结果。
@AllArgsConstructor
public class WorkThread implements Runnable{private Socket socket;private ServiceProvider serviceProvide;@Overridepublic void run() {try {ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());//读取客户端传过来的requestRpcRequest rpcRequest = (RpcRequest) ois.readObject();//反射调用服务方法获取返回值RpcResponse rpcResponse=getResponse(rpcRequest);//向客户端写入responseoos.writeObject(rpcResponse);oos.flush();} catch (IOException | ClassNotFoundException e) {e.printStackTrace();}}private RpcResponse getResponse(RpcRequest rpcRequest){//得到服务名String interfaceName=rpcRequest.getInterfaceName();//得到服务端相应服务实现类Object service = serviceProvide.getService(interfaceName);//反射调用方法Method method=null;try {method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());Object invoke=method.invoke(service,rpcRequest.getParams());return RpcResponse.sussess(invoke);} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {e.printStackTrace();System.out.println("方法执行错误");return RpcResponse.fail();}}
}
3.10 实现本地服务存放器
因为一个服务器会有多个服务,所以需要设置一个本地服务存放器serviceProvider存放服务,在接收到服务端的request信息之后,我们在本地服务存放器找到需要的服务,通过反射调用方法,得到结果并返回
//本地服务存放器
public class ServiceProvider {//集合中存放服务的实例private Map<String,Object> interfaceProvider;public ServiceProvider(){this.interfaceProvider=new HashMap<>();}//本地注册服务public void provideServiceInterface(Object service){String serviceName=service.getClass().getName();Class<?>[] interfaceName=service.getClass().getInterfaces();for (Class<?> clazz:interfaceName){interfaceProvider.put(clazz.getName(),service);}}//获取服务实例public Object getService(String interfaceName){return interfaceProvider.get(interfaceName);}
}
3.11 客户端主程序
public class TestClient {public static void main(String[] args) {ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);UserService proxy=clientProxy.getProxy(UserService.class);User user = proxy.getUserByUserId(1);System.out.println("从服务端得到的user="+user.toString());User u=User.builder().id(100).userName("wxx").sex(true).build();Integer id = proxy.insertUserId(u);System.out.println("向服务端插入user的id"+id);}
}
3.12 服务端主程序
public class TestServer {public static void main(String[] args) {UserService userService=new UserServiceImpl();ServiceProvider serviceProvider=new ServiceProvider();serviceProvider.provideServiceInterface(userService);RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);rpcServer.start(9999);}
}