netty之数据读写源码阅读

数据读写

write

从client端的写开始看

client与服务端建立完connect后可以从future里拿到连接的channel对象。这里的channel是io.netty.channel.Channel对象。

调用其channel.writeAndFlush(msg);方法可以进行数据发送。

writeAndFlush会调用pipeline的writeAndFlush方法

public ChannelFuture writeAndFlush(Object msg) {return pipeline.writeAndFlush(msg);
}

pipeline实现是DefaultChannelPipeline类,其writeAndFlush方法如下

public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}

我们回顾下pipeline的初始化,默认会设置两个handler,tail和head。tail是Inbound类型的handler。head既是outbound又是inbound类型的handler

protected DefaultChannelPipeline(Channel channel) {tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

写数据是从handler的tail开始的。

tail里的write方法会先创建一个promise方法,然后调用write方法,最后返回promise。

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {write(msg, true, promise);return promise;
}

write方法在父类AbstractChannelHandlerContext的默认实现。这里也是handler责任链式递归调用主要方法。每一个handler都有该write方法(都包装成HandlerContext),当自身invokeWriteAndFlush自行完后会继续调用write方法获取next handler。

private void write(Object msg, boolean flush, ChannelPromise promise) {//从tail往前找outBound类型的handlerfinal AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);//拿出来handler绑定的线程executorEventExecutor executor = next.executor();if (executor.inEventLoop()) { //判断是不是当前线程是不是eventLoop绑定的线程if (flush) {//需要flush调用writeflush方法next.invokeWriteAndFlush(m, promise);} else {//不需要flushnext.invokeWrite(m, promise);}} else {//不是同一个线程,构造task放入线程执行队列里final WriteTask task = WriteTask.newInstance(next, m, promise, flush);if (!safeExecute(executor, task, promise, m, !flush)) {task.cancel();}}
}

write方法首先从tail往前找下一个outBound类型的handler。如果我们在初始化client连接的时候没有往pipeline里新加入outBound类的handler,那么这里找到的就是head。

再往下拿出的executor这里是channel绑定的nioEvenLoop对象。在前面channel启动过程我们知道,Bootstrap会绑定一个EventLoopGroup。新一个channel,EventLoopGroup会拿出一个child与之进行绑定,child是单线程的executor实现。需要执行的task会先加入taskQueue。这里的executor就是一个child,NioEventLoop类型。

由于我们当前调用write的线程是业务线程,executor.inEventLoop()这一步判断(判断当前线程和NioEventLoop线程池中的线程是否是同一个线程)是不成立的,所以会走else,构造一个task添加到taskQueue里。然后wakeup NioEventLoop里的监听线程执行任务。这些都是前面分析server启动代码流程。

再来看WriteTask里的run方法

public void run() {try {decrementPendingOutboundBytes();if (size >= 0) {ctx.invokeWrite(msg, promise);} else {ctx.invokeWriteAndFlush(msg, promise);}} finally {recycle();}
}

也是调用当前handler的invokeWrite或invokeWriteAndFlush方法。和上面if成立是逻辑一致。

HeadContext-write

假如我们这里没有往pipeline里添加任何handler。按照逻辑找到的next就是head。会调用head的invokeWriteAndFlush方法。

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);}
}

这里invokehandler()是成立的,主要判断当前handler的状态。

invokeWrite0方法就是调用Context对应的handler的write方法。

headhandlerwrite方法

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);
}

unsafe的write

