【Netty】nio处理acceptreadwrite事件

       📝个人主页:五敷有你      

 🔥系列专栏:Netty

⛺️稳中求进,晒太阳

1.处理accept

1.1客户端代码

public class Client {public static void main(String[] args) {try (Socket socket = new Socket("localhost", 8080)) {System.out.println(socket);socket.getOutputStream().write("world".getBytes());System.in.read();} catch (IOException e) {e.printStackTrace();}}
}

1.2 服务端代码

@Slf4j
public class ChannelDemo {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();
//                int count = selector.selectNow();log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }// 获取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判断事件类型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();log.debug("{}", sc);}// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

水平触发是什么

参考这篇文章:细说八股 | NIO的水平触发和边缘触发到底有什么区别? - 掘金 (juejin.cn)

2. 处理read事件

2.1 服务端代码

@Slf4j
public class ChannelDemo {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();
//                int count = selector.selectNow();log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }// 获取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判断事件类型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);log.debug("连接已建立: {}", sc);} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);if(read == -1) {key.cancel();sc.close();} else {buffer.flip();debug(buffer);}}// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}

💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey

  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件。

注意:

关闭远程连接会发送一次可读事件,返回值为-1,需要cancel和close和remove

⚠️ 不处理边界的问题

服务端

public class Server {public static void main(String[] args) throws IOException {ServerSocket ss=new ServerSocket(9000);while (true) {Socket s = ss.accept();InputStream in = s.getInputStream();// 这里这么写,有没有问题byte[] arr = new byte[4];while(true) {int read = in.read(arr);// 这里这么写,有没有问题if(read == -1) {break;}System.out.println(new String(arr, 0, read));}}}
}

客户端

public class Client {public static void main(String[] args) throws IOException {Socket max = new Socket("localhost", 9000);OutputStream out = max.getOutputStream();out.write("hello".getBytes());out.write("world".getBytes());out.write("你好".getBytes());max.close();}
}

输出

为什么呢???????

处理消息的边界(3种)

  1. 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

  2. 另一种思路是按分隔符拆分,缺点是效率低

  3. TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

服务端

下述代码:按分隔符拆分,然后动态扩容

private static void split(ByteBuffer source) {source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一条完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把这条完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从 source 读,向 target 写for (int j = 0; j < length; j++) {target.put(source.get());}debugAll(target);}}source.compact(); // 0123456789abcdef  position 16 limit 16
}public static void main(String[] args) throws IOException {// 1. 创建 selector, 管理多个 channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立 selector 和 channel 的联系(注册)// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);// key 只关注 accept 事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("sscKey:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理selector.select();// 4. 处理事件, selectedKeys 内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题iter.remove();log.debug("key: {}", key);// 5. 区分事件类型if (key.isAcceptable()) { // 如果是 acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); // attachment// 将一个 byteBuffer 作为附件关联到 selectionKey 上SelectionKey scKey = sc.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);log.debug("scKey:{}", scKey);} else if (key.isReadable()) { // 如果是 readtry {SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel// 获取 selectionKey 上关联的附件ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 需要扩容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer); // 0123456789abcdef3333\nkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)}}}}
}

客户端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

ByteBuffer 大小分配
  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。

    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

3.处理Write事件

write就是将buffer中的内容通过channel传输过去。

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上

    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册

    • 如果不取消,会每次可写均会触发 write 事件

public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8080));Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);// 1. 向客户端发送内容StringBuilder sb = new StringBuilder();for (int i = 0; i < 3000000; i++) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());int write = sc.write(buffer);// 3. write 表示实际写了多少字节System.out.println("实际写入字节:" + write);// 4. 如果有剩余未读字节,才需要关注写事件if (buffer.hasRemaining()) {// read 1  write 4// 在原有关注事件的基础上,多关注 写事件sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);// 把 buffer 作为附件加入 sckeysckey.attach(buffer);}} else if (key.isWritable()) {ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);System.out.println("实际写入字节:" + write);if (!buffer.hasRemaining()) { // 写完了key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}}}
}

客户端

