21.Netty源码之编码器


highlight: arduino-light

Netty如何实现自定义通信协议

在学习完如何设计协议之后,我们又该如何在 Netty 中实现自定义的通信协议呢?其实 Netty 作为一个非常优秀的网络通信框架,已经为我们提供了非常丰富的编解码抽象基类,帮助我们更方便地基于这些抽象基类扩展实现自定义协议。

首先我们看下 Netty 中常用的编解码器有哪些。

一次编解码器和二次编解码器

Netty中的编解码器分为一次编解码和二次编解码。

一次编解码器:MessageToByteEncoder、ByteToMessageDecoder/ReplyingDecoder

二次编解码器:MessageToMessageEncoder、MessageToMessageDecoder

以解码为例,一次解码器用于解决TCP拆包/粘包问题,解析得到字节数据。

如果需要对解析后的字节数据做对象转换,需要使用二次解码器。同理,编码器是相反过程。

为什么需要二次码/编码

假设我们把解决半包粘包问题的常用三种解码器叫一次解码器:

image.png

那么我们在项目中,除了可选的的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,需要和项目中所使用的对象做转化,方便使用,这层解码器可以称为“二次解码器”。

相应的,对应的二次编码器是为了将 Java 对象转化成字节流方便存储或传输。

为什么不合并一次二次解码器

思考:是不是也可以一步到位? 合并 1 次解码(解决粘包、半包)和 2 次解码(解决可操作问题)

可以,但是不建议: •没有分层,不够清晰;分层可以组合。 •耦合性高,不容易置换方案。

常用的编解码方式

-Java 序列化

-Marshaling

-XML

-JSON

-MessagePack

-Protobuf

-其他

选择编解码方式的因素

-空间:编码后占用空间 -时间:编解码速度 -是否追求可读性 -是否支持多语言,例如msgpack的支持:Java\C\Python等

Protobuf

-Protobuf 是一个灵活的、高效的用于序列化数据的协议。

-相比较 XML 和 JSON 格式,Protobuf 更小、更快、更便捷。

-Protobuf 是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以自动生成 Java、python、C++ 等代码,不需要再写其他代码。

Protobuf使用步骤

第1步:在Maven 项目中引入 Protobuf 坐标,下载相关的jar包。

在pom.xml中 添加依赖

xml <dependencies> <dependency>    <groupId>com.google.protobuf</groupId>    <artifactId>protobuf-java</artifactId>    <version>3.6.1</version> </dependency> </dependencies>

第 2 步: 编写proto文件:Student.proto。

Student.proto的内容

```java syntax = "proto3"; //版本 option javaouterclassname = "StudentPoJO"; //指定生成的Java类名

//内部类的名称,是真正的PoJo 类 message Student{ // message 的规定的   int32 id = 1; //PoJo 类的属性数据类型类型和 序号(不是属性值)   string name = 2; } ```

第 3 步:通过 protoc.exe 根据描述文件生成 Java 类。

说明:protoc-3.6.1-win32 是从网上下载的 google 提供的文件.

cmd执行命令生成StudentPoJO.java: C:\Users\Administrator\Desktop\Netty资料\我的\资料\protoc-3.6.1-win32\bin>protoc.exe --java_out=. Student.proto

第4步:把生成的 StudentPoJo.java 拷贝到自己的项目中打开。

第 5 步:在 Netty 中使用。

java ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(StudentPoJO.Student.getDefaultInstance())); ​ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder());

抽象编码类

所有的解码器都继承了ChannelInBoundHandler。因为解码是需要解码接收的数据。所以使用In。

所有的编码器都继承了ChannelOutBoundHandler。因为编码是需要将对外发送的数据编码。所以使用Out。

image.png

通过抽象编码类的继承图可以看出编码类是 ChanneOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。

常用编码器类型

  • MessageToByteEncoder 对象编码成字节流;
  • MessageToMessageEncoder 一种对象消息类型编码成另外一种对象消息类型。

使用一次编码器IntegerEncoder和二次编码器IntegerToStringEncoder,将消息从Integer编码为String。

java class IntegerEncoder extends MessageToByteEncoder<Integer> {    @Override    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {        out.writeInt(msg);   } } ​ class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {    @Override    public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out) throws Exception {        out.add(message.toString());   } }

使用一次编码器StringEncoder和二次编码器StringToIntegerEncoder,将消息从String编码为Integer。

class StringEncoder extends MessageToByteEncoder<String> {    @Override    public void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {        out.writeCharSequence(msg, Charset.defaultCharset());   } } ​ class StringToIntegerEncoder extends MessageToMessageEncoder<String> {    @Override    public void encode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception {        out.add(Integer.parseInt(message));   } }

编码器MessageToByteEncoder

MessageToByteEncoder用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我们需要实现encode 方法即可完成自定义编码。那么encode() 方法是在什么时候被调用的呢?

我们一起看下MessageToByteEncoder 的核心源码片段,如下所示。

MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。

这里使用了模板模式:encode方法交给具体的子类实现。

java public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { ​ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {    ByteBuf buf = null;    try {   // 1.消息类型是否匹配 不匹配不会处理        // 即传入的是String        if (acceptOutboundMessage(msg)) {            //I是泛型            @SuppressWarnings("unchecked")            I cast = (I) msg;            // 2. 分配 ByteBuf 资源            buf = allocateBuffer(ctx, cast, preferDirect);            try {            // 3. 执行 encode 方法完成数据编码                encode(ctx, cast, buf);           } finally {                ReferenceCountUtil.release(cast);           }            if (buf.isReadable()) {            // 4. 向后传递写事件                ctx.write(buf, promise);           } else {                buf.release();                ctx.write(Unpooled.EMPTY_BUFFER, promise);           }            buf = null;       } else {            ctx.write(msg, promise);       }   } catch (EncoderException e) {        throw e;   } catch (Throwable e) {        throw new EncoderException(e);   } finally {        if (buf != null) {            buf.release();       }   } } } //供子类重写 protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:

  1. acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler。
  2. 分配 ByteBuf 资源,默认使用堆外内存。
  3. 调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用ReferenceCountUtil.release(cast) 自动释放。
  4. 如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点。如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。

实现类:StringToByteEncoder

编码器实现非常简单,不需要关注拆包/粘包问题。

如下例子,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。

java package io.netty.example.Encode; ​ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; ​ public class StringToByteEncoder extends MessageToByteEncoder<String> { ​    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {      byteBuf.writeBytes(data.getBytes());   } }

编码器MessageToMessageEncoder

https://www.javajike.com/book/essential-netty-in-action/chapter4/66cf00f545a4c73fa3fd2fad8d0b7a1d.html

MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法。

与 MessageToByteEncoder 不同的是,MessageToMessageEncoder 是将一种格式的消息转换为另外一种格式的消息。

其中第二个 Message 所指的可以是任意一个对象,如果该对象是 ByteBuf 类型,那么和 MessageToByteEncoder 的实现原理是一致的。

MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。

这里使用了模板模式:encode方法交给具体的子类实现。

java    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        CodecOutputList out = null;        try {            // 1. 消息类型是否匹配 不匹配不会处理       // 即传入的是String            if (acceptOutboundMessage(msg)) {                out = CodecOutputList.newInstance();                 //I是泛型                @SuppressWarnings("unchecked")                I cast = (I) msg;                try {                    //执行子类encode完成具体编码操作                    encode(ctx, cast, out);               } finally {                    ReferenceCountUtil.release(cast);               } //如果输出结果是对象列表out是空                if (out.isEmpty()) {                    out.recycle();                    out = null;                    throw new EncoderException(" must produce at least one message.");               }           } else {                ctx.write(msg, promise);           }       } catch (EncoderException e) {            throw e;       } catch (Throwable t) {            throw new EncoderException(t);       } finally {            if (out != null) {                final int sizeMinusOne = out.size() - 1;                // sizeMinusOne等于0说明 out长度是1                if (sizeMinusOne == 0) {                    //写出去                    ctx.write(out.getUnsafe(0), promise);               } else if (sizeMinusOne > 0) {                    //遍历写出去                    if (promise == ctx.voidPromise()) {                        writeVoidPromise(ctx, out);                   } else {                        writePromiseCombiner(ctx, out, promise);                   }               }                //回收                out.recycle();           }       }   }

此外 MessageToByteEncoder 的输出结果是对象列表out,编码后的结果属于中间对象,最终仍然会转化成 ByteBuf 进行传输。

MessageToMessageEncoder 常用的实现子类有 StringEncoder、LineEncoder、Base64Encoder 等。以 StringEncoder 为例看下 MessageToMessageEncoder 的用法。

实现类:StringEncoder

java @Sharable public class StringEncoder extends MessageToMessageEncoder<CharSequence> { ​    // TODO Use CharsetEncoder instead.    private final Charset charset; ​    /**     * Creates a new instance with the current system character set.     */    public StringEncoder() {        this(Charset.defaultCharset());   } ​    /**     * Creates a new instance with the specified character set.     */    public StringEncoder(Charset charset) {        if (charset == null) {            throw new NullPointerException("charset");       }        this.charset = charset;   } ​    @Override    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {        if (msg.length() == 0) {            return;       } //编码以后加入out列表 由父类写出即可        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));   } } ​

思考:现在有1个Java对象要编码为json字符串后转换为byte传输 如何做呢?

1.继承MessageToMessageEncoder

2.重写encode方法

3.将对象序列化为json

4.将json转为ByteBuffer发送:ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(json), charset)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/77034.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 根据图片的EXIF自动调整图片方向

PropertyItems 代码 /// <summary>/// 根据图片exif调整方向/// </summary>/// <param name"img"></param>public void RotateImage(Bitmap img){var exif img.PropertyItems;byte orien 0;var item exif.Where(m > m.Id 274).ToArra…

16、外部配置源与外部配置文件及JSON配置

外部配置源与外部配置文件及JSON配置 application.properties application.yml 这些是配置文件&#xff0c; 命令行配置、环境变量配置、系统属性配置源&#xff0c;这些属于配置源。 外部配置源的作用&#xff1a; Spring Boot相当于对Spring框架进行了封装&#xff0c;Spri…

策略模式(C++)

定义 定义一系列算法&#xff0c;把它们一个个封装起来&#xff0c;并且使它们可互相替换((变化)。该模式使得算法可独立手使用它的客户程序稳定)而变化(扩展&#xff0c;子类化)。 ——《设计模式》GoF 使用场景 在软件构建过程中&#xff0c;某些对象使用的算法可能多种多…

viewerjs 如何新增下载图片功能(npm包补丁)

文章目录 先实现正常的效果实现下载图片改变viewerjs的build函数源码改变之后&#xff0c;执行npm i 之后node_modules源码又变回了原样 1、viwerjs所有功能都很完善&#xff0c;但唯独缺少了图片的下载 2、需求&#xff1a;在用viwerjs旋转图片后&#xff0c;可以直接下载旋转…

规划模型Matlab代码

文章目录 数学规划定义一般形式分类 1.线性规划(linear programming)2.非线性规划(nonlinear programming)3. 整数规划(integer programming)4. 0-1规划(0-1 programming)5. 最大最小化模型6. 多目标规划模型7.敏感性分析&#xff08;对权重&#xff09;[例题] 数学规划定义 数…

Stable Diffusion - SDXL 1.0 全部样式设计与艺术家风格的配置与提示词

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132072482 来源于 Anna Dittmann 安娜迪特曼&#xff0c;艺术家风格的图像&#xff0c;融合幻想、数字艺术、纹理等样式。 SDXL 是 Stable Diffus…

Git常见问题

git clone 提示OpenSSL SSL_read git clone 时提示Connection was reset, errno 10054类错误 fatal: unable to acce ss https://github.com/fex-team/ueditor.git/: OpenSSL SSL_read: Connection was reset, errno 10054 备注&#xff1a;以下方法只是归纳整理&#xff0c;…

【零基础学Rust | 基础系列 | 基础语法】变量,数据类型,运算符,控制流

文章目录 简介&#xff1a;一&#xff0c;变量1&#xff0c;变量的定义2&#xff0c;变量的可变性3&#xff0c;变量的隐藏 二、数据类型1&#xff0c;标量类型2&#xff0c;复合类型 三&#xff0c;运算符1&#xff0c;算术运算符2&#xff0c;比较运算符3&#xff0c;逻辑运算…

Apache Flink概述

Flink 是构建在数据流之上的一款有状态的流计算框架&#xff0c;通常被人们称为第三代大数据分析方案 第一代大数据处理方案&#xff1a;基于Hadoop的MapReduce 静态批处理 | Storm 实时流计算 &#xff0c;两套独立的计算引擎&#xff0c;难度大&#xff08;2014年9月&#x…

Hyperledger Fabric 使用 CouchDB 和复杂智能合约开发

前言 在上个实验中&#xff0c;我们已经实现了简单智能合约实现及客户端开发&#xff0c;但该实验中智能合约只有基础的增删改查功能&#xff0c;且其中的数据管理功能与传统 MySQL 比相差甚远。本文将在前面实验的基础上&#xff0c;将 Hyperledger Fabric 的默认数据库支持 …

Kafka3.0.0版本——Broker(Zookeeper服务端存储的Kafka相关信息)

目录 一、启动zookeeper集群及kafka集群服务启动1.1、先启动三台zookeeper集群服务&#xff0c;再启动三台kafka集群服务1.2、使用PrettyZoo连接zookeeper客户端工具 二、在zookeeper服务端存储的Kafka相关信息 一、启动zookeeper集群及kafka集群服务启动 1.1、先启动三台zook…

C++初阶引用

目录 引用引用的特性使用输出型参数作返回值小总结引用的权限引用和指针 引用 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空间&#xff0c;它和它引用的变量共用同一块内存空间。 比如周树人&#xff0c;在外…

探索创意之路:稳定扩散AI绘画指南

文章目录 引言第一部分&#xff1a;了解稳定扩散AI绘画1.1 稳定扩散AI绘画简介1.2 稳定扩散AI绘画的优势 第二部分&#xff1a;使用稳定扩散AI绘画2.1 获取稳定扩散AI绘画工具2.2 准备绘画素材和设置参数2.3 进行AI绘画 第三部分&#xff1a;发挥创意&#xff0c;创作精彩绘画3…

阿里云AK创建

要在阿里云上创建 Access Key&#xff08;AK&#xff09;&#xff0c;您需要按照以下步骤进行操作&#xff1a; 登录到阿里云控制台&#xff08;[https://www.aliyun.com/?utm_contentse_1014243503)&#xff09;。 点击右上方的主账号&#xff0c;点击“AccessKey管理”。 …

P1064 [NOIP2006 提高组] 金明的预算方案 (依赖背包问题)(内附封面)

[NOIP2006 提高组] 金明的预算方案 题目描述 金明今天很开心&#xff0c;家里购置的新房就要领钥匙了&#xff0c;新房里有一间金明自己专用的很宽敞的房间。更让他高兴的是&#xff0c;妈妈昨天对他说&#xff1a;“你的房间需要购买哪些物品&#xff0c;怎么布置&#xff0…

R语言【Tidyverse、Tidymodel】的机器学习方法

机器学习已经成为继理论、实验和数值计算之后的科研“第四范式”&#xff0c;是发现新规律&#xff0c;总结和分析实验结果的利器。机器学习涉及的理论和方法繁多&#xff0c;编程相当复杂&#xff0c;一直是阻碍机器学习大范围应用的主要困难之一&#xff0c;由此诞生了Python…

python人工智能可以干什么,python人工智能能干什么

大家好&#xff0c;给大家分享一下python做人工智能需要什么水平&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 人工智能包含常用机器学习和深度学习两个很重要的模块&#xff0c;而python拥有matplotlib、Numpy、sklearn、keras等大量的…

【深度学习笔记】深度学习框架

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记&#xff0c;视频由网易云课堂与 deeplearning.ai 联合出品&#xff0c;主讲人是吴恩达 Andrew Ng 教授。感兴趣的网友可以观看网易云课堂的视频进行深入学习&#xff0c;视频的链接如下&#xff1a; 神经网络和…

snap xxx has “install-snap“ change in progress

error description * 系重复安装&#xff0c;进程冲突 solution 展示snap的改变 然后sudo snap abort 22即可终止该进程 之后重新运行install command&#xff5e;&#xff5e; PS: ubuntu有时候加载不出来&#xff0c;执行resolvectl flush-caches&#xff0c;清除dns缓存…

【计算机视觉 | 图像分割】arxiv 计算机视觉关于图像分割的学术速递(8 月 1 日论文合集)

文章目录 一、分割|语义相关(16篇)1.1 DPMix: Mixture of Depth and Point Cloud Video Experts for 4D Action Segmentation1.2 Investigating and Improving Latent Density Segmentation Models for Aleatoric Uncertainty Quantification in Medical Imaging1.3 Domain Ada…