Netty-2-数据编解码

解析编解码支持的原理

以编码为例,要将对象序列化成字节流,你可以使用MessageToByteEncoder或MessageToMessageEncoder类。

在这里插入图片描述
这两个类都继承自ChannelOutboundHandlerAdapter适配器类,用于进行数据的转换。

其中,对于MessageToMessageEncoder来说,如果把口标设置为ByteBuf,那么效果等同于使用MessageToByteEncodero这就是它们都可以进行数据编码的原因。

//MessageToMessageEncoder@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {// 创建一个CodecOutputList对象,并将其初始化为nullCodecOutputList out = null;try {// 检查消息是否满足输出条件if (acceptOutboundMessage(msg)) {// 创建一个CodecOutputList对象,并将其赋值给out变量out = CodecOutputList.newInstance();// 将msg强制转换为I类型,并赋值给cast变量@SuppressWarnings("unchecked")I cast = (I) msg;try {// 调用encode方法,将ctx、cast和out作为参数传入encode(ctx, cast, out);} catch (Throwable th) {// 释放cast的引用计数ReferenceCountUtil.safeRelease(cast);// 抛出异常PlatformDependent.throwException(th);}// 释放cast的引用计数ReferenceCountUtil.release(cast);// 检查out是否为空if (out.isEmpty()) {// 抛出编码异常throw new EncoderException(StringUtil.simpleClassName(this) + " must produce at least one message.");}} else {// 直接将msg写入通道ctx.write(msg, promise);}} catch (EncoderException e) {// 抛出编码异常throw e;} catch (Throwable t) {// 抛出编码异常throw new EncoderException(t);} finally {// 最终,释放out的引用计数if (out != null) {try {// 获取out的元素个数final int sizeMinusOne = out.size() - 1;if (sizeMinusOne == 0) {// 将out的第一个元素直接写入通道ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne > 0) {// 检查promise是否为voidPromiseif (promise == ctx.voidPromise()) {// 使用voidPromise来减少GC压力writeVoidPromise(ctx, out);} else {// 使用writePromiseCombiner方法来减少GC压力writePromiseCombiner(ctx, out, promise);}}} finally {// 释放out的资源out.recycle();}}}}protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

最终的目标是把对象转换为ByteBuf,具体的转换代码则委托子类继承的encode方法来实现。

Netty提供了很多子类来支持前面提及的各种数据编码方式。
在这里插入图片描述

解析典型Netty数据编解码的实现

HttpObjectEncoder编码器

//HttpObjectEncoder编码器@Override@SuppressWarnings("ConditionCoveredByFurtherCondition")protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {// 为了处理不需要类检查的常见模式的fast-pathif (msg == Unpooled.EMPTY_BUFFER) {out.add(Unpooled.EMPTY_BUFFER);return;}// 以这种顺序进行instanceof检查的原因是,不依赖于ReferenceCountUtil::release作为一种通用释放机制,// 参见https://bugs.openjdk.org/browse/JDK-8180450。// https://github.com/netty/netty/issues/12708包含有关先前版本的此代码如何与JIT instanceof优化交互的更多详细信息。if (msg instanceof FullHttpMessage) {encodeFullHttpMessage(ctx, msg, out);return;}// 判断msg是否为HttpMessage的实例if (msg instanceof HttpMessage) {final H m;try {// 将msg转换为H类型m = (H) msg;} catch (Exception rethrow) {// 出现异常时,释放msg的引用计数并抛出异常ReferenceCountUtil.release(msg);throw rethrow;}// 判断m是否为LastHttpContent的实例if (m instanceof LastHttpContent) {// 调用encodeHttpMessageLastContent方法对LastHttpContent进行编码encodeHttpMessageLastContent(ctx, m, out);} // 判断m是否为HttpContent的实例else if (m instanceof HttpContent) {// 调用encodeHttpMessageNotLastContent方法对HttpContent进行编码encodeHttpMessageNotLastContent(ctx, m, out);} // m既不是LastHttpContent也不是HttpContent的实例else {// 调用encodeJustHttpMessage方法对m进行编码encodeJustHttpMessage(ctx, m, out);}} // msg不是HttpMessage的实例else {// 调用encodeNotHttpMessageContentTypes方法对非HttpMessage的内容类型进行编码encodeNotHttpMessageContentTypes(ctx, msg, out);}}

