聊聊分布式架构04——RPC通信原理

目录

RPC通信的基本原理

RPC结构

手撸简陋版RPC

知识点梳理

1.Socket套接字通信机制

2.通信过程的序列化与反序列化

3.动态代理

4.反射

思维流程梳理

码起来

服务端时序图

服务端—Api与Provider模块

客户端时序图


RPC通信的基本原理

RPC(Remote Procedure Call)是一种远程过程调用协议,用于在分布式系统中进行远程通信,允许一个计算机程序调用另一个地址空间(通常是在不同的机器上)的函数或过程,就像调用本地函数一样。下面是RPC通信的基本原理:

  1. 客户端调用:远程客户端(调用方)希望调用远程服务器(提供方)上的一个或多个远程过程(函数)。客户端在本地创建一个请求,并指定要调用的远程过程的名称以及传递给该过程的参数。

  2. 参数序列化:在将请求发送到远程服务器之前,客户端需要将参数序列化为字节流或其他适合传输的格式。序列化是将数据转换为可以在网络上传输的形式的过程。

  3. 网络传输:客户端通过网络将请求发送到远程服务器。通常,这涉及到将请求数据打包成网络消息,然后通过网络协议(如HTTP、TCP/IP)将消息发送到服务器的地址。

  4. 服务器接收:远程服务器接收到客户端的请求消息,通常通过网络协议(如HTTP服务器、Socket服务器)监听特定的端口。

  5. 参数反序列化:服务器从请求消息中提取参数数据,并将其反序列化为本地数据结构,以便将其传递给远程过程。

  6. 远程过程调用:服务器调用相应的远程过程,将参数传递给该过程并执行相应的操作。远程过程可以位于服务器的本地代码中或远程服务器上的远程服务中。

  7. 结果序列化:远程过程执行完毕后,服务器将结果序列化为字节流或其他适合传输的格式。

  8. 结果传输:服务器通过网络将结果数据打包成响应消息,并将其发送回客户端。

  9. 客户端接收:客户端接收到服务器的响应消息。

  10. 结果反序列化:客户端从响应消息中提取结果数据,并将其反序列化为本地数据结构。

  11. 客户端处理:客户端可以根据远程过程的执行结果采取相应的行动,可能是继续执行本地代码或返回结果给调用方。

  12. 通信完成:一次RPC调用完成后,客户端和服务器之间的通信过程结束。

RPC结构

  1. 客户端模块代理所有远程方法的调用

  2. 将目标服务、目标方法、调用目标方法的参数等必要信息序列化

  3. 序列化之后的数据包进一步压缩,压缩后的数据包通过网络通信传输到目标服务节点

  4. 服务节点将接受到的数据包进行解压

  5. 解压后的数据包反序列化成目标服务、目标方法、目标方法的调用参数

  6. 通过服务端代理调用目标方法获取结果,结果同样需要序列化、压缩然后回传给客户端

手撸简陋版RPC

总觉得文字描述太干了,有点咽不下去,还是手撸下试试吧。毕竟艾瑞莉娅的奶奶总说:

纸上得来终觉浅,绝知此事要躬行。

知识点梳理
1.Socket套接字通信机制

在Java中,Socket是用于网络通信的基础类之一,它提供了一种机制,通过该机制,计算机程序可以在网络上建立连接、发送数据、接收数据和关闭连接。

  • 服务器套接字(ServerSocket)

    • 服务器套接字用于在服务器端监听并接受客户端的连接请求。

    • 使用以下步骤创建和使用服务器套接字:

      • 创建ServerSocket对象,并绑定到一个特定的端口号。

      • 使用ServerSocket对象的accept()方法来等待客户端的连接请求,并接受连接。

      • 一旦接受连接,可以创建新的Socket对象来处理客户端的通信。

      • 完成通信后,关闭SocketServerSocket连接。

    ServerSocket serverSocket = new ServerSocket(port_number);
    Socket clientSocket = serverSocket.accept(); // 等待连接
    InputStream inputStream = clientSocket.getInputStream();
    OutputStream outputStream = clientSocket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    clientSocket.close();
    serverSocket.close();
    ​
  • 客户端套接字(Socket)

    • 客户端套接字用于连接到远程服务器,发送请求并接收响应。

    • 使用以下步骤创建和使用客户端套接字:

      • 创建Socket对象,指定远程服务器的主机名或IP地址以及端口号。

      • 使用Socket对象的输入流和输出流来进行数据的读取和写入。

      • 完成通信后,关闭Socket连接。

    Socket socket = new Socket("server_hostname", port_number);
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    socket.close();
2.通信过程的序列化与反序列化

