基于netty实现简易版rpc服务-代码实现

1 公共部分

1.1 请求、响应对象

@Data
public class RpcRequest {private String serviceName;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;
}
@Data
public class RpcResponse {private int code;private String msg;private Object data;private String ex;
}

1.2 rpc协议

@Data
public class RpcProtocol {private int length;private byte[] content;}

1.3 简易注册中心,保存服务名和地址的映射

public class ServiceRegister {private Map<String, List<String>> register = new HashMap<>();public ServiceRegister() {register.put(RpcService.class.getName(), new ArrayList<>(List.of("localhost:1733")));}public List<String> findService(String serviceName) {return register.get(serviceName);}}

1.4 rpc上下文,用来获取单例的ServiceRegister

public class RpcContext {public static ServiceRegister register() {return RpcRegisterHodler.REGISTER;}private static class RpcRegisterHodler {private static final ServiceRegister REGISTER = new ServiceRegister();}}

1.7 帧解码器

// 帧解码器,要配置在ChannelPipeline的第一个,这样才能解决入站数据的粘包和半包
public class RpcFrameDecoder extends LengthFieldBasedFrameDecoder {public RpcFrameDecoder() {super(1024, 0, 4);}
}
// rpc协议的编解码器
public class RpcProtocolCodec extends ByteToMessageCodec<RpcProtocol> {// 将rpc协议对象编码成字节流@Overrideprotected void encode(ChannelHandlerContext ctx, RpcProtocol msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getContent());}// 将字节流解码成rpc协议对象@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);RpcProtocol protocol = new RpcProtocol();protocol.setLength(length);protocol.setContent(content);out.add(protocol);}
}
// rpc请求对象的编解码器
public class RpcRequestCodec extends MessageToMessageCodec<RpcProtocol, 
RpcRequest> {// 将请求对象编码成rpc协议对象@Overrideprotected void encode(ChannelHandlerContext ctx, RpcRequest msg, List<Object> out) throws Exception {byte[] content = JSON.toJSONBytes(msg);int length = content.length;RpcProtocol rpcProtocol = new RpcProtocol();rpcProtocol.setLength(length);rpcProtocol.setContent(content);out.add(rpcProtocol);}// 将rpc协议对象解码成请求对象@Overrideprotected void decode(ChannelHandlerContext ctx, RpcProtocol msg, List<Object> out) throws Exception {RpcRequest request = JSON.parseObject(msg.getData(),RpcRequest.class,JSONReader.Feature.SupportClassForName);out.add(request);}
}
// rpc响应对象的编解码器
public class RpcResponseCodec extends MessageToMessageCodec<RpcProtocol, 
RpcResponse> {// 将响应对象编码成rpc协议对象@Overrideprotected void encode(ChannelHandlerContext ctx, RpcResponse msg, List<Object> out) throws Exception {byte[] content = JSON.toJSONBytes(msg);int length = content.length;RpcProtocol rpcProtocol = new RpcProtocol();rpcProtocol.setLength(length);rpcProtocol.setContent(content);out.add(rpcProtocol);}// 将rpc协议对象解码成响应对象@Overrideprotected void decode(ChannelHandlerContext ctx, RpcProtocol msg, List<Object> out) throws Exception {RpcResponse response = JSON.parseObject(msg.getContent(), RpcResponse.class);out.add(response);}
}

1.6 服务接口

public interface RpcService {String hello(String name);}

2 服务端

2.1 接口实现类

@Slf4j
public class RpcServiceImpl implements RpcService {@Overridepublic String hello(String name) {log.info("service received: {} ", name);return "hello " + name;}
}

2.2 接口名和实现类的对象映射,通过接口名查找对应的实现类对象

public class ServiceMapping {private Map<String, RpcService> mappings = new HashMap<>();public ServiceMapping() {mappings.put(RpcService.class.getName(), new RpcServiceImpl());}public void registerMapping(String serviceName, RpcService service) {mappings.put(serviceName, service);}public RpcService findMapping(String serviceName) {return mappings.get(serviceName);}}

2.2 服务端rpc上下文,用来获取单例的ServiceMapping

public class RpcServerContext {public static ServiceMapping mapping() {return RpcMappingrHodler.MAPPING;}private static class RpcMappingrHodler {private static final ServiceMapping MAPPING = new ServiceMapping();}}