HttpObjectDecoder解码器

//HttpObjectDecoder.java/*** 定义了一个私有枚举类型State,表示不同的状态*/private enum State {/*** 用于跳过控制字符*/SKIP_CONTROL_CHARS,/*** 读取初始内容*/READ_INITIAL,/*** 读取头部信息*/READ_HEADER,/*** 读取可变长度的内容*/READ_VARIABLE_LENGTH_CONTENT,/*** 读取固定长度的内容*/READ_FIXED_LENGTH_CONTENT,/*** 读取分块大小*/READ_CHUNK_SIZE,/*** 读取分块内容*/READ_CHUNKED_CONTENT,/*** 读取分块分隔符*/READ_CHUNK_DELIMITER,/*** 读取分块脚注*/READ_CHUNK_FOOTER,/*** 错误消息*/BAD_MESSAGE,/*** 升级协议*/UPGRADED}//解码器相应实现@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {// 如果 resetRequested 为真if (resetRequested) {// 调用 resetNow() 方法resetNow();}switch (currentState) {case SKIP_CONTROL_CHARS:// 跳过控制字符case READ_INITIAL: try {// 解析缓冲区中的数据AppendableCharSequence line = lineParser.parse(buffer);if (line == null) {return;}// 拆分初始行String[] initialLine = splitInitialLine(line);if (initialLine.length < 3) {// 初始行无效 - 忽略currentState = State.SKIP_CONTROL_CHARS;return;}// 创建消息对象message = createMessage(initialLine);currentState = State.READ_HEADER;// 继续读取头部} catch (Exception e) {// 处理异常情况out.add(invalidMessage(buffer, e));return;}case READ_HEADER: try {State nextState = readHeaders(buffer);if (nextState == null) {return;}currentState = nextState;switch (nextState) {case SKIP_CONTROL_CHARS:// 快速路径// 无需期望任何内容out.add(message);out.add(LastHttpContent.EMPTY_LAST_CONTENT);resetNow();return;case READ_CHUNK_SIZE:if (!chunkedSupported) {throw new IllegalArgumentException("不支持分块消息");}// 分块编码 - 首先生成HttpMessage。后续将跟随HttpChunks。out.add(message);return;default:/*** <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> 规定,如果请求没有传输编码头或内容长度头,则消息体长度为0。* 但是对于响应,body长度是在服务器关闭连接之前接收到的字节数目。因此我们将此情况视为可变长度的分块编码。*/long contentLength = contentLength();if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {out.add(message);out.add(LastHttpContent.EMPTY_LAST_CONTENT);resetNow();return;}assert nextState == State.READ_FIXED_LENGTH_CONTENT ||nextState == State.READ_VARIABLE_LENGTH_CONTENT;out.add(message);if (nextState == State.READ_FIXED_LENGTH_CONTENT) {// 随着READ_FIXED_LENGTH_CONTENT状态逐块读取数据,分块大小将减小。chunkSize = contentLength;}// 在这里返回,这将强制再次调用解码方法,在那里我们将解码内容return;}} catch (Exception e) {out.add(invalidMessage(buffer, e));return;}case READ_VARIABLE_LENGTH_CONTENT: {// 一直读取数据直到连接结束。int toRead = Math.min(buffer.readableBytes(), maxChunkSize);if (toRead > 0) {// 从缓冲区中读取指定长度的数据,并以保留引用的形式分割成多个片段ByteBuf content = buffer.readRetainedSlice(toRead);out.add(new DefaultHttpContent(content));}return;}case READ_FIXED_LENGTH_CONTENT: {int readLimit = buffer.readableBytes();// 首先检查缓冲区是否可读,因为我们使用可读字节计数来创建HttpChunk。需要这样做,以防止创建包含空缓冲区的HttpChunk,从而被当作最后一个HttpChunk进行处理。// 参见:https://github.com/netty/netty/issues/433if (readLimit == 0) {return;}int toRead = Math.min(readLimit, maxChunkSize);if (toRead > chunkSize) {toRead = (int) chunkSize;}ByteBuf content = buffer.readRetainedSlice(toRead);chunkSize -= toRead;if (chunkSize == 0) {// 读取所有内容。out.add(new DefaultLastHttpContent(content, validateHeaders));resetNow();} else {out.add(new DefaultHttpContent(content));}return;}/*** 从这里开始处理读取分块的内容。基本上,读取分块大小,读取分块,忽略CRLF,然后重复直到分块大小为0*/case READ_CHUNK_SIZE: try {AppendableCharSequence line = lineParser.parse(buffer);if (line == null) {return;}int chunkSize = getChunkSize(line.toString());this.chunkSize = chunkSize;if (chunkSize == 0) {currentState = State.READ_CHUNK_FOOTER;return;}currentState = State.READ_CHUNKED_CONTENT;// fall-through} catch (Exception e) {out.add(invalidChunk(buffer, e));return;}case READ_CHUNKED_CONTENT: {// 判断chunkSize是否小于等于Integer的最大值assert chunkSize <= Integer.MAX_VALUE;// 计算本次需要读取的字节数,取chunkSize和maxChunkSize中的较小值int toRead = Math.min((int) chunkSize, maxChunkSize);// 如果不允许部分chunk,且buffer中可读取的字节数小于toRead,则返回if (!allowPartialChunks && buffer.readableBytes() < toRead) {return;}// 如果buffer中可读取的字节数小于toRead,则将toRead更新为buffer中可读取的字节数toRead = Math.min(toRead, buffer.readableBytes());// 如果toRead为0,则返回if (toRead == 0) {return;}// 从buffer中获取长度为toRead的slice,并用其创建HttpContent对象HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));// 更新剩余的chunkSizechunkSize -= toRead;// 将chunk添加到out中// 如果chunkSize不为0,则返回if (chunkSize != 0) {return;}// 设置当前状态为READ_CHUNK_DELIMITERcurrentState = State.READ_CHUNK_DELIMITER;// 继续执行下一个case语句// fall-through}case READ_CHUNK_DELIMITER: {// 读取分隔符final int wIdx = buffer.writerIndex();int rIdx = buffer.readerIndex();while (wIdx > rIdx) {byte next = buffer.getByte(rIdx++);if (next == HttpConstants.LF) {currentState = State.READ_CHUNK_SIZE;break;}}buffer.readerIndex(rIdx);return;}case READ_CHUNK_FOOTER: {try {// 读取尾部的Http头部信息LastHttpContent trailer = readTrailingHeaders(buffer);if (trailer == null) {return;}out.add(trailer);resetNow();return;} catch (Exception e) {// 发生异常时,将异常信息和当前buffer一起添加到输出channelout.add(invalidChunk(buffer, e));return;}}case BAD_MESSAGE: {// 直到断开连接为止,丢弃消息buffer.skipBytes(buffer.readableBytes());break;}case UPGRADED: {int readableBytes = buffer.readableBytes();if (readableBytes > 0) {// 读取可读字节数,如果大于0,则执行以下操作// 由于否则可能会触发一个DecoderException异常,其他处理器会在某个时刻替换此codec为升级的协议codec来接管流量。// 参见 https://github.com/netty/netty/issues/2173out.add(buffer.readBytes(readableBytes));}break;}default:break;}}

