RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)
服务注册
- 服务注册
- a.添加服务节点和主机节点
- b.抽象注册中心
- c.本地服务列表
服务注册
a.添加服务节点和主机节点
主要完成服务注册和发现的功能,其具体流程如下:
-
1.服务提供方将服务注册到注册中心中
-
2.消费端拉取服务列表。
-
3.消费端简单的选取一个可以服务(后续会进行改造,实现负载均衡)
1.修改framework/common的Constants
类:定义注册中心的路径常量
public class Constant {// 略........// 服务提供方的在注册中心的基础路径public static final String BASE_PROVIDERS_PATH = "/dcyrpc-metadata/providers";// 服务调用方的在注册中心的基础路径public static final String BASE_CONSUMERS_PATH = "/dcyrpc-metadata/consumers";
}
2.在core中引入common的依赖项
3.修改framework/core的DcyRpcBootstrap
类:定义一些相关的基础配置
- 定义相关变量:应用名称, Config, 默认端口
- 定义Zookeeper实例
- 完善方法代码:application() / registry() / protocol() / publish()
- publish()发布服务:将接口与匹配的实现注册到服务中心
// 略......
private static final DcyRpcBootstrap dcyRpcBootstrap = new DcyRpcBootstrap();
// 定义一些相关的基础配置
private String applicationName = "default";
private RegistryConfig registryConfig;
private ProtocolConfig protocolConfig;
private int port = 8088;// 维护一个Zookeeper实例
private ZooKeeper zooKeeper;// 略....../*** 定义当前应用的名字* @param applicationName 应用名称* @return*/
public DcyRpcBootstrap application(String applicationName) {this.applicationName = applicationName;return this;
}/*** 配置一个注册中心* @param registryConfig 注册中心* @return this*/
public DcyRpcBootstrap registry(RegistryConfig registryConfig) {// 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合zooKeeper = ZookeeperUtils.createZookeeper();this.registryConfig = registryConfig;return this;
}/*** 配置当前暴露的服务使用的协议* @param protocolConfig 协议的封装* @return this*/
public DcyRpcBootstrap protocol(ProtocolConfig protocolConfig) {this.protocolConfig = protocolConfig;if (log.isDebugEnabled()) {log.debug("当前工程使用了:{}协议进行序列化", protocolConfig.toString());}return this;
}/*** --------------------------------服务提供方的相关api--------------------------------*//*** 发布服务:将接口与匹配的实现注册到服务中心* @param service 封装需要发布的服务* @return*/
public DcyRpcBootstrap publish(ServiceConfig<?> service) {// 服务名称的节点String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + service.getInterface().getName();// 判断节点是否存在,不存在则创建节点(持久)if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) {ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null);ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 创建本机的临时节点,ip:port// 服务提供方的端口(一般自己设定),还需要获取ip的方法// /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088String node = parentNode + "/" + NetUtils.getIp() + ":" + port;if (!ZookeeperUtils.existNode(zooKeeper, node, null)) {ZookeeperNode zookeeperNode = new ZookeeperNode(node, null);ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);}if (log.isDebugEnabled()) {log.debug("服务{},已经被注册", service.getInterface().getName());}return this;
}// 略....../*** 启动netty服务*/
public void start() {try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}
}// 略......}
4.修改framework/common的utils.zookeeper.ZookeeperUtils
类:添加方法:判断节点是否存在
/*** 判断节点是否存在* @param zooKeeper* @param node* @param watcher* @return true:存在 false:不存在*/
public static boolean existNode(ZooKeeper zooKeeper, String node, Watcher watcher) {try {return zooKeeper.exists(node, watcher) != null;} catch (KeeperException | InterruptedException e) {log.error("判断节点:{} 是否存在时发生异常:", node, e);throw new ZookeeperException(e);}
}
5.修改framework/common的exceptions.ZookeeperException
类:完善内容
public class ZookeeperException extends RuntimeException{public ZookeeperException() {super();}public ZookeeperException(Throwable cause) {super(cause);}
}
6.在framework/common的utils
包下,创建NetUtils
类:Network工具类
/*** Network utils*/
@Slf4j
public class NetUtils {public static String getIp() {try {// 获取所有的网卡信息Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();while (interfaces.hasMoreElements()) {NetworkInterface iface = interfaces.nextElement();// 过滤非回环接口和虚拟接口if (iface.isLoopback() || iface.isVirtual() || !iface.isUp()) {continue;}Enumeration<InetAddress> addresses = iface.getInetAddresses();while (addresses.hasMoreElements()) {InetAddress addr = addresses.nextElement();// 过滤IPv6地址和回环地址if (addr instanceof Inet6Address || addr.isLoopbackAddress()) {continue;}String ipAddress = addr.getHostAddress();System.out.println("局域网IP地址:" + ipAddress);return ipAddress;}}throw new NetworkException();} catch (SocketException e) {log.error("获取局域网IP时发送异常", e);throw new NetworkException(e);}}
}
7.在framework/common的exceptions
包下创建NetworkException
类:编写自定义异常
public class NetworkException extends RuntimeException{public NetworkException() {super();}public NetworkException(String message) {super(message);}public NetworkException(Throwable cause) {super(cause);}
}
b.抽象注册中心
在当前项目中我们的确使用的是zookeeper作为我们项目的注册中心。但是,我们希望在我们的项目是可以扩展使用其他类型的注册中心的,如nacos,redis,甚至是自己独立开发注册中心。为后来的扩展提供可能性,所以在整个工程中我们再也不能单独的面向具体的对象编程,而是面向抽象,我们将抽象出**【注册中心】**整个抽象的概念
1.在core下com.dcyrpc
下创建discovery
包,创建Registry
接口:抽象注册中心接口:注册服务,发现服务,下线服务
/*** 抽象注册中心:注册服务,发现服务,下线服务*/
public interface Registry {/*** 注册服务* @param serviceConfig 服务的配置内容*/public void register(ServiceConfig<?> serviceConfig);
}
2.在core下com.dcyrpc.discovery
下创建AbstractRegistry
抽象类:提炼共享内容,还可以做模板方法 (待开发)
/*** 提炼共享内容,还可以做模板方法* 所有注册中心都有的公共方法*/
public abstract class AbstractRegistry implements Registry{
}
3.在core下com.dcyrpc.discovery
下创建impl
包,创建ZookeeperRegistry
类继承AbstractRegistry
- 把
DcyRpcBootstrap
类里的publish()
方法,提炼到该类中
@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {private ZooKeeper zooKeeper = ZookeeperUtils.createZookeeper();@Overridepublic void register(ServiceConfig<?> service) {// 服务名称的节点String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + service.getInterface().getName();// 判断节点是否存在,不存在则创建节点(持久)if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) {ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null);ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 创建本机的临时节点,ip:port// 服务提供方的端口(一般自己设定),还需要获取ip的方法// /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088// TODO:后续处理端口问题String node = parentNode + "/" + NetUtils.getIp() + ":" + 8088;if (!ZookeeperUtils.existNode(zooKeeper, node, null)) {ZookeeperNode zookeeperNode = new ZookeeperNode(node, null);ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);}if (log.isDebugEnabled()) {log.debug("服务{},已经被注册", service.getInterface().getName());}}
}
4.修改DcyRpcBootstrap
// 略.....
private static final DcyRpcBootstrap dcyRpcBootstrap = new DcyRpcBootstrap();// 定义一些相关的基础配置
private String applicationName = "default";
private RegistryConfig registryConfig;
private ProtocolConfig protocolConfig;
private int port = 8088;// 注册中心
private Registry zookeeperRegistry;// 略...../*** 配置一个注册中心* @param registryConfig 注册中心* @return this*/
public DcyRpcBootstrap registry(RegistryConfig registryConfig) {// 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合// 使用 registryConfig 获取一个注册中心this.zookeeperRegistry = registryConfig.getRegistry();return this;
}// 略...../*** 发布服务:将接口与匹配的实现注册到服务中心* @param service 封装需要发布的服务* @return*/
public DcyRpcBootstrap publish(ServiceConfig<?> service) {// 抽象了注册中心的概念,使用注册中心的一个实现完成注册zookeeperRegistry.register(service);return this;
}/*** 批量发布服务* @param services 封装需要发布的服务集合* @return this*/
public DcyRpcBootstrap publish(List<ServiceConfig<?>> services) {for (ServiceConfig<?> service : services) {this.publish(service);}return this;
}// 略.....}
5.修改RegistryConfig
类,将类放入discovery
包下
public class RegistryConfig {// 定义连接的 urlprivate final String connectString;public RegistryConfig(String connectString) {this.connectString = connectString;}/*** 可以使用简单工厂来完成* @return 具体的注册中心实例*/public Registry getRegistry() {// 1.获取注册中心// 1.获取类型// 2.获取主机地址String registryType = getRegistryType(connectString, true).toLowerCase().trim();if (registryType.equals("zookeeper")) {String host = getRegistryType(connectString, false);return new ZookeeperRegistry(host, Constant.TIME_OUT);}throw new DiscoveryException("未发现合适的注册中心");}private String getRegistryType(String connectString, boolean ifType) {String[] typeAndHost = connectString.split("://");if (typeAndHost.length != 2) {throw new RuntimeException("给定的注册中心连接url不合法");}if (ifType){return typeAndHost[0];}else {return typeAndHost[1];}}
}
6.在framework/common的exceptions
中创建DiscoveryException
类:服务注册与发现异常处理
/*** 服务注册与发现异常处理*/
public class DiscoveryException extends RuntimeException{public DiscoveryException() {super();}public DiscoveryException(String message) {super(message);}public DiscoveryException(Throwable cause) {super(cause);}
}
c.本地服务列表
服务调用方需要通过服务中心发现服务列表
- 1.使用Map进行服务列表的存储
- 2.使用动态代理生成代理对象
- 3.从注册中心,寻找一个可用的服务
1.修改DcyRpcBootstrap
部分代码:使用Map进行服务列表的存储
// 维护已经发布且暴露的服务列表 key:interface的全限定名 value:ServiceConfig
private static final Map<String, ServiceConfig<?>> SERVERS_LIST = new HashMap<>(16);/*** 发布服务:将接口与匹配的实现注册到服务中心* @param service 封装需要发布的服务* @return*/
public DcyRpcBootstrap publish(ServiceConfig<?> service) {// 抽象了注册中心的概念,使用注册中心的一个实现完成注册zookeeperRegistry.register(service);// 1.当服务调用方,通过接口、方法名、具体的方法参数列表 发起调用,提供方怎么知道使用哪一个实现// (1) new 1 个// (2) spring beanFactory.getBean(Class)// (3) 自己维护映射关系SERVERS_LIST.put(service.getInterface().getName(), service);return this;
}
2.修改ReferenceConfig
部分代码
// 略.....
private Registry registry;/*** 代理设计模式,生成一个API接口的代理对象* @return 代理对象*/
public T get() {// 使用动态代理完成工作ClassLoader classLoader = Thread.currentThread().getContextClassLoader();Class[] classes = new Class[]{interfaceRef};// 使用动态代理生成代理对象Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1.发现服务,从注册中心,寻找一个可用的服务// 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)InetSocketAddress address = registry.lookup(interfaceRef.getName());if (log.isInfoEnabled()){log.debug("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);}// 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果return null;}});return (T) helloProxy;
}public Registry getRegistry() {return registry;
}public void setRegistry(Registry registry) {this.registry = registry;
}
3.修改Registry
接口,添加发现服务的接口
/*** 从注册中心拉取一个可用的服务* @param serviceName 服务名称* @return 服务的ip+端口*/
InetSocketAddress lookup(String serviceName);
4.修改ZookeeperRegistry
部分代码,实现发现服务的业务逻
@Override
public InetSocketAddress lookup(String serviceName) {// 1.找到对应服务的节点String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName;// 2.从zk中获取它的子节点,List<String> children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, null);// 获取所有的可用的服务列表List<InetSocketAddress> inetSocketAddressList = children.stream().map(ipString -> {String[] ipAndPort = ipString.split(":");String ip = ipAndPort[0];int port = Integer.valueOf(ipAndPort[1]);return new InetSocketAddress(ip, port);}).toList();if (inetSocketAddressList.size() == 0){throw new DiscoveryException("未发现任何可用的服务主机");}return inetSocketAddressList.get(0);
}
5.修改ZookeeperUtils
部分代码,添加与实现获取子节点的方法
/*** 查询一个节点的子元素* @param zooKeeper* @param serviceNode 服务节点* @return 子元素列表*/
public static List<String> getChildren(ZooKeeper zooKeeper, String serviceNode, Watcher watcher) {try {return zooKeeper.getChildren(serviceNode, watcher);} catch (KeeperException | InterruptedException e) {log.error("获取节点{}的子元素时发生异常:{}", serviceNode, e);throw new ZookeeperException(e);}
}