在Java中通过 JDK 提供了 Java 对象的序列化方式实现对象序列化传输,主要通过输出流java.io.ObjectOutputStream和输入流java.io.ObjectInputStream来实现;

java.io.ObjectOutputStream:表示对象输出流 , 它的 writeObject(Object obj)方法可以对参数指定的 obj 对象进行序列化,把得到的字节序列写到一个目标输出流中;

java.io.ObjectInputStream:表示对象输入流 ,它的 readObject()方法源输入流中读取字节序列,再把它们反序列化成为一个对象,并将其返回;

3.动态代理

动态代理是在运行时动态生成代理类的方式。Java提供了java.lang.reflect.Proxy类和java.lang.reflect.InvocationHandler接口来实现动态代理。JDK中提供了基于接口动态代理的方法:

        // 创建代理对象MyInterface proxyObject = (MyInterface) Proxy.newProxyInstance(MyInterface.class.getClassLoader(),new Class[]{MyInterface.class},new MyInvocationHandler(realObject));

使用动态代理的目的:

  1. 透明远程调用: 动态代理可以隐藏底层的远程调用细节,使得远程过程调用看起来像本地方法调用一样简单。客户端无需关心网络通信、数据序列化和反序列化等细节,因为这些都由代理对象处理。

  2. 减少重复性代码: 使用动态代理可以减少编写和维护远程调用代码的工作量。代理类负责处理通用的远程调用逻辑,开发者只需关注具体的业务逻辑。

  3. 集中管理远程调用逻辑: 动态代理将远程调用逻辑集中在一个地方,这样可以更容易地管理和维护,例如添加统一的错误处理、日志记录或性能监控等功能。

4.反射

在RPC(Remote Procedure Call,远程过程调用)中使用反射的主要目的是实现透明的远程调用,即使在客户端和服务器之间存在远程分离,也可以像调用本地方法一样调用远程服务。反射在RPC中的具体目的和用途如下:

  1. 动态代理生成代理对象: 反射机制允许在运行时生成代理对象,这些代理对象可以代替实际的远程服务对象执行方法调用。这样,客户端代码不需要提前知道要调用的具体远程方法和对象,而可以动态生成代理并执行方法。

  2. 动态识别方法和参数: 反射允许在运行时识别远程方法和方法参数的名称、类型和数量。这对于将方法调用信息打包成请求并传递给远程服务器非常有用,服务器可以根据这些信息正确地解析请求并调用相应的方法。

  3. 动态序列化和反序列化: 反射可以用于动态地序列化请求和响应数据。在RPC中,请求和响应数据通常需要以某种格式进行序列化和反序列化,以便在网络上进行传输。反射可以帮助动态识别数据类型,将数据转换为适当的格式,并在服务器端进行反序列化。

思维流程梳理

码起来

假设需求是客户端想要访问服务端(ip:prot/helloService/sayHello)上的helloService调用sayHello()。先定义服务端的实现。

服务端时序图

服务端—Api与Provider模块

服务端作为服务提供者,自然需要具备常规的业务模块:Api与Provider模块

Api模块定义简单的HelloService接口,包含两个方法sayHello(String content)和saveUSer(User user)

public interface IHelloService {String sayHello(String content);String saveUSer(User user);
}

定义一个简单对象User类

public class User {private String name;private int age;
​// getter和setterpublic String getName() {return name;}
​public void setName(String name) {this.name = name;}
​public int getAge() {return age;}
​public void setAge(int age) {this.age = age;}
​@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}

Provider模块定义HelloService接口的实现类HelloServiceImpl(rpc-server-provider模块的pom中需要添加rpc-server-api模块的依赖)

public class HelloServiceImpl implements IHelloService {@Overridepublic String sayHello(String content) {System.out.println("request in sayHello:" + content);return "Say hello:" + content;}
​@Overridepublic String saveUSer(User user) {System.out.println("request in saveUser:" + user);return "Save user success";}
}

定义RpcServerProxy通过代理的方式使用Socket对外暴露服务接口

