数据读写
write
从client端的写开始看
client与服务端建立完connect后可以从future里拿到连接的channel对象。这里的channel是io.netty.channel.Channel对象。
调用其channel.writeAndFlush(msg);方法可以进行数据发送。
writeAndFlush会调用pipeline的writeAndFlush方法
public ChannelFuture writeAndFlush(Object msg) {return pipeline.writeAndFlush(msg);
}
pipeline实现是DefaultChannelPipeline类,其writeAndFlush方法如下
public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}
我们回顾下pipeline的初始化,默认会设置两个handler,tail和head。tail是Inbound类型的handler。head既是outbound又是inbound类型的handler
protected DefaultChannelPipeline(Channel channel) {tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}
写数据是从handler的tail开始的。
tail里的write方法会先创建一个promise方法,然后调用write方法,最后返回promise。
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {write(msg, true, promise);return promise;
}
write方法在父类AbstractChannelHandlerContext的默认实现。这里也是handler责任链式递归调用主要方法。每一个handler都有该write方法(都包装成HandlerContext),当自身invokeWriteAndFlush自行完后会继续调用write方法获取next handler。
private void write(Object msg, boolean flush, ChannelPromise promise) {//从tail往前找outBound类型的handlerfinal AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);//拿出来handler绑定的线程executorEventExecutor executor = next.executor();if (executor.inEventLoop()) { //判断是不是当前线程是不是eventLoop绑定的线程if (flush) {//需要flush调用writeflush方法next.invokeWriteAndFlush(m, promise);} else {//不需要flushnext.invokeWrite(m, promise);}} else {//不是同一个线程,构造task放入线程执行队列里final WriteTask task = WriteTask.newInstance(next, m, promise, flush);if (!safeExecute(executor, task, promise, m, !flush)) {task.cancel();}}
}
write方法首先从tail往前找下一个outBound类型的handler。如果我们在初始化client连接的时候没有往pipeline里新加入outBound类的handler,那么这里找到的就是head。
再往下拿出的executor这里是channel绑定的nioEvenLoop对象。在前面channel启动过程我们知道,Bootstrap会绑定一个EventLoopGroup。新一个channel,EventLoopGroup会拿出一个child与之进行绑定,child是单线程的executor实现。需要执行的task会先加入taskQueue。这里的executor就是一个child,NioEventLoop类型。
由于我们当前调用write的线程是业务线程,executor.inEventLoop()这一步判断(判断当前线程和NioEventLoop线程池中的线程是否是同一个线程)是不成立的,所以会走else,构造一个task添加到taskQueue里。然后wakeup NioEventLoop里的监听线程执行任务。这些都是前面分析server启动代码流程。
再来看WriteTask里的run方法
public void run() {try {decrementPendingOutboundBytes();if (size >= 0) {ctx.invokeWrite(msg, promise);} else {ctx.invokeWriteAndFlush(msg, promise);}} finally {recycle();}
}
也是调用当前handler的invokeWrite或invokeWriteAndFlush方法。和上面if成立是逻辑一致。
HeadContext-write
假如我们这里没有往pipeline里添加任何handler。按照逻辑找到的next就是head。会调用head的invokeWriteAndFlush方法。
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);}
}
这里invokehandler()是成立的,主要判断当前handler的状态。
invokeWrite0方法就是调用Context对应的handler的write方法。
headhandlerwrite方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);
}
unsafe的write
public final void write(Object msg, ChannelPromise promise) {ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;int size;try {//过滤消息msg = filterOutboundMessage(msg);size = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} catch (Throwable t) {try {ReferenceCountUtil.release(msg);} finally {safeSetFailure(promise, t);}return;}outboundBuffer.addMessage(msg, size, promise);
}
断点跟踪发现,在执行filterOutboundMessage()方法这里就异常终止了。
protected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
这个方法主要对msg类型进行了判断。在最开始调用channel发送数据的时候传入的一个字符串,不符合可以传输的两个类型,抛出了UnsupportedOperationException。
数据包装
看来在初始化pipeline的时候还是需要搞一个outboudhandler进行数据的包装。这里我们使用netty自带的StringEncoder进行数据包装
channel.pipeline().addLast(new StringEncoder()).addLast(new StringDecoder());
那这样从tail找outbound就找到了StringEncoder。StringEncoder继承自MessageToMessageEncoder。
StringEncoder extends MessageToMessageEncoder<CharSequence>{}
这里有个泛型类型,下面会校验当前消息类型是该泛型的子类
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out = null;try {if (acceptOutboundMessage(msg)) {//判断消息类型是否可处理,这里判断需要是CharSequenceout = CodecOutputList.newInstance();//I是泛型CharSequenceI cast = (I) msg;//encode编码转换encode(ctx, cast, out); }} else {ctx.write(msg, promise);}} finally {if (out != null) {try {//out的size是1,sizeMinusOne = 0final int sizeMinusOne = out.size() - 1;if (sizeMinusOne == 0) { //走if//这里write方法就会递归的调用下一个handler的write方法ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne > 0) {if (promise == ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {out.recycle();}}}
}
encode方法只是将msg转成bytebuf类型,放到out里
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
被包装成ByteBuf后,调用ctx.write进行调用下一个handler。这里StringEncoder只是对msg进行了封装,就又回到了head handler里。
数据缓存
在回到head handler里。这时候filterOutboundMessage()过滤消息就不会报错了。然后会调用outboundBuffer.addMessage(msg, size, promise);
这里addMessage就是构造一个entry,然后将entry放到链表尾部。到这里整个write方法就执行完了,从头到尾只是把数据组装,并没有数据流的操作.
数据发送
context的invokeWriteAndFlush方法有两步。上面看完了invokeWrite0方法只是组装数据,发送其实在invokeFlush0方法里
invokeWriteAndFlush(){invokeWrite0(msg, promise);invokeFlush0();
}
这里invokeWriteAndFlush的开始,是从tail开始找的next,也就是对应我们这里设定的StringEncoder。invokeWrite0()和invokeFlush0()方法都是递归往后调,直到head。write看完了,下面看invokeFlush0方法。
invokeFlush0()方法会调用当前Context对应handler的flush方法。
((ChannelOutboundHandler) handler()).flush(this);
StringEncoder的flush方法
public void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();
}
这里什么也没做,只是调ctx.flush方法调起下一个handler。
flush实现在里AbstractChannelHandlerContext里
public ChannelHandlerContext flush() {final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeFlush();} else {Tasks tasks = next.invokeTasks;if (tasks == null) {next.invokeTasks = tasks = new Tasks(next);}safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);}return this;
}
这里和write方法是相似的,也是MASK_FLUSH匹配的outbound然后递归调用invokeFlush方法,这里最终会调到handler的flush方法。
下一个HeadContext的flush方法
public void flush(ChannelHandlerContext ctx) {unsafe.flush();
}
最后还是unsafe的flush方法
public final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;//好像只是做了一些标记outboundBuffer.addFlush();flush0();
}
outboundBuffer.addFlush()这里标记了flushedEntry位置。将flushedEntry指向unflushedEntry。并修改每个entry对应的promise为不可取消
再往下调用调用unsafe.flush0()
flush0()方法
protected void flush0() {//...try {//dowrite方法了doWrite(outboundBuffer);} catch (Throwable t) {handleWriteError(t);} finally {inFlush0 = false;}
}
然后doWrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel();//获取写循环的次数,默认16int writeSpinCount = config().getWriteSpinCount();do {//循环处理待写数据if (in.isEmpty()) {// All written so clear OP_WRITEclearOpWrite();// Directly return here so incompleteWrite(...) is not called.return;}// 获取每个ByteBuf最大字节数int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();//转换成ByteBufferByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);int nioBufferCnt = in.nioBufferCount();// Always use nioBuffers() to workaround data-corruption.// See https://github.com/netty/netty/issues/2761switch (nioBufferCnt) {case 0:// We have something else beside ByteBuffers to write so fallback to normal writes.writeSpinCount -= doWrite0(in);break;case 1: {// Only one ByteBuf so use non-gathering write// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.ByteBuffer buffer = nioBuffers[0];int attemptedBytes = buffer.remaining();//channel写数据final int localWrittenBytes = ch.write(buffer);if (localWrittenBytes <= 0) {incompleteWrite(true);return;}adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}default: {// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.// We limit the max amount to int above so cast is safelong attemptedBytes = in.nioBufferSize();final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);if (localWrittenBytes <= 0) {incompleteWrite(true);return;}// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}}} while (writeSpinCount > 0);//循环16次还未写完生成task执行incompleteWrite(writeSpinCount < 0);
}
这里终于看到了channel.write()方法进行写数据。数据都存储在ChannelOutboundBuffer中。通过其nioBuffers()方法将缓冲数据转换成ByteBuffer[] buffers。将buffers写出到channel。最后调用removeBytes()方法将已写出数据从缓冲区刷出清理。
数据写出整个流程
read
读和写类似,selector监听到read事件后最终调用unsafe.read()进行读数据操作。这里unsafe的实例是NioByteUnsafe类型。首先读取数据到ByteBuf,然后从head开始递归调用pipleine里的handler进行消息的处理。
流程如下: