一、NIO简介
NIO 中的 N 可以理解为 Non-blocking,不单纯是 New,是解决高并发、I/O高性能的有效方式。
Java NIO 是Java1.4之后推出来的一套IO接口,NIO提供了一种完全不同的操作方式, NIO支持面向缓冲区的、基于通道的IO操作。
新增了许多用于处理输入输出的类,这些类都被放在java.nio包及子包下,并且对原java.io包中的很多类进行改写,新增了满足NIO的功能。
二、NIO VS BIO
2.1 BIO
BIO全称是Blocking IO,同步阻塞式IO,是JDK1.4之前的传统IO模型,就是传统的java.io包下面的代码实现。
Java BIO:服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如下图所示:
虽然此时服务器具备了高并发能力,即能够同时处理多个客户端请求了,但是却带来了一个问题,随着开启的线程数目增多,将会消耗过多的内存资源,导致服务器变慢甚至崩溃,NIO可以一定程度解决这个问题。
2.2 NIO
Java NIO: 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器(selector)上,多路复用器轮询到连接有I/O请求就进行处理。
一个线程中就可以调用多路复用接口(java中是select)阻塞同时监听来自多个客户端的IO请求,一旦有收到IO请求就调用对应函数处理,NIO擅长1个线程管理多条连接,节约系统资源。
三、NIO的核心实现
NIO 包含3个核心的组件:
- Buffer(缓冲区)
- Channel(通道)
- Selector(选择器)
关系图的说明:
- 每个 Channel 对应一个 Buffer。
- Selector 对应一个线程,一个线程对应多个 Channel。
- 该图反应了有三个 Channel 注册到该 Selector。
- 程序切换到那个 Channel 是由事件决定的(Event)。
- Selector 会根据不同的事件,在各个通道上切换。
- Buffer 就是一个内存块,底层是有一个数组。
- 数据的读取和写入是通过 Buffer,但是需要 flip() 切换读写模式,而 BIO 是单向的,要么输入流要么输出流。
3.1 Buffer(缓冲区)
缓冲区 Buffer 是 Java NIO 中一个核心概念,在NIO库中,所有数据都是用缓冲区处理的。
在读取数据时,它是直接读到缓冲区中的,在写入数据时,它也是写入到缓冲区中的,任何时候访问 NIO 中的数据,都是将它放到缓冲区中。
而在面向流I/O系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。
3.1.1 Buffer 数据类型
从类图中可以看到,7 种数据类型对应着 7 种子类,这些名字是 Heap 开头子类,数据是存放在 JVM 堆中的。
3.1.1.1 MappedByteBuffer
而 MappedByteBuffer 则是存放在堆外的直接内存中,可以映射到文件。
通过 java.nio 包和 MappedByteBuffer 允许Java程序直接从内存中读取文件内容,通过将整个或部分文件映射到内存,由操作系统来处理加载请求和写入文件,应用只需要和内存打交道,这使得IO操作非常快。
Mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。
只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝,对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用Mmap的方式其读/写的效率和性能都非常高,大家熟知的 RocketMQ 就使用了该技术。
3.1.2 Buffer数据流程
应用程序可以通过与 I/O 设备建立通道来实现对 I/O 设备的读写操作,操作的数据通过缓冲区 Buffer 来进行交互。
从 I/O 设备读取数据时:
1)应用程序调用通道 Channel 的 read() 方法;
2)通道往缓冲区 Buffer 中填入 I/O 设备中的数据,填充完成之后返回;
3)应用程序从缓冲区 Buffer 中获取数据。
往 I/O 设备写数据时:
1)应用程序往缓冲区 Buffer 中填入要写到 I/O 设备中的数据;
2)调用通道 Channel 的 write() 方法,通道将数据传输至 I/O 设备。
3.1.3 缓冲区核心方法
缓冲区存取数据的两个核心方法:
1)put() : 存入数据到缓冲区
- put(byte b):将给定单个字节写入缓冲区的当前位置
- put(byte[] src):将 src 中的字节写入缓冲区的当前位置
- put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
2)get() : 获取缓冲区的数据
- get() :读取单个字节
- get(byte[] dst):批量读取多个字节到 dst 中
- get(int index):读取指定索引位置的字节(不会移动 position)
3.1.4 缓冲区中的四个核心属性
- capacity : 容量,表示缓冲区中最大存储数据的容量。一旦声明不能更改。
- limit : 界限,表示缓冲区中可以操作数据的大小。(limit 后的数据不能进行读写)
- position : 位置,表示缓冲区中正在操作数据的位置。
- mark : 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置。
【注】:0 <= mark <= position <= limit <= capacity
3.1.5 直接缓冲区与非直接缓冲区
3.1.5.1 非直接缓冲区
通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存之中。
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
对上图的解释:
应用程序和磁盘之间想要传输数据,是没有办法直接进行传输的。操作系统出于安全的考虑,会经过上图几个步骤。例如,我应用程序想从磁盘中读取一个数据,这时候我应用程序向操作系统发起一个读请求,那么首先磁盘中的数据会被读取到内核地址空间中,然后会把内核地址空间中的数据拷贝到用户地址空间中(其实就是 JVM 内存中),最后再把这个数据读取到应用程序中来。
同样,如果我应用程序有数据想要写到磁盘中去,那么它会首先把这个数据写入到用户地址空间中去,然后把数据拷贝到内核地址空间,最后再把这个数据写入到磁盘中去。
3.1.5.2 直接缓冲区
通过 allocateDirect() 方法分配缓冲区,将缓冲区建立在物理内存之中。
对上图的解释:
直接用物理内存作为缓冲区,读写数据直接通过物理内存进行。
public static void test3() {// 分配直接缓冲区ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 判断是直接缓冲区还是非直接缓冲区System.out.println(byteBuffer.isDirect());
}
【注】:字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。
3.1.6 具体代码示例
public class TestBuffer {public static void test1() {String str = "abcde";//分配一个指定大小的缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(1024);System.out.println("---------allocate-----------");System.out.println(byteBuffer.capacity()); //1024System.out.println(byteBuffer.limit()); //1024System.out.println(byteBuffer.position()); //0//利用 put() 存入数据到缓冲区中byteBuffer.put(str.getBytes());System.out.println("---------put-----------");System.out.println(byteBuffer.capacity()); //1024System.out.println(byteBuffer.limit()); //1024System.out.println(byteBuffer.position()); //5//切换到读数据模式:flip()实际作用为位置翻转,position发生变化byteBuffer.flip();System.out.println("---------flip-----------");System.out.println(byteBuffer.capacity()); //1024System.out.println(byteBuffer.limit()); //5,limit 表示可以操作数据的大小,只有 5 个字节的数据给你读,所以可操作数据大小是 5System.out.println(byteBuffer.position()); //0,读数据要从第 0 个位置开始读//利用 get() 读取缓冲区中的数据byte[] dst = new byte[byteBuffer.limit()];byteBuffer.get(dst);System.out.println(new String(dst,0,dst.length));System.out.println("---------get-----------");System.out.println(byteBuffer.capacity()); //1024System.out.println(byteBuffer.limit()); //5,可以读取数据的大小依然是 5 个System.out.println(byteBuffer.position()); //5,读完之后位置变到了第 5 个//rewind() 可重复读byteBuffer.rewind(); //这个方法调用完后,又变成了读模式System.out.println("---------rewind-----------");System.out.println(byteBuffer.capacity()); //1024System.out.println(byteBuffer.limit()); //5System.out.println(byteBuffer.position()); //0//clear() 清空缓冲区,虽然缓冲区被清空了,但是缓冲区中的数据依然存在,只是出于"被遗忘"状态。意思其实是,缓冲区中的界限、位置等信息都被置为最初的状态了,所以你无法再根据这些信息找到原来的数据了,原来数据就出于"被遗忘"状态byteBuffer.clear();System.out.println("---------clear-----------");System.out.println(byteBuffer.capacity()); //1024System.out.println(byteBuffer.limit()); //1024System.out.println(byteBuffer.position()); //0}public static void test2() {String str = "abcde";ByteBuffer byteBuffer = ByteBuffer.allocate(1024);byteBuffer.put(str.getBytes());byteBuffer.flip();byte[] bytearray = new byte[byteBuffer.limit()];byteBuffer.get(bytearray,0,2);System.out.println(new String(bytearray,0,2)); //结果是 abSystem.out.println(byteBuffer.position()); //结果是 2//标记一下当前 position 的位置byteBuffer.mark();byteBuffer.get(bytearray,2,2);System.out.println(new String(bytearray,2,2)); //结果是 cdSystem.out.println(byteBuffer.position()); //结果是 4//reset() 恢复到 mark 的位置byteBuffer.reset();System.out.println(byteBuffer.position()); //结果是 2//判断缓冲区中是否还有剩余数据if (byteBuffer.hasRemaining()) {//获取缓冲区中可以操作的数量System.out.println(byteBuffer.remaining()); //结果是 3,上面 position 是从 2 开始的}}public static void main(String[] args) {
// test1();test2();}
}
3.2 Channel(通道)
Channel 是 NIO 的核心概念,它表示一个打开的连接,这个连接可以连接到 I/O 设备(例如:磁盘文件,Socket)或者一个支持 I/O 访问的应用程序,Java NIO 使用缓冲区和通道来进行数据传输。
(1)通道的主要实现类
在java.nio.channels.Channel 包,如下:
- FileChannel
- SocketChannel
- ServerSocketChannel
- DatagramChannel
(2)获取通道
- Java 针对支持通道的类提供了 getChannel() 方法
- 本地 IO :FileInputStream/FileOutputStream 、RandomAccessFile
- 网络 IO :Socket 、ServerSocket 、DatagramSocket
-
// 利用通道完成文件的复制(非直接缓冲区) FileInputStream fis = new FileInputStream("a.txt"); FileOutputStream fos = new FileOutputStream("b.txt"); // 获取通道 FileChannel fisChannel = fis.getChannel(); FileChannel foschannel = fos.getChannel();
- 在 JDK1.7 中的 NIO.2 针对各个通道提供了静态方法 open()
-
/*** 使用 open 方法来获取通道* 需要两个参数* 参数1:Path 是 JDK1.7 以后给我们提供的一个类,代表文件路径* 参数2:Option 就是针对这个文件想要做什么样的操作* --StandardOpenOption.READ :读模式* --StandardOpenOption.WRITE :写模式* --StandardOpenOption.CREATE :如果文件不存在就创建,存在就覆盖*/FileChannel inChannel = FileChannel.open(Paths.get("a.txt"), StandardOpenOption.READ);FileChannel outChannel = FileChannel.open(Paths.get("c.txt"), StandardOpenOption.WRITE,StandardOpenOption.READ, StandardOpenOption.CREATE);
-
- 在 JDK1.7 中的 NIO.2 的 Files 工具类的 newByteChannel() 方法
-
ByteChannel byteChannel = Files.newByteChannel(Paths.get(“1.jpg”),StandardOpenOption.READ);
-
(3)通道数据传输和内存映射文件
1)使用通道完成文件的复制(非直接缓冲区)
public static void test1() throws Exception {// 利用通道完成文件的复制(非直接缓冲区)FileInputStream fis = new FileInputStream("a.txt");FileOutputStream fos = new FileOutputStream("b.txt");// 获取通道FileChannel fisChannel = fis.getChannel();FileChannel foschannel = fos.getChannel();// 通道没有办法传输数据,必须依赖缓冲区// 分配指定大小的缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(1024);// 将通道中的数据存入缓冲区中while (fisChannel.read(byteBuffer) != -1) { // fisChannel 中的数据读到 byteBuffer 缓冲区中byteBuffer.flip(); // 切换成读数据模式// 将缓冲区中的数据写入通道foschannel.write(byteBuffer);byteBuffer.clear(); // 清空缓冲区}foschannel.close();fisChannel.close();fos.close();fis.close();
}
2)使用直接缓冲区完成文件的复制(内存映射文件)
//方式一:
public static void test2() throws Exception {// 使用直接缓冲区完成文件的复制(内存映射文件)/*** 使用 open 方法来获取通道* 需要两个参数* 参数1:Path 是 JDK1.7 以后给我们提供的一个类,代表文件路径* 参数2:Option 就是针对这个文件想要做什么样的操作* --StandardOpenOption.READ :读模式* --StandardOpenOption.WRITE :写模式* --StandardOpenOption.CREATE :如果文件不存在就创建,存在就覆盖*/FileChannel inChannel = FileChannel.open(Paths.get("a.txt"), StandardOpenOption.READ);FileChannel outChannel = FileChannel.open(Paths.get("c.txt"), StandardOpenOption.WRITE,StandardOpenOption.READ, StandardOpenOption.CREATE);/*** 内存映射文件* 这种方式缓冲区是直接建立在物理内存之上的* 所以我们就不需要通道了*/MappedByteBuffer inMapped = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());MappedByteBuffer outMapped = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());// 直接对缓冲区进行数据的读写操作byte[] dst = new byte[inMapped.limit()];inMapped.get(dst); // 把数据读取到 dst 这个字节数组中去outMapped.put(dst); // 把字节数组中的数据写出去inChannel.close();outChannel.close();
}//方式二:
public static void test3() throws Exception {/*** 通道之间的数据传输(直接缓冲区的方式)* transferFrom* transferTo*/FileChannel inChannel = FileChannel.open(Paths.get("a.txt"), StandardOpenOption.READ);FileChannel outChannel = FileChannel.open(Paths.get("d.txt"), StandardOpenOption.READ, StandardOpenOption.WRITE,StandardOpenOption.CREATE);inChannel.transferTo(0, inChannel.size(), outChannel);// 或者可以使用下面这种方式//outChannel.transferFrom(inChannel, 0, inChannel.size());inChannel.close();outChannel.close();
}
3.2.1 FileChannel类
本地文件IO通道,用于读取、写入、映射和操作文件的通道,使用文件通道操作文件的一般流程为:
(1)获取通道
文件通道通过 FileChannel 的静态方法 open() 来获取,获取时需要指定文件路径和文件打开方式。
// 获取文件通道
FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
(2)创建字节缓冲区
文件相关的字节缓冲区有两种,一种是基于堆的 HeapByteBuffer,另一种是基于文件映射,放在堆外内存中的 MappedByteBuffer。
// 分配字节缓存
ByteBuffer buf = ByteBuffer.allocate(10);
(3)读写操作
1)读取数据
一般需要一个循环结构来读取数据,读取数据时需要注意切换 ByteBuffer 的读写模式。
while (channel.read(buf) != -1){ // 读取通道中的数据,并写入到 buf 中buf.flip(); // 缓存区切换到读模式while (buf.position() < buf.limit()){ // 读取 buf 中的数据text.append((char)buf.get());}buf.clear(); // 清空 buffer,缓存区切换到写模式
}
2)写入数据
for (int i = 0; i < text.length(); i++) {buf.put((byte)text.charAt(i)); // 填充缓冲区,需要将 2 字节的 char 强转为 1 自己的 byteif (buf.position() == buf.limit() || i == text.length() - 1) { // 缓存区已满或者已经遍历到最后一个字符buf.flip(); // 将缓冲区由写模式置为读模式channel.write(buf); // 将缓冲区的数据写到通道buf.clear(); // 清空缓存区,将缓冲区置为写模式,下次才能使用}
}
(4)将数据刷出到物理磁盘
FileChannel#force(boolean metaData) 方法可以确保对文件的操作能够更新到磁盘。
channel.force(false);
(5)关闭通道
channel.close();
3.2.2 SocketChannel类
网络套接字IO通道,TCP协议,针对面向流的连接套接字的可选择通道(一般用在客户端)。
TCP 客户端使用 SocketChannel 与服务端进行交互的流程为:
// 1)打开通道,连接到服务端
SocketChannel channel = SocketChannel.open(); // 打开通道,此时还没有打开 TCP 连接
channel.connect(new InetSocketAddress("localhost", 9090)); // 连接到服务端
// 2)分配缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一个 1024 字节的缓冲区
// 3)配置是否为阻塞方式。(默认为阻塞方式)
channel.configureBlocking(false); // 配置通道为非阻塞模式
// 4)与服务端进行数据交互
......
// 5)关闭连接
channel.close(); // 关闭通道
3.2.3 ServerSocketChannel类
网络通信IO操作,TCP协议,针对面向流的监听套接字的可选择通道(一般用于服务端),流程如下:
// 1)打开一个 ServerSocketChannel 通道
ServerSocketChannel server = ServerSocketChannel.open(); // 打开通道
// 2)绑定端口
server.bind(new InetSocketAddress(9090)); // 绑定端口
// 3)阻塞等待连接到来,有新连接时会创建一个 SocketChannel 通道,服务端可以通过这个通道与连接过来的客户端进行通信。
//等待连接到来的代码一般放在一个循环结构中。
SocketChannel client = server.accept(); // 阻塞,直到有连接过来
// 4)通过 SocketChannel 与客户端进行数据交互
......
// 5)关闭 SocketChannel
client.close();
3.3 Selector(选择器)
Selector 类是NIO的核心类,Selector(选择器)选择器提供了选择已经就绪的任务的能力。
Selector会不断的轮询注册在上面的所有channel,如果某个channel为读写等事件做好准备,那么就处于就绪状态,通过Selector可以不断轮询发现出就绪的channel,进行后续的IO操作。
一个Selector能够同时轮询多个channel,这样,一个单独的线程就可以管理多个channel,从而管理多个网络连接,这样就不用为每一个连接都创建一个线程,同时也避免了多线程之间上下文切换导致的开销。
3.3.1 选择器使用
(1) 获取选择器
与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法 open() 来得到的。
Selector selector = Selector.open(); // 获取一个选择器实例
(2)获取可选择通道
能够被选择器监控的通道必须实现了 SelectableChannel 接口,并且需要将通道配置成非阻塞模式,否则后续的注册步骤会抛出 IllegalBlockingModeException。
// 打开 SocketChannel 并连接到本机 9090 端口
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
// 配置通道为非阻塞模式
socketChannel.configureBlocking(false);
(3)将通道注册到选择器
通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。
通道的注册通过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,如果需要关注该通道的多个 I/O 事件,可以传入这些事件类型或运算之后的结果。这些事件必须是通道所支持的,否则抛出 IllegalArgumentException。
与 Selector 一起使用时,Channel 必须处于非阻塞模式下。这意味着不能将 FileChannel 与 Selector 一起使用,因为 FileChannel 不能切换到非阻塞模式,而套接字通道都可以。
// 将套接字通过到注册到选择器,关注 read 和 write 事件
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
一共有以下四种事件:
- SelectionKey.OP_ACCEPT:服务端接收客户端连接事件
- SelectionKey.OP_CONNECT:客户端连接服务端事件
- SelectionKey.OP_READ:读事件
- SelectionKey.OP_WRITE:写事件
SelectionKey有四个方法连判断是否为某个事件,与上面的四种事件相对应:
- selectionKey.isAcceptable();
- selectionKey.isConnectable();
- selectionKey.isReadable();
- selectionKey.isWritable();
(4)轮询 select 就绪事件
通过调用选择器的 Selector.select() 方法可以获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,然后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。
while (selector.select() > 0){ // 轮询,且返回时有就绪事件Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合.......
}
有 3 种方式可以 select 就绪事件:
- select() :阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。
- select(long timeout) :阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。
- selectNode() :不阻塞,如果无就绪事件,则返回 0;如果有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。
(5)处理就绪事件
每次可以 select 出一批就绪的事件,所以需要对这些事件进行迭代。
selector.select(2000); //阻塞监听所有通道(2000ms)
//遍历就绪通道selectionKeys
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for(SelectionKey key : selectionKeys) {if(key.isWritable()) { // 可写事件if("Bye".equals( (line = scanner.nextLine()) )) {socketChannel.shutdownOutput();socketChannel.close();break;}buf.put(line.getBytes());buf.flip();socketChannel.write(buf);buf.compact();}
}
从一个 SelectionKey 对象可以得到:
- 就绪事件的对应的通道;
- 就绪的事件。通过这些信息,就可以很方便地进行 I/O 操作。
四、NIO测试代码案例
服务端代码:
//NIOServer
public static void main(String[] args) throws Exception{//创建ServerSocketChannel,-->> ServerSocketServerSocketChannel serverSocketChannel = ServerSocketChannel.open();InetSocketAddress inetSocketAddress = new InetSocketAddress(5555);serverSocketChannel.socket().bind(inetSocketAddress);serverSocketChannel.configureBlocking(false); //设置成非阻塞//开启selector,并注册accept事件Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select(2000); //阻塞监听所有通道(2000ms)//遍历就绪通道selectionKeysSet<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if(key.isAcceptable()) { //处理连接事件ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel socketChannel = server.accept(); // 获得和客户端连接的通道socketChannel.configureBlocking(false); //设置为非阻塞System.out.println("client:" + socketChannel.getLocalAddress() + " is connect");socketChannel.register(selector, SelectionKey.OP_READ); //注册客户端读取事件到selector} else if (key.isReadable()) { //处理读取事件SocketChannel channel = (SocketChannel) key.channel(); // 服务器可读取消息:得到事件发生的Socket通道ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 创建读取的缓冲区channel.read(byteBuffer);System.out.println("client:" + channel.getLocalAddress() + " send " + new String(byteBuffer.array()));}iterator.remove(); //事件处理完毕,要记得清除}}
}
客户端代码:
//NIOClient
public class NIOClient {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 5555);if(!socketChannel.connect(inetSocketAddress)) {while (!socketChannel.finishConnect()) {System.out.println("客户端正在连接中,请耐心等待");}}ByteBuffer byteBuffer = ByteBuffer.wrap("mikechen的互联网架构".getBytes());socketChannel.write(byteBuffer);socketChannel.close();}
}
五、基于NIO的Reactor单线程模式
优点:
- 模型简单,没有多线程,进程通信,竞争的问题,全部都在一个线程中完成。
缺点:
- 只有一个进程,无法发挥多核 CPU的性能,只能采取部署多个系统来利用多核CPU,但这样会带来运维复杂度;
- Handler 在处理某个连接上的业务时,整个进程无法处理其他连接的事件,很容易导致性能瓶颈。
结构为:
- Server :服务端
- Reactor :分发器
- Acceptor :处理连接请求
- Handler :处理读写请求
- Client :客户端
方案的具体步骤如下:
- Reactor对象通过 selector.select() 监控连接事件,收到事件后通过 dispatch() 进行分发 ;
- 如果是连接建立的事件,则交由 Acceptor 通过 accept() 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理;
- 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler来响应;
- Handler 会完成 :read -> 业务处理 -> send 的完整业务流程。
原理图示:
实现图示:
5.1 Server
package com.reactor1;public class Main {// Reactor就是对NIO封装,按照该功能分为不同类,对于开发者来说,只要按照Main类中的,// 新建TcpReactor对象,传入ip:port,调用run()方法就好了,底层就是一个nio good// Reactor 用于客户端连接和分发(acceptor和read/write)// Acceptor accept接收连接请求// Handler read接收请求 write写回给客户端// 三种模型都是这样// Reactor:把IO事件分配给对应的handler处理// Acceptor:处理客户端连接事件// Handler:处理非阻塞的任务public static void main(String[] args) {Reactor reactor = null;try {reactor = new Reactor(1333);reactor.run(); // 为什么设置里使用run 不是start 不需要实现Runnable接口,造成误解,// run()方法:// 是在主线程中执行方法,和调用普通方法一样;(按顺序执行,同步执行)// start()方法:// 是创建了新的线程,在新的线程中执行;(异步执行)} catch (Exception e) {e.printStackTrace();}}
}
5.2 Reactor
Reactor核心模块:该模块内部包含两个核心方法,select和dispatch,该模块负责监听就绪事件和对事件的分发处理。
package com.reactor1;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;public class Reactor implements Runnable {private final ServerSocketChannel serverSocketChannel; // 类变量// 三要素都要是类变量 selector--channnel--buffer(读写才要buffer,连接不需要buffer)private final Selector selector; // 类变量public Reactor(int port) throws Exception { //Reactor初始化// 初始五句话:初始化selector 初始化服务端channel 服务端channel绑定端口 服务端channel设置为非阻塞 服务端channel注册到selector上面,返回在selector中表示这个服务端channel的key// 最后一句:服务端这个channel去绑定一个服务端acceptor对象,用来接收请求的,参数为服务端channel和服务端selectorselector = Selector.open(); //打开一个Selector// 只有一个serverSocketChannelserverSocketChannel = ServerSocketChannel.open(); // 初始化服务端channel 直接静态方法新建//在ServerSocketChannel绑定监听端口serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 服务端channel绑定端口//设置ServerSocketChannel为非阻塞serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必须是非阻塞的//服务端channel注册到selector上面(最初给一个serverSocketChannel注册上去的事件都是accept),然后返回该通道的key SelectionKey.OP_ACCEPT 16SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //给定key一个附加的Acceptor对象selectionKey.attach(new Acceptor(serverSocketChannel, selector)); // 服务端channel去绑定一个服务端acceptor对象,用来接收请求的,参数为服务端channel和服务端selector}@Overridepublic void run() {while (!Thread.interrupted()) { // 当服务端线程没有被打断的时候System.out.println("Waiting for new event on port:" + serverSocketChannel.socket().getLocalPort() + "..."); // 在服务端端口号等待客户端连接int len=-1;try {// 就绪事件到达之前,阻塞,有客户端连接才通过,没有客户端连接,这里就是0,也可以通过唤醒来处理if ((len = selector.select()) == 0) { continue;}} catch (IOException e) {e.printStackTrace();}//取得所有已就绪事件的key集合(初始化的附带对象为Acceptor,在Acceptor的run()方法中还会附近Handler对象)Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectionKeys.iterator();while (it.hasNext()) {dispatch(it.next()); //根据事件的key进行分发调度it.remove(); // 每次删除一个,和Reactor之前的nio一样,就是对其封装一次}}}//调度方法private void dispatch(SelectionKey key) {//这里很关键,拿到每次selectKey里面附带的处理对象,然后调用其run,这个对象在具体的Handler里会进行创建,初始化的附带对象为Acceptor(看上面构造器)Runnable r = (Runnable) key.attachment(); // 三个绑定 服务端启动的使用绑定Acceptor 客户端连接的时候使用Handler// 客户端连接的时候绑定READ 客户端输入的时候使用// 客户端输入的时候绑定WRITE 输出给客户端的时候调用// 输出给客户端的时候绑定READ 下一次客户端输入的时候使用// 得到传递多来的selector中的key,所attach绑定的Runnable对象,就是Acceptor implements Runnableif (r != null) {r.run(); // 调用Acceptor的run()方法,没有新建子线程}}
}
5.3 Acceptor
这个模块只负责处理连接就绪事件,通过就绪事件可以拿到客户的SocketChannel,就可以继续完成接下来的读写任务了。
package com.reactor1;import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;//接受连接请求线程
public class Acceptor implements Runnable {private final ServerSocketChannel serverSocketChannel; private final Selector selector; public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {this.serverSocketChannel = serverSocketChannel; this.selector = selector;}@Overridepublic void run() {try {// 阻塞等待连接到来,有新连接时会创建一个 SocketChannel 通道,服务端可以通过这个通道与连接过来的客户端进行通信。SocketChannel socketChannel = serverSocketChannel.accept(); // 获得和客户端连接的通道System.out.println(socketChannel.getRemoteAddress().toString() + " is connected."); if (socketChannel != null) { // 不为空//设置成非阻塞socketChannel.configureBlocking(false);//SocketChannel向selector注册一个OP_READ事件,然后返回该通道的key//还是注册到同一个selector上面 等待输入SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); //注册读 因为服务端是先读后写//使一个阻塞住的selector操作立即返回// selector.wakeup(); //wakeup 唤醒selector中的其中一个,为什么要唤醒,在哪里阻塞,// selector.wakeup主要是为了唤醒阻塞在selector.select上的线程,在selector.select()后线程会阻塞//给定可以一个附加的TCPHandler对象selectionKey.attach(new Handler(selectionKey, socketChannel)); // 这个key attach一个TcpHandler// 传入两个参数 selectionKey是客户端channel注册到selector的时候获取到,客户端channel是连接得到的// Acceptor建立连接,TcpHandler就要处理读写请求了}} catch (Exception e) {e.printStackTrace();}}
}
5.4 Handler
TcpHandler类:也是一个实现 Runnable 接口的多线程类,用来处理客户端发过来的读写请求,包含构造方法、run() 方法、send() 方法、read() 方法、closeChannel() 方法和 process() 方法。
- 构造方法 :初始化selectionKey和客户端连接socketchannel ;
- run()方法 :是子线程具体逻辑,通过判断当前state的值决定服务端是读还是写 ;
- read()方法 :从客户端读就是调用,底层调用socketChannel.read()方法 ;
- send()方法 :写出到客户端就调用,底层调用socketChannel.write()方法 ;剩下两个,
- closeChannel()方法 :封装了selectionKey.cancel() 和 socketChannel.close() ,表示因服务异常引起的连接关闭操作;
- process()方法 :仅调用了线程休眠,模拟服务端处理耗时。
package com.reactor1;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;public class Handler implements Runnable {// 两个类变量,是在Accetpor的run方法中设置的,private final SelectionKey selectionKey; private final SocketChannel socketChannel;int state;// 传入两个参数 selectionKey是客户端channel注册到selector的时候获取到,客户端channel是连接得到的public Handler(SelectionKey selectionKey, SocketChannel socketChannel) {this.selectionKey = selectionKey;this.socketChannel = socketChannel;//初始的状态设定为Readingstate = 0; // state表示read还是send run用来判断 先读取客户端的数据,// 然后三步一体:将标志位修改为state=1,注册写事件,唤醒一个selector,执行run()方法,将数据写到客户端}@Overridepublic void run() { // 这个run()方法 逻辑就是读写try {if (state == 0) {//读取网络数据read(); //read send 向对应} else {//发送网络数据send(); // read send 相对应}} catch (Exception e) {e.printStackTrace();System.out.println("[Warning!] A client has been closed");closeChannel(); // 发生错误,关闭连接}}private void closeChannel() {try {selectionKey.cancel(); socketChannel.close(); // selectionKey是客户端channel注册到selector的时候获取到,客户端channel是连接得到的} catch (IOException e) {e.printStackTrace();}}private void send() throws Exception {String str = "Your message has sent to" + socketChannel.socket().getLocalSocketAddress().toString() + "\r\n"; //这是服务端要发送给客户端的响应//wrap自动把buf的position设为0,所以不需要在flip() good wrap和flip 就是要打印或者writeByteBuffer buf = ByteBuffer.wrap(str.getBytes());while (buf.hasRemaining()) {//回传给client回应字符串,发送buf的position位置到limit位置为止之间的内容socketChannel.write(buf); // 服务端使用channell来写,客户端使用两个stream读写,因为stream是单向的,// 服务端使用一个channel读写,因为channel是双向的,api层面既有channel.read,也有channel.write// 而且每次都是channel.write(buffer) channel.read(buffer) 所以 channel+buffer==stream selector是类变量,服务端全局选择// 服务端的channel和stream如何联系起来 不用联系,直接静态方法新建channel// 为什么分为服务端channel和客户端channel// 服务端channel TcpReactor构造函数中,serverSocketChannel = ServerSocketChannel.open();// 客户端channel Acceptor类中的run()方法,SocketChannel socketChannel = serverSocketChannel.accept(); // 启动服务端后阻塞在这里// 服务端channel是打开来给客户端连接的,客户端channel表示服务端获取到的一个客户端连接,表示服务端获取到的一个客户端连接 理解了 很重要 good// 所以,服务端channel是open 客户端channel是accept() 源码命名优美}// 写完成数据到客户端后,然后三步一体:将标志位修改为state=0,注册读事件,唤醒一个selector,执行run()方法,读取客户端的数据,就是一个循环//改变状态state = 0; // 改变状态为0,继续读取客户端的数据//通过key改变通道注册的事件 SelectionKey.OP_READ 1selectionKey.interestOps(SelectionKey.OP_READ); // 类变量selectionKey的感兴趣的操作设置为read 下一次使用 TCPHandlers selectedkey就是read//使一个阻塞住的selector操作立即返回selectionKey.selector().wakeup(); // 类变量selectionkey 得到这个key关联的selector,只有一个类变量selector,然后唤醒这个selector}private void read() throws Exception {//non-blocking下不可用Readers,因为Readers不支持non-blockingbyte[] arr = new byte[1024];ByteBuffer buffer = ByteBuffer.wrap(arr);//读取字符串int numBytes = socketChannel.read(buffer); // 服务端使用channel来读取,if (numBytes == -1) {System.out.println("[Warning!] A client has been closed.");closeChannel(); // 如果读完了,关闭连接,注意不是客户端发送-1,而是客户端发送exit// 因为客户端发送exit会导致出现client.close,这个时候服务端可以感应到return;}//将读取到的byte内容转为字符串类型String str = new String(arr);if ((str != null) && !str.equals(" ")) {//模拟逻辑处理process(); // 模拟逻辑处理System.out.println(socketChannel.socket().getRemoteSocketAddress().toString() + ">" + str); // 打印客户端ip:port 及其 数据// 然后三步一体:将标志位修改为state=1,注册写事件,唤醒一个selector,执行run()方法,将数据写到客户端//改变状态state = 1; // 已经读取完毕,就要修改状态,将将会结果发送给客户端//通过key改变通道注册的事件 SelectionKey.OP_WRITE 4selectionKey.interestOps(SelectionKey.OP_WRITE); // 类变量selectionKey的感兴趣的操作设置为write 下一次使用 TCPHandlers selectedkey就是write//使一个阻塞住的selector操作立即返回selectionKey.selector().wakeup(); // 类变量selectionkey 得到这个key关联的selector,只有一个类变量selector,然后唤醒这个selector}}void process() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}
}
5.5 Client
package com.reactor1;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;public class Client { // 客户端没什么好看的public static void main(String[] args) {String hostname = "127.0.0.1";int port = 1333; // 服务端ip:porttry {//连接至目的地Socket client = new Socket(hostname, port); // 建立socket连接System.out.println("连接至目的地:" + hostname); // 第一,启动之后这里打印一句PrintWriter out = new PrintWriter(client.getOutputStream()); // 源头时client.getOutputStream() 就是本地socket需要输出到服务端的东西BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); // 源头是client.getInputStream() 就是本地socket从服务端读取到的东西BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in)); // 源头时System.in 键盘输入String input;while ((input = stdIn.readLine()) != null) { //读取输入 输入不为空,循环不解释out.println(input); //发送输入的字符串 输入的东西发送给服务端out.flush();//强制将缓冲区的数据输出if (input.equals("exit")) {break; // 唯一退出条件}System.out.println("server:" + in.readLine()); // 打印从服务端读到的东西}client.close(); // 关闭socket连接 因为客户端发送exit会导致出现client.close,这个时候服务端可以感应到System.out.println("client stop."); // 打印一下关闭} catch (IOException e) {e.printStackTrace();}}
}
六、基于NIO的Reactor多线程模式
优点:
- 能够充分利用多核多 CPU的处理能力。
缺点:
- 多线程数据共享和访问比较复杂;
- Reactor 承担所有事件的监听和响应,只在主线程中运行,瞬间高并发时会成为性能瓶颈。
方案步骤:
- 主线程中,Reactor对象通过 selector.select() 监听连接事件,收到事件后通过 dispatch() 进行分发
- 如果是连接建立的事件,则由Acceptor处理,Acceptor通过 accept() 接受连接,并创建一个 Handler 来处理连接后续的各种事件。
- 如果不是连接建立事件,则Reactor会调用连接对应的Handler来进行响应;
- Handler 只负责响应事件,不进行业务处理,Handler 通过 read() 读取到数据后,会发给 processor() 进行业务处理;
- Processor 会在独立的子线程中完成真正的 业务处理,然后将响应结果发给主进程的 Handler处理,Handler 收到响应后通过 send() 将响应结果返回给 client。
原理图示:
实现图示:
说明:
- 有专门一个Reactor线程用于监听服务器 ServerSocketChannel,接收客户端的TCP连接请求;
- 网络IO的读/写操作等由一个 worker 线程池负责,由线程池中的NIO线程负责监听 SocketChannel事件,进行消息的读取,解码,编码和发送;
- 一个 NIO 线程可以同时处理 N条链路,但是一个链路只注册在一个NIO 线程上处理,防止发生并发操作问题。
七、基于NIO的Reactor主从多线程模式
方案说明:
- 主进程中 mainReactor 对象通过 select() 监控连接建立事件,收到事件后通过 Acceptor 接收,将新的连接分配给某个子进程;
- 子进程中的 subReactor 将 mainReactor 分配的连接加入连接队列进行监听,并创建一个 Handler 用于处理连接的各种事件;
- 当有新的事件发生时,subReactor 会调用连接对应的 Handler 来响应;
- Handler完成: read -> 业务处理 -> send 的完整业务流程。
特点:
- 主进程和子进程的职责非常明确,主进程只负责接收新连接,子进程负责完成后续的业务处理;
- 主进程和子进程的交互很简单,主进程只需要把新的连接传递给子进程,子进程无需返回数据;
- 子进程之间是相互独立的,无需同步共享之类的处理(这里仅限于网络模型相关的 select,read,send等无须同步共享,"业务处理"还是有可能需要同步共享的)。
原理图示:
实现图示: