一、概念
NIO, 即new io,也叫非阻塞io
二、NIO三个核心组件:
- Buffer数据缓冲区
- Channel通道
- Selector选择器
1、Buffer缓冲区
缓冲区本质上是一个可以存放数据的内存块(类似数组),可以在这里进行数据写入和读取。此内存块包含在NIO Buffer对象中,并且有一些列API,可以对内存块中的数据进行操作,相比较直接对数组的操作,Buffer API更加容易操作和管理。
1) Buffer进行数据写入与读取的步骤
- 将数据写入缓冲区
- 切换为读取模式,调用API: buffer.flip(),此时position位置会变为0,即从头开始读
- 读取缓冲区数据
- 调用API: buffer.cler()清除整个缓冲区数据或者buffer.compact()清除已读的数据
2) Buffer工作原理
三个重要属性
- capacity容量: 即内存块的大小
- position位置: 内存块当前数据操作的位置,即读数据的位置或者写数据的位置
- limit 限制: 写入模式,等于capacity。 读取模式,limit等于写入的数据量
3) ByteBuffer内存类型
ByteBuffer为性能关键型代码提供了"直接内存(堆外内存,off-heap memory)" 和 "非直接内存(堆内内存,on-heap memory)"两种实现
// 堆内内存 ByteBuffer buffer = ByteBuffer.allocate(4);// 堆外内存 ByteBuffer buffer = ByteBuffer.allocateDirect(4);
堆外内存好处:
1、进行网络IO或者文件IO时,堆外内存会比堆内内存少一次拷贝。(file/socket --- OS memory --- jvm heap), GC会移动对象内存,在写file或socket的过程中,JVM的实现中,会先把数据复制到堆外,再进行写入。
2、GC范围之外,降低了GC压力,但实现了自动管理。 DirectByteBuffer中有一个Cleaner对象(PhantomReference),Cleaner被GC前会执行clean方法,触发DirectByteBuffer中定义的Deallocator
建议:
1、性能确实可观的时候才去使用,分配给大型、长寿命(网络传输、文件读写场景)
2、通过虚拟机参数MaxDirectMemortSieze限制堆外内存大小,防止耗尽整个机器的内存。
什么是堆外内存?
正常的java非空对象,都是在jvm中的,而且受垃圾收集器的管理,即堆内内存(on-heap memory) 在某种特定的场景下,例如在文件读取写入的时候,如果一份数据写在堆内存的某个位置,假如此时在位置A,而此时jvm垃圾回收器进行了一次垃圾回收,此时的数据在内存中的位置可能会发生变化,变成了位置B,操作系统在读取这个数据的时候,通过A去查找,会导致读取到错误的数据,这种场景的解决方案: 堆外内存(off-heap memory),jvm在会先将写入的数据,复制一份到堆外,操作系统直接从堆外读取。
示例:
// 初始化buffer,此时每个元素会被初始化为0
ByteBuffer buffer = ByteBuffer.allocate(4);
System.out.println(String.format("初始化: capacity容量: %s, limit限制: %s, position位置: %s",buffer.capacity(), buffer.limit(), buffer.position()));
>> 输出: 初始化: capacity容量: 4, limit限制: 4, position位置: 0// 写入3个字节数据
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 2);
System.out.println(String.format("初始化: capacity容量: %s, limit限制: %s, position位置: %s",buffer.capacity(), buffer.limit(), buffer.position()));
>> 输出: 初始化: capacity容量: 4, limit限制: 4, position位置: 3//读数据, 不用buffer.flip()切换,position为4,应该是读到第4个字节的数据,但第4个字节没有,则默认为0,即buffer中有了4个数据, 所以读取模式下,limit也为4
byte b1 = buffer.get();
System.out.println(String.format("写入两个字节数据后,capacity容量: %s,limit限制:%s, position位置:%s",buffer.capacity(), buffer.limit(), buffer.position()));
System.out.println(b1);
// >> 输出 capacity容量: 4,limit限制:4, position位置:4
// >> 0// 调用buffer.flip(), 将position变为0
buffer.flip();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",buffer.capacity(), buffer.limit(), buffer.position()));
// >> 输出 capacity容量: 4,limit限制:4, position位置:0// 从position=1开始读取数据
byte b2 = buffer.get();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",buffer.capacity(), buffer.limit(), buffer.position()));
System.out.println(b2);
// >> 输出 capacity容量: 4,limit限制:4, position位置:1
// >> 1// 清除已经读过的数据, 转为写入模式
buffer.compact();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",buffer.capacity(), buffer.limit(), buffer.position()));// >> 输出 capacity容量: 4,limit限制:4, position位置:3//清除所有数据
buffer.clear();
System.out.println(String.format("capacity容量: %s,limit限制:%s, position位置:%s",buffer.capacity(), buffer.limit(), buffer.position()));// >> 输出 capacity容量: 4,limit限制:4, position位置:0
buffer.compact()
方法: 将buffer的position设置为capicity - buffer中剩余未读取的数据索引, 即: position = capicity - compact之前的position
buffer.put((byte)1);
buffer.put((byte)2);
System.out.println();
System.out.format("capacity: %s, limit: %s, position: %s", buffer.capacity(), buffer.limit(), buffer.position());
// capacity: 4, limit: 4, position: 2buffer.compact();
System.out.println();
System.out.format("capacity: %s, limit: %s, position: %s", buffer.capacity(), buffer.limit(), buffer.position());
// capacity: 4, limit: 4, position: 2byte ss = buffer.get();
System.out.println();
System.out.println(ss);
System.out.format("capacity: %s, limit: %s, position: %s", buffer.capacity(), buffer.limit(), buffer.position());
// ss = 0
// capacity: 4, limit: 4, position: 3
2、Channel通道
buffer缓冲区数据传输的渠道,类似BIO中的sokcet+io流
1)客户端通道: SocketChannel
NIO的客户端通道,SocketChannel用于建立TCP网络连接,类似jav.net.Socket
示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;public class NIOClient {public static void main(String[] args) throws IOException {SocketChannel client = SocketChannel.open();// 设置SocketChannel为非阻塞模式,默认的socket相关都是阻塞的client.configureBlocking(false);client.connect(new InetSocketAddress("127.0.0.1", 8888));// 没连接上则一直等待while (!client.finishConnect()){Thread.yield();}Scanner scanner = new Scanner(System.in);System.out.println("连接服务端成功,请输入需要发送的内容:" );// 发送内容String msg = scanner.nextLine();// 将数据包装为bufferByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());// 是否还有数据剩余,有就继续写while (buffer.hasRemaining()) {client.write(buffer);}// 读取服务端响应数据ByteBuffer responseBuffer = ByteBuffer.allocate(1024);while (client.isOpen() && client.read(responseBuffer) != -1) {// 长连接情况下,需要手动判断数据有没有读取结束// (此处做一个简单的判断: 超过0字节就认为请求结束了)// position > 0, 代表该请求有数据在写入,否则一直阻塞等待if (responseBuffer.position() > 0)break;}responseBuffer.flip();byte[] content = new byte[responseBuffer.limit()];ByteBuffer buffer1 = responseBuffer.get(content);System.out.println("收到服务端响应: " + new String(content));scanner.close();client.close();}
}
2)服务端通道: ServerSocketChannel
NIO的服务端通道,类似java.net.ServerSocket
示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;public class NIOServer {public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8888));System.out.println("服务端启动成功!等待新连接...");while (true) {// 在nio中,accept方法是非阻塞的,如果没有接受到请求,会返回nullSocketChannel accept = serverSocketChannel.accept();// 有新的连接,读取数据if (accept != null) {System.out.println("收到新连接:" + accept.getRemoteAddress());accept.configureBlocking(false);// 读取数据ByteBuffer requestBuffer = ByteBuffer.allocate(1024);while (accept.isOpen() && accept.read(requestBuffer) != -1) {// 长连接情况下,需要手动判断数据有没有读取结束// (此处做一个简单的判断: 超过0字节就认为请求结束了)// position > 0, 代表该请求有数据在写入,否则一直阻塞等待if (requestBuffer.position() > 0)break;}// 没数据了,不再继续后面的操作if (requestBuffer.position() == 0)continue;requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];requestBuffer.get(content);System.out.println("收到数据来自:" + accept.getRemoteAddress());System.out.println(new String(content));// 响应结果String response = "HTTP/1.1 200 OK \r\n" +"Content-Length: 11 \r\n\r\n" +"Hello World";ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {accept.write(buffer);}}}}
}
3、Selector选择器
Selector是一个Java NIO组件,可以监听一个或多个NIO通道,并根据绑定的事件,确定哪些通道已准备好进行相应的操作,如某个通道注册了读取事件,Selector就可以确定该通道什么时候可以进行读取。
实现了单个线程可以管理多个通道,从而管理多个网络连接,节约内存开销。
一个线程使用Selector监听多个channel的不同事件,4个事件分别对应SelectionKey的4个常量:
- Connect连接事件(SelectionKey.OP_CONNECT)
- Accept准备就绪事件(SelectionKey.OP_ACCEPT)
- Read读取事件(SelectionKey.OP_READ)
- Write写入事件(SelectionKey.OP_WRITE)
实现一个线程处理多个通道的核心概念理解: 事件驱动机制 非阻塞的网络通道下,开发者通过Selector注册对于通道感兴趣的事件类型,线程通过监听时间来处理相应的代码执行(拓展:更底层是操作系统的多路复用机制)
示例:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;public class NIOServer2 {private static List<SocketChannel> channelList = new ArrayList<>();public static void main(String[] args) throws IOException {// 1、创建网络服务端ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);// 2、构建Selector一个选择器,将channel注册上去Selector selector = Selector.open();// 将serverSocketChannel注册到selector, 注册感兴趣事件:ACCEPT事件SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);// 3、绑定端口serverSocketChannel.bind(new InetSocketAddress(8888));System.out.println("服务端启动成功!等待新连接...");while (true) {// 不再轮询通道,改用下面轮询事件的方式.select方法有阻塞效果,直到有事件通知才会有返回selector.select();// 获取选择器事件集合Set<SelectionKey> selectionKeys = selector.selectedKeys();// 遍历Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey next = iter.next();iter.remove();// 关注READ和ACCEPT事件if (next.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) next.attachment();// 将客户端注册到Selector上,并关注READ事件SocketChannel clientChannel = server.accept();clientChannel.configureBlocking(false);clientChannel.register(selector, SelectionKey.OP_READ, clientChannel);System.out.println("收到新连接: " + clientChannel.getRemoteAddress());}if (next.isReadable()) {SocketChannel socketChannel = (SocketChannel) next.attachment();ByteBuffer buffer = ByteBuffer.allocate(1024);while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {if (buffer.position() > 0)break;}if (buffer.position() == 0)continue;buffer.flip();byte[] content = new byte[buffer.limit()];buffer.get(content);System.out.println("收到消息,来自:"+ socketChannel.getRemoteAddress());System.out.println(new String(content));String res = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";ByteBuffer byteBuffer = ByteBuffer.wrap(res.getBytes());while (byteBuffer.hasRemaining()) {socketChannel.write(byteBuffer);}// 取消时间订阅next.cancel();}}}}
}