Netty源码—4.客户端接入流程二

大纲

1.关于Netty客户端连接接入问题整理

2.Reactor线程模型和服务端启动流程

3.Netty新连接接入的整体处理逻辑

4.新连接接入之检测新连接

5.新连接接入之创建NioSocketChannel

6.新连接接入之绑定NioEventLoop线程

7.新连接接入之注册Selector和注册读事件

8.注册Reactor线程总结

9.新连接接入总结

6.新连接接入之绑定NioEventLoop线程

(1)将新连接绑定到Reactor线程的入口

(2)服务端Channel的Pipeline介绍

(3)服务端Channel默认的Pipeline处理器

(4)服务端Channel处理新连接的步骤

(5)总结

(1)将新连接绑定到Reactor线程的入口

创建完NioSocketChannel后,接下来便要对NioSocketChannel进行一些设置,并且需要将它绑定到一个正在执行的Reactor线程中。

NioMessageUnsafe.read()方法里的readBuf容器会承载着所有新建的连接,如果某个时刻Netty轮询到多个连接,那么通过使用for循环就可以批量处理这些NioSocketChannel连接。

处理每个NioSocketChannel连接时,是通过NioServerSocketChannel的pipeline的fireChannelRead()方法来处理的。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {Selector selector;private SelectedSelectionKeySet selectedKeys;private boolean needsToSelectAgain;private int cancelledKeys;...@Overrideprotected void run() {for (;;) {...//1.调用select()方法执行一次事件轮询select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}...//2.处理产生IO事件的ChannelneedsToSelectAgain = false;processSelectedKeys();...//3.执行外部线程放入TaskQueue的任务runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}private void processSelectedKeys() {if (selectedKeys != null) {//selectedKeys.flip()会返回一个数组processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {//1.首先取出IO事件final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//Help GC//2.然后获取对应的Channel和处理该Channel//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {//网络事件的处理processSelectedKey(k, (AbstractNioChannel) a);} else {//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}//3.最后判断是否应该再进行一次轮询if (needsToSelectAgain) {for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();//selectedKeys.flip()会返回一个数组selectedKeys = this.selectedKeys.flip();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();...try {int readyOps = k.readyOps();//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise//the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入//此时将调用Channel的unsafe变量来进行实际操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//进行新连接接入处理unsafe.read();if (!ch.isOpen()) {//Connection already closed - no need to handle write.return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}...
}//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {//临时存放读到的连接NioSocketChannelprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {//断言确保该read()方法必须来自Reactor线程调用assert eventLoop().inEventLoop();//获得Channel对应的Pipelinefinal ChannelPipeline pipeline = pipeline();//获得Channel对应的RecvByteBufAllocator.Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();do {//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channelint localRead = doReadMessages(readBuf);if (localRead == 0) {break;}} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接//2.设置并绑定NioSocketChannelint size = readBuf.size();for (int i = 0; i < size; i ++) {//调用DefaultChannelPipeline的fireChannelRead()方法//开始处理每个NioSocketChannel连接pipeline.fireChannelRead(readBuf.get(i));}//3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法readBuf.clear();//结束处理每个NioSocketChannel连接pipeline.fireChannelReadComplete();}}//Read messages into the given array and return the amount which was read.protected abstract int doReadMessages(List<Object> buf) throws Exception;...
}

(2)服务端Channel的Pipeline介绍

在Netty的各种类型的Channel中,都会包含一个Pipeline。Pipeline可理解为一条流水线,流水线有起点有结束,中间还会有各种各样的流水线关卡。对Channel的处理会在流水线的起点开始,然后经过各个流水线关卡的加工,最后到达流水线的终点结束。

流水线Pipeline的开始是HeadContext,结束是TailContext。HeadContext中会调用Unsafe进行具体的操作,TailContext中会向用户抛出流水线Pipeline中未处理异常和未处理消息的警告。

在服务端的启动过程中,Netty会给服务端Channel自动添加一个Pipeline处理器ServerBootstrapAcceptor,并且会将用户代码中设置的一系列参数传入到这个ServerBootstrapAcceptor的构造方法中。

服务端Channel的Pipeline如下所示:

图片

所以服务端Channel的Pipeline在传播ChannelRead事件时首先会从HeadContext处理器开始,然后传播到ServerBootstrapAcceptor处理器,最后传播到TailContext处理器结束。

(3)服务端Channel默认的Pipeline处理器

首先,服务端启动时会给服务端Channel的Pipeline添加一个ServerBootstrapAcceptor处理器。

//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...@Overridevoid init(Channel channel) throws Exception {//1.设置服务端Channel的Option与Attrfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}//2.设置客户端Channel的Option与Attrfinal EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}//3.配置服务端启动逻辑ChannelPipeline p = channel.pipeline();//p.addLast()用于定义服务端启动过程中需要执行哪些逻辑p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {//一.添加用户自定义的Handler,注意这是handler,而不是childHandlerfinal ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) pipeline.addLast(handler);//二.添加一个特殊的Handler用于接收新连接//自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptorch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//调用DefaultChannelPipeline的addLast()方法pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final DefaultChannelPipeline pipeline;...//Creates a new instance.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}@Overridepublic ChannelPipeline pipeline() {return pipeline;}...
}//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {...for (ChannelHandler h: handlers) {if (h == null) break;addLast(executor, null, h);}return this;}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);...}...}//往Pipeline中添加ChannelHandler处理器private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}...
}