2.3 业务处理器handler

@Slf4j
public class RpcServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest request = (RpcRequest) msg;RpcResponse response = invoke(request);ctx.writeAndFlush(response);}private RpcResponse invoke(RpcRequest request) {RpcResponse response = new RpcResponse();try {ServiceMapping register = RpcServerContext.mapping();RpcService rpcService = register.findMapping(request.getServiceName());String methodName = request.getMethodName();Class<?>[] parameterTypes = request.getParameterTypes();Object[] parameters = request.getParameters();// invokeMethod method = RpcService.class.getDeclaredMethod(methodName, parameterTypes);Object result = method.invoke(rpcService, parameters);//response.setCode(200);response.setMsg("ok");response.setData(result);} catch (Exception e) {response.setCode(500);response.setMsg("error");response.setEx(e.getMessage());}return response;}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive :{}", ctx.channel().remoteAddress());ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught :{}", ctx.channel().remoteAddress(), cause);ctx.close();}
}

2.4 启动类

public class RpcServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ChannelFuture channelFuture = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcFrameDecoder());ch.pipeline().addLast(new RpcProtocolCodec());ch.pipeline().addLast(new RpcRequestCodec());ch.pipeline().addLast(new RpcResponseCodec());
//                            ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new RpcServerHandler());}}).bind(1733);channelFuture.sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

3 客户端

3.2 客户端rpc上下文,用来处理channel的响应数据

public class RpcClientContext {private Map<Channel, Promise<Object>> promises = new HashMap<>();public Promise<Object> getPromise(Channel channel) {return promises.remove(channel);}public void setPromise(Channel channel, Promise<Object> promise) {promises.put(channel, promise);}}

3.2 业务处理器handler

