highlight: arduino-light
Netty如何实现自定义通信协议
在学习完如何设计协议之后,我们又该如何在 Netty 中实现自定义的通信协议呢?其实 Netty 作为一个非常优秀的网络通信框架,已经为我们提供了非常丰富的编解码抽象基类,帮助我们更方便地基于这些抽象基类扩展实现自定义协议。
首先我们看下 Netty 中常用的编解码器有哪些。
一次编解码器和二次编解码器
Netty中的编解码器分为一次编解码和二次编解码。
一次编解码器:MessageToByteEncoder、ByteToMessageDecoder/ReplyingDecoder
二次编解码器:MessageToMessageEncoder、MessageToMessageDecoder
以解码为例,一次解码器用于解决TCP拆包/粘包问题,解析得到字节数据。
如果需要对解析后的字节数据做对象转换,需要使用二次解码器。同理,编码器是相反过程。
为什么需要二次码/编码
假设我们把解决半包粘包问题的常用三种解码器叫一次解码器:
那么我们在项目中,除了可选的的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,需要和项目中所使用的对象做转化,方便使用,这层解码器可以称为“二次解码器”。
相应的,对应的二次编码器是为了将 Java 对象转化成字节流方便存储或传输。
为什么不合并一次二次解码器
思考:是不是也可以一步到位? 合并 1 次解码(解决粘包、半包)和 2 次解码(解决可操作问题)
可以,但是不建议: •没有分层,不够清晰;分层可以组合。 •耦合性高,不容易置换方案。
常用的编解码方式
-Java 序列化
-Marshaling
-XML
-JSON
-MessagePack
-Protobuf
-其他
选择编解码方式的因素
-空间:编码后占用空间 -时间:编解码速度 -是否追求可读性 -是否支持多语言,例如msgpack的支持:Java\C\Python等
Protobuf
-Protobuf 是一个灵活的、高效的用于序列化数据的协议。
-相比较 XML 和 JSON 格式,Protobuf 更小、更快、更便捷。
-Protobuf 是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以自动生成 Java、python、C++ 等代码,不需要再写其他代码。
Protobuf使用步骤
第1步:在Maven 项目中引入 Protobuf 坐标,下载相关的jar包。
在pom.xml中 添加依赖
xml <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency> </dependencies>
第 2 步: 编写proto文件:Student.proto。
Student.proto的内容
```java syntax = "proto3"; //版本 option javaouterclassname = "StudentPoJO"; //指定生成的Java类名
//内部类的名称,是真正的PoJo 类 message Student{ // message 的规定的 int32 id = 1; //PoJo 类的属性数据类型类型和 序号(不是属性值) string name = 2; } ```
第 3 步:通过 protoc.exe 根据描述文件生成 Java 类。
说明:protoc-3.6.1-win32 是从网上下载的 google 提供的文件.
cmd执行命令生成StudentPoJO.java: C:\Users\Administrator\Desktop\Netty资料\我的\资料\protoc-3.6.1-win32\bin>protoc.exe --java_out=. Student.proto
第4步:把生成的 StudentPoJo.java 拷贝到自己的项目中打开。
第 5 步:在 Netty 中使用。
java ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(StudentPoJO.Student.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder());
抽象编码类
所有的解码器都继承了ChannelInBoundHandler。因为解码是需要解码接收的数据。所以使用In。
所有的编码器都继承了ChannelOutBoundHandler。因为编码是需要将对外发送的数据编码。所以使用Out。
通过抽象编码类的继承图可以看出编码类是 ChanneOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。
常用编码器类型
- MessageToByteEncoder 对象编码成字节流;
- MessageToMessageEncoder 一种对象消息类型编码成另外一种对象消息类型。
使用一次编码器IntegerEncoder和二次编码器IntegerToStringEncoder,将消息从Integer编码为String。
java class IntegerEncoder extends MessageToByteEncoder<Integer> { @Override public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } } class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> { @Override public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out) throws Exception { out.add(message.toString()); } }
使用一次编码器StringEncoder和二次编码器StringToIntegerEncoder,将消息从String编码为Integer。
class StringEncoder extends MessageToByteEncoder<String> { @Override public void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { out.writeCharSequence(msg, Charset.defaultCharset()); } } class StringToIntegerEncoder extends MessageToMessageEncoder<String> { @Override public void encode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception { out.add(Integer.parseInt(message)); } }
编码器MessageToByteEncoder
MessageToByteEncoder用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我们需要实现encode 方法即可完成自定义编码。那么encode() 方法是在什么时候被调用的呢?
我们一起看下MessageToByteEncoder 的核心源码片段,如下所示。
MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。
ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。
这里使用了模板模式:encode方法交给具体的子类实现。
java public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { // 1.消息类型是否匹配 不匹配不会处理 // 即传入的是String if (acceptOutboundMessage(msg)) { //I是泛型 @SuppressWarnings("unchecked") I cast = (I) msg; // 2. 分配 ByteBuf 资源 buf = allocateBuffer(ctx, cast, preferDirect); try { // 3. 执行 encode 方法完成数据编码 encode(ctx, cast, buf); } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { // 4. 向后传递写事件 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } } } //供子类重写 protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:
- acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler。
- 分配 ByteBuf 资源,默认使用堆外内存。
- 调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用ReferenceCountUtil.release(cast) 自动释放。
- 如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点。如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。
实现类:StringToByteEncoder
编码器实现非常简单,不需要关注拆包/粘包问题。
如下例子,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。
java package io.netty.example.Encode; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class StringToByteEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(data.getBytes()); } }
编码器MessageToMessageEncoder
https://www.javajike.com/book/essential-netty-in-action/chapter4/66cf00f545a4c73fa3fd2fad8d0b7a1d.html
MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法。
与 MessageToByteEncoder 不同的是,MessageToMessageEncoder 是将一种格式的消息转换为另外一种格式的消息。
其中第二个 Message 所指的可以是任意一个对象,如果该对象是 ByteBuf 类型,那么和 MessageToByteEncoder 的实现原理是一致的。
MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter。
ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。
这里使用了模板模式:encode方法交给具体的子类实现。
java @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { CodecOutputList out = null; try { // 1. 消息类型是否匹配 不匹配不会处理 // 即传入的是String if (acceptOutboundMessage(msg)) { out = CodecOutputList.newInstance(); //I是泛型 @SuppressWarnings("unchecked") I cast = (I) msg; try { //执行子类encode完成具体编码操作 encode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } //如果输出结果是对象列表out是空 if (out.isEmpty()) { out.recycle(); out = null; throw new EncoderException(" must produce at least one message."); } } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { if (out != null) { final int sizeMinusOne = out.size() - 1; // sizeMinusOne等于0说明 out长度是1 if (sizeMinusOne == 0) { //写出去 ctx.write(out.getUnsafe(0), promise); } else if (sizeMinusOne > 0) { //遍历写出去 if (promise == ctx.voidPromise()) { writeVoidPromise(ctx, out); } else { writePromiseCombiner(ctx, out, promise); } } //回收 out.recycle(); } } }
此外 MessageToByteEncoder 的输出结果是对象列表out,编码后的结果属于中间对象,最终仍然会转化成 ByteBuf 进行传输。
MessageToMessageEncoder 常用的实现子类有 StringEncoder、LineEncoder、Base64Encoder 等。以 StringEncoder 为例看下 MessageToMessageEncoder 的用法。
实现类:StringEncoder
java @Sharable public class StringEncoder extends MessageToMessageEncoder<CharSequence> { // TODO Use CharsetEncoder instead. private final Charset charset; /** * Creates a new instance with the current system character set. */ public StringEncoder() { this(Charset.defaultCharset()); } /** * Creates a new instance with the specified character set. */ public StringEncoder(Charset charset) { if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; } @Override protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception { if (msg.length() == 0) { return; } //编码以后加入out列表 由父类写出即可 out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset)); } }
思考:现在有1个Java对象要编码为json字符串后转换为byte传输 如何做呢?
1.继承MessageToMessageEncoder
2.重写encode方法
3.将对象序列化为json
4.将json转为ByteBuffer发送:ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(json), charset)