netty rpc框架 即时通讯

Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,一般都是基于Netty来构建,比如sock-bolt。总之一句话,Java小伙伴们需要且有必要学会使用Netty并理解其实现原理。

netty旨在为可维护的高性能、高可扩展性协议服务器和客户端的快速开发提供异步事件驱动的网络应用程序框架和工具。换句话说,Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化并简化了TCP和UDP套接字服务器开发等网络编程。

学习netty原理细节,看netty源码是必不可少的,那首先来看下如何编译源码:

  1. 从github下载netty 4.x源码
  2. 如果缺少XxxObjectHashMap类,这些类是在编译时自动生成的,可以执行mvn clean install或者cd common && mvn clean install命令即可。
  3. 打开idea,开启源码阅读之旅 😃

除了看源码,可以结合一些书籍来看,学习效果更好。关于Netty的书籍,笔者这里推荐一本 李林锋 写的《Netty权威指南》,这本书对于Netty的基础概念和NIO部分讲解的还是不错的,不过有点地方感觉有点贴代码凑字数嫌疑,整体来说还算不错。

什么是Netty

Netty是一个事件驱动的高性能Java网络库,是一个隐藏了背后复杂性而提供一个易于使用的API的客户端/服务端框架。Netty以其高性能和可扩展性,使开发者专注于真正感兴趣的地方。它的一个主要目标就是促进“关注点分离”:使业务逻辑从网络基础设施应用程序中分离

不仅仅是Netty框架,其他框架的设计目的也大都是为了使业务程序和底层技术解耦,使程序员更加专注于业务逻辑实现,提高开发质量和效率。Netty为什么性能如此之高,主要是其内部的Reactor模型机制。

Netty核心组件

  • Bootstrap和ServerBootstrap:Netty应用程序通过设置bootstrap引导类来完成,该类提供了一个用于应用程序网络层配置的容器。Bootstrap服务端的是ServerBootstrap,客户端的是Bootstrap。
  • Channel:Netty 中的接口 Channel 定义了与 socket 丰富交互的操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。
  • ChannelHandler:ChannelHandler 支持很多协议,并且提供用于数据处理的容器,ChannelHandler由特定事件触发, 常用的一个接口是ChannelInboundHandler,该类型处理入站读数据(socket读事件)。
  • ChannelPipeline:ChannelPipeline 提供了一个容器给 ChannelHandler 链并提供了一个API 用于管理沿着链入站和出站事件的流动。每个 Channel 都有自己的ChannelPipeline,当 Channel 创建时自动创建的。 下图说明了ChannelHandler和ChannelPipeline二者的关系:

  • EventLoop:EventLoop 用于处理 Channel 的 I/O 操作。一个单一的 EventLoop通常会处理多个 Channel 事件。一个 EventLoopGroup 可以含有多于一个的 EventLoop 和 提供了一种迭代用于检索清单中的下一个。
  • ChannelFuture:Netty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后获取它的结果。出于这个目的,Netty 提供了接口 ChannelFuture,它的 addListener 方法

Netty 是一个非阻塞、事件驱动的网络框架。Netty 实际上是使用 Threads( 多线程) 处理 I/O事件的,对于熟悉多线程编程的读者可能会需要关注同步代码。这样的方式不好,因为同步会影响程序的性能,Netty 的设计保证程序处理事件不会有同步。因为某个Channel事件是被添加到一个EventLoop中的,以后该Channel事件都是由该EventLoop来处理的,而EventLoop是一个线程来处理的,也就是说Netty不需要同步IO操作,EventLoop与EventLoopGroup的关系可以理解为线程与线程池的关系一样。

Buffer(缓冲)

ByteBuf是字节数据的容器,所有的网络通信都是基于底层的字节流传输,ByteBuf 是一个很好的经过优化的数据容器,我们可以将字节数据有效的添加到 ByteBuf 中或从 ByteBuf 中获取数据。为了便于操作,ByteBuf 提供了两个索引:一个用于读,一个用于写。我们可以按顺序读取数据,也可以通过调整读取数据的索引或者直接将读取位置索引作为参数传递给get方法来重复读取数据。

ByteBuf使用模式

堆缓冲区ByteBuf将数据存储在 JVM 的堆空间,这是通过将数据存储在数组的实现。堆缓冲区可以快速分配,当不使用时也可以快速释放。它还提供了直接访问数组的方法,通过 ByteBuf.array() 来获取 byte[]数据。

堆缓冲区ByteBuf使用示例:

ByteBuf heapBuf = ...;
if (heapBuf.hasArray()) {byte[] array = heapBuf.array();int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();int length = heapBuf.readableBytes();handleArray(array, offset, length);
}

直接缓冲区ByteBuf,在 JDK1.4 中被引入 NIO 的ByteBuffer 类允许 JVM 通过本地方法调用分配内存,其目的是通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。DirectBuffer 在-XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响。

Netty示例代码

了解了Netty基础概念之后,一起看下Netty的使用示例,下面以TCP server、TCP client、http server为例,由于示例代码不难,所以不再赘述,直接上代码。

