NIO基础
三大组件
Channel and Buffer
常用的只有ByteBuffer
Selector(选择器)
结合服务器的设计演化来理解Selector
多线程版设计
最早在nio设计出现前服务端程序的设计是多线程版设计
,即一个客户端对应一个socket连接,一个连接用一个线程处理,每个线程专管一个连接
由此造成的缺点有
- 内存占用高, 如果一个线程默认1m大小,1000个链接就是1g
- CPU在线程之间的上下文切换成本高
- 只适合连接数少的场景
线程池版设计
socketAPI工作在阻塞模式下,同一时刻内一个线程只能处理一个客户端的socket连接,切必须等待线程处理完成当前socket连接,且必须等断开旧的连接后才能处理新的socket连接,即使旧的连接没有任何读写请求,线程没有得到充分的利用
早期的Tomcat就采用的线程池版设计阻塞式io,比较适合http请求
Selector设计
Channel代表服务器和客户端连接,数据读写的通道,
将客户端连接服务器的各种事件操作通过Channel细致化,Selector是负责监听Channel请求的工具,将监听到的请求交给线程,如果Channel的流量太高,其他Channel会被搁置,所以只适合流量低连接数多的场景
ByteBuffer
基本使用
内存有限,缓存区不能跟文件一样大小增大,所以分多次读取
@Slf4j
public class AppTest {public static void main(String[] args) {// 获取fileChannel的方法 1.输入输出流 2.RandomAccessFiletry (FileChannel channel = new FileInputStream("data.txt").getChannel()) {// 准备缓冲区 获取一块内存,大小由allocate决定ByteBuffer buffer = ByteBuffer.allocate(10);while (true){//从Channel读取数据向buffer写入int len = channel.read(buffer);log.debug("读取到的字节数{}", len);if (len == -1){ //没有内容了break;}//打印buffer的内容buffer.flip();// 切换至读模式while (buffer.hasRemaining()) {// 是否还有未读数据byte b = buffer.get();log.debug("实际字节{}",(char) b);}buffer.clear();// 切换为写模式}} catch (IOException e) {}}
}
ByteBuffer结构
ByteBuffer有以下重要属性
- capacity(容量)
- position(读写指针)
- limit(读写限制)
常见方法
分配空间
获取数据
byte b = buffer.get();
byte b = buffer.get(1);// 不会改变position值
字符串转ByteBuffer方法
后两种会直接切换到读模式
Scattering Reads 分散读集中写
黏包、半包
public class TestByteBufferExam {public static void main(String[] args) {/*网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为Hello,world\nI'm zhangsan\nHow are you?\n变成了下面的两个 byteBuffer (黏包,半包)Hello,world\nI'm zhangsan\nHow are you?\n现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据*/ByteBuffer source = ByteBuffer.allocate(32);source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split2(source);source.put("w are you?\n".getBytes());//追加值split2(source);}private static void split(ByteBuffer source) {source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一条完整消息if (source.get(i) == '\n') {//不改变position值int length = i + 1 - source.position();// 把这条完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从 source 读,向 target 写for (int j = 0; j < length; j++) {target.put(source.get());//改变了position值}debugAll(target);}}source.compact();//未读完部分向前压缩并切换为写模式}
}
文件编程
重点是网络编程所以文件编程了解就行
FileChannel
传输数据
Path
Files
FilesWalkFileTree
更便捷的遍历文件的API
private static void m1() throws IOException {AtomicInteger dirCount = new AtomicInteger();AtomicInteger fileCount = new AtomicInteger();Files.walkFileTree(Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91"), new SimpleFileVisitor<Path>(){//进入目录@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println("进入目录====>"+dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}//操作文件@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return super.visitFile(file, attrs);}//退出目录@Overridepublic FileVisitResult postVisitDirectory(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println("dir count:" +dirCount);System.out.println("file count:" +fileCount);
}
网络编程
阻塞模式
阻塞模式下,服务器每次只能执行一个连接,单线程模式运行,必须等待客户端连接和客户端信息,否则阻塞,线程停止运行等待客户端,资源效率利用低
@Slf4j
public class Server {public static void main(String[] args) throws IOException {//使用nio来理解阻塞模式,单线程// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建的了服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept建立与客户端连接,socketChannel用来与客户端之间通信log.debug("connecting...");SocketChannel sc = ssc.accept(); // 如果客户端没启动,将会阻塞在这里等待客户端启动连接的请求log.debug("connected... {}", sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接受客户端发生的数据log.debug("before read... {}",channel);channel.read(buffer);//如果客户端没有数据返回,将会阻塞等待客户端返回数据buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read... {}",channel);}}}
}-------public class Client {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));System.out.println("waiting...");}
}
非阻塞模式
非阻塞模式下,不管客户端是否有链接或返回数据,线程都不会停止,但会一直轮训尝试获取连接和数据,资源浪费
@Slf4j
public class Server {public static void main(String[] args) throws IOException {//使用nio来理解阻塞模式,单线程// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建的了服务器ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//非阻塞模式// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();//非阻塞模式下 循环一直进行,轮训等待连接,还是有资源浪费while (true) {SocketChannel sc = ssc.accept(); // 非阻塞模式,如果没有客户端连接,sc返回null 线程继续运行if (sc != null){// 4. accept建立与客户端连接,socketChannel用来与客户端之间通信log.debug("connected... {}", sc);channels.add(sc);}for (SocketChannel channel : channels) {// 5. 接受客户端发生的数据log.debug("before read... {}",channel);int read = channel.read(buffer);// 非阻塞,线程继续运行,如果没有返回数据read返回0if (read > 0){buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read... {}",channel);}}}}
}
Selector
Selector底层有两个keys的集合,keys表示注册在selector中的Channel集合,selectedKeys表示上述Channel参数的事件集合,selectedKeys集合中的事件必须及时消费处理,并及时移除,否则selector会认为该事件未处理造成重复处理死循环
accept
serverSocket独有事件,客户端发起连接请求就会触发此事件connect
是客户端连接建立后触发的事件read
数据可读事件write
可写事件
第三步,把Channel注册在selector
中时,在内部注册SelectionKey的set集合,它存储了各个事件key,当事件被处理时应当从SelectionKey集合中移除它,否则有key而没有待处理的事件会报错空指针,所以使用Iterator
迭代器循环,能移除循环到的key
@Slf4j
public class Server {public static void main(String[] args) throws IOException {// 1. 创建 Selector,管理多个 ChannelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立Selector和Channel的联系(注册)// selectionKey 绑定一个事件SelectionKey sscKey = ssc.register(selector, 0, null);// 这个key绑定accept事件,只关注accept事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("register key:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select方法,没有事件发生则线程阻塞,有事件发生则线程恢复运行// select 在事件未处理时不会阻塞,事件发生后要么取消要么处理,不能置之不理selector.select();// 4. 处理事件,selectKeys 内部包含了所有发生的事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();//处理key时,要从selectKeys集合中删除,否则有key而没有事件则会报错空指针iterator.remove();log.debug("key: {}", key);// 5. 区分事件类型if (key.isAcceptable()) {//如果是链接ServerSocketChannel channel = (ServerSocketChannel) key.channel();//若果不处理链接,则第三步 selector 会认为事件未处理,未完成连接的消费,不会阻塞SocketChannel sc = channel.accept();sc.configureBlocking(false);SelectionKey scKey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);log.debug("scKey:{}", scKey);} else if (key.isReadable()) {//如果是 readSocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);channel.read(buffer);buffer.flip();debugRead(buffer);}//key.cancel(); 取消事件}}}
}
处理客户端断开
如果客户端异常断开,在catch块中删除key,因为无论是否有读数据,都会产生read返回值,正常断开的返回值是-1
消息边界问题
当Buffer小于消息长度时,消息接受不完整会造成消息乱码或失效
可能会出现的情况,10字节大小的Buffer在接受13字节大小的消息时,只打印了后3个字节的的消息,因为当Buffer一次读没有接受完,服务器会自动触发两次读事件,将没有读完的内容再次读取,第一次读到Buffer就被覆盖了。
Buffer应该重新设计成,可自动扩容和非局部变量
附件与扩容
在向selector中注册keys时,可以为此key添加att参数,即附件参数,附件参数在双方交互时都可以获取修改
处理内容过多问题
如果发生数据大于Buffer 缓冲区,会触发多次读事件,这时如果有其他的客户端事件是处理不了的,相当于被一个客户端连接阻塞了,一直在处理那个客户端的内容发送,因为发送缓冲区被占满暂时无法发送,这时要充分利用可以选择去读
服务端的写操作会尝试写入尽可能多的字节,但是写入缓冲区满了是无法写的,这时候write返回0,我们优化让他关注可写事件,不要做无谓的尝试,可写的时候再写入。
最终改造
服务器在向客户端第一次写操作没写完时,给当前只关注写操作SelectionKey事件追加加上 读操作事件的值,这样大数据量一次没写完,服务器自动触发的写操作可以被捕获,能写操作再写操作,不会因为缓冲区满产生不必要的写操作造成阻塞
public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8088));while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey scKey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);// 1. 向客户端发送大量数据StringBuilder sb = new StringBuilder();for (int i = 0; i < 5000000; i++) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());// 2. 返回代表实际写入的字节数int write = sc.write(buffer);System.out.println(write);// 3. 判断是否有剩余字节未写完if (buffer.hasRemaining()) {// 4. 关键改造,让客户端关注可写事件,能写入时再写入 在原有关注数值上加上对应可写事件的数值常量scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);// 5. 把未写完的数据挂到scKey的附件上scKey.attach(buffer);}} else if (key.isWritable()) {//关注写事件ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);System.out.println(write);// 6. 清理操作if (!buffer.hasRemaining()) {// 清理Buffer 将附件设为空key.attach(null);// 写完毕 不再需要关注写事件key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);}}}}}
}
总结
多线程优化
boss线程的selector只处理连接事件,worker线程的selector则专门处理读写事件
线程队列优化
boss线程内调用 worker 的 register方法,其还是由boss线程调用,只有run方法中的方法才回另开线程,实现了 boss线程的selector只处理连接事件,worker线程的selector则专门处理读写事件。
其中利用线程队列的方法延迟select的注册,因为队列中的sc注册只消费一次,所以完成注册后select只监听read事件
@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 非阻塞模式Selector bossSelect = Selector.open();SelectionKey bossKey = ssc.register(bossSelect, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8008));// 1. 创建固定数量的workerWorker worker = new Worker("worker-0");while (true) {log.debug("1 bossSelect before ...,{}", bossSelect);bossSelect.select(); // select 方法 没有事件则阻塞log.debug("2 bossSelect after ...,{}", bossSelect);Iterator<SelectionKey> iterator = bossSelect.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("3 connected ...,{}", sc.getRemoteAddress());// 2. 关联selectorlog.debug("4 before ...,{}", sc.getRemoteAddress());worker.register(sc);// 初始 selector 启动 worker 线程log.debug("5 after ...,{}", sc.getRemoteAddress());}}}}static class Worker implements Runnable {private Thread thread;private Selector workerSelect;private String name;private volatile boolean start = false;//还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}public void register(SocketChannel sc) throws IOException {if (!start) {thread = new Thread(this, name);thread.start();workerSelect = Selector.open();start = true;}// 向队列添加任务,但这个任务没有立刻执行 bossqueue.add(()->{log.debug("6 sc register ...,{}", workerSelect);try {sc.register(workerSelect, SelectionKey.OP_READ, null);// 在select阻塞时会注册失败} catch (ClosedChannelException e) {throw new RuntimeException(e);}});workerSelect.wakeup();// wakeup方法是一次性方法,无论在select阻塞前后执行 都会唤醒一次select让select不被阻塞}@Overridepublic void run() {while (true) {try {log.debug("7 workerSelect before ...,{}", workerSelect);workerSelect.select();Runnable task = queue.poll();if (task != null){log.debug("8 queue run ...,{}", workerSelect);task.run();}log.debug("9 workerSelect after...,{}", workerSelect);Iterator<SelectionKey> iterator = workerSelect.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("10 read ...,{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}
还可以去掉队列的操作,只调用select的唤醒
因为wakeup方法是一次性方法,无论在select阻塞前后执行 都会唤醒一次select让select不被阻塞
而客户端连接事件只在boss线程执行一次所以调用了一次register方法,而start方法在唤醒前执行了,此时可能阻塞了,而wakeup方法的特殊性,能保证select完成注册,所以只需一次唤醒一次select完成注册,即可保证客户端被正常注册,消息被正常接受而不会被先运行的workerSelect.select();
方法阻塞
多worker改造
worker开几个线程比较合适,若要充分发挥服务器的多核优势,可以设置至少物理核的数量
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
NIO vs BIO
stream vs channel
IO模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在)、异步非阻塞
零拷贝
切换了三次,数据复制了4次
直接内存第一次优化
DMA是一个独立的硬件。
零拷贝指不在Java JVM内存中进行拷贝动作
AIO
Async异步IO
由主线程处产生的异步线程叫守护线程,如果主线程结束了,那么守护线程也会结束,哪怕守护线程还在工作中
所以我们使主线程阻塞,等待异步返回结果
@Slf4j
public class AioFileChannel {public static void main(String[] args) throws IOException {try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {// 参数1 ByteBuffer// 参数2 读取的起始位置// 参数3 附件// 参数4 回调对象 CompletionHandlerByteBuffer buffer = ByteBuffer.allocate(16);log.debug("read begin...");channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Override // read 成功public void completed(Integer result, ByteBuffer attachment) {log.debug("read completed...{}", result);attachment.flip();debugAll(attachment);}@Override // read 失败public void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});log.debug("read end...");} catch (IOException e) {e.printStackTrace();}System.in.read();//使主线程阻塞不结束}
}