然后,新连接接入调用到服务端Channel的Pipeline的fireChannelRead()方法时,便会触发调用ServerBootstrapAcceptor处理器的channelRead()方法。最终会调用NioEventLoop的register()方法注册这个新连接Channel,即给新连接Channel绑定一个Reactor线程。

//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {//从Pipeline的第一个HeadContext处理器开始调用AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//调用AbstractChannelHandlerContext的fireChannelRead()方法ctx.fireChannelRead(msg);}@Overridepublic ChannelHandler handler() {return this;}...}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {...static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}}private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {//首先调用的是Pipeline的第一个处理器HeadContext的channelRead()方法//注意:HeadContext继承了AbstractChannelHandlerContext((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(), msg);return this;}private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {//注意:HeadContext继承了AbstractChannelHandlerContext//所以如果this是HeadContext,那么这里会获取下一个节点ServerBootstrapAcceptorctx = ctx.next;} while (!ctx.inbound);return ctx;}...
}public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;...//channelRead()方法在新连接接入时被调用@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;//1.给新连接的Channel添加用户自定义的Handler处理器//这里的childHandler其实是一个特殊的Handler: ChannelInitializerchild.pipeline().addLast(childHandler);//2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关for (Entry<ChannelOption<?>, Object> e: childOptions) {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}}//3.设置新连接Channel的属性for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}//4.绑定Reactor线程//childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法childGroup.register(child);}...}...
}// MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {......
}//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {...@Overridepublic ChannelFuture register(Channel channel) {//最终会调用NioEventLoop的register()方法注册这个新连接Channelreturn next().register(channel);}@Overridepublic EventLoop next() {//获取一个NioEventLoopreturn (EventLoop) super.next();}...
}//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutorChooserFactory.EventExecutorChooser chooser;...//Create a new instance.protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);}//Create a new instance.protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {...children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);...}//创建chooserchooser = chooserFactory.newChooser(children);...}@Overridepublic EventExecutor next() {//调用chooser的next()方法获得一个NioEventLoopreturn chooser.next();}...
}public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();private DefaultEventExecutorChooserFactory() { }@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTowEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}}
}

(4)服务端Channel处理新连接的步骤

ServerBootstrapAcceptor处理新连接的步骤:

一.给客户端Channel添加childHandler

给客户端Channel添加childHandler也就是将用户自定义的childHandler添加到新连接的pipeline里。