自定义编解码

下面先实现一个Netty编码处理程序。

public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {/*** 编码器类,用于将ResponseMessage对象编码为ByteBuf对象并添加到输出列表中*/@Overrideprotected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {/*** 获取一个ByteBuf对象用于存储编码后的数据*/ByteBuf buffer = ctx.alloc().buffer();/*** 对ResponseMessage对象进行编码,并将编码后的数据写入ByteBuf对象中*/responseMessage.encode(buffer);/*** 将编码后的ByteBuf对象添加到输出列表中*/out.add(buffer);}
}

接下来,再实现对应的Netty解码处理程序。

/*** 订单协议解码器*/
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {// 创建一个请求消息对象RequestMessage requestMessage = new RequestMessage();// 对字节缓冲区进行解码,将解码后的消息填充到请求消息对象中requestMessage.decode(byteBuf);// 将请求消息对象添加到输出列表中out.add(requestMessage);}
}

最后,将这对编解码处理程序添加到处理程序流水线(pipeline)中就可以完成集成工作了。

这是我们第一次提及处理程序流水线这个概念。在这里,只需要将它理解成"一串”有序的处理程序集合并有一个初步印象即可,后续会详细介绍相关内容。

为了完成处理程序流水线的设置,还要构建ServerBootstrap这个“启动”对象。

        ServerBootstrap serverBootstrap = new ServerBootstrap();  // 创建一个ServerBootstrap对象serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {  // 为子通道设置ChannelInitializer处理器@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {  // 初始化连接通道ChannelPipeline pipeline = ch.pipeline();  // 获取通道的编排器// 省略其他非核心代码pipeline.addLast("protocolDecoder", new OrderProtocolDecoder());  // 添加一个解码器到通道的最后pipeline.addLast("protocolEncoder", new OrderProtocolEncoder());  // 添加一个编码器到通道的最后// 省略其他非核心代码}});