public class WriteClient {public static void main(String[] args) throws IOException {Selector selector = Selector.open();SocketChannel sc = SocketChannel.open();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);sc.connect(new InetSocketAddress("localhost", 8080));int count = 0;while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isConnectable()) {System.out.println(sc.finishConnect());} else if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);count += sc.read(buffer);buffer.clear();System.out.println(count);}}}}
}
💡 write 为何要取消

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/353714.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Cloudscapes V2】Blender商城10周年免费领取礼物超逼真的Vdb云和爆炸合集烟雾体积云字体符号轨迹火焰粒子

6月19号的限时免费领取插件挺牛的&#xff0c;可以在blender里渲染体积云、爆炸特效、火焰、烟雾等效果&#xff0c;非常逼真。 Blender商城10周年免费领取礼物&#xff1a;https://blendermarket.com/birthday Cloudscapes V2 - 超逼真的 Vdb 云和爆炸合集 CloudScapes 是 …

如何快速在一台新电脑上安装 Python 环境

一、下载miniconda 1.下载 我们可以在清华大学开源软件镜像站下载最新版本的miniconda。如&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-py38_4.9.2-Windows-x86_64.exe 2.安装 双击exe文件安装&#xff0c;如果没有特殊的需求&#x…

C# 索引器与迭代器分部类详情

文章目录 一、迭代器二、查看Foreach执行中间语言三、foreach实现过程总结四、实现迭代器最常用的方法五、分布类概述及其使用六、索引器概述及声明七、索引器在类中的使用八、索引器在接口中的使用九、总结 一、迭代器 1、迭代器&#xff08;iterator&#xff09;解决的是集合…

vue3+ts+vite集成eslint

项目中安装eslint yarn add eslint -Deslint初始化 npx eslint --init按照下方操作即可 安装typescript-eslint/parser yarn add typescript-eslint/parser -D安装vite-plugin-eslint2 yarn add vite-plugin-eslint2 -D配置vite-plugin-eslint2 // vite.config.ts import …

Linux系统编程——进程信号

目录 一&#xff0c;信号预备 1.1 生活中的信号 1.2 技术应用中的信号 1.3 signal函数捕捉信号 1.3 信号的发送与记录 1.4 信号的常见处理方式 二&#xff0c;信号的产生 2.1 核心转储 2.1.1 环境配置 2.1.2 利用core文件进行调试 2.1.3 core dump标志 2.2 通过系统…

所以spring mvc异常处理工作原理是啥

文章目录 spring mvc异常处理&#xff08;源码分析&#xff09;概述原理&#xff08;源码角度&#xff09;模拟debug前期提要分析4个map4个map的初始化为什么需要基于mappedMethods缓存 总结一下 spring mvc异常处理&#xff08;源码分析&#xff09; 概述 spring mvc有下面三…

127.0.0.1与本机IP地址的区别

大家好&#xff0c;今天我们来聊聊一个在网络世界中常常被提及&#xff0c;但可能对于非专业人士来说还有些模糊的概念——127.0.0.1与本机IP地址。这两个地址在网络通信中都扮演着重要的角色&#xff0c;但它们之间又有着怎样的区别呢&#xff1f;让我们一起来探究一下。 一、…

简单Mesh多线程合并,使用什么库性能更高

1&#xff09;简单Mesh多线程合并&#xff0c;使用什么库性能更高 2&#xff09;Unity Semaphore.WaitForSignal耗时高 3&#xff09;VS编辑的C#代码注释的中文部分乱码 4&#xff09;变量IntPtr m_cachePtr切换线程后变空 这是第389篇UWA技术知识分享的推送&#xff0c;精选了…

【GO-OpenCV】go-cv快速配置

最近对golang实现目标检测心血来潮&#xff0c;尝试在没有sudo权限的平台配置go-cv,有所发现&#xff0c;索性多个平台都做尝试 安装Go语言&#xff08;Golang&#xff09; 通过包管理器安装&#xff08;适用于Debian/Ubuntu&#xff09;(有点慢) 更新包列表&#xff1a; sud…

AbMole带你探索颅内压力与肌肉生长的联系:一项突破性研究

在生物医学领域&#xff0c;颅内压力&#xff08;ICP&#xff09;的调控机制一直是研究的热点。最近&#xff0c;一项发表在《PLOS ONE》上的研究为我们揭示了颅内压力与后颅窝肌肉生长之间的潜在联系&#xff0c;为我们理解某些慢性头痛的成因提供了新的视角。 颅内压力的异常…