pipeline.fireChannelRead(NioSocketChannel)最终会调用到ServerBootstrapAcceptor的channelRead()方法,而且这个channelRead()方法一上来就会把入参的msg强制转换为Channel。

拿到新连接的Channel后就可以拿到其对应的Pipeline,这个Pipeline是在调用AbstractChannel构造方法时创建的。于是可以将用户代码中的childHandler添加到Pipeline中,而childHandler其实就是用户代码中的ChannelInitializer。所以新连接Channel的Pipeline的构成是:Head -> ChannelInitializer -> Tail。

二.设置客户端Channel的options和attr

所设置的childOptions和childAttrs也是在用户代码中设置的,这些设置项最终会传递到ServerBootstrapAcceptor的channelRead()方法中进行具体设置。

三.选择NioEventLoop绑定客户端Channel

childGroup.register(child)中的childGroup就是用户代码里创建的workerNioEventLoopGroup。NioEventLoopGroup的register()方法会调用next()由其父类通过线程选择器chooser返回一个NioEventLoop。所以childGroup.register(child)最终会调用到NioEventLoop的register()方法,这和注册服务端Channel时调用config().group().register(channel)一样。

(5)总结

服务端Channel在检测到新连接并且创建完客户端Channel后,会通过服务端Channel的Pipeline的一个处理器ServerBootstrapAcceptor做一些处理。这些处理包括:给客户端Channel的Pipeline添加childHandler处理器、设置客户端Channel的options和attrs、调用线程选择器chooser选择一个NioEventLoop进行绑定。绑定时会将该客户端Channel注册到NioEventLoop的Selector上,此时还不会关心事件。

7.新连接接入之注册Selector和注册读事件

NioEventLoop的register()方法是由其父类SingleThreadEventLoop实现的,并最终调用到AbstractChannel的内部类AbstractUnsafe的register0()方法。

步骤一:注册Selector

和服务端启动过程一样,先调用AbstractNioChannel的doRegister()方法进行注册。其中javaChannel().register()会将新连接NioSocketChannel绑定到Reactor线程的Selector上,这样后续这个新连接NioSocketChannel所有的事件都由绑定的Reactor线程的Selector来轮询。

步骤二:配置自定义Handler

此时新连接NioSocketChannel的Pipeline中有三个Handler:Head -> ChannelInitializer -> Tail。invokeHandlerAddedIfNeeded()最终会调用ChannelInitializer的handlerAdded()方法。

步骤三:传播ChannelRegistered事件

pipeline.fireChannelRegistered()会把新连接的注册事件从HeadContext开始往下传播,调用每一个ChannelHandler的channelRegistered()方法。

步骤四:注册读事件

接着还会传播ChannelActive事件。传播完ChannelActive事件后,便会继续调用HeadContetx的readIfIsAutoRead()方法注册读事件。由于创建NioSocketChannel时已将SelectionKey.OP_READ的事件代码保存到其成员变量中,所以AbstractNioChannel的doBeginRead()方法,就可以将SelectionKey.OP_READ事件注册到Selector中完成读事件的注册。

