本篇博客将进行NIO编程实战,实现一个简单聊天室。
我们来通过 SocketChannel 和 ServerSocketChannel 实现一个 0.1 版的聊天室,先说一下需求,比较简单,服务端启动监听客户端请求,当客户端向服务器端发送信息后,服务器端接收到后把客户端消息回显给客户端,比较呆瓜,但可以先来看一下。
来看服务端代码:
@Slf4j
public class ChatServer {private Selector selector;private ServerSocketChannel serverSocketChannel;private static final int PORT = 8080;public static void main(String[] args) {new ChatServer().start();}public ChatServer(){try {// 创建一个 ServerSocketChannel,并将其绑定到指定端口serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(PORT));// 设置为非阻塞模式serverSocketChannel.configureBlocking(false);// 创建一个Selector,并将ServerSocketChannel注册到它上面// 监听OP_ACCEPT事件(等待客户端连接)selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);log.info("聊天室服务端启动了:"+PORT);} catch (IOException e) {e.printStackTrace();}}public void start(){try {// 无限循环,等待连接while(true){// 等待已注册的通道中有事件发生if(selector.select()>0){Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){// 获取到发生事件的通道的SelectionKeySelectionKey key = iterator.next();iterator.remove();handleKey(key);}}}} catch (IOException e) {e.printStackTrace();}}// 判断SelectionKey的事件类型private void handleKey(SelectionKey key) throws IOException{// 如果是OP_ACCEPT事件,说明有新的客户端连接进来。// 接受新的连接,并将新连接的SocketChannel注册到 Selector上,监听OP_READ事件if(key.isAcceptable()){SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);log.info("客户端连接上了:"+socketChannel.getRemoteAddress());}else if(key.isReadable()){ // 如果是OP_READ事件,说明客户端发送了消息。读取客户端发送的消息,并将其返回给客户端SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int len = socketChannel.read(buffer);if(len>0){buffer.flip();String message = new String(buffer.array(),0,len);log.info("客户端说:"+message);String responseMsg = "服务端回复:"+message;socketChannel.write(ByteBuffer.wrap(responseMsg.getBytes()));}}}
}
接下来是客户端:
@Slf4j
public class ChatClient {private Selector selector;private SocketChannel socketChannel;private static final String HOST = "localhost";private static final int PORT = 8080;public static void main(String[] args) {new ChatClient().start();}public ChatClient(){try {selector = Selector.open();// 创建一个 SocketChannel,并连接到指定的服务器地址和端口socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT));socketChannel.configureBlocking(false); // 设置为非阻塞模式// 将SocketChannel注册到selector上面,监听OP_READ事件(等待接收服务器的消息)socketChannel.register(selector, SelectionKey.OP_READ);log.info("连接到聊天室了");} catch (IOException e) {e.printStackTrace();}}private void start() {// 启动一个新线程用于new Thread(()->{try {while(true){// 等待已注册的通道中有事件发生if(selector.select()>0){// 获取到发生事件的通道的 SelectionKeyfor(SelectionKey key:selector.selectedKeys()){selector.selectedKeys().remove(key);// 如果是 OP_READ 事件,说明服务器发送了消息。读取服务器发送的消息,并在控制台显示。if(key.isReadable()){readMessage();}}}}} catch (Exception e) {e.printStackTrace();}}).start();try(BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))){String input;while((input=reader.readLine())!=null){sendMessage(input);}}catch (IOException e){e.printStackTrace();}}private void sendMessage(String message) throws IOException{if(message!=null && !message.trim().isEmpty()){ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());socketChannel.write(buffer);}}private void readMessage() throws IOException{ByteBuffer buffer = ByteBuffer.allocate(1024);int len = socketChannel.read(buffer);if(len>0){buffer.flip();String message = new String(buffer.array(),0,len);log.info(message);}}
}