TCP Server
public static void main(String[] args) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap boot = new ServerBootstrap();boot.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(8080).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoHandler());}});// startChannelFuture future = boot.bind().sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {// shutdownbossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}public class EchoHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in = (ByteBuf) msg;System.out.println(in.toString(CharsetUtil.UTF_8));ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}
TCP client
public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(new EchoClientHandler());}});// Start the client.ChannelFuture f = b.connect("localhost", 8081).sync();f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}
}public class EchoClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf message;public EchoClientHandler() {message = Unpooled.buffer(256);message.writeBytes("hello netty".getBytes(CharsetUtil.UTF_8));}@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.writeAndFlush(message);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(((ByteBuf) msg).toString(CharsetUtil.UTF_8));ctx.write(msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

netty client端在什么时候将channel注册到selector上的呢?是在创建channel之后,就注册到selector的,相关代码在initAndRegister方法中:

final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建(netty自定义)Channel实例,并初始化// channel为 NioServerSocketChannel 实例,NioServerSocketChannel的父类AbstractNioChannel保存有nio的ServerSocketChannelchannel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 向Selector注册channelChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.//    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully//    added to the event loop's task queue for later execution.//    i.e. It's safe to attempt bind() or connect() now://         because bind() or connect() will be executed *after* the scheduled registration task is executed//         because register(), bind(), and connect() are all bound to the same thread.return regFuture;
}

initAndRegister之后会执行connect动作,注意,真正的channel.connect动作是由NioEventLoop线程来完成的,当连接三次握手完成之后,会触发该channel的ACCEPT事件,也就是NIOEventLoop中处理事件的流程。

Http server
public static void main(String[] args) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap boot = new ServerBootstrap();boot.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(8080).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("decoder", new HttpRequestDecoder()).addLast("encoder", new HttpResponseEncoder()).addLast("aggregator", new HttpObjectAggregator(512 * 1024)).addLast("handler", new HttpHandler());}});// startChannelFuture future = boot.bind().sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {// shutdownbossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.wrappedBuffer("hello netty".getBytes()));HttpHeaders heads = response.headers();heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + "; charset=UTF-8");heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); // 3heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);ctx.writeAndFlush(response);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/289613.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024-3-28 市场情绪强修复

这一轮退潮负反馈都修复了&#xff0c; 艾艾精工 博信股份 安奈尔 永悦科技 大理药业 &#xff0c;高新发展 也补跌了&#xff0c;收尸队也干活了&#xff0c;情绪不修复不接力得最好写照。这轮周期 宁科生物 已经7板&#xff0c;已经追平了 博信股份7板&#xff0c;看明天溢…

18.字面量

文章目录 一、字面量二、区分技巧三、扩展&#xff1a; /t 制表符 一、字面量 在有些资料&#xff0c;会把字面量说成常量、字面值常量&#xff0c;这种叫法都不是很正确&#xff0c;最正确的叫法还是叫做&#xff1a;字面量。 作用&#xff1a;告诉程序员&#xff0c;数据在…

环信IM集成教程---消息转发合并转发的实现

前言 在发送消息体系中&#xff0c;转发消息是一个重要的环节&#xff0c;可以单条转发也可以合并转发。本文教大家在接入环信IM过程中如何实现单条转发&#xff0c;合并转发消息功能&#xff0c;同时举例一些容易踩坑的位置&#xff0c;以便大家尽快顺利的实现转发消息功能。…

高效 CUDA 调试:将 NVIDIA Compute Sanitizer 与 NVIDIA 工具扩展结合使用并创建自定义工具

高效 CUDA 调试&#xff1a;将 NVIDIA Compute Sanitizer 与 NVIDIA 工具扩展结合使用并创建自定义工具 NVIDIA Compute Sanitizer 是一款功能强大的工具&#xff0c;可以节省您的时间和精力&#xff0c;同时提高 CUDA 应用程序的可靠性和性能。 在 CUDA 环境中调试代码既具有挑…

Exception in thread “main“ com.fasterxml.jackson.databind.JsonMappingException:

问题&#xff1a;jaskson反序列化超出最大长度 Caused by: com.fasterxml.jackson.core.exc.StreamConstraintsException: String length (5043456) exceeds the maximum length (5000000) 场景&#xff1a;前端传递过大base64 原因&#xff1a; jaskon默认已经限制了最大长…

20个超实用Python魔法方法

大家好&#xff01;今天我们要一起探索Python世界的神秘角落——那些被称为“魔法方法”的特殊成员方法。它们就像是编程中的魔法咒语&#xff0c;赋予你的类各种神奇特性&#xff0c;让你的代码更加简洁、强大且有趣味&#xff01; __init__&#xff1a;这是每个对象出生时都要…

安卓利用CameraX 拍照获这张照片的exif信息

一、首先导入相关权限 <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE" /><uses-featureandroid:name"android.hardware.camera"android:required"true" /><uses-permission android:name"andro…