//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;private volatile EventLoop eventLoop;...//Creates a new instance.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}//Returns a new DefaultChannelPipeline instance.protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}//Unsafe implementation which sub-classes must extend and use.protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//绑定事件循环器,即绑定一个NioEventLoop到该Channel上AbstractChannel.this.eventLoop = eventLoop;//注册Selector,并启动一个NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通过启动这个NioEventLoop线程来调用register0()方法将这个Channel注册到Selector上eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}private void register0(ChannelPromise promise) {...boolean firstRegistration = this.neverRegistered;//1.调用JDK底层注册Channel到Selector上doRegister();this.neverRegistered = false;this.registered = true;//2.配置自定义Handlerthis.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);//3.传播channelRegisterd事件this.pipeline.fireChannelRegistered();//4.注册读事件if (isActive()) {if (firstRegistration) {//会进入这个方法,传播完ChannelActive事件后,再注册读事件this.pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}...}@Overridepublic final void beginRead() {...//调用AbstractNioChannel实现的doBeginRead()方法doBeginRead();...}...}//Is called after the Channel is registered with its EventLoop as part of the register process.//Sub-classes may override this methodprotected void doRegister() throws Exception {// NOOP}//Schedule a read operation.protected abstract void doBeginRead() throws Exception;@Overridepublic Channel read() {//调用DefaultChannelPipeline的read()方法pipeline.read();return this;}...
}//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {private final SelectableChannel ch;//这是NIO中的Channelprotected final int readInterestOp;volatile SelectionKey selectionKey;...//Create a new instance//@param parent,the parent Channel by which this instance was created. May be null.//@param ch,he underlying SelectableChannel on which it operates//@param readInterestOp,the ops to set to receive data from the SelectableChannelprotected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中//可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量chthis.ch = ch;this.readInterestOp = readInterestOp;...//设置Channel对象为非阻塞模式ch.configureBlocking(false);...}@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {...//首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法,//将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上;//这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来;//而且注册的ops值是0,表示此时还不关注任何事件;selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;...}}protected SelectableChannel javaChannel() {return ch;}@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {//将SelectionKey.OP_READ读事件注册到Selector上,表示这个客户端Channel可以处理读事件了selectionKey.interestOps(interestOps | readInterestOp);}}...
}//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...@Overridepublic final ChannelPipeline fireChannelActive() {//调用HeadContext的channelActive()方法AbstractChannelHandlerContext.invokeChannelActive(head);return this;}@Overridepublic final ChannelPipeline read() {//从TailContext开始,最终会调用到HeadContext的read()方法tail.read();return this;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;...@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();//传播ChannelActive事件readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {//调用AbstractChannel的read()方法channel.read();}}@Overridepublic void read(ChannelHandlerContext ctx) {//调用AbstractChannel.AbstractUnsafe的beginRead()方法unsafe.beginRead();}...}
}

8.注册Reactor线程总结

一.首先当boss Reactor线程在检测到有ACCEPT事件之后,会创建JDK底层的Channel。

二.然后使用一个NioSocketChannel包装JDK底层的Channel,把用户设置的ChannelOption、ChannelAttr、ChannelHandler都设置到该NioSocketChannel中。

三.接着从worker Reactor线程组中,也就是worker NioEventLoopGroup中,通过线程选择器chooser选择一个NioEventLoop出来。

四.最后把NioSocketChannel包装的JDK底层Channel当作key,自身NioSocketChannel当作attachment,注册到NioEventLoop对应的Selector上。这样后续有读写事件发生时,就可以从底层Channel直接获得attachment即NioSocketChannel来进行读写数据的逻辑处理。

9.新连接接入总结

新连接接入整体可以分为两部分:一是检测新连接,二是注册Reactor线程。

一.首先在Netty服务端的Channel(也就是NioServerSocketChannel)绑定的NioEventLoop(也就是boss线程)中,轮询到ACCEPT事件。

二.然后调用JDK的服务端Channel的accept()方法获取一个JDK的客户端Channel,并且将其封装成Netty的客户端Channel(即NioSocketChannel)。

三.封装过程中会创建这个NioSocketChannel一系列的组件,如unsafe组件和pipeline组件。unsafe组件主要用于进行Channel的读写,pipeline组件主要用于处理Channel数据的业务逻辑。

四.接着Netty服务端Channel的Pipeline的一个处理器ServerBootstrapAcceptor,会给当前Netty客户端Channel分配一个NioEventLoop并将客户端Channel绑定到Selector上。

五.最后会传播ChannelRegistered事件和ChannelActive事件,并将客户端Channel的读事件注册到Selector上。

至此,新连接NioSocketChannel便可以开始正常读写数据了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/38640.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025.3.17-2025.3.23学习周报