@Slf4j
public class RpcClientHandler extends ChannelInboundHandlerAdapter {private final RpcClientContext rpcClientContext;public RpcClientHandler(RpcClientContext rpcClientContext) {this.rpcClientContext = rpcClientContext;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("rpc invoke response: {}", msg);RpcResponse response = (RpcResponse) msg;//Promise<Object> promise = rpcClientContext.getPromise(ctx.channel());//if (response.getEx() != null)promise.setFailure(new RuntimeException(response.getEx()));elsepromise.setSuccess(response.getData());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive :{}", ctx.channel().remoteAddress());ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught :{}", ctx.channel().remoteAddress(), cause);ctx.close();}
}

3.3 启动类

@Slf4j
public class RpcClient {private final Map<String, NioSocketChannel> nioSocketChannels = new HashMap<>();private final RpcClientContext rpcClientContext = new RpcClientContext();public RpcService rpcService() {String serviceName = RpcService.class.getName();List<String> services = RpcContext.register().findService(serviceName);String url = services.get(0);if (!nioSocketChannels.containsKey(url)) {NioSocketChannel nioSocketChannel = createNioSocketChannel(url);nioSocketChannels.put(url, nioSocketChannel);log.info("create a new channel: {}", nioSocketChannel);}final NioSocketChannel nioSocketChannel = nioSocketChannels.get(url);return (RpcService) Proxy.newProxyInstance(RpcClient.class.getClassLoader(), new Class[]{RpcService.class},(proxy, method, args) -> {RpcRequest request = new RpcRequest();request.setServiceName(RpcService.class.getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);nioSocketChannel.writeAndFlush(request);// wait responseDefaultPromise<Object> promise = new DefaultPromise<>(nioSocketChannel.eventLoop());rpcClientContext.setPromise(nioSocketChannel, promise);promise.await();if (!promise.isSuccess())throw new RuntimeException(promise.cause());return promise.getNow();});}private NioSocketChannel createNioSocketChannel(String url) {//String host = url.substring(0, url.indexOf(":"));int port = Integer.parseInt(url.substring(url.indexOf(":") + 1));//EventLoopGroup group = new NioEventLoopGroup();try {ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcFrameDecoder());ch.pipeline().addLast(new RpcProtocolCodec());ch.pipeline().addLast(new RpcResponseCodec());ch.pipeline().addLast(new RpcRequestCodec());
//                            ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new RpcClientHandler(rpcClientContext));}}).connect(host, port);channelFuture.sync();channelFuture.channel().closeFuture().addListener(future -> {nioSocketChannels.remove(RpcService.class.getName());group.shutdownGracefully();});//return (NioSocketChannel) channelFuture.channel();} catch (InterruptedException e) {throw new RuntimeException(e);}}private void close() {nioSocketChannels.values().forEach(NioSocketChannel::close);}public static void main(String[] args) {RpcClient rpcClient = new RpcClient();RpcService rpcService = rpcClient.rpcService();String netty = rpcService.hello("netty");System.out.println(netty);String world = rpcService.hello("world");System.out.println(world);String java = rpcService.hello("java");System.out.println(java);rpcClient.close();}}

4 总结

这样就实现了简单的rpc服务,通过公共部分的接口、注册中心、编解码器、服务端的服务映射,客户端就能进行远程过程调用了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/451881.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Thread类的介绍

线程是操作系统中的概念&#xff0c;操作系统中的内核实现了线程这种机制&#xff0c;同时&#xff0c;操作系统也提供了一些关于线程的API让程序员来创建和使用线程。 在JAVA中&#xff0c;Thread类就可以被视为是对操作系统中提供一些关于线程的API的的进一步的封装。 多线…

PHP(一)从入门到放弃

参考文献&#xff1a;https://www.php.net/manual/zh/introduction.php PHP 是什么&#xff1f; PHP&#xff08;“PHP: Hypertext Preprocessor”&#xff0c;超文本预处理器的字母缩写&#xff09;是一种被广泛应用的开放源代码的多用途脚本语言&#xff0c;它可嵌入到 HTML…

从新手到高手:Spring AOP的进阶指南

目录 一、AOP简介 1.1 AOP入门案例 1.2 AOP 优点 二、核心概念 2.1 切面(Aspect) 2.2 切点(PointCut) 2.3 通知(Advice) 2.4 织入(Weaving) 三、AOP 原理 3.1 CGLIB 与 JDK动态代理对比 3.2 切面优先级 四、总结 一、AOP简介 AOP(Aspect-Oriented Programming) 面向切面编…

在各大媒体报纸上刊登自己的文章用什么投稿方法发表快?

在职场中,信息宣传是每个单位的重要工作,而每个月的考核投稿任务更是让我深感压力。作为一名普通员工,我常常面临着如何在各大媒体上顺利发表文章的问题。起初,我选择了传统的邮箱投稿方式,然而这条路却让我陷入了无尽的焦虑和挫败之中。 刚开始投稿时,我满怀激情,认真撰写每一…

[论文笔记]HERMES 3 TECHNICAL REPORT

引言 今天带来论文HERMES 3 TECHNICAL REPORT&#xff0c;这篇论文提出了一个强大的工具调用模型&#xff0c;包含了训练方案介绍。同时提出了一个函数调用标准。 为了简单&#xff0c;下文中以翻译的口吻记录&#xff0c;比如替换"作者"为"我们"。 聊天模…

数据库事务

为了保证一致性 1.ACID 事务具有四个基本特性&#xff0c;也就是通常所说的 ACID 特性&#xff0c;即原子性&#xff08;Atomicity&#xff09;、一致性&#xff08;Consistency&#xff09;、隔离性&#xff08;Isolation&#xff09;和持久性&#xff08;Durability&#x…

算法: 模拟题目练习

文章目录 模拟替换所有的问号提莫攻击Z 字形变换外观数列数青蛙 总结 模拟 替换所有的问号 按照题目的要求写代码即可~ public String modifyString(String ss) {int n ss.length();if (n 1) {return "a";}char[] s ss.toCharArray();for (int i 0; i < n; i…

使用Python和Proxy302代理IP高效采集Bing图片

目录 项目背景一、项目准备环境配置 二、爬虫设计与实现爬虫设计思路目标网站分析数据获取流程 代码实现1. 初始化爬虫类&#xff08;BingImageSpider&#xff09;2. 创建存储文件夹3. 获取图像链接4. 下载图片5. 使用Proxy302代理IP6. 主运行函数 运行截图 三、总结 项目背景 …

SpringMVC一个拦截器和文件上传下载的完整程序代码示例以及IDEA2024部署报错 找不到此 Web 模块的 out\artifacts\..问题

一、SpringMVC一个拦截器和文件上传下载的完整程序代码示例 本文章是一个 SpringMVC拦 截器和文件上传下载的完整程序代码示例&#xff0c;使用的开发工具是 IntelliJ IDEA 2024.1.6 (Ultimate Edition)&#xff0c; 开发环境是 OpenJDK-21 java version 21.0.2。Tomcatt版本为…

【C++篇】类与对象的秘密(上)

目录 引言 一、类的定义 1.1类定义的基本格式 1.2 成员命名规范 1.3 class与struct的区别 1.4 访问限定符 1.5 类的作用域 二、实例化 2.1 类的实例化 2.2 对象的大小与内存对齐 三、this 指针 3.1 this指针的基本用法 3.2 为什么需要this指针&#xff1f; 3.3 t…

基于SSM+微信小程序的房屋租赁管理系统(房屋2)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 基于SSM微信小程序的房屋租赁管理系统实现了有管理员、中介和用户。 1、管理员功能有&#xff0c;个人中心&#xff0c;用户管理&#xff0c;中介管理&#xff0c;房屋信息管理&#xff…

Java基础-IO基础

IO是指input/output&#xff0c;即输入和输出。输入和输出是以内存为中心的&#xff1a; input 从外部往内存输入数据&#xff0c;比如硬盘中的数据写入内存等。 output 从内存往外输出数据&#xff0c;比如内存数据写入硬盘等。 File File类表示一个文件或者一个目录。使用F…

【服务器虚拟化是什么?】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

父子元素中只有子元素设置margin-bottom的问题

问题代码如下所示 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>.div1 {background-color: red;width: 80px;height: 80px;border: 1px solid orange;}.div2 {bac…

【飞腾加固服务器】全国产化解决方案:飞腾FT2000+/64核,赋能关键任务保驾护航

在信息安全和自主可控的时代背景下&#xff0c;国产化设备的需求与日俱增&#xff0c;尤其是在国防、航空航天、能源和其他关键行业。高可靠性和极端环境设计的国产加固服务器&#xff0c;搭载强大的飞腾FT2000/64核处理器&#xff0c;全面满足国产自主可控的严苛要求。 性能强…

光伏电站设计之辐照度效果(threejs实现)

类似 solaredge里面的日照度效果 1、由经纬度和屋顶朝向获取&#xff08;参考pvlib&#xff09;当前地区的辐照度值&#xff0c; 2、根据辐照度值插值获取对应辐照度的颜色。 3、计算片段着色器里面计算每个顶点的遮挡率和紫色混合 4、计算鼠标移动中的投射屋顶位置辐照度&…

Ansible自动化运维管理工具

一、Ansible 1.1、自动化运维管理工具有哪些&#xff1f; 工具架构语言使用情况Ansible无clientpython 协议用ssh95%puppetC/Sruby 协议用http基本不用chefC/Sruby 协议用http基本不用saltstackC/Spython 协议用ssh5% 1.2、Ansible简介 Ansible是一个基于Py…

网易翻译工具解析!这几大翻译器值得一试!

翻译工具的出现&#xff0c;使得跨语言沟通变得更加便捷。本文将为您推荐几款优秀的翻译工具&#xff0c;包括福昕在线翻译、福昕翻译客户端、海鲸AI翻译和网易有道翻译&#xff0c;帮助您在学习、工作和生活中轻松应对语言挑战。 福昕在线翻译 直达链接&#xff08;复制到浏…

c4d渲染和3d渲染有什么区别?c4d和3dmax哪个容易学?

在现代设计和创意产业中&#xff0c;3D渲染技术是不可或缺的一部分。它能够帮助设计师和艺术家将他们的创意转化为逼真的视觉效果&#xff0c;从而更好地展示和传达他们的想法。在众多3D渲染软件中&#xff0c;C4D渲染和3D Max是两款备受关注的软件。 本文将探讨C4D渲染和3D渲…

深度学习领域,你心目中 idea 最惊艳的论文是哪篇?

深度学习发展至今&#xff0c;共经历了三次浪潮&#xff0c;20 世纪40年代到60年代深度学习的雏形出现在控制论(cybernetics)中&#xff0c;20 世纪 80 年代 到 90 年代深度学习表现为 联结主义(connectionism)&#xff0c;直到 2006 年&#xff0c;才真正以深度学习之名复兴。…