public final void write(Object msg, ChannelPromise promise) {ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;int size;try {//过滤消息msg = filterOutboundMessage(msg);size = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} catch (Throwable t) {try {ReferenceCountUtil.release(msg);} finally {safeSetFailure(promise, t);}return;}outboundBuffer.addMessage(msg, size, promise);
}

断点跟踪发现,在执行filterOutboundMessage()方法这里就异常终止了。

protected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

这个方法主要对msg类型进行了判断。在最开始调用channel发送数据的时候传入的一个字符串,不符合可以传输的两个类型,抛出了UnsupportedOperationException。

数据包装

看来在初始化pipeline的时候还是需要搞一个outboudhandler进行数据的包装。这里我们使用netty自带的StringEncoder进行数据包装

channel.pipeline().addLast(new StringEncoder()).addLast(new StringDecoder());

那这样从tail找outbound就找到了StringEncoder。StringEncoder继承自MessageToMessageEncoder。

StringEncoder extends MessageToMessageEncoder<CharSequence>{}

这里有个泛型类型,下面会校验当前消息类型是该泛型的子类

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out = null;try {if (acceptOutboundMessage(msg)) {//判断消息类型是否可处理,这里判断需要是CharSequenceout = CodecOutputList.newInstance();//I是泛型CharSequenceI cast = (I) msg;//encode编码转换encode(ctx, cast, out);   }} else {ctx.write(msg, promise);}} finally {if (out != null) {try {//out的size是1,sizeMinusOne = 0final int sizeMinusOne = out.size() - 1;if (sizeMinusOne == 0) { //走if//这里write方法就会递归的调用下一个handler的write方法ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne > 0) {if (promise == ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {out.recycle();}}}
}

encode方法只是将msg转成bytebuf类型,放到out里

protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}

被包装成ByteBuf后,调用ctx.write进行调用下一个handler。这里StringEncoder只是对msg进行了封装,就又回到了head handler里。

数据缓存

在回到head handler里。这时候filterOutboundMessage()过滤消息就不会报错了。然后会调用outboundBuffer.addMessage(msg, size, promise);

这里addMessage就是构造一个entry,然后将entry放到链表尾部。到这里整个write方法就执行完了,从头到尾只是把数据组装,并没有数据流的操作.

数据发送

context的invokeWriteAndFlush方法有两步。上面看完了invokeWrite0方法只是组装数据,发送其实在invokeFlush0方法里

invokeWriteAndFlush(){invokeWrite0(msg, promise);invokeFlush0();
}

这里invokeWriteAndFlush的开始,是从tail开始找的next,也就是对应我们这里设定的StringEncoder。invokeWrite0()和invokeFlush0()方法都是递归往后调,直到head。write看完了,下面看invokeFlush0方法。

invokeFlush0()方法会调用当前Context对应handler的flush方法。

((ChannelOutboundHandler) handler()).flush(this);

StringEncoder的flush方法

public void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();
}

这里什么也没做,只是调ctx.flush方法调起下一个handler。

flush实现在里AbstractChannelHandlerContext里

public ChannelHandlerContext flush() {final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeFlush();} else {Tasks tasks = next.invokeTasks;if (tasks == null) {next.invokeTasks = tasks = new Tasks(next);}safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);}return this;
}

这里和write方法是相似的,也是MASK_FLUSH匹配的outbound然后递归调用invokeFlush方法,这里最终会调到handler的flush方法。

下一个HeadContext的flush方法

public void flush(ChannelHandlerContext ctx) {unsafe.flush();
}

最后还是unsafe的flush方法

public final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;//好像只是做了一些标记outboundBuffer.addFlush();flush0();
}

outboundBuffer.addFlush()这里标记了flushedEntry位置。将flushedEntry指向unflushedEntry。并修改每个entry对应的promise为不可取消

再往下调用调用unsafe.flush0()

flush0()方法

protected void flush0() {//...try {//dowrite方法了doWrite(outboundBuffer);} catch (Throwable t) {handleWriteError(t);} finally {inFlush0 = false;}
}

然后doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel();//获取写循环的次数,默认16int writeSpinCount = config().getWriteSpinCount();do {//循环处理待写数据if (in.isEmpty()) {// All written so clear OP_WRITEclearOpWrite();// Directly return here so incompleteWrite(...) is not called.return;}// 获取每个ByteBuf最大字节数int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();//转换成ByteBufferByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);int nioBufferCnt = in.nioBufferCount();// Always use nioBuffers() to workaround data-corruption.// See https://github.com/netty/netty/issues/2761switch (nioBufferCnt) {case 0:// We have something else beside ByteBuffers to write so fallback to normal writes.writeSpinCount -= doWrite0(in);break;case 1: {// Only one ByteBuf so use non-gathering write// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.ByteBuffer buffer = nioBuffers[0];int attemptedBytes = buffer.remaining();//channel写数据final int localWrittenBytes = ch.write(buffer);if (localWrittenBytes <= 0) {incompleteWrite(true);return;}adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}default: {// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.// We limit the max amount to int above so cast is safelong attemptedBytes = in.nioBufferSize();final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);if (localWrittenBytes <= 0) {incompleteWrite(true);return;}// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}}} while (writeSpinCount > 0);//循环16次还未写完生成task执行incompleteWrite(writeSpinCount < 0);
}