蓝桥杯练习题总结(三)线性dp题(摆花、数字三角形加强版)

目录 一、摆花 思路一&#xff1a; 确定状态&#xff1a; 初始化&#xff1a; 思路二&#xff1a; 确定状态&#xff1a; 初始化&#xff1a; 循环遍历&#xff1a; 状态转移方程&#xff1a; 二、数字三角形加强版 一、摆花 题目描述 小明的花店新开张&#xff0c;为了吸…

Uni-app/Vue/Js本地模糊查询,匹配所有字段includes和some方法结合使用e

天梦星服务平台 (tmxkj.top)https://tmxkj.top/#/ 1.第一步 需要一个数组数据 {"week": "全部","hOutName": null,"weekendPrice": null,"channel": "门市价","hOutId": 98,"cTime": "…

【Redis】Redis 内存管理,Redis事务,bigkey和hotkey

目录 Redis 内存管理 缓存数据设置过期时间&#xff1f; Redis 是如何判断数据是否过期的呢&#xff1f; 过期删除策略 内存淘汰机制 主从模式下对过期键的处理&#xff1f; LRU和LFU的区别 Redis事务 定义和原理 Redis 事务的注意点&#xff1f; 为什么不支持回滚&a…

SQLite数据库文件损坏的可能几种情况(一)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;十三&#xff09; 下一篇&#xff1a;SQLite使用的临时文件&#xff08;二&#xff09; 概述 SQLite数据库具有很强的抗损坏能力。如果应用程序崩溃&#xff0c…

指针数组的有趣程序【C语言】

文章目录 指针数组的有趣程序指针数组是什么&#xff1f;指针数组的魅力指针数组的应用示例&#xff1a;命令行计算器有趣的颜色打印 结语 指针数组的有趣程序 在C语言的世界里&#xff0c;指针是一种强大的工具&#xff0c;它不仅能够指向变量&#xff0c;还能指向数组&#…

HBase Shell基本操作

一、进入Hbase Shell客户端 先在Linux Shell命令行终端执行start-dfs.sh脚本启动HDFS&#xff0c;再执行start-hbase.sh脚本启动HBase。如果Linux系统已配置HBase环境变量&#xff0c;可直接在任意目录下执行hbase shell脚本命令&#xff0c;就可进入HBase Shell的命令行终端环…

Unity Mobile Notifications推送问题

1.在部分机型点击通知弹窗进不去游戏 把这里改成自己的Activity 2.推送的时候没有横幅跟icon红点 主要是第一句话 注册的时候选项可以选择 defaultNotificationChannel new AndroidNotificationChannel(“default_channel”, “Default Channel”, “For Generic notifica…

LinkedIn 互联网架构扩展简史

LinkedIn成立于 2003 年&#xff0c;其目标是连接到您的网络以获得更好的工作机会。第一周只有 2,700 名会员。时间快进了很多年&#xff0c;LinkedIn 的产品组合、会员基础和服务器负载都取得了巨大的增长。 如今&#xff0c;LinkedIn 在全球运营&#xff0c;拥有超过 3.5 亿会…

今日AI热点:科技前沿新动态

引言&#xff1a; 人工智能领域日新月异&#xff0c;每天都有令人振奋的新进展。从苹果到谷歌&#xff0c;从OpenAI到Meta&#xff0c;各大科技巨头纷纷推出创新产品和技术&#xff0c;不断推动着人工智能的发展。让我们一起来看看今日AI热点&#xff0c;探索这个充满活力和激情…

C++从入门到精通——命名空间

命名空间 前言一、命名空间引例什么是命名空间 二、命名空间定义正常的命名空间定义嵌套的命名空间多个相同名称的命名空间 三、命名空间使用加命名空间名称及作用域限定符使用using将命名空间中某个成员引入使用using namespace 命名空间名称引用引用命名空间和引用头文件有什…

Mac安装minio

Mac安装minio 本文介绍使用 mac 安装 MinIO。 所有软件安装优先参考官网&#xff1a;MinIO Object Storage for MacOS — MinIO Object Storage for MacOS #使用 brew 安装 minio brew install minio/stable/minio#找到 minio tong ~ $ brew list minio /opt/homebrew/Cella…

【ssh连接】奇奇怪怪报错记录

gitlab配置ssh连接&#xff0c;先跟着教程生成密钥&#xff0c;上传公钥&#xff0c;将服务器信息存入config文件&#xff0c;但是ssh连接超时&#xff0c;很急&#xff0c;想用服务器&#xff0c;各种搜索尝试&#xff0c;搞了两三天别的什么都没干&#xff0c;还是没解决&…

深度学习pytorch——激活函数损失函数(持续更新)

论生物神经元与神经网络中的神经元联系——为什么使用激活函数&#xff1f; 我们将生物体中的神经元与神经网络中的神经元共同分析。从下图可以看出神经网络中的神经元与生物体中的神经元有很多相似之处&#xff0c;由于只有刺激达到一定的程度人体才可以感受到刺激&#xff0c…