BIO(同步阻塞)
利用网络连接传输数据为例:
服务端单线程
服务端只有一个主线程处理客户端的连接和读写处理,此时如果有第二个客户端欲连接并发送消息服务端是接收不到的。
因为读写和等待accept连接都是阻塞的。
sever端代码:
ServerSocket serverSocket = new ServerSocket(9000);while (true) {System.out.println("等待连接。。");//阻塞方法Socket clientSocket = serverSocket.accept();System.out.println("有客户端连接了。。");handler(clientSocket);}
client端代码:
Socket socket = new Socket("127.0.0.1", 9000);//向服务端发送数据socket.getOutputStream().write("HelloServer".getBytes());socket.getOutputStream().flush();System.out.println("向服务端发送数据结束");byte[] bytes = new byte[1024];//接收服务端回传的数据socket.getInputStream().read(bytes);System.out.println("接收到服务端的数据:" + new String(bytes));socket.close();
服务端多线程
服务端使用主线程接受客户端的accept连接并建立连接关系,利用其他线程处理读写操作,实现连接和处理任务分离。使得主线程可以在while中不断处理客户端的连接请求,而利用多个子线程处理多个客户端的读写处理。
改写服务端代码:
ServerSocket serverSocket = new ServerSocket(9000);while (true) {System.out.println("等待连接。。");//阻塞方法Socket clientSocket = serverSocket.accept();System.out.println("有客户端连接了。。");new Thread(new Runnable() {@Overridepublic void run() {try {handler(clientSocket);} catch (IOException e) {e.printStackTrace();}}}).start();}
缺点:高并发场景下每一个客户端都对应生成一个线程一对一处理,并且对于只连接没有读写操作的客户端会持续阻塞线程资源,比如C10K问题,太耗费资源
NIO(非阻塞)
没有selector的笨蛋式轮询server
服务端的channel等待客户端的accept非阻塞,读read也是非阻塞。采用两个while循环单线程不断地去看看有没有客户端连接请求,将这些socketChannel加入一个集合中,不断地从该集合遍历socketChannel是否有读写操作,read也是非阻塞。所以没有读的需求就继续遍历下一个socketChannel。如果中途有新的client加入或者有read需求,那就下一次轮询的时候会处理。
// 保存客户端连接static List<SocketChannel> channelList = new ArrayList<>();public static void main(String[] args) throws IOException {// 创建NIO ServerSocketChannel,与BIO的serverSocket类似ServerSocketChannel serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(9000));// 设置ServerSocketChannel为非阻塞serverSocket.configureBlocking(false);System.out.println("服务启动成功");while (true) {// 非阻塞模式accept方法不会阻塞,否则会阻塞// NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数SocketChannel socketChannel = serverSocket.accept();if (socketChannel != null) { // 如果有客户端进行连接System.out.println("连接成功");// 设置SocketChannel为非阻塞socketChannel.configureBlocking(false);// 保存客户端连接在List中channelList.add(socketChannel);}// 遍历连接进行数据读取Iterator<SocketChannel> iterator = channelList.iterator();while (iterator.hasNext()) {SocketChannel sc = iterator.next();ByteBuffer byteBuffer = ByteBuffer.allocate(128);// 非阻塞模式read方法不会阻塞,否则会阻塞int len = sc.read(byteBuffer);// 如果有数据,把数据打印出来if (len > 0) {System.out.println("接收到消息:" + new String(byteBuffer.array()));} else if (len == -1) { // 如果客户端断开,把socket从集合中去掉iterator.remove();System.out.println("客户端断开连接");}}}}
缺点:此时是对所有的client无差别进行轮询,只要是连接了的client,就会对其进行内层while遍历看是否能read。和BIO的多线程问题有点相似之处。
就是对那些不经常发消息的客户端 给了不应该有的平等的对待~hhh。想想100万个client里其实99万都是僵尸,只有1万会说话,那我的服务端不应该每次轮询都要轮询这100万个client!
正确的操作应该是实现事件驱动的轮询处理,对那些不会动的僵尸们,采用冷漠忽视的态度!
那应该如何做呢?
NIO其实已经实现了!那就是IO多路复用器,也就是selector。
事件驱动的server
由于有了selector,我们只需要将serverSocket注册到selector上面,对于server Socket身上的收发事件(包括了accept),selector就能监听到。
server端创建serverSocket代码和上面的笨蛋式代码一致:
// 创建NIO ServerSocketChannelServerSocketChannel serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(9000));// 设置ServerSocketChannel为非阻塞serverSocket.configureBlocking(false);
但是紧接着创建一个selector,并注册server Socket,并关注其身上的accept事件。一旦channel注册成功,会返回一个绑定的key(下面代码中的selectionKey)并将其存进selector中的selectedKeys集合中:
// 打开Selector处理Channel,即创建epollSelector selector = Selector.open();// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
同样是通过while不断地使用selector监听各个channel,selector.select会阻塞!直到有事件触发才会继续往下走。
while (true) {// 阻塞等待需要处理的事件发生selector.select();
当有一个client试图连接server时,server的serverSocketChannel会触发事件响应,此时selector.select会监听到事件响应,程序继续进行,从selector的selectorKeys中遍历key:
while (true) {// 阻塞等待需要处理的事件发生selector.select();// 获取selector中注册的全部事件的 SelectionKey 实例Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();// 遍历SelectionKey对事件进行处理while (iterator.hasNext()) {巴拉巴拉。。。。。}
此时触发的是accept类型事件,进入该处理分支,并通过key得到绑定的channel,即socketServerChannel,并accept客户端的连接,建立起该客户端的socketChannel,并将其注册到selector关注读事件:
while (iterator.hasNext()) {SelectionKey key = iterator.next();// 如果是OP_ACCEPT事件,则进行连接获取和事件注册if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel socketChannel = server.accept();socketChannel.configureBlocking(false);// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("客户端连接成功");
然后程序遍历结束,回到最外层while,再次阻塞在selector.select,等待下一次连接或者读写事件。此时连接的client发送消息给server,打破阻塞,进入内层selectedKeys遍历,这次走的是read事件响应,从socketChannel读出消息数据。
// 遍历SelectionKey对事件进行处理while (iterator.hasNext()) {SelectionKey key = iterator.next();// 如果是OP_ACCEPT事件,则进行连接获取和事件注册if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel socketChannel = server.accept();socketChannel.configureBlocking(false);// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("客户端连接成功");} else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);int len = socketChannel.read(byteBuffer);// 如果有数据,把数据打印出来if (len > 0) {System.out.println("接收到消息:" + new String(byteBuffer.array()));} else if (len == -1) { // 如果客户端断开连接,关闭SocketSystem.out.println("客户端断开连接");socketChannel.close();}}//从事件集合里删除本次处理的key,防止下次select重复处理iterator.remove();}
selector继续阻塞直到下一次事件触发…
流程如下:
底层实现原理和源码
JDK1.4之前都是通过Linux内核函数select或者poll去轮询所有channel查看哪些有事件,JDK1.4之后引入selector,底层实现采用了Linux的epoll函数,实现了将有事件的channel主动放入就绪事件列表。
selector底层使用epoll_create函数创建了一个epoll对象:
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd;
}
对应的将channel注册到selector中底层是将channel的文件描述符添加进epoll的一个包装数组pollWrapper(底层并没有真正和epoll fd绑定):
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
protected void implRegister(SelectionKeyImpl ski) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;int fd = Integer.valueOf(ch.getFDVal());fdToKey.put(fd, ski);pollWrapper.add(fd);keys.add(ski);}
然后selector.select对应底层源码为:
protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}
其中pollWrapper包装数组会进行轮询poll:
int poll(long timeout) throws IOException {updateRegistrations();updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;}
其中updateRegistrations函数会使用epoll_ctl对每一个channel fd绑定给epoll fd并监听事件:
epollCtl(epfd, opcode, fd, events);
如果有事件响应,其实是操作系统的硬中断感知并把channel fd放进ready list。
上述的epollWait底层是epoll_wait函数,会从ready list(操作系统维护)里面去看有没有就绪事件如果有就放入selectedKeys去。
总结:
epoll_create创建epoll对象,epoll_ctl实现真正注册,epoll_wait实现监听并将硬中断事件的channel fd放入ready list
对比select,poll和epoll
redis线程模型
基于epoll的NIO线程模型实现。
netty
简化NIO,进一步对其封装为异步非阻塞。不在AIO上封装是因为Linux底层还是用epoll模型实现AIO但是异步没有优化好。
AIO(异步)
NIO2.0(对NIO进行封装,不需要轮询ready list处理事件,而是响应式编程采用回调函数直接主动处理),NIO的select,accept和read等等都是主线程自己做的,AIO不是,AIO的accept和read等都是采用了回调函数,并且是不同的线程处理
final AsynchronousServerSocketChannel serverChannel =AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {@Overridepublic void completed(AsynchronousSocketChannel socketChannel, Object attachment) {try {System.out.println("2--"+Thread.currentThread().getName());// 再此接收客户端连接,如果不写这行代码后面的客户端连接连不上服务端serverChannel.accept(attachment, this);System.out.println(socketChannel.getRemoteAddress());ByteBuffer buffer = ByteBuffer.allocate(1024);socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {System.out.println("3--"+Thread.currentThread().getName());buffer.flip();System.out.println(new String(buffer.array(), 0, result));socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));}@Overridepublic void failed(Throwable exc, ByteBuffer buffer) {exc.printStackTrace();}});} catch (IOException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});System.out.println("1--"+Thread.currentThread().getName());Thread.sleep(Integer.MAX_VALUE);
有三个线程异步非阻塞处理。