Java-NIO篇章(4)——Reactor反应器模式

前面已经讲过了Java-NIO中的三大核心组件Selector、Channel、Buffer,现在组件我们回了,但是如何实现一个超级高并发的socket网络通信程序呢?假设,我们只有一台内存为32G的Intel-i710八核的机器,如何实现同时2万个客户端高并发非阻塞通信?可能你会说不可能实现,答案是2万的并发可能都低估了,Redis单机通信20万的并发都是可以的,当然达到20万的并发对机器性能以及带宽都需要非常高的要求。那么就不得不引出今天讲解的Reactor反应器模式,它可以说是一种高并发网络编程中的设计模式,不包括在我们常说的23中设计模式之中。Netty网络框架、Nginx服务器、Reids缓存等大名鼎鼎的中间件都是基于Reactor反应器模式设计的,它就能提供超高并发的网络通信,我学过之后一直感叹这些大佬都是奇才,学这些思想精彩万分!下面具体进行介绍:

Reactor是什么?

Reactor就是一种网络编程的设计模式,如果不知道Reactor那么学Netty的时候会非常困难,因为很多概念就是Reactor,因此学会了Reactor在学Netty就非常简单。其次,懂得高并发中间件的网络通信设计的底层原理对提升自己的技术也是非常重要的,所以,学习像Netty这样的“精品中的精品”框架,肯定也是需要先从设计模式入手的。而Netty的整体架构,就是基于这个著名反应器模式。所以,学习和掌握反应器模式,对于开始学习高并发通信(包括Netty框架)的人来说,一定是磨刀不误砍柴工,况且很多中间件都是基于Netty来设计网络通信模块的。

思维风暴开启Reactor之路

好的,我们用一个例子开始讲解Reactor原理,假设你是Doug Lea,Java JUC包的作者, 也是Reactor设计模式的提出者之一。现在面临的一个问题就是现在的软件系统不能够满足日益增长的并发量,很多软件系统一旦人访问数多了要么卡死要么阻塞一段时间才有响应,用户体验非常差,现在公司提出了这个需求需要你解决。请你思考:

单线程阻塞模式

首先TCP网络通信需要先建立连接(三次握手)然后才可以传输数据,于是你写下了第一行解决的代码:

1 while(true){
2     socket = accept(); //阻塞,接收连接
3     handle(socket) ; //读取数据、业务处理、写入结果
4 }5 private void handle(socket){
6     String msg = socket.read();  //阻塞,读取客户端发送过来的数据
7     System.out.println(msg);
8	  .... // 其他处理
9 }

解释一下,上面采用一个循环的方式来解决这个问题,程序占用一个主线程不断执行while循环中的代码,当代码执行到第2行时如果没有客户端发生连接的请求则阻塞,不继续向下执行。直到某个客户端发生连接请求,于是获得了socket对象,这个对象假设包括客户端的ip地址和端口号,并且可以通过socket与客户端接受和发送数据。之后执行到第6行代码,这里也会阻塞直到用户发生了数据。上面的服务器代码如果只有一个客户端与它交互是没有问题的,如果超过一个用户与之交互则会发生阻塞的情况,假设有两个客户A和B,A已经连接好了服务器也就是上面代码执行到了第6行代码进行阻塞,此时服务器希望收到客户发送的数据。就在阻塞的这个时候,如果B想要连接服务器,发送了连接请求,但是服务器代码一直卡在第6行等待获取客户端的发生数据,如果A一直不发送数据则B永远连不上服务器。除非等到A发送了一个数据,于是程序运行到第2行,然后接受B的连接请求,然后又卡在了第6行。很明显,上面的网络编程服务程序很糟糕,非常卡,连得上连不上完全看运气。失败!

这个时候,Doug Lea进行思考,阻塞是因为网络编程就是基于事件触发的,也就是说接受连接的第二行代码和读取数据的第六行代码完全取决于客户端,什么时候触发完全随机,因此很难搞。另外一个最主要的原因是这个是单线程程序,那么使用多线程能不能解决呢?答案是基本上可以解决,而且早期的Tomcat服务器就是这样设计的,这个模式就叫做 Connection Per Thread模式。下面进行详细介绍!

多线程经典Connection Per Thread模式