常见疑问解析

为什么Netty自带的编解码方案很少有人使用

其中个很重要的因素就是历史原因,但实际上,除历史原因之外,更重要的原因在于Netty自带的编解码方案大多是具有封帧和解帧功能的编解码器,并且融两层编码于一体,因此从结构上看并不清晰。

另外,Netty自带的编解码方案在使用方式上不够灵活。

在进行序列化和反序列时,字段的顺序弄反了

我们在序列化对象的字段时,使用的顺序是a b c;但是,等到我们解析时,顺序可能不小心写成了 c b a, 因此,我们一定要完全对照好顺序才行。

编解码的顺序问题

有时候,我们往往采用多层编解码。
例如,在得到可传输的字节流之后,我们可能想压缩一下以进一步减少所传输内容占用的空间。
此时,多级编解码就可以派上用场了:对于发送者, 先编码后压缩;而对于接收者,先解压后解码。

但是,代码的添加顺序和我们想要的顺序不一定完全匹配。如果顺序错了,那么代码可能无法工作。

if (compressor != null) {pipeline.addLast("frameDecompressorn", new Frame.Decompressor(compressor));pipeline.addLast("frameCompressor", new Frame.Compressor(compressor));pipeline.addLast("messageDecoder", messageDecoder);pipeline.addLast("messageEncoder", messageEncoderFor(protocolversion));
}

在这里插入图片描述
处理程序对于读取操作和写出操作的执行顺序刚好是相反的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/223426.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

字符设备驱动开发-注册-设备文件创建

一、字符设备驱动 linux系统中一切皆文件 1、应用层&#xff1a; APP1 APP2 ... fd open("led驱动的文件"&#xff0c;O_RDWR); read(fd); write(); close(); 2、内核层&#xff1a; 对灯写一个驱动 led_driver.c driver_open(); driver_read(); driver_write(…

GoogLeNet(V1)

