Netty基础—6.Netty实现RPC服务三

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

5.Netty服务端之RPC服务提供端的处理

(1)RPC服务提供端NettyServer

(2)基于反射调用请求对象的目标方法

(1)RPC服务提供端NettyRpcServer

public class ServiceConfig {private String serviceName;//调用方的服务名称private Class serviceInterfaceClass;//服务接口类型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到这一步为止,server启动了而且监听指定的端口号了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//进入一个阻塞的状态,同步一直等待到你的server端要关闭掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多个服务public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}

(2)基于反射调用请求对象的目标方法

//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//参数类型private Object[] args;//参数值private String invokerApplicationName;//调用方的服务名称private String invokerIp;//调用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此时我们要实现什么呢?//我们需要根据RpcRequest指定的class,获取到这个class//然后通过反射构建这个class对象实例//接着通过反射获取到这个RpcRequest指定方法和入参类型的method//最后通过反射调用,传入方法,拿到返回值//根据接口名字拿到接口实现类的名字后再获取类ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc调用结果封装到响应里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}

6.RPC服务调用端实现超时功能

public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}});       try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//标记一下请求发起的时间NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除发起请求时间的标记NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/34525.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

「BWAPP靶场通关记录(1)」A1注入漏洞

BWAPP通关秘籍&#xff08;1&#xff09;&#xff1a;A1 injection 1.HTML Injection - Reflected (GET)1.1Low1.2Medium1.3High 2.HTML Injection - Reflected (POST)2.1Low2.2Medium2.3High 3.HTML Injection - Reflected (URL)3.1Low3.2/3.3Medium/HIgh 4.HTML Injection - …

机器学习算法实战——敏感词检测(主页有源码)

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​ ​​​ 1. 引言 随着互联网的快速发展&#xff0c;信息传播的速度和范围达到了前所未有的高度。然而&#xff0c;网络空间中也充斥着大量的…

Ollama+DeepSeek+NatCross内网穿透本地部署外网访问教程

目录 一、Ollama 简介 二、下载Ollama 三、下载并运行 DeepSeek 模型 四、运行 DeepSeek 模型 五、NatCross免费内网穿透 六、配置 ChatBox 连接 Ollama 七、外网使用ChatBox体验 一、Ollama 简介 Ollama 是一个开源的本地大模型部署工具&#xff0c;旨在让用户能够在个…

联想台式电脑启动项没有U盘

开机按F12&#xff0c;进入启动设备菜单&#xff0c;发现这里没有识别到插在主机的U盘&#xff1f; 解决方法 1、选上图的Enter Setup或者开机按F2&#xff0c;进入BIOS设置 选择Startup -> Primary Boot Sequence 2、选中“Excludeed from boot order”中U盘所在的一行 …

开源链动 2+1 模式 AI 智能名片 S2B2C 商城小程序助力社群发展中榜样影响力的提升

摘要&#xff1a;本文深入剖析了社群发展进程中榜样所承载的关键影响力&#xff0c;并对开源链动 21 模式 AI 智能名片 S2B2C 商城小程序在增强这一影响力方面所具备的潜力进行了全面探讨。通过对不同类型社群&#xff0c;如罗辑思维社群和 007 不出局社群中灵魂人物或意见领袖…

《交互式线性代数》

《交互式线性代数》 *Interactive Linear Algebra*由Dan Margalit和Joseph Rabinoff编写&#xff0c;是一本聚焦线性代数的教材。本书旨在教授线性代数的核心概念、方法及其应用&#xff0c;通过代数与几何相结合的方式&#xff0c;帮助读者深入理解线性代数的本质&#xff0c…

CSS -属性值的计算过程

目录 一、抛出两个问题1.如果我们学过优先级关系&#xff0c;那么请思考如下样式为何会生效2.如果我们学习过继承&#xff0c;那么可以知道color是可以被子元素继承使用的&#xff0c;那么请思考下述情景为何不生效 二、属性值计算过程1.确定声明值2.层叠冲突3.使用继承4.使用默…

生活中的可靠性小案例11:窗户把手断裂

窗户把手又断了&#xff0c;之前也断过一次&#xff0c;使用次数并没有特别多。上方的图是正常的把手状态&#xff0c;断的形状如下方图所示。 这种悬臂梁结构&#xff0c;没有一个良好的圆角过渡&#xff0c;导致应力集中。窗户的开关&#xff0c;对应的是把手的推拉&#xff…

怎么解决在Mac上每次打开文件夹都会弹出一个新窗口的问题