Connection Per Thread 即一个连接创建一个线程来处理,首先我们分析一下一台上述的内存32G的机器可以创建多少个线程,Java虚拟机默认一个线程占用1MB的栈内存,在不考虑其他情况下,假设分配给了虚拟机栈20G的空间,那么可以创建20*1024个线程来应对网络连接,也就是可以同时并发20480个客户端的请求。我们先看如何实现,再看它的缺点是什么,实现代码如下:

public class ConnectionPerThread implements Runnable {@Overridepublic void run(){Socket socket = new Socket();while(true){acceptedSocket = socket.accept(); //依旧是阻塞方法,接受客户端的连接请求// 如果有一个连接就立即创建一个线程为这个连接服务,直到连接断开Handler handler = new Handler(socket);new Thread(handler).start(); // 启动新线程执行run方法}}class Handler implements Runnable{Socket socket;public Handler(Socket socket){this.socket = socket;}@Overridepublic void run() {while (true){String msg = socket.read(); //依旧是阻塞方法,接受客户端的发送的数据if("close".equals(msg)){ // 假设客户端主动断开发送`close`字符,NIO中是空字符串表示断开break; // 终止线程}// 也可以执行写操作,如果是发送大数据会明显阻塞,如果小文件可视为非阻塞,本质还是会阻塞socket.write("hello 用户!");}}}
}

以上的Socket使用的是伪代码,实际上需要使用OIO或者NIO的ServerSocket对象,反正能够表达这个意思就行。其实上面的代码还可以使用线程池来维护线程进行优化,但是这里只是为了举例说明多线程也是可以的实现较高并发的网络通信。下面来具体分析:

以上示例代码中,对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的socket连接的输入和输出。当然,服务器的监听线程也是独立的,任何的socket连接的输入和输出处理,不会阻塞到后面新socket连接的监听和建立,这样,服务器的吞吐量就得到了提升。早期版本的Tomcat服务器,就是这样实现的。Connection Per Thread模式(一个线程处理一个连接)的优点是:解决了前面的新连接被严重阻塞的问题,在一定程度上,较大的提高了服务器的吞吐量。Connection Per Thread模式的缺点是:对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统无法承受。而且,线程的反复创建、销毁、线程的切换也需要代价。因此,在高并发的应用场景下,多线程OIO的缺陷是致命的。新的问题来了:如何减少线程数量,比如说让一个线程同时负责处理多个socket连接的输入和输出,行不行呢? 可以的,一个有效途径是:使用Reactor反应器模式。用反应器模式对线程的数量进行控制,做到一个线程处理大量的连接。它是如何做到呢?直接上正餐——多线程的Reactor反应器模式。

多线程Reactor反应器模式

唤醒你的回忆,还记得Selector和IO多路复用不?不记得的话请访问:https://blog.csdn.net/cj151525/article/details/135695467 查看!我们前面讲到,客户端的连接和发送数据等行为是以IO事件的方式触发Selector的查询的,仅仅使用一个线程的Selector模式,就可以应付大量的访问,其主旨就是:如果某个用户阻塞了那本线程就去为别的需要服务的用户服务,而不是傻傻等待你阻塞解除,总而言之就是线程只为通过Selector.select()查询出来的需要执行的事件服务。因此,单线程下效率就非常高,例如Redis的数据处理模块就是单线程的,单线程的优点就是线程安全,CPU不需要频繁上下文切换。这种模式下,并发量上10万都是简简单单的。那么你敢想想如果我们引进多线程将会有多高的并发量吗?线程并不是越多越好,当你的线程数量和你的CPU核心数相同时就不会频繁发生CPU上下文切换,当线程数远远超过CPU核心数才会频繁发生导致执行效率不高,甚至阻塞等问题。好的,目前基础已经讲解完毕,下面正式引入Reactor反应器模式。

引用一下Doug Lea大师在文章《Scalable IO in Java》中对反应器模式的定义,具体如下:Reactor反应器模式由Reactor反应器线程、 Handlers处理器两大角色组成,两大角色的职责分别如下:

(1) Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。

(2) Handlers处理器的职责:非阻塞的执行业务处理逻辑。

每一个单独线程执行的Selector我们就叫做Reactor反应器。一个Reactor反应器包括一个Selector对象,另外还有需要干的活儿,也就是run方法中需要执行的逻辑,这个逻辑叫做Handler处理器。因此,如何理解Reactor反应器,就是单独线程来执行的Selector。明白了这些之后,那么我们将Selector分为Boss和Worker,Boss只有一位负责用户的连接请求与任务分发,Worker可以有很多,负责发送和接受用户的数据以及处理这些数据的中间过程。Boss和每个Worker就是一个Reactor,多线程Reactor反应器模式的模型如下(黄色的是方法,橙色是对象):

在这里插入图片描述

下面是代码实现,注意为了和Netty中EventLoop概念一致,这里Reactor使用EventLoop替代,你只要知道这两的概念是同一个,就是单独线程执行的Selector。代码如下:

package com.cheney.nioBaseTest;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;/*** @version 1.0* @Author Chenjie* @Date 2024-01-21 18:39* @注释*/
public class ReactorTest {public static void main(String[] args) throws IOException {new BossEventLoop().register();}/*** BossReactor,EventLoop和Reactor是同一个概念*/@Slf4jstatic class BossEventLoop implements Runnable {private Selector bossSelector;private WorkerEventLoop[] workers; // 一个boss负责分配任务,worker负责执行任务private volatile boolean start = false; // 对象的方法只能执行一次AtomicInteger index = new AtomicInteger(); // WorkerEventLoop[]数组的下标public void register() throws IOException {if (!start) {// 连接ChannelServerSocketChannel ssc = ServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);bossSelector = Selector.open();// Boss 注册连接事件SelectionKey ssckey = ssc.register(bossSelector, 0, null);ssckey.interestOps(SelectionKey.OP_ACCEPT);// 创建若干个WorkerReactor来读取发送数据workers = initEventLoops();// 本Boss一个线程启动起来先new Thread(this, "boss").start();log.debug("boss start...");start = true;}}/*** 创建若干个WorkerEventLoop* @return*/public WorkerEventLoop[] initEventLoops() {
//        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];for (int i = 0; i < workerEventLoops.length; i++) {workerEventLoops[i] = new WorkerEventLoop(i);}return workerEventLoops;}/*** Boss需要执行连接和任务分发,就是概念中的Handler处理器,图中的AcceptorHandler*/@Overridepublic void run() {while (true) {try {bossSelector.select();Iterator<SelectionKey> iter = bossSelector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// 前面只注册了连接事件,因此只要负责建立连接并将后续的任务分发给Worker就行ServerSocketChannel c = (ServerSocketChannel) key.channel();SocketChannel sc = c.accept();// 建立连接sc.configureBlocking(false);log.debug("{} connected", sc.getRemoteAddress());// 分发给Worker来处理,这里是公平地轮询,即每个Worker公平循环领取任务去执行// 因为每个Worker其实就是一个Selector,而每个Selector可以管理多个Channel(用户交互)workers[index.getAndIncrement() % workers.length].register(sc);}}} catch (IOException e) {e.printStackTrace();}}}}/*** WorkerReactor,主要负责读取用户发来的数据*/@Slf4jstatic class WorkerEventLoop implements Runnable {private Selector workerSelector;private volatile boolean start = false;private int index;// 任务队列,存放可执行的命令,两个线程需要传参的话通过队列来实现执行逻辑解耦private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();public WorkerEventLoop(int index) {this.index = index;}public void register(SocketChannel sc) throws IOException {if (!start) {workerSelector = Selector.open();// 启动一个新线程执行本类的run方法new Thread(this, "worker-" + index).start();start = true;}tasks.add(() -> {// 向任务队列中添加任务(即需要执行的指令)try {SelectionKey sckey = sc.register(workerSelector, 0, null);sckey.interestOps(SelectionKey.OP_READ);workerSelector.selectNow();} catch (IOException e) {e.printStackTrace();}});// 唤醒SelectorworkerSelector.wakeup();}/*** WorkerReactor 的Handler处理器,负责读取用户发过来的数据*/@Overridepublic void run() {while (true) {try {workerSelector.select();// 从任务队列中获取一个任务并执行Runnable task = tasks.poll();if (task != null) {task.run();}Set<SelectionKey> keys = workerSelector.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {// 读取客户端发生过来的数据SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);try {int read = sc.read(buffer);if (read == -1) { // 如果-1则是用户断开连接触发的读事件key.cancel();sc.close();} else {buffer.flip();log.debug("{} message:", sc.getRemoteAddress());}} catch (IOException e) {e.printStackTrace();key.cancel();sc.close();}}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}
}

总结

什么是Reactor?答:一个线程对应一个Selector模式的对象,Reactor模式其中BossReactor负责客户端连接与任务分发给WorkerReactor对象,WorkerReactor负责具体的数据发送与接受等操作。而各自所负责的任务也被叫做Handler(处理器)。相信看完上面的讲解和代码,你已经知道了什么是Reactor模式了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/242899.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL索引优化:深入理解索引下推原理与实践

随着MySQL的不断发展和升级&#xff0c;每个版本都为数据库性能和查询优化带来了新的特性。在MySQL 5.6中&#xff0c;引入了一个重要的优化特性——索引下推&#xff08;Index Condition Pushdown&#xff0c;简称ICP&#xff09;。ICP能够在某些查询场景下显著提高查询性能&a…

PPT 编辑模式滚动页面不居中

PPT 编辑模式滚动页面不居中 目标&#xff1a;编辑模式下适应窗口大小、切换页面居中显示 调整视图大小&#xff0c;编辑模式通过Ctrl 鼠标滚轮 或 在视图菜单中点击适应窗口大小。 2. 翻页异常&#xff0c;调整视图大小后&#xff0c;PPT翻页但内容不居中或滚动&#xff0c…

革新区块链:代理合约与智能合约升级的未来

作者 张群&#xff08;赛联区块链教育首席讲师&#xff0c;工信部赛迪特聘资深专家&#xff0c;CSDN认证业界专家&#xff0c;微软认证专家&#xff0c;多家企业区块链产品顾问&#xff09;关注张群&#xff0c;为您提供一站式区块链技术和方案咨询。 代理合约&#xff08;Prox…

WordPress怎么禁用文章和页面古腾堡块编辑器?如何恢复经典小工具?

现在下载WordPress最新版来搭建网站&#xff0c;默认的文章和页面编辑器&#xff0c;以及小工具都是使用古腾堡编辑器&#xff08;Gutenberg块编辑器&#xff09;。虽然有很多站长说这个编辑器很好用&#xff0c;但是仍然有很多站长用不习惯&#xff0c;觉得操作太难了&#xf…

JAVA之常用集合框架

java中的常用集合是对数据进行存储以及相关操作的api。常用的有ArrayList、LinkedList、Vector、HashSet、TreeSet、TreeMap、HashMap ArrayList 数据结构 ArrayList的本质是一个数组 &#xff0c;那么它就具有数组的所有特性 可以根据下标快速查找值 ArrayList是如何实现动…

路由器初始化配置、功能配置

实验环境 拓扑图 Ip规划表&#xff08;各组使用自己的IP规划表&#xff09; 部门 主机数量 网络地址 子网掩码 网关 可用ip Vlan 市场部 38 192.168.131.0 255.255.255.0 192.168.131.1 2-254 11 研发部 53 192.168.132.0 255.255.255.0 192.168.132.1 2-2…

弹性调度助力企业灵活应对业务变化,高效管理云上资源

作者&#xff1a;吴昆 什么是弹性调度 云计算时代&#xff0c;企业可以通过云平台获得大量计算资源&#xff0c;并根据业务发展和流量需求的实时变化&#xff0c;灵活调整使用的资源类型与资源量。阿里云提供了多种弹性资源&#xff0c;如云服务器 ECS 和弹性容器实例 ECI&am…

SpringBoot解决Slow HTTP慢速攻击漏洞

项目场景&#xff1a; 扫描到的漏洞截图&#xff1a; 攻击原理&#xff1a; Web应用在处理HTTP请求之前都要先接收完所有的HTTP头部&#xff0c;因为HTTP头部中包含了一些Web应用可能用到的重要的信息。攻击者利用这点&#xff0c;发起一个HTTP请求&#xff0c;一直不停的发送…

SpringCloud Aliba-Sentinel【上篇】-从入门到学废【4】

&#x1f3b5;诗词分享&#x1f3b5; 大江东去&#xff0c;浪淘尽&#xff0c;千古风流人物。 ——苏轼《念奴娇赤壁怀古》 目录 &#x1f37f;1.Sentinel是什么 &#x1f9c2;2.特点 &#x1f9c8;3.下载 &#x1f32d;4.sentinel启动 &#x1f953;5.实例演示 1.Senti…

Java代码审计Shiro反序列化CB1链source入口sink执行gadget链

目录 0x00 前言 0x01 CC链&CB链简介 1. Commons Collections链是什么&#xff1f; 2. Commons BeanUtils链是什么&#xff1f; 0x02 测试Commons BeanUtils1链 0x03 Shiro550 - Commons BeanUtils1链 - 跟踪分析&#xff08;无依赖&#xff09; 1. 前置知识 2. Co…

【每日一题】按分隔符拆分字符串

文章目录 Tag题目来源解题思路方法一&#xff1a;遍历方法二&#xff1a;getline 写在最后 Tag 【遍历】【getline】【字符串】【2024-01-20】 题目来源 2788. 按分隔符拆分字符串 解题思路 方法一&#xff1a;遍历 思路 分隔符在字符串开始和结束位置时不需要处理。 分隔…

设计模式设计原则——依赖倒置原则(DIP)

DIP&#xff1a;Dependence Inversion Principle 原始定义&#xff1a;High level modules should not depend upon low level modules. Both should depend upon abstractions. Abstractions should not depend upon details. Details should depend upon abstractions。 官…

【正点原子STM32】搭建开发环境(安装MDK和器件支持包、DAP仿真器和ST LINK仿真器、CH340串口驱动)

一、常用开发工具简介 MDKDAP 二、安装MDK 1、MDK简介2、如何获取MDK3、安装MDK和器件支持包 三、安装仿真器驱动 DAP仿真器免驱ST LINK仿真器驱动安装方法 ST LINK驱动及教程 四、安装CH340 USB虚拟串口驱动 1、安装CH340 USB虚拟串口驱动2、为什么要安装CH340 USB虚拟…

《WebKit 技术内幕》之八(2):硬件加速机制

2 Chromium的硬件加速机制 2.1 GraphicsLayer的支持 GraphicsLayer对象是对一个渲染后端存储中某一层的抽象&#xff0c;同众多其他WebKit所定义的抽象类一样&#xff0c;在WebKit移植中&#xff0c;它还需要具体的实现类来支持该类所要提供的功能。为了完成这一功能&#x…

蓝桥杯官网填空题(奇怪的分式)

题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 上小学的时候&#xff0c;小明经常自己发明新算法。一次&#xff0c;老师出的题目是&#xff1a;1/4乘以8/5 小明居然把分子拼接在一起&#xff0c;分母拼接在一起&…

无法找到mfc100.dll的解决方法分享,如何快速修复mfc100.dll文件

在日常使用电脑时&#xff0c;我们可能会碰到一些系统错误提示&#xff0c;比如“无法找到mfc100.dll”的信息。这种错误通常会阻碍代码的执行或某些应用程序的启动。为了帮助您解决这一问题&#xff0c;本文将深入探讨其成因&#xff0c;并提供几种不同的mfc100.dll解决方案。…

【分享】MathWorks中国汽车年会:“软件定义汽车”

从软件赋能到软件定义&#xff0c;汽车行业不仅需要解决诸如错误发现滞后带来的高昂代价、功能融合所需的跨学科知识、功能安全与实施成本之间的权衡等老问题&#xff0c;也面临着新的挑战&#xff1a;软件复杂度的不断提升、利用数据驱动创造价值、人工智能的引入和实现、数字…

【从0上手cornerstone3D】如何加载nifti格式的文件

在线演示 支持加载的文件格式 .nii .nii.gz 代码实现 npm install cornerstonejs/nifti-volume-loader// ------------- 核心代码 Start------------------- // 注册一个nifti格式的加载器 volumeLoader.registerVolumeLoader("nifti",cornerstoneNiftiImageVolu…

Spark SQL函数定义

目录 窗口函数 SQL函数分类 Spark原生自定义UDF函数 Pandas的UDF函数 Apache Arrow框架基本介绍 基于Arrow完成Pandas DataFrame和Spark DataFrame互转 基于Pandas完成UDF函数 自定义UDF函数 自定义UDAF函数 窗口函数 分析函数 over(partition by xxx order by xxx [as…

如何在ubuntu22.04安装ROS2

ubuntu22.04安装ROS2 教程 选择对应版本进行安装设置编码添加源安装ROS2设置环境变量 运行ROS2 选择对应版本 通过官方网站&#xff0c;查询Ubuntu与ros对应的版本&#xff0c;版本不一致也会出现安装不成功。 https://wiki.ros.org/ROS/Installation 每一个都可以进行点击&a…