目录 摘要Abstract1 文献阅读1.1 动态图邻接矩阵1.2 总体框架1.2.1 GCAM1.2.2 输出块 1.3 实验分析 总结 摘要 在本周阅读的文献中&#xff0c;作者提出了一种名为TFM-GCAM的模型。TFM-GCAM模型的创新主要分为两部分&#xff0c;一部分是交通流量矩阵的设计&#xff0c;TFM-GC…

生活电子类常识——搭建openMauns工作流+搭建易犯错解析

前言 小白一句话生成一个网站&#xff1f;小白一句话生成一个游戏&#xff1f;小白一句话生成一个ppt?小白一句话生成一个视频&#xff1f; 可以 原理 总体的执行流程是 1&#xff0c;用户下达指令 2&#xff0c;大模型根据用户指令&#xff0c;分解指令任务为多个细分步骤…

深入解析 Uniswap:自动做市商模型的数学推导与智能合约架构

目录 1. 自动做市商&#xff08;AMM&#xff09;模型的数学推导1.1 恒定乘积公式推导1.2 价格影响与滑点 2. Uniswap 智能合约架构解析2.1 核心合约&#xff08;Core&#xff09;2.1.1 工厂合约&#xff08;Factory&#xff09;2.1.2 交易对合约&#xff08;Pair&#xff09; 2…

高频面试题(含笔试高频算法整理)基本总结回顾20

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…

生成模型速通(Diffusion,VAE,GAN)

基本概念 参考视频https://www.bilibili.com/video/BV1re4y1m7gb/?spm_id_from333.337.search-card.all.click&vd_sourcef04f16dd6fd058b8328c67a3e064abd5 生成模型其实是主要是依赖概率分布&#xff0c;对输入特征的概率密度函数建模 隐空间&#xff08;latent space)…

Android在kts中简单使用AIDL

Android在kts中简单使用AIDL AIDL相信做Android都有所了解&#xff0c;跨进程通信会经常使用&#xff0c;这里就不展开讲解原理跨进程通信的方式了&#xff0c;最近项目换成kts的方式&#xff0c;于是把aidl也换成了统一的方式&#xff0c;其中遇到了很多问题&#xff0c;这里…

学习本地部署DeepSeek的过程(基于ollama)

DeepSeek除了支持在线调用服务接口外&#xff0c;还支持本地部署后调用本地服务&#xff0c;这样的好处是不需要api key&#xff0c;且资源独占&#xff0c;还能训练个人知识库。本文学习并记录本地部署DeepSeek的过程。   参考文献3中列出了不同模型对于电脑硬件的要求&…

文献分享: ColXTR——将ColBERTv2的优化引入ColXTR

1. ColXTR \textbf{1. ColXTR} 1. ColXTR原理 1.1. ColBERTv2 \textbf{1.1. ColBERTv2} 1.1. ColBERTv2概述 1.1.1. \textbf{1.1.1. } 1.1.1. 训练优化 1️⃣难负样本生成 初筛&#xff1a;基于 BM-25 \text{BM-25} BM-25找到可能的负样本重排&#xff1a;使用 KL \text{KL} KL…

Altium Designer数模电学习笔记

模电 电容 **退耦&#xff1a;**利用通交阻直&#xff0c;将看似直流的信号中的交流成分滤除 &#xff08;一般用在给MPU供电&#xff0c;尽量小一些&#xff0c;10nf~100nf~1uf以下&#xff09; **滤波&#xff1a;**也可以理解为给电容充电&#xff0c;让电容在电平为低时…

从指令集鸿沟到硬件抽象:AI 如何重塑手机与电脑编程语言差异——PanLang 原型全栈设计方案与实验性探索1

AI 如何跨越指令集鸿沟&#xff1f;手机与电脑编程语言差异溯源与统一路径——PanLang 原型全栈设计方案与实验性探索1 文章目录 AI 如何跨越指令集鸿沟&#xff1f;手机与电脑编程语言差异溯源与统一路径——PanLang 原型全栈设计方案与实验性探索1前言一、手机与电脑编程语言…