目录 一、GooLeNet介绍 1、模型设计的motivation 2、Inception块 3、GoogLeNet架构 4、Inception后续变种 5、总结 二、代码实现 1、Inception块 2、GoogLeNet模型 3、训练模型 4、总结 一、GooLeNet介绍 GoogLeNet是由Google团队于2014年提出的深度卷积神经网络架构…

BUG记录 | 使用阿里云OSS实现文件上传后,得到的url无法在浏览器中打开

项目背景 SpringBoot的项目&#xff0c;使用阿里云对象存储OSS对项目中的文件进行存储&#xff0c;所需文件也会通过IDEA中由官方Demo改编而成的工具类作为接口&#xff0c;调用接口后上传 问题描述 使用阿里云OSS实现文件上传后&#xff0c;通过postman测试得到的url无法在…

H266/VVC帧内预测编码

预测编码技术 预测编码&#xff08;Prediction Coding&#xff09;是指利用已编码的一个或多个样本值&#xff0c;根据某种模型或方法&#xff0c;对当前的样本值进行预测&#xff0c;并对样本真实值和预测值之间的差值进行编码。 视频中的每个像素看成一个信源符号&#xff…

【UML】第12篇 序列图(1/2)——基本概念和构成

目录 一、什么是序列图&#xff08;Sequence Diagram&#xff09; 1.1 定义 1.2 主要用途 1.3 序列图和BPMN的区别和联系 二、序列图的构成 2.1 对象 2.2 生命线 2.3 消息 2.4 激活 序列图&#xff0c;是我个人认为的用处最多的一种图。产品和研发的同学&#xff0c;都…

二级指针的作用 -- 将变量从函数中带出

使用一级指针不能将变量带出 void test(int *p) {static int nub 10; /*使用static是保证函数结束, 变量依然存在, 不然即使将它带出来, 函数结束时这片内存已经被释放了就没有意义了*/p &nub; }int main(void) {int *p NULL;test(p);printf("%d",*p);return …

驱动开发-1

一、驱动课程大纲 内核模块字符设备驱动中断 二、ARM裸机代码和驱动有什么区别&#xff1f; 1、共同点&#xff1a; 都能够操作硬件 2、不同点&#xff1a; 1&#xff09;裸机就是用C语言给对应的寄存器里面写值&#xff0c;驱动是按照一定的套路往寄存器里面写值 2&#xff09…

开关电源厚膜集成电路引脚功能

开关电源厚膜集成电路引脚功能 一、 STR51213、STR50213、STR50103 引脚号 引脚功能 1 接地&#xff0c;内接稳压基准电路 2 开关管基极 3 开关管集电极 4 开关管发射极 5 误差比较电压信号输入&#xff0c;兼待机控制 二、 STR3302、STR3202 引脚号 引脚功能 1内部半…

华为vrrp+mstp+ospf+dhcp+dhcp relay配置案例

1、左边是vlan 10主桥&#xff0c;右边是vlan 20的主桥&#xff0c;并且互为备桥 2、 vlan 10 vrrp网关默认用左边&#xff0c;vlan 20的vrrp 网关默认用右边&#xff0c;对应mstp生成树 3、两边都track检测&#xff0c;不通就把vrrp减掉60&#xff0c;这样就会自动切另一边了 …

四、UART_阻塞发送中断接收

1、开发环境 (1)Keil MDK: V5.38.0.0 (2)MCU: mm320163D7P 2、实验目的&原理图 2.1、实验目的 (1)上位机串口助手给MCU发送信息&#xff0c;MCU串口通过通过串口助手接收后&#xff0c;将接收到的内容通过串口助手发送到上位机。 (2)串口在whil循环中每隔1秒发送一次…

【一起学Rust | 框架篇 | Tauri2.0框架】Tauri2.0环境搭建与项目创建

文章目录 前言一、搭建 Tauri 2.0 开发环境二、创建 Tauri 2.0 项目1.创建项目2.安装依赖4. 编译运行 三、设置开发环境四、项目结构 前言 Tauri在Rust圈内成名已久&#xff0c;凭借Rust的可靠性&#xff0c;使用系统原生的Webview构建更小的App 以及开发人员可以灵活的使用各…

离散型制造企业为什么要注重MES管理系统的实施

离散型制造企业经常面临三个核心问题&#xff1a;生产什么、生产多少以及如何生产。尽管许多企业都实施了ERP系统&#xff0c;但仍然绕不开MES管理系统的话题。本文将从三个方面详细解释为什么离散型企业需要实施MES管理系统。 一、生产线经常出现的问题 在离散型企业中&#…

Blazor 混合开发_MAUI+Vue_WPF+Vue

Blazor 混合开发_MAUIVue_WPFVue 背景混合开发的核心为什么必须使用 wwwroot 文件夹放置 Web 项目文件 创建 MAUI 项目创建 wwwroot 文件夹服务注册创建 _import.razor添加 Main.razor 组件修改 MainPage.xaml 文件 创建 WPF 项目创建 wwwroot 文件夹服务注册创建 _import.razo…

OpenCV利用HSV颜色区间分离不同物体

需求 当前有个需求是从一个场景中将三个不同的颜色的二维码分离出来&#xff0c;如下图所示。 这里有两个思路可以使用 思路一是通过深度学习的方式&#xff0c;训练一个能够识别旋转边界框的模型&#xff0c;但是需要大量的数据进行模型训练&#xff0c;此处缺少训练数据&a…

华清远见嵌入式学习——ARM——作业2

目录 作业要求&#xff1a; 现象&#xff1a; 代码&#xff1a; 思维导图&#xff1a; 模拟面试题&#xff1a; 作业要求&#xff1a; GPIO实验——3颗LED灯的流水灯实现 现象&#xff1a; 代码&#xff1a; .text .global _start _start: 设置GPIOEF时钟使能 0X50000…

Openwrt AP 发射 WiFi 信号

问题 想一次把 OpenWrt 路由器 wifi 问题给解决&#xff0c;完全取代路由器。 使用 倍控的 N5105 设备&#xff0c;有 mPCIe 接口&#xff0c;使用了 intel AX200 无线网卡&#xff0c;支持 2.4G 与 5G。 设置步骤 OpenWrt 镜像 第一次使用的镜像不支持 wifi&#xff0c;在…

量化服务器 - 后台挂载运行

服务器 - 后台运行 pip3命令被kill 在正常的pip命令后面加上 -no-cache-dir tmux 使用教程 https://codeleading.com/article/40954761108/ 如果你希望在 tmux 中后台执行一个 Python 脚本&#xff0c;你可以按照以下步骤操作&#xff1a; 启动 tmux: tmux这将会创建一个新…

Http---HTTP响应报文

1. HTTP响应报文分析 HTTP 响应报文效果图: 响应报文说明: --- 响应行/状态行 --- HTTP/1.1 200 OK # HTTP协议版本 状态码 状态描述 --- 响应头 --- Server: Tengine # 服务器名称 Content-Type: text/html; charsetUTF-8 # 内容类型 Transfer-Encoding: chunked # 发送给客…

HBase基础知识(三):HBase架构进阶、读写流程、MemStoreFlush、StoreFile Compaction、Region Split

1. 架构原理 1&#xff09;StoreFile 保存实际数据的物理文件&#xff0c;StoreFile以HFile的形式存储在HDFS上。每个Store会有一个或多个StoreFile&#xff08;HFile&#xff09;&#xff0c;数据在每个StoreFile中都是有序的。 2&#xff09;MemStore 写缓存&#xff0c;由于…

Hadoop入门学习笔记——三、使用HDFS文件系统

视频课程地址&#xff1a;https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接&#xff1a;https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 Hadoop入门学习笔记&#xff08;汇总&#xff09; 目录 三、使用HDFS文件系统3.1. 使用命令操作HDFS文件系统3.1.…