多人聊天室 NIO模型实现

NIO编程模型

在这里插入图片描述

  • Selector监听客户端不同的zhuangtai
  • 不同客户端触发不同的状态后,交由相应的handles处理
  • Selector和对应的处理handles都是在同一线程上实现的

I/O多路复用

在Java中,I/O多路复用是一种技术,它允许单个线程处理多个输入/输出(I/O)源,而不需要为每个I/O源创建一个线程。这种技术可以显著提高性能,因为它减少了线程创建和上下文切换的开销。I/O多路复用的核心思想是使用一个机制来监控多个I/O通道,一旦某个通道有数据可读或可写,就通知应用程序进行相应的操作。

NIO模型 + Selector监听通道 == 经典的I/O多路复用

同步式I/O和异步I/O概念及分类

概念:

  • 同步式I/O(Synchronous I/O)
    定义:在同步I/O模型中,当一个线程发起一个I/O请求时,它会阻塞,直到I/O操作完成。也就是说,线程会一直等待直到数据被读取或写入完毕。
    特点:阻塞性:线程在I/O操作完成之前不能执行其他任务。
    资源消耗:每个I/O操作都需要一个线程或进程,可能导致资源消耗较大,特别是在高并发场景下。
  • 异步I/O(Asynchronous I/O)
    定义:在异步I/O模型中,当一个线程发起一个I/O请求后,它不会被阻塞,而是可以继续执行其他任务。I/O操作在后台进行,当操作完成时,系统会通知发起请求的线程。
    特点:非阻塞性:线程不需要等待I/O操作完成,可以继续执行其他任务。
    并发性:可以提高系统的并发处理能力,适用于高并发场景。

分类:

  • BIO(Blocking I/O):
    类型:同步I/O。
    特点:在BIO模型中,当线程执行I/O操作时,如果数据还没有准备好,它会一直等待直到数据准备完成。在这个过程中,线程被阻塞,不能执行其他任务。
  • NIO(Non-blocking I/O):
    类型:非阻塞I/O,可以用于同步或异步操作。
    特点:NIO模型中的I/O操作是非阻塞的,这意味着当数据没有准备好时,线程可以立即返回,去做其他事情。NIO本身提供了非阻塞的能力,但是它既可以用于同步编程(通过在while循环中检查并处理I/O事件),也可以与异步I/O(如Java 7引入的NIO.2,也称为Asynchronous I/O)结合使用。
  • I/O多路复用(I/O Multiplexing):
    类型:同步I/O。
    特点:I/O多路复用模型允许单个线程监控多个I/O通道,但是当线程执行I/O操作时,如果数据没有准备好,线程仍然会被阻塞。最常见的I/O多路复用技术是select/poll系统调用。在Java中,可以通过Selector和Channel实现I/O多路复用。

总结:

  • NIO模型+Selector实现的I/O多路复用是同步式I/O,因为服务器端需要多次调用selector.select()来查看是否有新的事件发生。如果服务器端不通过多次调用selector.select(),也没有其他线程会通知主线程有新的事件发生,主线程就会持续阻塞。
  • AIO异步I/O则是当主线程查看发现没有新事件发生时立刻返回处理其他事件,当有新事件发生时主线程会被通知,并来处理。

ChatServer实现

