Netty应用(四) 之 Reactor模型 零拷贝

目录

6.Reactor模型

6.1 单线程Reactor

6.2 主从多线程Reactor (主--->Boss | 从--->Worker | 一主多从机制)

7.扩展与补充

8.Reactor模型的实现

8.1 多线程Reactor模型的实现(一个Boss线程,一个Worker线程)

8.2 多线程Reactor模型的实现(一个Boss线程,多个Worker线程)

9.零拷贝

10.硬件驱动程序与软件驱动程序(数据库驱动程序)的关系

11.linux内核与centos,redhat等这些产品有啥区别和关系?

6.Reactor模型

6.1 单线程Reactor

6.2 主从多线程Reactor (主--->Boss | 从--->Worker | 一主多从机制)

但是单线程版的Reactor,一定存在性能瓶颈的!你想想一个线程做了所有的连接,读,写的操作,肯定存在极大的性能瓶颈。

所以后续引入了主从多线程Reactor模式:

1.Boss主线程负责监听并且处理客户端的连接事件。它具有一个独立的Selector监控器,所有的客户端都会注册存储到该Selector上,这样才能完成监听连接事件的操作。

2.当Boss主线程监听并处理完客户端的连接事件后,它会把这个客户端交给Worker从线程进行处理,从线程肯定会先把该客户端像主线程一样先注册存储到当前从线程所对应的Selector上。从线程会进行监听与处理该客户端后续可能会进行的读或写的事件。从线程可能会有多个,每一个从线程都具有一个独立的Selector。

补充:

1.Boss主线程与Worker从线程们之间一定是建立了联系的,这样主线程才能把客户端交给从线程。每一个线程都具有一个独立的Selector

2.分配多少个线程给Boss或Worker?分配一个线程给Boss,剩余(Cpu核数-1)个线程分配给Worker从线程。

先说为什么一共分配CPU核数的线程?为了避免频繁的线程上下文切换,为了实现真正的并行。后续肯定还可以引入线程池的优化

为什么Worker线程分配的多?因为客户端连接就一次,后续Boss把客户端甩手给Worker后,客户端的读写事件就不再让Boss去处理了,读写事件可是远比连接事件多的!

如图所示:

7.扩展与补充

扩展:

主从架构与主备架构的区别:

主从架构:

主节点也干活,从节点也干活。只是二者干的活不一样,主节点一般做写操作(主要的工作),从节点一般做次要的工作 如 读操作。缺点:主节点一旦宕机,就废掉了。

主备架构:

主节点会同步数据到另外一共备用主节点,当主节点宕机时,会切换到另外一台时刻同步数据的备用主节点。解决了主从架构的缺点。

主备架构:MySQL的双主架构,Redis的哨兵集群

如何实现主备架构中,主节点同步数据到另外一台备用主节点呢?

使用zookeeper集群这一CP原理架构,遵循CP的是强一致性的架构,即:当zookeeper集群节点出现宕机时,需要重新选举节点作为主节点时,拒绝对外提供服务,此时就保证了强一致性。但是降低了性能,这就是CAP原则,不可以同时满足C A P。

当然,zookeeper可以替换为consul,etcd

反向代理与正向代理:

反向代理时针对于服务器端:

1.做负载均衡

2.URL重写 :面对写入的URL重写

3.动静分离

新的问题:当客户端连接这个服务器集群时,正在连接的服务器宕机了,你怎么做到客户端无感的?客户端socket连接服务器时,需要通过服务器的ip连接,如果当前这台服务器节点宕机,你不是要更新此时连接的ip?这怎么可能做到无感啊,但通过以下技术就是可以做到无感。。。

方法1:VIP虚拟IP

其实都不需要更新ip,在过去传统的开发中,使用的是Virtual IP 虚拟IP。客户端访问的是虚拟IP对应的服务,虚拟IP这个服务再找到具体的节点服务器资源,虚拟IP服务器肯定会涉及到分发,保活,注册服务器节点等,但是这一切对于客户端来说,是无感知的,它是不知道服务器节点的切换的。

这些都可以通过早期的一些技术完成:如lvs,keepavlid。

方法2:IP飘逸

当发现节点服务器坏了,那么偷偷的把这台服务器节点ip给复制给另外一台服务器节点,这就是ip飘逸,这样也做到了客户端无感知!

解决技术:HAproxy

方法3:云原生OPS自动化运维

这些技术实际上都是运维的技术,现如今微服务为什么说是云原生的一个分支,因为云原生主张OPS,自动化运维,就是要取代运维!

抛开上述三种运维方式,仅仅是站在java层面去开发一个程序去实现这一客户端无感:

其实思路很简单,就是做一个redis缓存,缓存的是ip-主机节点的ip映射集合。

然后在client与server端分别开一个agent,agent之间做网络通信,然后agent再与client或server端内部做交互传递数据。

8.Reactor模型的实现

8.1 多线程Reactor模型的实现(一个Boss线程,一个Worker线程)

Reactor模型:多个客户端连接同一服务端。在服务端做了Reactor模型的编程逻辑。

通过一个Boss线程(可以有多个,看具体业务),处理多个NIO的接收连接的操作,也就是ServerSocketChannel的ACCEPT事件

通过一个worker线程(可以有多个,看具体业务),处理读写的操作,也就是SocketChannel的READ或WRITE事件。

重点:你处理的read或write事件一定是前面对应这次连接的boss线程的连接得来的,你不能前后没联系。是一种传递的关系。但是每一个线程,无论是boss还是worker,都具有独立的Selector监管器

  • 第一版本代码
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~}}}}
}

对于TODO位置,我们要进行调用worker线程去进行处理读写事件。但是具体要开启多少个worker线程呢?是过来一个客户端连接就需要开启一个worker线程去处理该连接对应的读写事件吗?

当然不是,在之前,我们使用一个线程处理所有注册在复用器上的channel对象对应的所有事件,这里我们使用Boss线程处理连接事件。worker线程处理读写事件,但是也没必要一个线程对应一个客户端连接的后续读写事件啊,如果线程过大,内存占用过大,线程之间切换也会更加频繁,性能很低的。

下面暂且把worker线程设为开启一个,那么我们需要把创建Worker的代码设为公共的,而不能每过来一个客户端连接后,就进行创建一份新的Worker。具体见下面代码

  • 第二版代码
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面,做一个共享变量Worker2 worker2 = new Worker2();while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~worker2.register(socketChannel);}}}}
}

  • 第三版代码
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面,做一个共享变量Worker2 worker2 = new Worker2("worker-01");while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~worker2.register(socketChannel);}}}}
}
package com.messi.netty_basic_01.Reactor_prac;import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;public class Worker2 implements Runnable{private String name;private Thread thread ;private Selector selector;private volatile boolean isCreated;public Worker2(String name) {this.name = name;}public void register(SocketChannel socketChannel) {if (!isCreated) {this.thread = new Thread(this,name);thread.start();selector = Selector.open();isCreated = true;}}@Overridepublic void run() {}
}

为什么Worker2线程类构造方法就一个参数name?

对于其他属性,thread,selector的创建,完全是可以在Worker2内部完成的。woker2线程本身就是Boss线程调用的,Boss线程只初始化worker线程的name,这样解耦合,降低代码的侵入性。你想想,并发场景下,在Boss线程创建多个Thread或Selector,这样是不是Boss线程端代码太难看,造成代码的极大污染。

为什么把thread,selector的初始化创建放在register方法中?

其实也是可以放在构造方法中的。

为什么要使用isCreated标记位?

保证thread线程创建和启动只进行一次!使用标志位isCreated的方式在register中进行初始化。使用标记位isCreated一定要记住:isCreated在完成一次初始化后,要置为true!如果不使用isCreated,boss线程在触发连接事件后调用worker的register方法。但此时还是main线程在执行register方法。那么boss线程触发一次客户端连接事件,就会对应创建一个新线程来处理该客户端连接对应后续的读写事件。

补充:

在多线程并发共享isCreated这个变量时,要加上volatile关键字,volatile可以保证当其中一个线程修改isCreated的值的时候,会及时通知告知其他线程,"isCreated的最新值为xxx"。保证了isCreated的可见性,因为标记位通常做if判断,对可见性要求高。

但此时boss为单线程,并且由于是迭代器一个个遍历触发的事件的,所以同一时刻只有一个boss线程调用register方法,所以isCreated并不存在多线程共享的并发安全问题。养成良好习惯,说不准后续boss变为多线程了。。。

worker2.register(socketChannel)接着调用:

为什么出现空指针异常?

  • 第四版代码
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {private static final Logger log = LoggerFactory.getLogger(ReactorBossServer2.class);public static void main(String[] args) throws Exception{log.debug("boss线程开启 >>>");ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面,做一个共享变量Worker2 worker2 = new Worker2("worker-01");while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~log.debug("boss线程即将要调用Worker2类的register方法 >>>");worker2.register(socketChannel);log.debug("boss线程完成调用Worker2类的register方法 >>>");}}}}
}
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;public class Worker2 implements Runnable{private static final Logger log = LoggerFactory.getLogger(Worker2.class);private String name;private Thread thread ;private Selector selector;private volatile boolean isCreated;public Worker2(String name) {this.name = name;}public void register(SocketChannel socketChannel) throws Exception{if (!isCreated) {selector = Selector.open();this.thread = new Thread(this,name);thread.start();isCreated = true;}log.debug("boss线程将要执行socketChannel#register方法 >>>");socketChannel.register(selector, SelectionKey.OP_READ);}@Overridepublic void run() {log.debug("开启一个新线程worker-01 >>>");while (true) {try {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(30);int read = channel.read(buffer);if (read == -1) {log.debug("客户端断开 >>>");key.cancel();} else {buffer.flip();CharBuffer decode = Charset.forName("UTF-8").decode(buffer);log.info("worker-01线程读取的结果为:{}",decode.toString());}}}} catch (IOException e) {e.printStackTrace();}}}
}
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class MyClient2 {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(Charset.forName("UTF-8").encode("梅西"));System.out.println("MyClient2.main");}}

测试:

  • 但是代码在多线程场景下,存在bug:因为多线程环境下,一切皆有可能,如果新开启的线程执行的run方法(selector.select())在register方法之前执行。由于selector阻塞等待,导致register注册不上。最终就一直阻塞等待着,就死掉了。。。。。下面详细分析一下:

补充一点:

为什么创建一个线程后thread.start()不会立即执行?

因为真正创建出一个线程并且切换都是需要一定时间的,最重要的是当前main线程的时间片可能还没使用完,所以CPU是不会轻易切换线程的,但是在多线程环境下一切是皆有可能,可能start后就切换到该新线程了。但是当main线程时间片耗尽后,就一定会切换到新创建的线程的,即调用run方法中的业务逻辑。通常main线程时间片耗尽的原因是因为执行的业务线复杂导致时间片耗尽,这里我们可以模拟一下,Thread.sleep(2000)休眠2s,此时main线程时间片耗尽,然后就完成了线程切换,线程切换到新创建的线程,并且执行run方法!

但就是因为先执行了run方法,后执行了register方法,代码就出现了bug!

出现bug的原因症结就是因为:run方法(selector.select())在register方法之前执行。由于selector阻塞等待,导致register注册不上。但是在多线程环境下,一切皆有可能,所以你无法保证run方法(selector.select())一定在register方法之后执行!说白了,就是因为register方法和run方法(select()方法调用)二者不是在同一线程中,你无法百分百做到控制方法的执行顺序。

演示一下这个bug:

睡眠模拟bug》》》》

断点设为Thread级别,当前线程挂起时只挂自己。如果设为ALL,那么当前线程挂起时,会挂起所有的线程

debug测试:

切换到main线程,往下执行,发现也阻塞了,阻塞在register这句代码:

分析如下:

解决方法1:

搞一个同步锁,把selector.wakeup()和register方法的调用搞一块:

为什么要搞一个同步锁,为何多此一举呢?

搞同步锁为的就是保证wakeup唤醒和register注册之间没有其他情况的发生,不然不可能保证有意外的发生,如下:

但是同步锁依旧存在很大问题,其实问题就是在于:性能低下。所以后续我们模仿Netty做一个队列,把要执行的代码传递给新开启的线程中,让selector.select()和register方法在同一个线程中执行调用!!并且加一个wakeup做兜底操作,就完美的解决了这个问题。其实就是为了保证在selector.select阻塞selector之前完成register注册READ事件的操作,如果selector因为多线程环境下的因素导致在register方法之前阻塞(一切皆有可能),所以使用wakeup方法做兜底,wakeup像做了一个标记,无论在wakeup前还是后做阻塞了,都可以唤醒一次!

解决方法2:

所以最终解决方案:模仿Netty底层做一个并发队列,把调用register的代码当作一个任务存储到该并发队列中,然后在run方法对应的线程中,再把该队列中存储的register调用代码取出来,这样就保证了register方法和selector.select()在同一个线程中调用。那么你就可以控制先后顺序了。最后再补充一个selector.wakeup()的唤醒作为兜底操作。当selector.select()再次阻塞时,直接唤醒,然后就可以接着在同一线程中执行register方法啦。

具体如下:

ConcurrentLinkedQueue队列可以在两个线程之间进行传递一些代码,功能,知道这些就够了。

测试:

多线程环境下,一切皆有可能,因为你也不知道走到哪一句代码CPU就调度执行其他线程了,所以要考虑更多的情况,然后保证并发安全。

  • 第五版代码(最终代码)
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {private static final Logger log = LoggerFactory.getLogger(ReactorBossServer2.class);public static void main(String[] args) throws Exception{log.debug("boss线程开启 >>>");ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面,做一个共享变量Worker2 worker2 = new Worker2("worker-01");while (true) {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~log.debug("boss线程即将要调用Worker2类的register方法 >>>");worker2.register(socketChannel);log.debug("boss线程完成调用Worker2类的register方法 >>>");}}}}
}
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;public class Worker2 implements Runnable{private static final Logger log = LoggerFactory.getLogger(Worker2.class);private String name;private Thread thread ;private Selector selector;private volatile boolean isCreated;public Worker2(String name) {this.name = name;}private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public void register(SocketChannel socketChannel) throws Exception{if (!isCreated) {selector = Selector.open();this.thread = new Thread(this,name);thread.start();isCreated = true;}log.debug("boss线程将要执行socketChannel#register方法 >>>");//使用睡眠的方式模拟出:在register方法执行前执行了繁重的业务代码逻辑Thread.sleep(2000);log.debug("main线程睡眠 >>>");
//        synchronized (this) {
//            selector.wakeup();
//            socketChannel.register(selector, SelectionKey.OP_READ);
//        }queue.add(()->{try {socketChannel.register(selector,SelectionKey.OP_READ);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});selector.wakeup();}@Overridepublic void run() {while (true) {log.debug("开启一个新线程worker-01 >>>");try {selector.select();Runnable poll = queue.poll();if (poll != null) {poll.run();}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(30);int read = channel.read(buffer);if (read == -1) {log.debug("客户端断开 >>>");key.cancel();} else {buffer.flip();CharBuffer decode = Charset.forName("UTF-8").decode(buffer);log.info("worker-01线程读取的结果为:{}",decode.toString());}}}} catch (IOException e) {e.printStackTrace();}}}
}
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class MyClient2 {public static void main(String[] args) throws Exception{SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(Charset.forName("UTF-8").encode("梅西"));System.out.println("MyClient2.main");}}

8.2 多线程Reactor模型的实现(一个Boss线程,多个Worker线程)

  • 相比之前的代码,我们要做的就是增加Worker线程的数量 然后随机选举出一个Worker线程进行register

  • 代码
package com.messi.netty_basic_01.reactor;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;public class ReactorBossServer {private static final Logger log = LoggerFactory.getLogger(ReactorBossServer.class);public static void main(String[] args) throws Exception{log.debug("boss线程开启 >>>");//建立ServerSocketChannelServerSocketChannel ssc = ServerSocketChannel.open();//设置为非阻塞ssc.configureBlocking(false);//监听端口8000ssc.bind(new InetSocketAddress(8000));//创建Selector监管者Selector selector = Selector.open();//把ServerSocketChannel注册到Selector监管者上的keys这一HashSet中,对应注册事件为ACCEPTssc.register(selector, SelectionKey.OP_ACCEPT);//        Worker worker = new Worker("worker-01");//模拟多线程的情况,一个Boss线程,多个Worker线程。在实际开发中,要使用线程池Worker[] workers = new Worker[2];//模拟出多个Worker线程for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker - " + i);}//原子操作的变量AtomicInteger index = new AtomicInteger();while (true) {//轮询selector.select();//得到Selector上注册的Channel对应的所有事件 ---》Boss(main)线程对应注册的只有ACCEPT事件,Worker线程处理READ,WRITE事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();//遍历每个事件while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {//拿到的肯定是ACCEPT事件对应的ServerSocketChannelServerSocketChannel channel = (ServerSocketChannel) key.channel();//accept方法接收得到SocketChannel 这个是建立连接后,服务端与客户端之间进行READ或WRITE的通道SocketChannel socketChannel = channel.accept();//同样,设为非阻塞socketChannel.configureBlocking(false);log.debug("boss线程准备调用worker线程的register方法 >>>");//在Boss单线程的情况下,由于是while迭代器逐个遍历,所以这句代码同一时刻只有一个线程访问,不存在并发安全问题
//                    worker.register(socketChannel);//index.getAndIncrement(): 获取到index的值并且+1,线程安全的//index.getAndIncrement() % workers.length 得到的结果为:[0,workers.length-1]//注意哈:一次拿到一个事件去处理,而不是一次拿一堆,由于Boss是单线程, 所以同一时刻register方法只有一个线程去访问,无并发安全问题。workers[index.getAndIncrement() % workers.length].register(socketChannel) ;log.debug("boss线程调用结束worker线程的register方法 >>>");}}}}}
package com.messi.netty_basic_01.reactor;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;public class Worker implements Runnable{private static final Logger log = LoggerFactory.getLogger(Worker.class);//worker的名称private String name;//worker是一个子线程的任务,所以应该有一个线程,其实就是实际干活的private Thread thread;//每一个worker子线程都会有一个Selector轮询器。一定注意,每一个线程都具有独立的Selector复用器,和主线程不是同一个Selectorprivate Selector selector;//加上一个volatile来解决多线程环境下isCreated值的可见性问题//但是在我们这个程序中,并不存在多线程安全问题,因为Boss线程为单线程,并且迭代器遍历客户端事件时是逐个遍历请求register//所以不存在多线程并发安全问题。只不过这里养成良好习惯给标记位加上volatile。为了保证后续多个Boss线程同时访问的环境下的可用性private volatile boolean isCreated ; //标记位//并发队列,负责传递[代码 或 功能]到另外一个线程private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>() ;public Worker(String name) {this.name = name;}public void register(SocketChannel socketChannel) throws Exception{log.debug("worker#register invoke");//使用标记位方式,保证下列初始化只执行一次!// 分析一下:为什么只让执行一次?// Boss(main)线程调用register方法中的下列这些代码。其实就是Boss线程在与处理完客户端建立连接的事件后的调用// 如果每一次处理完一个客户端连接就开启一个新Thread去处理该客户端后续的READ或Write事件,那么是不是倒退了?// 所以要控制,每一个Worker对象才会异步开启一个线程去执行处理很多客户端的读写事件,后续压力大了,可以搞一个Worker线程池,多个Worker线程去处理所有客户端的读写事件!// 绝对不是一个Worker线程处理一个客户端的读写事件,那样性能耗费太大了,而且没必要!// 但是还有一个细节需要思考:// 对于同一Worker对象而言,异步线程Thread只创建启动一个。但是对于不同的Worker对象,异步线程Thread会重新创建启动一个// 为什么会这样呢?因为isCreated标记位,对于一个新Worker对象,isCreated初始值为false, 那么自然会执行下列这段代码!// 下列这个if包围的代码还可以放在哪?// 放在Worker的构造方法中,只在对象创建初始化执行一次if (!isCreated) {//创建线程this.thread = new Thread(this,name);//先对Selector进行初始化,避免后续NUll Exceptionselector = Selector.open();//异步起一个线程this.thread.start();//标志位设为true 保证初始化代码只执行一次isCreated = true;System.out.println("我是梅西");}log.debug("socketChannel#register >>>");//使用睡眠模拟繁杂的业务逻辑Thread.sleep(2000);
//        synchronized (this) {
//            //其实就是打了一个标记。即调用一次wakeup后,无论此时selector.select()阻塞还是之后才阻塞,都会减少阻塞一次
//            selector.wakeup();
//            socketChannel.register(selector, SelectionKey.OP_READ);
//        }//把SocketChannel注册到Selector并且设置READ事件的代码放到并发队列中 传递给后续新开启的线程queue.add(()->{try {socketChannel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}});//兜底操作:唤醒一次Selector。无论Selector在wakeup方法执行前阻塞,还是在wakeup执行后阻塞,wakeup都可以唤醒一次Selector阻塞,像是一个标记计数selector.wakeup();System.out.println("Worker.register");}@Overridepublic void run() {while (true) {log.debug("worker线程的run方法开始执行 >>>");try {//应用层等待。内核层轮询监听事件的发生selector.select();//从并发队列中获取任务Runnable poll = queue.poll();//任务不为空,则执行该任务if (poll != null) {poll.run();}//一旦Selector监听到keys这一HashSet中注册的Channel对应事件的发生,就会把这些触发的事件对应的Channel的引用复制copy到SelectionKeys这一HashSet中//这一copy迁移Channel引用的工作是selector.select()做的Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();//遍历Selector监听到的事件集合迭代器while (iterator.hasNext()) {//一个一个的来SelectionKey key = iterator.next();//使用完后就删除掉 避免空指针异常iterator.remove();//Worker线程监听Read事件if (key.isReadable()) {//获取到SocketChannel 该SocketChannel为客户端与服务端之间进行读写操作的管道SocketChannel socketChannel = (SocketChannel) key.channel();//创建Buffer缓冲区 默认为写模式ByteBuffer buffer = ByteBuffer.allocate(30);//把SocketChannel中的数据读取到buffer缓冲区socketChannel.read(buffer);//切换到读模式buffer.flip();//解码CharBuffer result = Charset.forName("UTF-8").decode(buffer);//输出读取到的结果数据log.info("worker线程读取的结果result为 : {}",result);}}} catch (IOException e) {e.printStackTrace();}}}
}
package com.messi.netty_basic_01.reactor;import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class MyClient {public static void main(String[] args) throws Exception{//客户端创建SocketChannelSocketChannel socketChannel = SocketChannel.open();//客户端与服务端之间通过SocketChannel进行连接socketChannel.connect(new InetSocketAddress(8000));//客户端写数据给服务端socketChannel.write(Charset.forName("UTF-8").encode("leomessi")) ;//.....System.out.println("MyClient.main");}}
  • 测试

debug的方式进行开启多个客户端与服务端建立通信

Server端:每次都会选一个Worker线程执行register方法

  • 讨论交流

worker线程搞多少个比较好 ? ? ?

CPU核数减1。现代计算机一般都是单CPU多个核,也就是俗称的多核CPU。假设目前为八核,那么开启8-1=7个worker线程,为什么开7个worker线程,一个线程给boss线程,一共8个线程,更好的利用这八个核,实现真正意义上的并行。截止到目前有好几个问题:(1)为什么worker线程数要多于boss线程?(2)为什么正好开启核数个线程效率最佳?(即核数=boss线程数+worker线程数)(3)能不能开启线程个数超过最大核数,最大开启线程数为多少?

依次回答:(1)对于一个客户端而言,连接只进行一次,连接之后的读写可能回触发多次!而我们又知道,boss线程是处理连接的,worker线程处理连接后的读写事件的,答案显而易见了。(2)为了实现真正意义上的并行,因为当线程数超过CPU核数时,CPU需要不断的频繁调度切换。并且线程创建是占用内存的,线程越多,内存消耗越大。(3)当然可以,最大开启线程数取决于具体硬件配置,你要看硬件的并发能力,你要CPU切换能力强,内存大,线程开多点其实也没毛病的。

9.零拷贝

一定要看这篇文章:

一文彻底弄懂零拷贝原理 - 掘金

  • 基本知识

无论是磁盘IO还是网络IO,实际上最终都需要操作具体的硬件,磁盘IO--->磁盘文件,网络IO--->网络网卡

但是呢,我们编写的JVM应用程序是无法直接操控硬件的,中间需要通过操作系统做中间者,具体原因这里简单说一下,因为对于磁盘,网络资源,我们不能让用户程序直接操控,一方面是硬件版本问题,一方面是安全问题(怎么可能让用户直接操控内存呢,这多不安全呀!)

具体的调用过程为:JVM用户程序--->操作系统--->硬件驱动程序--->具体的硬件

总结一下:

1.JVM中的java应用程序不能直接操作硬件,需要操作系统的介入

2.内存分为两种,也可以理解成内存空间分为两部分:

一部分为用户地址空间,这部分的内存空间通常给应用程序(如:JVM进行使用),应用程序通常是进程级别(我们常说的线程,线程实际上是共享进程的内存资源的一种抽象细化,线程是利用进程空间的)。应用程序其实就是我们平常编写的各种代码动态运行起来的一种概念统称。

另外一部分为内核地址空间,这部分的内存空间通常给操作系统,操作系统也是占用内存的一种特殊的程序,操作系统通常会提供一系列公共的功能API。啥是公共的功能API?比如:应用程序需要申请内存空间,应用程序就需要先调用操作系统提供的公共功能API。

3.为什么socket网络连接,io流资源,线程资源都是珍贵的资源,使用完毕后要及时关闭?

对于这些资源,是对安全性要求极高的资源,你仅仅通过用户程序是无法完成这些资源的操控的,必须陷入操作系统内核让OS去真正调用申请。所以这些资源是珍贵的, 是位于OS内存的公共资源。

并且申请的这些公共资源都是占用着操作系统所对应的内核地址空间内存,我们应用程序是陷入内核使用这部分内核内存空间的。

假如说当前用户程序使用完毕后不断开对这部分操作系统内存的引用(即不断开socket网络连接,IO流),会影响这台计算机其他应用程序对操作系统这部分公共资源内存的调用和使用的,因为不可能一台计算机只有一个进程应用程序!甚至导致其他应用程序无法申请到这些珍贵的资源。所以使用完后,应用程序要及时断开对珍贵资源的连接。

并且要减少应用程序与这部分资源的交互,因为需要陷入内核,切换的时候性能耗费一定是很大的

  • 传统方式进行拷贝,不进行零拷贝优化:

下面分析涉及到的代码程序

总结:

参看文章:一文彻底弄懂零拷贝原理 - 掘金

分析:

注释:创建直接内存的API就是:ByteBuffer buffer = ByteBuffer.allocateDirect(20);

  • 零拷贝优化1:mmap+write

涉及到的代码:

通过NIO的API创建直接内存空间,所谓直接内存就是应用程序不再开辟用户地址内存空间(堆内存)了,而是直接使用操作系统的内存。用户程序可以编写代码直接操控这一块直接内存,可以读写,清除等操作。

这也叫做内存映射,好像是把操作系统的内存映射到用户地址内存空间上,减少了数据的拷贝。

但应用程序一味的使用直接内存也存在很大弊端,因为OS不具备JVM的GC机制,所以如果应用程序不及时释放操作系统的直接内存,那么可能会存在很大隐患。

eg:调用NIO-API:ByteBuffer.allocateDirect(10),这句代码创建的就是10字节大小的操作系统内存:高速页缓存这种。然后应用程序获取到这块直接内存后,在应用程序中就可以使用java代码直接操作这块直接内存了。

总结:

见文章:一文彻底弄懂零拷贝原理 - 掘金

补充:

但是注意,内存映射/开辟OS直接内存是针对于文件的读写操作。对于网络相关的数据读写操作我们不可以使用直接内存映射,为什么?因为你无法把这一台JVM的相应内容数据给另外一台JVM。

  • 零拷贝优化2:linux内核2.1版本的零拷贝 - sendfile方式

涉及到的代码:

所有的零拷贝都是文件相关的,socket不涉及零拷贝。零拷贝是指不进行JVM虚拟机层面的拷贝,只进行系统级别的拷贝.

总结:

见文章:一文彻底弄懂零拷贝原理 - 掘金

  • 零拷贝优化3:linux内核2.4版本的零拷贝- 带有 scatter/gather 的 sendfile方式

对Linux2.4内核进行sendFile方法的优化的一些理解:这次优化直接让保存到高速页缓存的文件读取数据直接发送到网卡对应的驱动程序,再给到网卡。而无需拷贝到socket缓存。

为什么可以这样?因为无论是文件对应的高速页缓存还是网卡对应的socket缓存,其实内部结构都是一样的,只不过可能标记位不同,既然差不多一样,把页缓存的内存地址,偏移量记录传输到socket缓存,然后直接把高速页缓存的数据给到网卡驱动程序,然后再给网卡,这有问题吗?

总结:

见文章:一文彻底弄懂零拷贝原理 - 掘金

  • 零拷贝优化4 - splice 方式

splice 调用和sendfile 非常相似,用户应用程序必须拥有两个已经打开的文件描述符,一个表示输入设备,一个表示输出设备。与sendfile不同的是,splice允许任意两个文件互相连接,而并不只是文件与socket进行数据传输。对于从一个文件描述符发送数据到socket这种特例来说,一直都是使用sendfile系统调用,而splice一直以来就只是一种机制,它并不仅限于sendfile的功能。也就是说 sendfile 是 splice 的一个子集。

在 Linux 2.6.17 版本引入了 splice,而在 Linux 2.6.23 版本中, sendfile 机制的实现已经没有了,但是其 API 及相应的功能还在,只不过 API 及相应的功能是利用了 splice 机制来实现的。

总结:

见文章:一文彻底弄懂零拷贝原理 - 掘金

  • 总结

无论是传统的 I/O 方式,还是引入了零拷贝之后,2 次 DMA copy是都少不了的。因为两次 DMA 都是依赖硬件完成的。所以,所谓的零拷贝,都是为了减少 CPU copy 及减少了上下文的切换。

sendfile局限于文件与socket进行数据传输。与sendfile不同的是,splice允许任意两个文件互相连接,而并不只是文件与socket进行数据传输。

下图展示了各种零拷贝技术的对比图:

  • 补充

我们java平常做的优化都是在JVM层面进行优化的。对于OS操作系统层面的优化,需要OS开发维护者去优化的,比如说linux2.4内核完成了sendFile方法的重构,epoll模型的完善。对于我们java程序来说,只要你部署在linux操作系统上,你就可以享用OS优化带来的好处,因为你java程序对于一些系统级别的操作,如文件读取,其实是调用OS的sendFile,那么OS对sendFile做了优化,是不是等价于你java程序性能也进一步优化了?对吧,答案显而易见。

这就是为什么后面我们的服务一般都部署在linux操作系统服务器上,它在IO处理方面是要比市面上其他操作系统服务器要强的多的,但是异步处理层面上要比win操作系统差一些的。。。所以现在都研究linux内核

10.硬件驱动程序与软件驱动程序(数据库驱动程序)的关系

11.linux内核与centos,redhat等这些产品有啥区别和关系?

linux内核是开源的,是linux最核心的代码。后续centos,redhat这些厂商基于linux内核进行开发,进行套壳,增加了稳定性,可靠性,或一些美丽的图形化界面。但是这些厂商也会把产品分为很多发行版,比如说这一个发行版做的不好或差一些,那么就开源不收费。如果这一个发行版做的好,那么就收费闭源。。。

这就好比:安卓操作系统与小米的MIUI,华为的一些发行版产品等,一样的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/255121.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python程序员面试题精选及解析(2)

本文精心挑选了10道Python程序员面试题&#xff0c;覆盖了Python的多个核心领域&#xff0c;包括装饰器、lambda函数、列表推导式、生成器、全局解释器锁(GIL)、单例模式以及上下文管理器等。每道题都附有简洁的代码示例&#xff0c;帮助读者更好地理解和应用相关知识点无论是对…

【Java EE初阶十一】文件操作(IO)

1. 认识文件 所谓的文件是一个广义的概念&#xff0c;可以代表很多东西&#xff1b;在操作系统里面&#xff0c;会把很多的硬件设备和软件设备都抽象成“文件”&#xff0c;统一进行管理&#xff1b;但是大部分情况下&#xff0c;我们读到的文件&#xff0c;都是指硬盘的文件&a…

Codeforces Round 106 D. Coloring Brackets 【区间DP + 记忆化搜索实现】

D. Coloring Brackets 约定 ∣ S ∣ ≤ 700 |S| \leq 700 ∣S∣≤700 题意 给定一个正则括号序列 s s s&#xff0c;我们需要求出合法的染色方案数。合法的条件为&#xff1a; 每个符号要么不染色&#xff0c;要么染红色&#xff0c;要么染蓝色对于每对配对的括号&#xf…

单片机与外设的交互

单片机与外设的交互是嵌入式系统中非常重要的一个基础知识点。单片机是一个集成在同一芯片上的中央处理器、存储器和输入/输出接口,它可以根据用户编写的程序与各种外部设备即外设进行交互。单片机与外设之间的交互主要通过单片机上的输入/输出口(I/O口)来实现。 I/O口的工作原…

【golang】24、go get 和 go mod:indrect 与 go mod tidy

文章目录 go get 会执行如下操作&#xff1a; 操作 go.mod 文件&#xff08;add、update、remove&#xff09;下载依赖到 $GOPATH/pkg/mod 中若已安装&#xff0c;则更新该包&#xff0c;到最新版本 试验前置准备&#xff1a;首先删除已下载的依赖&#xff0c;rm -rf $GOPATH…

零基础学python之高级编程(1)---面向对象编程及其类的创建

面向对象编程及其类的创建 文章目录 面向对象编程及其类的创建前言一、面向过程编程和面向对象编程的概念1.面向过程编程(Procedural Programming)2.面向对象编程(Object-Oriented Programming&#xff0c;OOP) 二、面向对象编程基础1.初识类(class)和对象调用方法 2.类中的两种…

Redis(三)主从架构、Redis哨兵架构、Redis集群方案对比、Redis高可用集群搭建、Redis高可用集群之水平扩展

转自 极客时间 Redis主从架构 redis主从架构搭建&#xff0c;配置从节点步骤&#xff1a; 1、复制一份redis.conf文件2、将相关配置修改为如下值&#xff1a; port 6380 pidfile /var/run/redis_6380.pid # 把pid进程号写入pidfile配置的文件 logfile "6380.log" …

JAVA设计模式之策略模式详解

策略模式 1 策略模式概述 策略模式(strategy pattern)的原始定义是&#xff1a;定义一系列算法&#xff0c;将每一个算法封装起来&#xff0c;并使它们可以相互替换。策略模式让算法可以独立于使用它的客户端而变化。 其实我们在现实生活中常常遇到实现某种目标存在多种策略…

【机器学习】机器学习简单入门

&#x1f388;个人主页&#xff1a;甜美的江 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步…

CSP-202112-2-序列查询新解

CSP-202112-2-序列查询新解 【70分思路】 【暴力枚举】按照题目思路遍历一遍f(x)和g(x)&#xff0c;计算error(A)&#xff0c;时间复杂度为O(N)&#xff0c;时间超限。 #include <iostream> using namespace std; int main() {long long n, N, sum 0;cin >> n …

【SpringBoot】application配置(5)

type-aliases-package: com.rabbiter.cm.domaintype-aliases-package: 这个配置用于指定mybatis的别名&#xff0c;别名是一个简化的方式&#xff0c;让你在Mapper xml 文件中引用java类型&#xff0c;而不需要使用使用完整的类名。例如&#xff0c;如果你在 com.rabbiter.cm.d…

谷歌 DeepMind 联合斯坦福推出了主从式遥操作双臂机器人系统增强版ALOHA 2

谷歌 DeepMind 联合斯坦福推出了 ALOHA 的增强版本 ——ALOHA 2。与一代相比&#xff0c;ALOHA 2 具有更强的性能、人体工程学设计和稳健性&#xff0c;且成本还不到 20 万元人民币。并且&#xff0c;为了加速大规模双手操作的研究&#xff0c;ALOHA 2 相关的所有硬件设计全部开…

数据结构|对称矩阵压缩存储的下标公式推导|如何求对称矩阵压缩存储对应的一维数组下标

因为考试的时候可能会给很多情况的变式题&#xff0c;所以要会推导而不是背公式&#xff0c;情况变了&#xff0c;公式就不管用了。 行优先、只存储主对角线下三角区&#xff1a; 矩阵下标 ai,j(i>j)->一维数组下标 B[k] 按照行优先的原则&#xff0c;确定 ai,j 是一维数…

搭建yum仓库服务器

安装 1.安装linux 1.1安装依赖 yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel 1.2下载 cd /opt/nginx wget http://nginx.org/download/nginx-1.25.3.tar.gz 1.3解压 tar -xvf nginx-1.25.3.tar.gz 1.4配置 cd nginx-1.25.3 ./configure --pre…

NLP_引入注意力机制

文章目录 点积注意力创建两个张量x1和x2计算张量点积&#xff0c; 得到原始权重对原始权重进行归一化求出注意力分布的加权和 缩放点积注意力编码器-解码器注意力定义Attention类重构Decoder类重构Seq2Seq类可视化注意力权重 注意力机制中的 Q、K、V自注意力多头自注意力注意力…

【MATLAB】使用随机森林在回归预测任务中进行特征选择(深度学习的数据集处理)

1.随机森林在神经网络的应用 当使用随机森林进行特征选择时&#xff0c;算法能够为每个特征提供一个重要性得分&#xff0c;从而帮助识别对目标变量预测最具影响力的特征。这有助于简化模型并提高其泛化能力&#xff0c;减少过拟合的风险&#xff0c;并且可以加快模型训练和推理…

阿里云游戏服务器租用价格表,2024最新报价

阿里云游戏服务器租用价格表&#xff1a;4核16G服务器26元1个月、146元半年&#xff0c;游戏专业服务器8核32G配置90元一个月、271元3个月&#xff0c;阿里云服务器网aliyunfuwuqi.com分享阿里云游戏专用服务器详细配置和精准报价&#xff1a; 阿里云游戏服务器租用价格表 阿…

【Linux系统学习】5.Linux实用操作 下

7.虚拟机配置固定IP 7.1 为什么需要固定IP 当前我们虚拟机的Linux操作系统&#xff0c;其IP地址是通过DHCP服务获取的。 DHCP&#xff1a;动态获取IP地址&#xff0c;即每次重启设备后都会获取一次&#xff0c;可能导致IP地址频繁变更 原因1&#xff1a;办公电脑IP地址变化无所…

顺序表、链表(ArrayList、LinkedList)

目录 前言&#xff1a; 顺序表&#xff08;ArrayList&#xff09;&#xff1a; 顺序表的原理&#xff1a; ArrayList源码&#xff1a; 的含义&#xff1a;​编辑 ArrayList的相关方法&#xff1a;​编辑 向上转型List&#xff1a; 练习题&#xff08;杨辉三角&#x…

Go 语言中如何大小端字节序?int 转 byte 是如何进行的?

嗨&#xff0c;大家好&#xff01;我是波罗学。 本文是系列文章 Go 技巧第十五篇&#xff0c;系列文章查看&#xff1a;Go 语言技巧。 我们先看这样一个问题&#xff1a;“Go 语言中&#xff0c;将 byte 转换为 int 时是否涉及字节序&#xff08;endianness&#xff09;&#x…