这里终于看到了channel.write()方法进行写数据。数据都存储在ChannelOutboundBuffer中。通过其nioBuffers()方法将缓冲数据转换成ByteBuffer[] buffers。将buffers写出到channel。最后调用removeBytes()方法将已写出数据从缓冲区刷出清理。

数据写出整个流程

在这里插入图片描述

read

读和写类似,selector监听到read事件后最终调用unsafe.read()进行读数据操作。这里unsafe的实例是NioByteUnsafe类型。首先读取数据到ByteBuf,然后从head开始递归调用pipleine里的handler进行消息的处理。

流程如下:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/137848.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nacos内核设计之一致性协议(上)

Nacos一致性协议 Nacos技术架构 先简单介绍下Nacos的技术架构 从而对nacos有一个整体的认识 如图Nacos架构分为四层 用户层、应用层、核心层、各种插件 再深入分析下nacos一致性协议的发展过程及原理实现 为什么nacos需要一致性协议 Nacos是一个需要存储数据的一个组件 为了实…

uniapp合法域名配置

首先打开微信开发者平台 找到开发管理 打开开发设置 找到服务器域名>修改 request 写入域名前缀即可 > 完成 重启小程序即可 感谢观看

瑞芯微:基于RK3568的ocr识别

光学字符识别&#xff08;Optical Character Recognition, OCR&#xff09;是指对文本资料的图像文件进行分析识别处理&#xff0c;获取文字及版面信息的过程。亦即将图像中的文字进行识别&#xff0c;并以文本的形式返回。OCR的应用场景 卡片证件识别类&#xff1a;大陆、港澳…

无涯教程-JavaScript - CONFIDENCE.NORM函数

描述 CONFIDENCE.NORM函数使用正态分布返回总体平均值的置信区间。 置信区间是一个值范围。您的样本均值x位于此范围的中心,范围为xCONFIDENCE.NORM。 语法 CONFIDENCE.NORM (alpha,standard_dev,size)争论 Argument描述Required/OptionalAlpha 显着性水平,用于计算置信度…

基于GBDT+Tkinter+穷举法按排队时间预测最优路径的智能导航推荐系统——机器学习算法应用(含Python工程源码)+数据集(三)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境Pycharm 环境Scikit-learnt 模块实现1. 数据预处理2. 客流预测3. 百度地图API调用4. GUI界面设计1&#xff09;手绘地图导入2&#xff09;下拉菜单设计3&#xff09;复选框设计4&#xff09;最短路径结果输出界面…

mysql限制用户登录失败次数,限制时间

mysql用户登录限制设置 mysql 需要进行用户登录次数限制,当使用密码登录超过 3 次认证链接失败之后,登录锁住一段时间,禁止登录这里使用的 mysql: 8.1.0 这种方式不用重启数据库. 配置: 首先进入到 mysql 命令行:然后需要安装两个插件: 在 mysql 命令行中执行: mysql> INS…

数据分析与可视化项目技术参考

&#x1f64c;秋名山码民的主页 &#x1f602;oi退役选手&#xff0c;Java、大数据、单片机、IoT均有所涉猎&#xff0c;热爱技术&#xff0c;技术无罪 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; 获取源码&#xff0c;添加WX 目录 1. 考核…

科目二倒车入库

调整座位和后视镜 离合踩到底大腿小腿成130-140 上半身90-100 座椅高度能看到前方全部情况 后视镜调节到能看到后门把手&#xff0c;且后门把手刚好在后视镜上方边缘、离车1/3处。 保持直线&#xff1a; 前进&#xff1a; 车仪表盘中央的原点和地面上的黄线擦边&#xff…

zookeeper可视化工具ZooInspector用法

最近在做银行的项目&#xff0c;用到了thrift&#xff0c;rpc和zookeeper&#xff0c;所有应用都是注册到zookeeper上的&#xff0c;想知道哪些应用注册上了&#xff0c;就用到ZooInspector这个可视化的工具。 1&#xff0c;下载 链接&#xff1a;https://issues.apache.org/…