package server;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;public class ChatServer {private static final int DEFAULT_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private ServerSocketChannel server;private Selector selector;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);//统一编码,解码方法private Charset charset = Charset.forName("UTF-8");//可以自定义服务器端的端口private int port;public ChatServer() {this(DEFAULT_PORT);}public ChatServer(int port) {this.port = port;}private void start() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.socket().bind(new InetSocketAddress(port));selector = Selector.open();//将ServerSocketChannel的Accept事件注册到selector上//一旦ServerSocketChannel接收到了客户端的连接请求,selector就会得知server.register(selector, SelectionKey.OP_ACCEPT);System.out.println("启动服务器, 监听端口:" + port + "...");while (true) {//有事件被触发了select()函数才会有返回selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {// 处理被触发的事件handles(key);}//清空集合,防止重复处理selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// ACCEPT事件 - 和客户端建立了连接if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel client = server.accept();client.configureBlocking(false);// 在selector上注册可能发生的Read事件client.register(selector, SelectionKey.OP_READ);System.out.println(getClientName(client) + "已连接");}// READ事件 - 客户端发送了消息else if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();String fwdMsg = receive(client);if (fwdMsg.isEmpty()) {// 客户端异常// 取消掉key的注册,以后不再响应Read的事件// selector的key注销掉以后通常搭配selector.wakeup(); (是个好习惯)立刻唤醒selector,判断当前发生的事件key.cancel();selector.wakeup();} else {System.out.println(getClientName(client) + ":" + fwdMsg);forwardMessage(client, fwdMsg);// 检查用户是否退出if (readyToQuit(fwdMsg)) {key.cancel();selector.wakeup();System.out.println(getClientName(client) + "已断开");}}}}private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {for (SelectionKey key: selector.keys()) {Channel connectedClient = key.channel();//如果遍历到了服务器的监听Socket,则跳过(不需要将消息转发给服务器)if (connectedClient instanceof ServerSocketChannel) {continue;}// key是否有效(key对应的Channel和selector都在运行没有关闭) && 不是发消息的客户端他自己if (key.isValid() && !client.equals(connectedClient)) {// 写Buffer前先将Buffer清空wBuffer.clear();// 写入Buffer消息wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));// 将Buffer从写状态反转成读状态wBuffer.flip();// 将Buffer中的数据写入通道中while (wBuffer.hasRemaining()) {((SocketChannel)connectedClient).write(wBuffer);}}}}private String receive(SocketChannel client) throws IOException {// 在每次新的读取前先把buffer清空rBuffer.clear();// 将channel中的信息读入rBuffer中,直到读不出文件while(client.read(rBuffer) > 0);// 将rBuffer从写模式从转换成读模式rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}private String getClientName(SocketChannel client) {return "客户端[" + client.socket().getPort() + "]";}private boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) {ChatServer chatServer = new ChatServer(7777);chatServer.start();}
}
  • 将Channel中的事件注册在Selector
  • 用Selector监控事件的发生,实现了在同一线程处理多个客户端输入
  • 极大提高了线程的使用效率,使得服务器端能够处理大量的客户端连接

实现ChatClient

ChatClient

package client;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;public class ChatClient {private static final String DEFAULT_SERVER_HOST = "127.0.0.1";private static final int DEFAULT_SERVER_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private String host;private int port;private SocketChannel client;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);private Selector selector;private Charset charset = Charset.forName("UTF-8");public ChatClient() {this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);}public ChatClient(String host, int port) {this.host = host;this.port = port;}public boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}private void start() {try {client = SocketChannel.open();client.configureBlocking(false);selector = Selector.open();client.register(selector, SelectionKey.OP_CONNECT);client.connect(new InetSocketAddress(host, port));while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {handles(key);}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException e) {// 用户正常退出} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// CONNECT事件 - 连接就绪事件if (key.isConnectable()) {SocketChannel client = (SocketChannel) key.channel();// 判断连接是否建立完全if (client.isConnectionPending()) {// 调用finishConnect方法正式建立连接client.finishConnect();// 创建一个新的线程来处理用户的输入new Thread(new UserInputHandler(this)).start();}// 将Read事件注册在selector上面client.register(selector, SelectionKey.OP_READ);}// READ事件 -  服务器转发消息else if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();String msg = receive(client);if (msg.isEmpty()) {// 服务器异常close(selector);} else {System.out.println(msg);}}}public void send(String msg) throws IOException {if (msg.isEmpty()) {return;}wBuffer.clear();wBuffer.put(charset.encode(msg));wBuffer.flip();while (wBuffer.hasRemaining()) {client.write(wBuffer);}// 检查用户是否准备退出if (readyToQuit(msg)) {close(selector);}}private String receive(SocketChannel client) throws IOException {rBuffer.clear();while (client.read(rBuffer) > 0);rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}public static void main(String[] args) {ChatClient client = new ChatClient("127.0.0.1", 7777);client.start();}
}
  • ChatClient仍然需要通过创建新的线程来处理用户输入

UserInputHanlder

package client;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class UserInputHandler implements Runnable {private ChatClient chatClient;public UserInputHandler(ChatClient chatClient) {this.chatClient = chatClient;}@Overridepublic void run() {try {// 等待用户输入消息BufferedReader consoleReader =new BufferedReader(new InputStreamReader(System.in));while (true) {String input = consoleReader.readLine();// 向服务器发送消息chatClient.send(input);// 检查用户是否准备退出if (chatClient.readyToQuit(input)) {break;}}} catch (IOException e) {e.printStackTrace();}}
}
  • UserInputHandler仍然需要阻塞式的等待用户的输入
  • 用户的输入延迟应非常小,所以线程必须时刻等待着用户的输入以便第一时间处理

总结

与用BIO模型实现的多人聊天室有什么区别

  • 使用Channel代替Stream
  • 使用Selector监控多条Channel
  • 可以在一个线程里处理多个Channel I/O

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/484703.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电商产品自动化测试实战—解锁高效测试新技能

在这个数字化时代&#xff0c;电子商务行业的竞争愈发激烈&#xff0c;产品品质和稳定性成为了企业赢得市场的关键。而高质量的测试工作&#xff0c;正是确保产品品质和稳定性的重要保障。为此&#xff0c;我们特别推出了一场电商产品自动化测试实战公开课&#xff0c;旨在帮助…

【JavaWeb后端学习笔记】Mybatis基础操作以及动态SQL(增、删、改、查)

Mybatis 0、环境准备0.1 准备数据库表emp&#xff1b;0.2 准备SpringBoot工程0.3 配置文件中引入数据库连接信息0.4 创建对应的实体类0.5 准备Mapper接口 1、MyBatis基础操作1.1 删除1.2 新增&#xff08;主键返回&#xff09;1.3 更新1.4 查询&#xff08;解决字段名与类属性名…

SpringBoot该怎么使用Neo4j - 优化篇

文章目录 前言实体工具使用 前言 上一篇中&#xff0c;我们的Cypher都用的是字符串&#xff0c;字符串拼接简单&#xff0c;但存在写错的风险&#xff0c;对于一些比较懒的开发者&#xff0c;甚至觉得之间写字符串还更自在快速&#xff0c;也确实&#xff0c;但如果在后期需要…

如何用AI生成胶片风格的场景图 - 实用教程

如何用AI生成胶片风格的场景图 - 实用教程 在这个教程中,我们将介绍如何使用Recraft AI生成复古胶片风格的场景图。通过简单的步骤,你就能创建出独特的复古风格图片。 成功案例展示 小红书爆火作品 11月22日,小红书博主"四月崔aprilchui"发布胶片风格的场景图…

在M3上面搭建一套lnmp环境

下载docker-desktop 官网下载docker-desktop 切换镜像源 {"builder": {"gc": {"defaultKeepStorage": "20GB","enabled": true}},"experimental": false,"registry-mirrors": ["https://docke…

思特奇亮相2024数字科技生态大会,以“智”谋新共赢AI新时代

12月3-5日,2024数字科技生态大会在广州琶洲广交会展馆D区盛大举行。大会以“AI赋能 共筑数字新生态”为主题,汇聚行业领军企业、创新型科技公司以及众多专家学者,共探数字经济时代未来发展新机遇。 作为中国电信长期重要的生态伙伴,思特奇受邀参会并亮相18.2号馆天翼AI展区,重点…

【全网最新】若依管理系统基于SpringBoot的前后端分离版本开发环境配置

目录 提前准备&#xff1a; 下载源代码 设置依赖 设置后台连接信息 运行后台 运行前端 安装npm依赖 启动前端 登录网页客户端 提前准备&#xff1a; 1、安装mysql 5以上就可以。 2、安装redis. 3、安装npm npm下载地址&#xff1a;https://nodejs.org/dist/v22.12…

远程游戏新体验!

在这个数字化的时代&#xff0c;游戏已经不仅限于家里的电视或书房的电脑了。远程游戏&#xff0c;也就是通过远程控制软件在不同地点操作游戏设备&#xff0c;给玩家带来了前所未有的自由和灵活性。RayLink远程控制软件&#xff0c;凭借其出色的性能和专为游戏设计的功能&…

【人工智能】Transformers之Pipeline(二十八):视觉问答(visual-question-answering)

​​​​​​​ 目录 一、引言 二、视觉问答&#xff08;visual-question-answering&#xff09; 2.1 概述 2.2 dandelin/ViLT 2.3 pipeline参数 2.3.1 pipeline对象实例化参数 2.3.2 pipeline对象使用参数 2.3.3 pipeline对象返回参数 2.4 pipeline实战 2.5 模型…

qt QPrinter详解

1、概述 QPrinter类是Qt框架中用于打印输出的绘图设备。它表示打印出来的一系列页面&#xff0c;并提供了一组附加功能来管理特定于设备的特性&#xff0c;比如方向和分辨率。QPrinter可以生成PDF文档&#xff0c;也可以将内容发送到打印机进行实际打印。它继承自QPagedPaintD…

AI开发: 知识图谱的初识,学会制作知识图谱- Python 机器学习

一、知识图谱的概念 知识图谱是一个通过图结构来表示和组织知识的工具&#xff0c;它将事物、概念和它们之间的关系以图的形式呈现出来&#xff0c;图中的节点代表实体&#xff08;比如人物、地点、事件等&#xff09;&#xff0c;而边代表这些实体之间的各种关系&#xff08;…

移动端登录注册界面样式,简洁切换

非常简洁的登录、注册界面模板&#xff0c;使用uni-app编写&#xff0c;直接复制粘贴即可&#xff0c;无任何引用&#xff0c;全部公开。 废话不多说&#xff0c;代码如下&#xff1a; login.vue文件 <template><view class"content"><view class&quo…

RTMP如何实现毫秒级延迟体验?

技术背景 在我们大多数音视频行业从业者的认知里&#xff0c;RTMP播放器的延迟通常可以做到2到3秒。实际上&#xff0c;在较为理想的网络环境和优化良好的系统设置下&#xff0c;RTMP播放器一样可以做到几百毫秒的延迟水平。今天就影响RTMP播放延迟的一些因素&#xff0c;做个…

Oracle数据库 用户管理模式下的冷备份与热备份

1. 用户管理模式下的冷备份 1.1. 通过数据库相关视图查询 查实例 select instance_name,version,status,archiver,database_status from v$instance; 查数据库 select dbid,name,log_mode from v$database; 查数据文件状态 select file_name,tablespace_name,status,o…

【k8s 深入学习之 event 聚合】event count累记聚合(采用 Patch),Message 聚合形成聚合 event(采用Create)

参考 15.深入k8s:Event事件处理及其源码分析 - luozhiyun - 博客园event 模块总览 EventRecorder:是事件生成者,k8s组件通过调用它的方法来生成事件;EventBroadcaster:事件广播器,负责消费EventRecorder产生的事件,然后分发给broadcasterWatcher;broadcasterWatcher:用…

浙江工业大学《2024年828自动控制原理真题》 (完整版)

本文内容&#xff0c;全部选自自动化考研联盟的&#xff1a;《浙江工业大学828自控考研资料》的真题篇。后续会持续更新更多学校&#xff0c;更多年份的真题&#xff0c;记得关注哦~ 目录 2024年真题 Part1&#xff1a;2024年完整版真题 2024年真题

AI开发:用模型来识别手写数字的完整教程含源码 - Python 机器学习

今天一起来学习scikit-learn 。 scikit-learn 是一个强大的 Python 机器学习库&#xff0c;提供多种分类、回归、聚类算法&#xff0c;适用于从数据预处理到模型评估的全流程。它支持简单一致的 API&#xff0c;适合快速构建和测试模型。 官方地址在这里&#xff0c;记得Mark…

【Docker】创建Docker并部署Web站点

要在服务器上创建Docker容器&#xff0c;并在其中部署站点&#xff0c;你可以按照以下步骤操作。我们将以Flask应用为例来说明如何完成这一过程。 1. 准备工作 确保你的服务器已经安装了Docker。如果没有&#xff0c;请根据官方文档安装&#xff1a; Docker 安装指南 2. 创…

cgo内存泄漏排查

示例程序&#xff1a; package main/* #include <stdlib.h> #include <string.h> #include <stdio.h> char* cMalloc() {char *mem (char*)malloc(1024 * 1024 * 16);return mem; } void cMemset(char* mem) {memset(mem, -, 1024 * 1024 * 16); } int arr…

在做题中学习(76):颜色分类

解法&#xff1a;三指针 思路&#xff1a;用三个指针&#xff0c;把数组划分为三个区域&#xff1a; for循环遍历数组&#xff0c;i遍历数组&#xff0c;left是0区间的末尾&#xff0c;right是2区间的开头&#xff0c;0 1 2区间成功被划分 而上面的图画是最终实现的图样&…