public class RpcServerProxy {private ExecutorService executorService = Executors.newCachedThreadPool();
​public void publisher(Object service, int port) {ServerSocket serverSocket = null;try {serverSocket = new ServerSocket(port);while (true) {Socket socket = serverSocket.accept(); // 使用accept()等待客户端连接,接收请求executorService.execute(new ProcessorHandler(socket, service)); // 每一个socket交给一个processorHandler处理}} catch (IOException e) {e.printStackTrace();} finally {// 关闭资源,jdk1.7后提供了try-with可以自动关闭if (serverSocket != null) {try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}
}

需要定义一个线程ProcessorHandler对每次的socket请求进行处理

public class ProcessorHandler implements Runnable{
​private Socket socket;private Object service;
​public ProcessorHandler(Socket socket, Object service) {this.socket = socket;this.service = service;}
​
​@Overridepublic void run() {// 使用ObjectOutputStream和ObjectInputStream配合socket的输入流输出流进行序列化和反序列化ObjectOutputStream objectOutputStream = null;ObjectInputStream objectInputStream = null;try {objectInputStream = new ObjectInputStream(socket.getInputStream());
​// 客户端传过来的信息:请求哪个类,哪个方法,方法的参数  ——>  封装为RpcRequest类RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); // 拿到客户端通信传递的请求类Object result = invoke(rpcRequest); // 反射调用本地服务
​objectOutputStream = new ObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(result);objectOutputStream.flush(); // 手动将缓冲区中的数据强制刷新到输出流中,以确保数据被立即写入底层的输出流。
​} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} finally {// 关闭关联资源,jdk1.7后提供了try-with可以自动关闭if (objectInputStream != null) {try {objectInputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}if (objectOutputStream != null) {try {objectOutputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
​private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {// 反射调用Object[] args = rpcRequest.getParameters(); // 获取客户端传递的RpcRequest中的参数Class<?>[] types = new Class[args.length]; // 获得每个参数的类型for (int i = 0; i < args.length; i++) {types[i] = args.getClass();}Class clazz = Class.forName(rpcRequest.getClassName()); // 根据请求的类名反射加载类Method method = clazz.getMethod(rpcRequest.getMethodName(), types); // 获取类中的方法Object result = method.invoke(service, args); // 进行反射调用方法return result;}
}

定义一个通用的RpcRequest类参与通信过程中的请求处理,这里是需要序列化的

public class RpcRequest implements Serializable {private String className;private String methodName;private Object[] parameters;// getter and setterpublic String getClassName() {return className;}
​public void setClassName(String className) {this.className = className;}
​public String getMethodName() {return methodName;}
​public void setMethodName(String methodName) {this.methodName = methodName;}
​public Object[] getParameters() {return parameters;}
​public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

Provider的app中发布下服务,运行没有报错,服务端OK

public class App 
{public static void main( String[] args ){IHelloService helloService = new HelloServiceImpl();RpcServerProxy rpcServerProxy = new RpcServerProxy();rpcServerProxy.publisher(helloService, 8080); // 把服务发布出去}
}

客户端时序图

定义动态代理RpcClientProxy,JDK的接口代理方式就是一句话:

public class RpcClientProxy {public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));}
}

定义被代理接口RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {private String host;private int port;
​public RemoteInvocationHandler(String host, int port) {this.host = host;this.port = port;}
​@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 客户端请求会进入这里进行包装RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParameters(args);// 远程通信交给RpcNetTransportRpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);Object result = rpcNetTransport.send(rpcRequest);return result;}
}

定义通信传输类RpcNetTransport

public class RpcNetTransport {private String host;private int port;
​public RpcNetTransport(String host, int port) {this.host = host;this.port = port;}
​public Object send(RpcRequest rpcRequest) {Socket socket = null;Object result = null;ObjectInputStream objectInputStream = null;ObjectOutputStream objectOutputStream = null;try {socket = new Socket(host, port); // 建立连接
​objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); // 客户端通信写入objectOutputStream.writeObject(rpcRequest);objectOutputStream.flush();
​objectInputStream = new ObjectInputStream(socket.getInputStream()); // 客户端通信输出result = objectInputStream.readObject();return result;} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} finally {// 关闭关联资源if (objectInputStream != null) {try {objectInputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}if (objectOutputStream != null) {try {objectOutputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}}return result;}
}

客户端请求远程服务

public class App 
{public static void main( String[] args ){RpcClientProxy rpcClientProxy = new RpcClientProxy();IHelloService helloService = rpcClientProxy.clientProxy(IHelloService.class, "localhost", 8080);String result = helloService.sayHello("Elaine");System.out.println(result);}
}

请求成功,客户端获得返回结果

服务端打印了请求日志

到这里,一个简陋且粗糙的RPC通信版本就好了。

别灰心,要记住艾瑞莉娅的奶奶总说

路漫漫其修选兮,吾将上下而求索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/152887.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OSPF的7大状态和5大报文详讲

- Down OSPF的初始状态 - Init 初始化——我刚刚给别人发Hello报文 我们可以将OSPF邻居建立的过程理解为&#xff1a;我和你打招呼&#xff0c;你和我打招呼&#xff0c;然后咱俩成了邻居 比如&#xff1a; R1和R2要建立OSPF邻居 R1给R2发送了Hello报文&#xff0c;但是R1此时…

Pytorch之EfficientNetV2图像分类

文章目录 前言一、EfficientNet V21. 网络简介2. EfficientNetV1弊端&#x1f947;训练图像的尺寸很大时&#xff0c;训练速度非常慢&#x1f948;在网络浅层中使用Depthwise convolutions速度会很慢&#x1f949;同等的放大每个stage是次优的 3.NAS Search4. Progressive Lear…

3.3.OpenCV技能树--二值图像处理--图像形态学操作

文章目录 1.图像形态学运算简介2.图像开运算处理2.1.图像开运算处理简介2.2.图像开运算处理代码2.3.图像开运算处理效果 3.图像闭运算处理3.1.图像闭运算处理简介3.2.图像闭运算处理代码3.3.图像闭运算处理效果 4.图像形态学梯度处理4.1.图像形态学梯度处理简介4.2.图像形态学梯…

如何保证 RabbitMQ 的消息可靠性?

项目开发中经常会使用消息队列来完成异步处理、应用解耦、流量控制等功能。虽然消息队列的出现解决了一些场景下的问题&#xff0c;但是同时也引出了一些问题&#xff0c;其中使用消息队列时如何保证消息的可靠性就是一个常见的问题。如果在项目中遇到需要保证消息一定被消费的…

Leetcode---114双周赛

题目列表 2869. 收集元素的最少操作次数 2870. 使数组为空的最少操作次数 2871. 将数组分割成最多数目的子数组 2872. 可以被 K 整除连通块的最大数目 一、收集元素的最小操作次数 直接模拟&#xff0c;倒序遍历即可&#xff0c;代码如下 class Solution { public:int mi…

docker搭建jenkins

1.拉取镜像 docker pull jenkinsci/blueocean 2.启动容器 docker run -d -u root -p 8666:8080 -p 50000:50000 -v /var/jenkins_home:/var/jenkins_home -v /etc/localtime:/etc/localtime --name MyJenkins jenkinsci/blueocean 3.访问ip:port,就能访问了 4.docker logs 容器…

Go 团队发布组织 / 构建 Go module 的官方指南

导读Go 团队发布了一份官方指南&#xff0c;帮助开发者更规范地组织 / 构建 Go module。 刚接触 Go 的开发者常见问题之一是&#xff0c;“就文件和文件夹的组织布局而言&#xff0c;如何组织我的 Go 项目&#xff1f;”。这份指南就是提供建议来帮助回答这个问题。其中包括针对…

基于Softmax回归的多分类任务

Logistic回归可以有效地解决二分类问题&#xff0c;但在分类任务中&#xff0c;还有一类多分类问题&#xff0c;即类别数C大于2 的分类问题。Softmax回归就是Logistic回归在多分类问题上的推广。 使用Softmax回归模型对一个简单的数据集进行多分类实验。 首先给大家看一下需要的…

多路彩灯控制器led流水灯VHDL速度可调仿真图视频、源代码

名称&#xff1a;多路彩灯控制器led流水灯VHDL速度可调 软件&#xff1a;Quartus 语言&#xff1a;VHDL 代码功能&#xff1a; 使用VHDL设计彩灯控制器&#xff0c;共24个led灯&#xff0c;分为5种不同的花样&#xff0c;可以通过按键切换花样的变化速度。 代码下载&#…

VBox启动失败、Genymotion启动失败、Vagrant迁移

VBox启动失败、Genymotion启动失败、Vagrant迁移 2023.10.9 最新版本vbox7.0.10、Genymotion3.5.0 Vbox启动失败 1、查看日志 Error -610 in supR3HardenedMainInitRuntime! (enmWhat4) Failed to locate ‘vcruntime140.dll’ 日志信息查看方法->找到虚拟机所在位置->…

如何开始学习量子机器学习

一、关于量子计算 这是我关于量子机器学习&#xff08;QML&#xff09;的第二篇文章&#xff0c;这是第一篇&#xff0c;关于为什么你应该开始学习QML。 开始研究量子机器学习很困难&#xff0c;因为我不知道我需要了解多少量子力学和计算知识。我在101年上大学时上了量子力学2…

抽象轻松的java-mybatis简单入门

第一步&#xff1a;用IDEA新建一个java包 第二步&#xff1a;在IDEA中添加数据库&#xff08;ps&#xff1a;自己百度&#xff09; 点击数据库 第二步&#xff0c;新建数据库 选择你使用的数据库 用户与密码根据自己的设置进行配置 为了更方便的查看数据库&#xff0c;可以像图…

【kubernetes】带你了解k8s中PV和PVC的由来

文章目录 1 为什么需要卷(Volume)2 卷的挂载2.1 k8s集群中可以直接使用2.2 需要额外的存储组件2.3 公有云 2 PV(Persistent Volume)3 SC(Storage Class) 和 PVC(Persistent Volume Claim)4 总结 1 为什么需要卷(Volume) Pod是由一个或者多个容器组成的&#xff0c;在启动Pod中…

Linux免密登录

目标&#xff1a; 192.168.233.31 ton-pc-003 192.168.233.32 ton-pc-004 在ton-pc-004&#xff08;以下简称004&#xff09;免密登录ton-pc-003&#xff08;以下简称003&#xff09; 具体流程和说明&#xff1a; 1、在004&#xff08;客户机&#xff09;中生成秘钥对 ssh…

【计算机视觉|人脸建模】学习从4D扫描中获取的面部形状和表情的模型

本系列博文为深度学习/计算机视觉论文笔记&#xff0c;转载请注明出处 标题&#xff1a;Learning a model of facial shape and expression from 4D scans 链接&#xff1a;Learning a model of facial shape and expression from 4D scans | ACM Transactions on Graphics Pe…

解决方案:AI赋能工业生产3.0,从工业“制造”到“智造”

视频监控技术是一种既成熟又广泛应用于工业制造领域的先进技术。它可以通过安装各种摄像头和传感器来监测整个生产流程&#xff0c;包括原材料的采购、加工、装配和物流等环节&#xff0c;从而实现对生产过程的实时监控和管理&#xff0c;以及对异常事件的及时预警和响应。 在…

【SV中的多线程fork...join/join_any/join_none】

SV中fork_join/fork_join_any/fork_join_none 1 一目了然1.1 fork...join1.2 fork...join_any1.3 fork...join_none 2 总结 SV中fork_join和fork_join_any和fork_join_none; Note: fork_join在Verilog中也有&#xff0c;只有其他的两个是SV中独有的&#xff1b; 1 一目了然 1.…

FreeRTOS自我救赎3之USB虚拟串口

任何项目的功能都从需求出发&#xff0c;在这里我用的是斥侯蜂的一块STM32F407ZGT6 在开发一个项目的过程中&#xff0c;免不了串口调试&#xff0c;而这块板子板载的mircousb不是直接连的引脚而是一个OTGUSB

SpringCloud Alibaba - Seata 部署 TC 服务,并集成微服务

目录 一、Seata 架构 1.1、Seata 架构重要角色 1.2、部署 TC 服务 1.2.1、前言 1.2.2、下载 seata-server 包&#xff0c;解压 1.2.3、修改配置 1.2.4、在 nacos 中添加配置 1.2.5、创建数据库表 1.2.6、启动 TC 服务 1.3、微服务集成 Seata 1.3.1、引入依赖 1.3.2、…

SpringBoot 实现数据脱敏

SpringBoot 实现数据脱敏 前言Hutool 实现数据脱敏引入依赖脱敏工具类代码实现 使用注解的方式定义枚举自定义序列化类定义注解测试 前言 数据脱敏是指对某些敏感信息通过脱敏规则进行数据的变形&#xff0c;实现敏感隐私数据的可靠保护。 数据脱敏常用规则有替换、重排、加密…