python 实现一个简单的window 任务管理器

import tkinter as tk from tkinter import ttk import psutil# 运行此代码前&#xff0c;请确保已经安装了 psutil 库&#xff0c;可以使用 pip install psutil 进行安装。 # 由于获取进程信息可能会受到权限限制&#xff0c;某些进程的信息可能无法获取&#xff0c;代码中已经…

C之(15)cppcheck使用介绍

C之(15)cppcheck使用介绍 Author: Once Day Date: 2025年3月23日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文章可查看专栏: Linux实践记录_Once_day的博客-CS…

Ant Design Vue Select 选择器 全选 功能

Vue.js的组件库Ant Design Vue Select 选择器没有全选功能&#xff0c;如下图所示&#xff1a; 在项目中&#xff0c;我们自己实现了全选和清空功能&#xff0c;如下所示&#xff1a; 代码如下所示&#xff1a; <!--* 参数配置 - 风力发电 - 曲线图 * 猴王软件学院 - 大强 …

CaiT (Class-Attention in Image Transformers):深度图像Transformer的创新之路

CaiT (Class-Attention in Image Transformers)&#xff1a;深度图像Transformer的创新之路 近年来&#xff0c;Transformers 模型在自然语言处理领域的成功逐渐扩展到了计算机视觉领域&#xff0c;尤其是图像分类任务中&#xff0c;Vision Transformer (ViT) 的提出打破了卷积…

Qt之MVC架构MVD

什么是MVC架构&#xff1a; MVC模式&#xff08;Model–view–controller&#xff09;是软件工程中的一种软件架构模式&#xff0c;把软件系统分为三个基本部分&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&#xff08;Controll…

数组,指针 易混题解析(二)

目录 一.基础 1. 2. 二.中等 1. 坑 2. 3.指针1到底加什么 三.偏难 1.&#xff08;小端 x86&#xff09; 2.通过数组指针进行偏移的时候怎么偏移 3. 大BOSS &#xff08;1&#xff09;**cpp &#xff08;2&#xff09;*-- * cpp 3 &#xff08;3&#xff09;*c…

数据建模流程: 概念模型>>逻辑模型>>物理模型

数据建模流程 概念模型 概念模型是一种高层次的数据模型&#xff0c;用于描述系统中的关键业务概念及其之间的关系。它主要关注业务需求和数据需求&#xff0c;而不涉及具体的技术实现细节。概念模型通常用于在项目初期帮助业务人员和技术人员达成共识&#xff0c;确保对业务需…

spring-security原理与应用系列:建造者

目录 1.构建过程 AbstractSecurityBuilder AbstractConfiguredSecurityBuilder WebSecurity 2.建造者类图 SecurityBuilder ​​​​​​​AbstractSecurityBuilder ​​​​​​​AbstractConfiguredSecurityBuilder ​​​​​​​WebSecurity 3.小结 紧接上一篇文…

结合代码理解Spring AOP的概念(切面、切入点、连接点等)

前情回顾 对AOP的理解 我这篇文章介绍了为什么要有AOP&#xff08;AOP解决了什么问题&#xff09;以及如何实现AOP。但在实现AOP的时候&#xff0c;并未探讨AOP相关概念&#xff0c;例如&#xff1a;切面、切入点、连接点等。因此&#xff0c;本篇文章希望结合代码去理解Spring…

【AI大模型】搭建本地大模型GPT-NeoX:详细步骤及常见问题处理

搭建本地大模型GPT-NeoX:详细步骤及常见问题处理 GPT-NeoX是一个开源的大型语言模型框架,由EleutherAI开发,可用于训练和部署类似GPT-3的大型语言模型。本指南将详细介绍如何在本地环境中搭建GPT-NeoX,并解决过程中可能遇到的常见问题。 1. 系统要求 1.1 硬件要求 1.2 软…