一、select和epoll原理分析
外设设备网卡、鼠标、键盘等通过总线写到内存中,中间就有DMA拷贝,操作系统怎么知道内存中有数据了,这就需要操作系统通过中断机制确定,如果有中断信号过来,cpu会首先打断用户程序执行,响应硬件的程序的信号,然后再恢复用户程序的执行;不同外设设备对应的驱动程序不同,发送的中断信号也就不同,操作系统根据对应信号做出相应的处理
1、操作系统如何处理中断请求
内核和设备驱动是通过中断方式来处理的。所谓中断,可以理解当设备上有数据到达时,会给cpu的相关引脚上发一个电压变化,以通知cpu来处理数据;硬件产生的信号需要cpu立马处理,否则数据可能丢失
网络模块比较耗时,中断时会过度占用cpu,导致cpu无法响应其它设备,因此在Linux中段处理函数分上半部和下半部,上半部通知cpu,下半部响应
2、进程阻塞
操作系统为了支持多任务,实现进程调度功能,会把进程分为“运行”和“等待”等几种状态。运行状态是获取cpu使用权,正在执行的代码是运行状态;等待状态是阻塞状态,详情参照下图(个人理解)
3、内核接收网络数据
一旦有cpu响应中断操作就会拷贝数据到内存,经过协议层的解析到socket应用层就有了数据,就会唤醒进程A,重新进入到工作队列中
4、同时监视多个socket的简单方法
select实现思路很直接。假如程序同时监视socket1、socket2、socket3,那么调用select之后,操作系统把进程A分别加入到这三个socket的等待列表中,当任何一个socket收到数据,都会中断程序唤醒进程A,所有的socket的等待队列中的进程A都会被移除,加入到工作队列中
如上图所示,进程A只知道Socket有数据过来,并不知道哪些Socket有数据,所以就需要遍历Socket列表;并且处于遍历的性能考虑,select最大只能监视1024个Socket
5、epoll的原理
当执行epoll_create方法时,内核会创建一个eventpoll对象;当Socket收到数据后,中断程序会操作eventpoll对象,而不直接操作进程;中断程序会给rdlist引用收到的数据Socket2、Socket3,当执行epoll_wait,如果rdlist已经引用了Socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程
当Socket接收到数据,中断程序一方面修改rdlist,另一个方面唤醒 eventpoll等待队列的线程,线程A回到工作队列中去,由于rdlist引用了接收了数据的Socket,所以不用对所有的进行遍历
二、Netty基础
1、Netty的组件
- Bootstrap是Netty框架的启动类和主入口类,分为客户端类Bootstrap和服务器类ServerBootstrap两种。
- EventLoop暂时可以看成一个线程、EventLoopGroup 自然就可以看成线程组。
- Channel是Java NIO的一个基本构造。
- ChannelHandler和ChannelPipeline:每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法,既然事件分为 入站和出站,用来处理事件的 ChannelHandler 也被分为可以处理入站事件的 Handler 和出站 事件的 Handler,当然有些 Handler 既可以处理入站也可以处理出站;这些 ChannelHandler 都放在 ChannelPipeline 中统一管理
- ChannelFuture:异步获取结果的类,类似于JDK的java.util.concurrent.Future类
2、使用示例
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId<version>4.1.42.Final </version><scope>compile</scope>
</dependency>
服务端
public class EchoServer {private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);private final int port;public EchoServer(int port) {this.port = port;}public static void main(String[] args) throws InterruptedException {int port = 9999;EchoServer echoServer = new EchoServer(port);LOG.info("服务器即将启动");echoServer.start();LOG.info("服务器关闭");}public void start() throws InterruptedException {/*线程组*/EventLoopGroup group = new NioEventLoopGroup();try {/*服务端启动必备*/ServerBootstrap b = new ServerBootstrap();b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoServerHandler());}});/*异步绑定到服务器,sync()会阻塞到完成*/ChannelFuture f = b.bind().sync();LOG.info("服务器启动完成。");/*阻塞当前线程,直到服务器的ServerChannel被关闭*/f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}}
客户端
public class EchoClient {private final int port;private final String host;public EchoClient(int port, String host) {this.port = port;this.host = host;}public void start() throws InterruptedException {/*线程组*/EventLoopGroup group = new NioEventLoopGroup();try {/*客户端启动必备,和服务器的不同点*/Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class)/*指定使用NIO的通信模式*//*指定服务器的IP地址和端口,和服务器的不同点*/.remoteAddress(new InetSocketAddress(host,port))/*和服务器的不同点*/.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoClientHandler());}});/*异步连接到服务器,sync()会阻塞到完成,和服务器的不同点*/ChannelFuture f = b.connect().sync();f.channel().closeFuture().sync();/*阻塞当前线程,直到客户端的Channel被关闭*/} finally {group.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {new EchoClient(9999,"127.0.0.1").start();}
}
服务端的ChannelHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;System.out.println("server accept :" + in.toString(CharsetUtil.UTF_8));ctx.writeAndFlush(in);//ctx.close();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("连接已建立");super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
客户端的ChannelHandler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {/*读取到网络数据后进行业务处理,并关闭连接*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept"+msg.toString(CharsetUtil.UTF_8));//关闭连接///ctx.close();}/*channel活跃后,做业务处理*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty",CharsetUtil.UTF_8));
// ctx.pipeline().write()
// ctx.channel().write()ctx.alloc().buffer();}
}