在Mac上每次打开文件夹都会弹出一个新窗口的问题&#xff0c;可以通过以下方法解决‌ ‌调整Finder设置‌&#xff1a; 打开Finder&#xff0c;点击“Finder”菜单&#xff0c;选择“偏好设置”。在偏好设置中&#xff0c;选择“通用”标签。取消勾选“在标签页中打开文件夹”或…

HOT100——栈篇Leetcode739. 每日温度

文章目录 题目&#xff1a;Leetcode160. 相交链表原题链接思路代码 题目&#xff1a;Leetcode160. 相交链表 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温…

C++ 返回值优化(Return Value Optimization)

Intro 返回值优化(Return Value Optimization, RVO)是 C中的一种编译器优化技术, 它允许编译器在某些情况下省略临时对象的创建和复制/移动操作, 从而提高程序性能. RVO 主要应用于函数返回值的场景. 两种形式的 RVO 假定我们有这样一个类: class MyClass {std::string nam…

C++内存管理(复习)

1.动态申请多个某类型的空间并初始化 //动态申请10个int类型的空间并初始化为0到9int* p new int[10]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; delete[] p; //销毁 2.new/delete new:开空间构造函数 delete:析构函数释放空间 new和delete是用户进行动态内存申请和释放的操作符&#…

计算机视觉——深入理解卷积神经网络与使用卷积神经网络创建图像分类算法

引言 卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;简称 CNNs&#xff09;是一种深度学习架构&#xff0c;专门用于处理具有网格结构的数据&#xff0c;如图像、视频等。它们在计算机视觉领域取得了巨大成功&#xff0c;成为图像分类、目标检测、图像分…

Java数据结构第二十三期:Map与Set的高效应用之道(二)

专栏&#xff1a;Java数据结构秘籍 个人主页&#xff1a;手握风云 目录 一、哈希表 1.1. 概念 1.2. 冲突 1.3. 避免冲突 1.4. 解决冲突 1.5. 实现 二、OJ练习 2.1. 只出现一次的数字 2.2. 随机链表的复制 2.3. 宝石与石头 一、哈希表 1.1. 概念 顺序结构以及平衡树中…

OSPF | LSDB 链路状态数据库 / SPF 算法 / 实验

注&#xff1a;本文为 “OSPF | LSDB / SPF ” 相关文章合辑。 LSDB 和 SPF 算法 潇湘浪子的蹋马骨汤 发布 2019-02-15 23:58:46 1. 链路状态数据库 (LSDB) 链路状态协议除了执行洪泛扩散链路状态通告&#xff08;LSA&#xff09;以及发现邻居等任务外&#xff0c;其第三个任…

Android Framework 之了解系统启动流程二

Android Framework 源码阅读系列篇章有&#xff1a; 系统启动流程一之init进程和zygote进程启动分析系统启动流程二之SystemServer进程启动分析 1. SystemServer 进程启动分析 在 系统启动流程一之init进程和zygote进程启动分析 中分析 zygote 进程时&#xff0c;我们知道了…

阿里云企业邮箱出现故障怎么处理?

阿里云企业邮箱出现故障怎么处理&#xff1f; 以下是处理阿里云企业邮箱故障的详细分步指南&#xff0c;帮助您快速定位问题并恢复邮箱正常使用&#xff1a; 一、初步排查&#xff1a;确认故障范围与现象 确定影响范围 全体用户无法使用 → 可能为阿里云服务端故障或网络中断。…

Python----数据分析(Pandas二:一维数组Series,Series的创建,Series的属性,Series中元素的索引与访问)

一、一维数组Series Series&#xff1a;一维数组,与Numpy中的一维array类似。它是一种类似于一维数组的对象&#xff0c;是由一组数据(各种 NumPy 数据类型)以及一组与之相关的数据标签(即索引)组成。 仅由一组数据也可产生简单的 Series 对象&#xff0c;用值列表生成 Series …

小程序配置

注册小程序账号和安装开发工具 参考文档&#xff1a;注册小程序账号和安装开发工具https://blog.csdn.net/aystl_gss/article/details/127878658 HBuilder新建项目 填写项目名称&#xff0c;选择UNI-APP&#xff0c;修改路径&#xff0c;点击创建 manifest.json 配置 需要分别…

前端UI编程基础知识:基础三要素(结构→表现→行为)

以下是重新梳理的前端UI编程基础知识体系&#xff0c;结合最新技术趋势与实战要点&#xff0c;以更适合快速掌握的逻辑结构呈现&#xff1a; 一、基础三要素&#xff08;结构→表现→行为&#xff09; 1. HTML5 核心能力 • 语义化标签&#xff1a;<header>, <nav&g…