一、什么是NIO
NIO,全称为New Input/Output,是Java平台中用于替代传统I/O(Blocking I/O)模型的一个功能强大的I/O API。NIO在Java 1.4版本中被引入,其设计目标是提供一种非阻塞的、低延迟的I/O操作方式,以提高应用程序在处理大量并发连接时的效率,特别适合于网络通信和文件操作。
NIO的主要特性如下:
-
非阻塞I/O:
在传统的BIO模型中,读写操作是阻塞的,即当数据不可用时,线程会被挂起,等待数据准备好。
而在NIO中,读写操作是非阻塞的,当数据不可用时,线程不会被挂起,而是可以去做其他工作,等到数据准
备好了再回来处理。 -
缓冲区(Buffer):
NIO引入了缓冲区的概念,数据在读写时先被存放在缓冲区中,提高了数据处理的效率。
缓冲区提供了对数据的结构化访问,可以方便地进行读写操作,并且支持高效的数据预读和后写。 -
通道(Channel):
通道类似于流,但它是双向的,可以用于读和写数据。
通道可以从文件、套接字等源获取数据,也可以向这些源发送数据。 -
选择器(Selector):
选择器允许单个线程检查多个通道,看哪些通道已经准备好进行读写操作,从而实现了I/O的多路复用,
减少了线程的数量,提高了系统的效率。 -
直接内存访问(Direct Buffer):
NIO支持直接在操作系统内存中分配缓冲区,避免了Java堆和系统内存之间的数据复制,提高了性能。
上面的几个特性中,有三个是NIO的核心特性分别是缓冲区、通道、选择器,由选择器决定选择哪一个通道进行数据的读写操作,通道读写的数据必须经过缓冲区。下面笔者分别对这三个核心特性做深入讲解。
二、缓冲区(Buffer)
基本缓冲区类
缓冲区是NIO中的很重要概念,它是一个特殊类型的数组,用于存储数据。Buffer类是所有缓冲区类的抽象基类。以下是截取的源码开头部分:
可以看到Buffer类是一个抽象类,并且这是一个限制继承的抽象类,sealed和permits关键字说明了这一点。这两个关键字是jdk15版本推出来的预览特性,在后续的jdk版本中逐渐成熟。这些关键字用于限制类的继承,以提供更好的封装性和安全性。
-
sealed
一个密封类(Sealed Class)是只能被其指定的子类(或接口)继承的类。这样 可以防止未经授权的类随意扩展它,从而增强了代码的可控性。密封类可以防止意外的子类化,这在构建框架或库时特别有用,因为这样可以确保只有预期的子类能够存在。 -
permits
该关键字用于在密封类声明中列出允许继承它的子类。
Buffer抽象类一共有七个子类继承类,分别对应java中的七个基本数据类型(没有布尔类型)。
这里笔者搜索了相关资料,对于为什么没有布尔类型缓冲区类给出一个解释如下:
布尔类型在Java中通常是作为位(bit)来处理的,而不是作为一个完整的字节或更大数据类型的一部分。布尔值在内存中通常占用一个字节的位,但Java的布尔变量在逻辑上是不可分割的,不像其他基本类型那样可以独立地进行读写操作
-
ByteBuffer:用于存储字节(byte)类型的数据
-
CharBuffer:用于存储字符(char)类型的数据。
-
ShortBuffer:用于存储短整型(short)类型的数据
-
IntBuffer:用于存储整型(int)类型的数据。
-
LongBuffer:用于存储长整型(long)类型的数据。
-
FloatBuffer:用于存储浮点型(float)类型的数据。
-
DoubleBuffer:用于存储双精度浮点型(double)类型的数据。
以上就是基本类型的缓冲区类,当然每种基本缓冲区类都还会有不同场景下的自己的子类型,这里不再介绍。
缓冲区的核心属性
通过源码观察到,缓冲区有以下几个核心属性:
-
mark:标记位置,可以用来记录position的某个值,但不总是存在,默认值为未定义
-
position:下一个要被读或写的元素的位置。初始值为0。
-
limit:在读模式下,表示缓冲区中可读数据的边界;在写模式下,表示可以写入数据的最大位置。初始化时,limit通常与capacity相同。
-
capacity:缓冲区的最大容量,即它可以容纳的数据量。
在缓冲区中,这几个核心属性需要满足以下几个不变量关系:
mark<=position<=limit<=capacity
缓冲区的基本操作
-
allocate():静态方法,用于创建并初始化一个新的缓冲区,大小为指定的容量。
-
flip():切换缓冲区状态从写模式到读模式,将limit设置为当前position,然后重置position为0。
-
clear():清空缓冲区,将position重置为0,limit设置回capacity,但不实际清除数据。
-
rewind():将position重置为0,允许重新读取缓冲区中的所有数据,limit保持不变。
-
compact():将未读数据移动到缓冲区的起始位置,并清空已读部分,用于连续写入更多数据。
-
put() 和 get():分别用于写入数据到缓冲区或从缓冲区中读取数据。
Buffer实现类的常⽤⽅法
以ByteBuffer为例,分类介绍ByteBuffer的常⽤⽅法。
- 创建Buffer
ByteBuffer提供了allocate静态⽅法⽤来创建带有初始化数组的Buffer缓冲区。
ByteBuffer buffer = ByteBuffer.allocate(1024);
-
向Buffer中写数据
将数据写⼊到buffer中有三种⽅式:-
put(数据): 将数据存⼊到buffer,此时position随之变化。
-
wrap(数据):将数据存⼊数据并返回buffer,此时position为0,limit为数据的⻓度
-
channel.read(buffer):将数据读⼊到buffer中。
-
-
从Buffer中读取数据
从Buffer中读取数据有以下⼏种⽅式:- get相关的⽅法:获得当前position或指定position的数据
- array():返回整个数组内容
- channel.write(buffer):使⽤channel获得buffer中的内容并写⼊到指定⽬标
缓冲区分类
- 子缓冲区
可以为Buffer创建⼦缓冲区,在现有缓冲区上分割出⼀块空间作为新的缓冲区。原缓冲区和⼦缓冲区共享同⼀⽚数据空间。通过调⽤slice⽅法创建⼦缓冲区 。
package com.execute.batch.executebatch;import java.nio.ByteBuffer;
import java.util.Arrays;/*** @author hulei* @date 2024/6/6 15:00*/public class BufferChildren {public static void main(String[] args) {// 得到bufferByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});// 设置positionbuffer.position(3);// 设置limitbuffer.limit(7);// 得到子缓冲区 与原缓冲区共享bufferByteBuffer slice = buffer.slice();System.out.println(Arrays.toString(slice.array()));slice.put(0, ((byte) 40));System.out.println(Arrays.toString(buffer.array()));slice.position(3);ByteBuffer slice1 = slice.slice();slice1.put(0, ((byte) 50));System.out.println(Arrays.toString(buffer.array()));}
}
- 只读缓冲区
通过buffer的asReadOnlyBuffer()⽅法获得⼀个新的只读缓冲区,所谓的只读缓冲区就是只能读不能写。只读缓冲区与原缓冲区共享同⼀⽚数据空间,原缓冲区数据发⽣改变,只读缓冲区也能看到变化后的数据,因为它们共享同⼀⽚存储空间。
package com.execute.batch.executebatch;import java.nio.ByteBuffer;/*** @author hulei* @date 2024/6/6 15:46*/public class BufferOnlyRead {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});// 获得只读缓冲区ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();// 修改原bufferbuffer.put(3, (byte) 40);// 查看readOnlyBufferwhile (readOnlyBuffer.hasRemaining()){System.out.println(readOnlyBuffer.get());}}
}
上面的代码首先创建了一个原始缓冲区,接着又通过原始缓冲区获得了一个只读缓冲区。两个缓冲区的数据是共享的,但是修改缓冲区数据只能通过原始缓冲区进行,对于readOnlyBuffer 只读缓冲区,是无法进行修改操作的比如 readOnlyBuffer.put(4, (byte) 50),会有报错提示。当然,通过原始缓冲区进行修改,也会在只读缓冲区中体现出来。
- 直接缓冲区
直接缓冲区其实就是在内存上直接分配空间用作缓冲区。减少了数据在java堆和内存之间的复制,提升了性能。可以通过调用此类的allocateDirect工厂方法来创建直接字节缓冲区。此方法返回的缓冲区通常比非直接缓冲区具有更⾼的分配和释放成本。直接缓冲区的内容可能位于正常垃圾收集堆之外,因此它们对应⽤程序内存占⽤的影响可能不明显。
package com.execute.batch.executebatch;import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;/*** @author hulei* @date 2024/6/6 16:44*/public class BufferDirect {public static void main(String[] args) throws IOException {FileInputStream fis = new FileInputStream("D:/1.txt");FileChannel fisChannel = fis.getChannel();FileOutputStream fos = new FileOutputStream("D:/8.txt");FileChannel fosChannel = fos.getChannel();// 创建直接缓冲区ByteBuffer buffer = ByteBuffer.allocateDirect(1024);while (fisChannel.read(buffer) > 0){// 把buffer的数据写到文件中buffer.flip();fosChannel.write(buffer);}System.out.println("复制完毕");}
}
代码首先创建一个文件读取流,这个文件必须已经存在,获取读取通道。然后再创建一个文件写入流,获取写入通道。接着创建直接缓冲区,开始循环读取数据复制。
- 基于内存映射的Buffer
MappedByteBuffer采⽤direct buffer的⽅式读写⽂件内容,这种⽅式就是内存映射。这种⽅式直接调⽤系统底层的缓存,没有JVM和系统之间的复制操作,所以效率⾮常⾼,主要⽤于操作⼤⽂件,是直接缓冲区的父类。
通过FileChannel的map⽅法得到MappedByteBuffer,MappedByteBuffer把磁盘中⽂件的内容映射到计算机的虚拟内存中,操作MappedByteBuffer直接操作内存中的数据,⽽⽆需每次通过IO来读取物理磁盘中的⽂件,效率很高。
package com.execute.batch.executebatch;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;/*** @author hulei* @date 2024/6/6 17:17*/public class BufferMemory {public static void main(String[] args) throws IOException {File file = new File("D://1.txt");RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");// 获得channelFileChannel fileChannel = randomAccessFile.getChannel();// channel的map来获得内存映射缓冲区MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, file.length());
// while (mappedByteBuffer.hasRemaining()){
// System.out.println((char) mappedByteBuffer.get());
// }Charset utf8 = StandardCharsets.UTF_8;CharBuffer decodedChars = utf8.decode(mappedByteBuffer);while (decodedChars.hasRemaining()) {System.out.print(decodedChars.get());}}
}
注:以上代码注释的部分把字节转换为char类型,打印出来的是乱码,因为文件不是ASCII编码格式。所以我使用了utf8进行转换,显示出来原始的字符。实际开发中如果数据还需要进行传输,则不必要这么转码。
三、通道(Channel)
Channel即通道,表示打开IO设备的连接,⽐如打开到⽂件、Socket套接字的连接。在使⽤NIO时,必须要获取⽤于连接IO设备的通道以及⽤于容纳数据的缓冲区。通过操作缓冲区,实现对数据的处理。也就是说数据是保存在buffer缓冲区中的,需要通过Channel来操作缓冲区中的数据。与传统的IO流(Stream)不同,Channel支持双向数据传输,既可以读也可以写。
Channel的核心特性如下:
-
双向通信: Channel是双向的,意味着数据可以从Channel读取到缓冲区,也可以从缓冲区写入到Channel,这与InputStream和OutputStream等单向流形成了对比。
-
与缓冲区(Buffer)交互: Channel并不直接与数据源或目标进行操作,而是通过缓冲区作为中介。数据总是先读入缓冲区或从缓冲区写出,这样的设计允许更高效的批量操作和减少实际I/O调用次数。
-
操作系统的直接映射: Channel提供了对操作系统底层I/O服务的直接访问,通常对应于操作系统中的文件描述符,如硬件设备、文件、网络套接字等。这意味着Channel能够利用操作系统的优化,提高性能。
-
主要实现类型:
- FileChannel:用于文件I/O操作。
- SocketChannel:用于TCP网络编程中的客户端和服务端通信。
- ServerSocketChannel:用于监听新进来的TCP连接请求,类似于传统的ServerSocket。
- DatagramChannel:用于UDP数据报通信,支持无连接的数据传输。
-
选择器(Selector): Channel可以与Selector一起工作,实现非阻塞I/O操作。通过注册感兴趣的事件(如读、写、连接等),应用程序可以管理多个Channel,而无需为每个Channel分配单独的线程,大大提高了并发处理能力。
-
全双工通信: Channel支持同时进行读写操作,这对于需要同时接收和发送数据的应用场景非常有用,比如网络通信。
-
关闭和生命周期:Channel 有自己的生命周期,可以通过调用 close() 方法关闭。一旦关闭,就不能再进行读写操作。
总的来说,Channel是Java NIO框架中的核心组件,它提供了高效、灵活的方式来处理I/O操作,特别是对于需要处理多个并发连接的服务器应用程序而言,NIO Channel和相关机制提供了强大的性能提升。
四、选择器(Selector)
Selector选择器,也可以称为多路复⽤器。它是Java NIO的核⼼组件之⼀,⽤于检查⼀个或多个Channel的状态是否处于可读、可写、可连接、可接收等。通过⼀个Selector选择器管理多个Channel,可以实现⼀个线程管理多个Channel对应的⽹络连接。使⽤单线程管理多个Channel可以避免多线程的线程上下⽂切换带来的额外开销。
SelectableChannel可选择通道
只有SelectableChannel才能被Selector管理,⽐如所有的Socket通道。⽽FileChannel并没有继承SelectableChannel,因此不能被Selector管理。
上文中的四个Channel中,FileChannel无法被Selector管理,因为它没有继承SelectableChannel,其他三个SocketChannel、ServerSocketChannel、DatagramChannel均直接或者间接的继承了SelectableChannel抽象类,所以它们可以被管理。
注册Channel
Channel通过注册的⽅式关联Selector。⼀个Channel可以注册到多个Selector上,但在某⼀个Selector上只能注册⼀次。注册时需要告知Selector,Selector需要对通道的哪个操作感兴趣。
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException{return register(sel, ops, null);
}
通道的操作类型有下面几种:
-
可读:SelectionKey.OP_READ
-
可写:SelectionKey.OP_WRITE
-
可连接:SelectionKey.OP_CONNECT
-
可接收:SelectionKey.OP_ACCEPT
注册通道的操作类型方法如下:
channel.register(selector, SelectionKey.OP_READ);
同时注册一个通道的多个操作方法如下:
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
选择器会查询每个channel的操作事件,如果是该channel注册的操作已就绪,则进⾏响应。注意,这⾥channel的操作指的是channel完成某个操作的条件,表示该channel对于该操作已处于就绪状态。⽐如ServerSocketChannel已准备好接收新的连接,那么它注册的 SelectionKey.OP_ACCEPT 操作就处于就绪状态。再比如SocketChannel已准备好去连接Server服务器,那么它注册的SelectionKey.OP_CONNECT 操作就处于就绪状态。于是Selector就可以触发之后的动作。
SelectionKey选择键
SelectionKey封装了Channel和注册的操作。
当Selector调⽤select()⽅法时,会轮询所有注册在它身上的Channel,查看是否有处于某个操作(已注册到selector上的)就绪状态的Channel,把这些Channel放⼊到SelectionKey的集合中。
Selector用法
- 创建Selector
Selector selector = Selector.open();
- 非阻塞Channel注册
注意:Channel必须处于非阻塞模式才能注册到Selector上,注册示例代码如下
package com.execute.batch.executebatch;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;/*** @author hulei* @date 2024/6/5 17:58*/public class ChannelRegister {private static final System.Logger logger = System.getLogger(ChannelRegister.class.getName());public static void main(String[] args) throws IOException {// 1.创建SelectorSelector selector = Selector.open();// 2.获得Channeltry (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();){// 3.设置成非阻塞的模式serverSocketChannel.configureBlocking(false);// 4.绑定端口serverSocketChannel.bind(new InetSocketAddress(9001));// 5.注册channel到selector上SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);}catch (Exception e){logger.log(System.Logger.Level.ERROR, "注册channel到selector上失败");}}
}
- Selector轮询就绪状态的Channel
Selector通过调⽤select⽅法轮询已就绪的通道操作。select⽅法是阻塞的,直到⾄少有⼀个通道的注册操作已就绪。当完成select⽅法调⽤后,被选中的已就绪的所有channel通过Selector的selectedKeys()⽅法获得,该⽅法获得的是⼀个SelectionKey集合,其中每⼀个SelectionKey都表示⼀个Channel。于是可以根据SelectionKey的注册操作来做具体的业务处理。下面是一个简单的轮询示例。
package com.execute.batch.executebatch;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;/*** @author hulei* @date 2024/6/6 10:30*/public class SelectorLoop {public static void main(String[] args) throws IOException {// 创建selectorSelector selector = Selector.open();// 创建serverSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 非阻塞serverSocketChannel.configureBlocking(false);// 绑定端口serverSocketChannel.bind(new InetSocketAddress(9001));// 注册serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// selector轮询while (true){// 阻塞等待某个操作就绪状态的channelselector.select();// 获得一个集合,里面包含了这次selector执行select方法获得的发生就绪状态的多个channelSet<SelectionKey> selectionKeys = selector.selectedKeys();// 遍历所有的channelIterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();if (key.isReadable()){System.out.println("readable");}else if (key.isAcceptable()){System.out.println("acceptable");}else if (key.isConnectable()){System.out.println( "connectable");}else if (key.isWritable()){System.out.println( "writable");}// 保证下次channel有就绪状态的操作发生时可以被selector轮询到iterator.remove();}}}
}
Selector通信示例
- 服务端示例代码
package com.execute.batch.executebatch;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** @author hulei* @date 2024/6/6 10:55*/public class NIOServer {public static void main(String[] args) throws IOException {// 获得ChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 设置成非阻塞serverSocketChannel.configureBlocking(false);// 绑定端口号serverSocketChannel.bind(new InetSocketAddress(9002));// 获得SelectorSelector selector = Selector.open();// 把channel注册到selector上面, 监听accept事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 让selector轮询监听while (true){// 阻塞直到有通道就绪selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();// 获取有动作的selectionKey == channelIterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey selectionKey = iterator.next();handle(selectionKey);// 删除key,表示处理完成iterator.remove();}}}private static void handle(SelectionKey selectionKey) throws IOException {if (selectionKey.isAcceptable()){// 当服务端处于接收的就绪状态// 获得selectionKey中的channelServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();// 接收客户端连接,获得socketChannelSocketChannel socketChannel = serverSocketChannel.accept();// 设置成非阻塞状态,否则无法被selector复用socketChannel.configureBlocking(false);// 把socketChannel注册到selector上,让selector对socketChannel的read操作感兴趣socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);}else if (selectionKey.isReadable()){// 当socketChannel处于读数据的就绪状态SocketChannel socketChannel = (SocketChannel) selectionKey.channel();// 读取socketChannel中的数据//设置成非阻塞socketChannel.configureBlocking(false);// 创建BufferByteBuffer buffer = ByteBuffer.allocate(1024);// 读数据int len;while ((len = socketChannel.read(buffer)) > 0){// 翻转buffer.flip();System.out.println(new String(buffer.array(), 0, len));// 清除buffer中的数据buffer.clear();}socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE);}else if (selectionKey.isWritable()){SocketChannel socketChannel = (SocketChannel) selectionKey.channel();socketChannel.write(ByteBuffer.wrap("hello".getBytes()));socketChannel.close();selectionKey.cancel();}else if (selectionKey.isConnectable()){System.out.println("连接就绪");SocketChannel socketChannel = (SocketChannel) selectionKey.channel();if (socketChannel.finishConnect()){socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);}selectionKey.cancel();socketChannel.close();}}}
- 客户端示例代码
package com.execute.batch.executebatch;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;/*** @author hulei* @date 2024/6/6 10:59*/public class NIOClient {public static void main(String[] args) throws IOException {// 创建ChannelSocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9002));// 设置成非阻塞模式socketChannel.configureBlocking(false);// 得到bufferByteBuffer buffer = ByteBuffer.allocate(1024);// 把数据写入到buffer中buffer.put("hello selector".getBytes());// 反转bufferbuffer.flip();// 把buffer中的数据写入到channel中socketChannel.write(buffer);// 关闭socketChannel.close();}
}
执行结果如下,客户端代码点了多次运行: