NIO通信架构图
1.Client
NioClient
package nio;import constant.Constant;import java.io.IOException;
import java.util.Scanner;public class NioClient {private static NioClientHandle nioClientHandle;public static void start() {nioClientHandle = new NioClientHandle(Constant.DEFAULT_SERVER_IP, Constant.DEFAULT_PORT);new Thread(nioClientHandle, "Client").start();}// 向服务器发送消息public static boolean sendMsg(String msg) throws IOException {nioClientHandle.sendMsg(msg);return true;}public static void main(String[] args) throws IOException {start();Scanner scanner = new Scanner(System.in);while (NioClient.sendMsg(scanner.next()));}
}
NioClientHandle
package nio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class NioClientHandle implements Runnable {private String host;private int port;private volatile boolean started;private Selector selector;private SocketChannel socketChannel;public NioClientHandle(String ip, int port) {this.host = ip;this.port = port;try {// 创建选择器的实例selector = Selector.open();// 创建ServerSocketChannel的实例socketChannel = SocketChannel.open();// 设置通道为非阻塞模式socketChannel.configureBlocking(false);started = true;} catch (Exception e) {e.printStackTrace();}}public void stop() {started = false;}@Overridepublic void run() {try {doConnect();} catch (Exception e) {e.printStackTrace();System.exit(1);}// 循环遍历selectorwhile (started) {try {// 无论是否有读写事件发生,selector每隔1s被唤醒一次selector.select(1000);// 获取当前有哪些事件可以使用Set<SelectionKey> keys = selector.selectedKeys();// 转换为迭代器Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();// 我们必须受限将处理过的SelectionKey从选定的集合中删除// 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键// 出现,这会导致我们尝试再次处理它it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null) {key.channel().close();}}}}} catch (Exception e) {e.printStackTrace();System.exit(1);}}// selector关闭后会自动释放里面管理的资源if (selector != null) {try {selector.close();} catch (Exception e) {e.printStackTrace();}}}/*** 具体的事件处理方法* @param key*/private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 获得关心当前事件的channelSocketChannel sc = (SocketChannel)key.channel();// 连接事件if (key.isConnectable()) {if (sc.finishConnect()) {socketChannel.register(selector, SelectionKey.OP_READ);} else {System.exit(1);}}// 有数据可读事件if (key.isReadable()) {// 创建ByteBuffer,并开启一个1M的缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);// 读取数据码流 返回读取到的字节数int readBytes = sc.read(buffer);// 读取到字节 对字节进行编解码if (readBytes > 0) {//将缓冲区当前的limit设置为position, position = 0;// 用于后续对缓冲区的读取操作buffer.flip();// 根据缓冲区可读字节数创建字节数组byte[] bytes = new byte[buffer.remaining()];// 将缓冲区可读字节数组复制到新建的数组中buffer.get(bytes);String result = new String(bytes, "UTF-8");System.out.println("客户端收到消息:" + result);} else if (readBytes < 0) {key.cancel();sc.close();}}}}public void sendMsg(String msg) throws IOException {doWrite(socketChannel, msg);}private void doWrite(SocketChannel socketChannel, String request) throws IOException {// 将消息编码为字节数组byte[] bytes = request.getBytes();// 根据数组容量创建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);// 将字节数组复制到缓冲区writeBuffer.put(bytes);// flip操作writeBuffer.flip();// 发送缓冲区的字节数组// 关心事件和读写网络不冲突socketChannel.write(writeBuffer);}private void doConnect() throws IOException {// 非阻塞的连接if (socketChannel.connect(new InetSocketAddress(host, port))) {socketChannel.register(selector, SelectionKey.OP_READ);} else {socketChannel.register(selector, SelectionKey.OP_CONNECT);}}
}
2.Server
NioServer
package nio;import constant.Constant;public class NioServer {private static NioServerHandle nioServerHandle;public static void main(String[] args) {nioServerHandle = new NioServerHandle(Constant.DEFAULT_PORT);new Thread(nioServerHandle, "Server").start();}
}
NioServerHandle
package nio;import constant.Constant;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class NioServerHandle implements Runnable {private volatile boolean started;private ServerSocketChannel serverSocketChannel;private Selector selector;public NioServerHandle(int port) {try {// 创建选择器实例selector = Selector.open();// 创建ServerSocketChannel的实例serverSocketChannel = ServerSocketChannel.open();// 设置通道为非阻塞模式serverSocketChannel.configureBlocking(false);// 绑定端口serverSocketChannel.socket().bind(new InetSocketAddress(port));// 注册事件,表示关心客户端连接serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// started = true;System.out.println("服务器已启动, 端口号为:" + port);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void run() {while (started) {try {// 获取当前有哪些事件selector.select(1000);// 获取事件的集合Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 我们必须首先将处理过的SelectionKey 从选定的键集合中删除// 如果我们没有删除处理过的键,那么它仍然会在主集合中以// 一个激活的键出现,这回导致我们尝试再次处理它iterator.remove();handleInput(key);}} catch (Exception e) {}}}/*** 处理事件的发生* @param key*/private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 处理新接入的客户端的请求if (key.isAcceptable()) {// 获取关心当前事件的ChannelServerSocketChannel ssc = (ServerSocketChannel)key.channel();// 接受连接SocketChannel sc = ssc.accept();System.out.println("=========建立连接=========");sc.configureBlocking(false);// 关注读事件sc.register(selector, SelectionKey.OP_READ);}// 处理对端的发送的数据if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();// 创建ByteBuffer, 开辟一个缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);// 从通道里读取数据,然后写入bufferint readBytes = sc.read(buffer);if (readBytes > 0) {// 将缓冲区当前的limit设置为position, position = 0// 用于后续对缓冲区的读取操作buffer.flip();// 根据缓冲区可读字节数创建字节数组byte[] bytes = new byte[buffer.remaining()];// 将缓冲区可读字节数组复制到新建的数组中buffer.get(bytes);String message = new String(bytes, "UTF-8");System.out.println("服务器收到消息:" + message);// 处理数据String result = Constant.response(message);// 发送应答消息doWrite(sc, result);}}}}private void doWrite(SocketChannel sc, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer buffer = ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();sc.write(buffer);}
}
3.代码运行实例
先启动server再启动client