关于数据权限的设计

在项目实际开发中我们不光要控制一个用户能访问哪些资源&#xff0c;还需要控制用户只能访问资源中的某部分数据。 控制一个用户能访问哪些资源我们有很成熟的权限管理模型即RBAC&#xff0c;但是控制用户只能访问某部分资源&#xff08;即我们常说的数据权限&#xff09;使用R…

ABB COM0011 2RAA005844A0007J编码器模块

ABB COM0011 2RAA005844A0007J 编码器模块是用于测量和反馈旋转或线性位置信息的设备&#xff0c;通常用于自动化、机器控制和运动控制系统。以下是该编码器模块可能具备的产品功能&#xff1a; 位置测量&#xff1a;ABB COM0011 2RAA005844A0007J 编码器模块的主要功能是测量旋…

【Java 基础篇】Java 线程通信详解

多线程编程在实际应用中非常常见&#xff0c;但随之而来的问题是线程之间的通信。线程通信是多线程编程中一个至关重要的概念&#xff0c;它涉及到线程之间的信息传递、同步和协作。本篇博客将详细解释Java中的线程通信&#xff0c;包括什么是线程通信、为什么需要线程通信、如…

油猴Safari浏览器辅助插件:Tampermonkey for Mac中文版

油猴脚本Tampermonkey是一款油猴Safari浏览器辅助插件&#xff0c;是一款适用于Safari用户的脚本管理&#xff0c;能够方便管理不同的脚本。虽然有些受支持的浏览器拥有原生的用户脚本支持&#xff0c;但tampermonkey油猴插件将在您的用户脚本管理方面提供更多的便利&#xff0…

uni-app 新增 微信小程序之新版隐私协议

一、manifest.json中配置 "__usePrivacyCheck__": true 二、编写封装后的组件 <template><view class"privacy" v-if"showPrivacy"><view class"content"><view class"title">隐私保护指引</…

蓝桥杯每日一题2023.9.21

蓝桥杯2021年第十二届省赛真题-异或数列 - C语言网 (dotcpp.com) 题目描述 Alice 和 Bob 正在玩一个异或数列的游戏。初始时&#xff0c;Alice 和 Bob 分别有一个整数 a 和 b&#xff0c;有一个给定的长度为 n 的公共数列 X1, X2, , Xn。 Alice 和 Bob 轮流操作&#xff0…

恢复数据库 NBU ZDLRA Backup

Duplicate of specific PDB in LOCAL UNDO for specific tablespaces using Tape/ZDLRA Backup (Doc ID 2926039.1)​编辑To Bottom In this Document Goal Solution References APPLIES TO: Oracle Database - Enterprise Edition - Version 12.2.0.1 and later Information…

如何解决 503 Service Temporarily Unavailable?

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f405;&#x1f43e;猫头虎建议程序员必备技术栈一览表&#x1f4d6;&#xff1a; &#x1f6e0;️ 全栈技术 Full Stack: &#x1f4da…

轻松使用androidstudio交叉编译libredwg库

对于安卓或嵌入式开发者而言,交叉编译是再熟悉不过的操作了,可是对于一些刚入门或初级开发者经常会遇到这样的问题:如何交叉编译C++库来生成安卓下的so库呢? 最近有一些粉丝找到我求救,那么我最近刚好有空大致研究了下,帮他们成功编译了其中一个libredwg的C++库,这篇文章…

21天学会C++:Day13----动态内存管理

CSDN的uu们&#xff0c;大家好。这里是C入门的第十三讲。 座右铭&#xff1a;前路坎坷&#xff0c;披荆斩棘&#xff0c;扶摇直上。 博客主页&#xff1a; 姬如祎 收录专栏&#xff1a;C专题 目录 1. 加深对内存四区的理解 2. new-delete 与 malloc-free 2.1 能否用 fre…

滚雪球学Java(31):玩转多维数组:高效访问和遍历

&#x1f3c6;本文收录于「滚雪球学Java」专栏&#xff0c;专业攻坚指数级提升&#xff0c;助你一臂之力&#xff0c;带你早日登顶&#x1f680;&#xff0c;欢迎大家关注&&收藏&#xff01;持续更新中&#xff0c;up&#xff01;up&#xff01;up&#xff01;&#xf…