大数据的发展,带动电子商务产业链,促进了社会的进步【电商数据采集API接口推动电商项目的源动力】

最近几年计算机技术在诸多领域得到了有效的应用&#xff0c;同时在多方面深刻影响着我国经济水平的发展。除此之外&#xff0c;人民群众的日常生活水平也受大数据技术的影响。 在这其中电子商务领域也在大数据技术的支持下&#xff0c;得到了明显的进步。虽然电子商务领域的发…

酷开科技将AI与大数据融合,成为OTT大屏营销革新的驱动力

在数字化浪潮的推动下&#xff0c;营销领域正经历着深刻的变革。而在这样一个媒介渠道分散、注意力碎片化的时代&#xff0c;“大屏”是难得能让消费者们“精神集中”高度卷入的内容消费场景&#xff0c;也是能让品牌一对多地高效触达家庭人群的通道&#xff0c;大屏的独特营销…

TVBOX 最新版下载+视频源教程

下载链接 wx 搜索 Geek 前端 发送电视资源进行获取 操作教程

【数据结构与算法】对称矩阵,三角矩阵 详解

给出对称矩阵、三角矩阵的节省内存的存贮结构并写出相应的输入、输出算法。 对称矩阵和三角矩阵可以通过特殊的存储结构来节省内存。这种存储结构只存储矩阵的一部分元素&#xff0c;而不是全部元素。 对称矩阵&#xff1a;对于一个n阶对称矩阵&#xff0c;我们只需要存储主对…

VUE 项目用 Docker+Nginx进行打包部署

一、Docker Docker 是一个容器化平台&#xff0c;允许你将应用程序及其依赖项打包在容器中。使用 Docker&#xff0c;你可以创建一个包含 Vue.js 应用程序的容器镜像&#xff0c;并在任何支持 Docker 的环境中运行该镜像。 二、Nginx Nginx 是一个高性能的 HTTP 服务器和反向…

钡铼技术BL104在环境监测站多协议采集保障数据全面准确

随着工业化和城市化进程的加快&#xff0c;环境污染问题日益严重&#xff0c;环境监测站在保护生态环境、保障公众健康中的作用变得越来越重要。钡铼技术PLC物联网关BL104&#xff0c;为环境监测站提供了一种高效、可靠的多协议数据采集解决方案&#xff0c;保障了监测数据的全…

Hype 4(html5工具) mac版下载-Hype 4 for mac软件最新版下载附加详细安装步骤

用户量向我们证明了矢量形状&#xff0c;矢量是使用矢量工具绘制的形状&#xff0c;包括直线&#xff0c;曲线和复杂形状。有目共睹的是Hype是一款强大的Mac OS平台 HTML5 创作工具&#xff0c;它能够在网页上做出赏心悦目的动画片效果&#xff0c;创建丰富的网页交互动画片&am…

域策略笔记

域策略 导航 文章目录 域策略导航一、设置客户端壁纸二、重定向用户配置文件路径三、部署网络打印机四、部署共享文件夹为网络驱动器五、通过域策略推送软件安装六、通过域策略限制软件的使用通过路径进行限制通过进程限制 七、通过域策略将文件添加白名单八、通过域策略添加可…

大数据集群离线解析经纬度逆编码地址

背景 最近有个需要需求把经纬度解析为地址&#xff0c;那么通常解析地址市面上流行的方案就是调取百度、高德地图的接口进行解析。 难点 但是在用这个方案遇到一个问题就是企业认证的百度地图每天的逆编码解析为300w次&#xff0c;qps为100次/秒&#xff0c;对于日增上千万的…

大数据实训项目(小麦种子)-03、大数据环境Hadoop、Mapreduce、Hive、Hbase、HDFS搭建服务及调试

文章目录 前言一、Linux系统Centos7安装配置JDK8二、Linxu系统Centos7中搭建Hadoop3.1.0服务下载地址服务1&#xff1a;详细步骤&#xff08;初始化与启动dfs服务&#xff09;详细步骤配置环境变量 服务2&#xff1a;Hadoop(YARN)环境搭建 三、Linux系统搭建Hive3.1.2服务前提条…