目录
- 0.前言
- 1. MqttEncoder--编码器
- 1.1 构造方法
- 1.2 encodeConnectMessage -- 连接消息
- 1.3 encodeConnAckMessage - 确认连接
- 1.4 encodePublishMessage -- 发布消息
- 1.5 encodeSubscribeMessage - 订阅主题
- 1.6 encodeUnsubscribeMessage - 取消订阅
- 1.7 encodeSubAckMessage - 订阅应答
- 1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId
- 1.9 encodeMessageWithOnlySingleByteFixedHeader
- 2. MqttDecoder--解码器
- 2.1 构造方法
- 2.2 READ_FIXED_HEADER - 固定报头解码
- 2.3 READ_VARIABLE_HEADER - 可变报头解码
- 2.4 READ_PAYLOAD- 有效载荷解码
0.前言
这里梳理下netty中对mqtt协议的编码和解码的处理。一方面对mqtt协议的结构再巩固些,另一方面就是学习下netty中对字节的处理。对于MQTT协议,可以参考前一篇文章MQTT协议详解。
1. MqttEncoder–编码器
1.1 构造方法
对于编码器,构造方法是私有的。我们可以通过其提供的静态常量INSTANCE访问。
public static final MqttEncoder INSTANCE = new MqttEncoder();private MqttEncoder() { }
1.2 encodeConnectMessage – 连接消息
编码器,我们重点看一下doEncode方法。通过固定报头中的消息类型来对不同类型的消息进行特定编码。
static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {switch (message.fixedHeader().messageType()) {case CONNECT:return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);case CONNACK:return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);case PUBLISH:return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);case SUBSCRIBE:return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);case UNSUBSCRIBE:return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);case SUBACK:return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);case UNSUBACK:case PUBACK:case PUBREC:case PUBREL:case PUBCOMP:return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);case PINGREQ:case PINGRESP:case DISCONNECT:return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);default:throw new IllegalArgumentException("Unknown message type: " + message.fixedHeader().messageType().value());}}
对于消息的编码,我们可以对照着协议来看,这样会更清晰。
固定报头
可变报头
encodeConnectMessage方法
private static ByteBuf encodeConnectMessage(ByteBufAllocator byteBufAllocator,MqttConnectMessage message) {// 1. 有效载荷初始大小设为0字节int payloadBufferSize = 0;MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttConnectVariableHeader variableHeader = message.variableHeader();MqttConnectPayload payload = message.payload();// 2.public enum MqttVersion {// MQTT_3_1("MQIsdp", (byte) 3),// MQTT_3_1_1("MQTT", (byte) 4);// } 对mqtt版本做校验,枚举中只有两种,如果名称和版本不匹配,则报错MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),(byte) variableHeader.version());// 3.如果可变报头中,没有用户名称但是有用户密码,则报错if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {throw new DecoderException("Without a username, the password MUST be not set");}// 4. 校验有效载荷中的clientId,版本MQTT_3_1中clinetId编码字节长度必须为1到23,// 在v3.1.1中允许超过23字节或者长度为0的clientId,所以不为null即可。String clientIdentifier = payload.clientIdentifier();if (!isValidClientId(mqttVersion, clientIdentifier)) {throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);}byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);// 4.1 connect消息的有效载荷,如果都包含的话必须按这个顺序出现:客户端标识符,遗嘱主题,遗嘱消息,用户名,密码// 而这几部分的结构,都是由一个两字节的长度和对应的载荷消息的组成,所以这里都是 2字节 + 消息的字节长度payloadBufferSize += 2 + clientIdentifierBytes.length;// 5.校验有效载荷中的遗嘱主题和遗嘱消息,如果可变报头中的遗嘱标志为1,则表示这两项存在String willTopic = payload.willTopic();byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;byte[] willMessage = payload.willMessageInBytes();byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;if (variableHeader.isWillFlag()) {payloadBufferSize += 2 + willTopicBytes.length;payloadBufferSize += 2 + willMessageBytes.length;}// 6.校验有效载荷中的用户和密码,如果可变报头中的用户名/密码标识为1,则表示用户名/密码存在String userName = payload.userName();byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;if (variableHeader.hasUserName()) {payloadBufferSize += 2 + userNameBytes.length;}byte[] password = payload.passwordInBytes();byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;if (variableHeader.hasPassword()) {payloadBufferSize += 2 + passwordBytes.length;}byte[] protocolNameBytes = mqttVersion.protocolNameBytes();// 7. 可变报头的长度 = 2字节的协议名的长度(v3.1为3,v3.1.1值为4) + 4字节的协议名(MQTT) + 1字段协议级别 + 1字节连接标志 + 2字节保持时间int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;int variablePartSize = variableHeaderBufferSize + payloadBufferSize;// 8.这里计算的是固定头的长度 = 1字节的报文类型(包括类型标志位) + 剩余长度字段,剩余长度=可变报头 + 有效载荷// 由于剩余长度字段中每字节的最高位为进制位,所以每个字节表示的最大值为128(0-127),所以(可变报头+有效载荷)/128即剩余长度所占的字字节int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);// 9.创建一个这Mqtt消息长度的byteBuf,开始写入消息ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);// 9.1 向byteBuf中写入固定报头中的第一个字节,详细介绍见下段代码buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));// 9.2 向byteBuf中写入固定报头中的剩余长度字段,详细介绍见下段代码writeVariableLengthInt(buf, variablePartSize);// 9.3 写入两字节的协议名的长度,再写入协议名buf.writeShort(protocolNameBytes.length);buf.writeBytes(protocolNameBytes);// 9.4 1字节协议版本,1字节连接标志,2字节连接时间buf.writeByte(variableHeader.version());buf.writeByte(getConnVariableHeaderFlag(variableHeader));buf.writeShort(variableHeader.keepAliveTimeSeconds());// 9.5 最后写入有效载荷buf.writeShort(clientIdentifierBytes.length);buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);if (variableHeader.isWillFlag()) {buf.writeShort(willTopicBytes.length);buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);buf.writeShort(willMessageBytes.length);buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);}if (variableHeader.hasUserName()) {buf.writeShort(userNameBytes.length);buf.writeBytes(userNameBytes, 0, userNameBytes.length);}if (variableHeader.hasPassword()) {buf.writeShort(passwordBytes.length);buf.writeBytes(passwordBytes, 0, passwordBytes.length);}return buf;}
/*** 计算固定报头第一字节** @param header* @return*/private static int getFixedHeaderByte1(MqttFixedHeader header) {int ret = 0;// 1. 将消息类型左移四位,因为第一字节的0-3位为类型对应的标志位ret |= header.messageType().value() << 4;// 2. 如果是重发报文,则重发标志为1,第一字节的第3位if (header.isDup()) {// 采用或运算,将第一字节的第三位这只为1ret |= 0x08;}// 3. Qos等级为第一字节的第1、2位ret |= header.qosLevel().value() << 1;// 4.如果要保留消息,则保留标志为1,第一字节的第0位if (header.isRetain()) {ret |= 0x01;}return ret;}/*** 写入固定报头中的剩余长度字段** @param buf* @param num*/private static void writeVariableLengthInt(ByteBuf buf, int num) {do {// 1. %求余,取前一个字节所表示的数值大小,// 比如num表示200字节,第一次这里digit表示72// 由于第二次,num=1,大于0成立,这里1 % 128 = 1int digit = num % 128;// 2. 第一除以128,即去除前一个字节,num表示1,第二次等于0了,表示没有字节了num /= 128;if (num > 0) {// 3.如果还有字节,则最高位进制位设置为1digit |= 0x80;}// 4. 第一字节写入72,第二字节写入1buf.writeByte(digit);} while (num > 0);}/*** 通过或运算计算连接标志*/private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {int flagByte = 0;if (variableHeader.hasUserName()) {flagByte |= 0x80;}if (variableHeader.hasPassword()) {flagByte |= 0x40;}if (variableHeader.isWillRetain()) {flagByte |= 0x20;}flagByte |= (variableHeader.willQos() & 0x03) << 3;if (variableHeader.isWillFlag()) {flagByte |= 0x04;}if (variableHeader.isCleanSession()) {flagByte |= 0x02;}return flagByte;}
1.3 encodeConnAckMessage - 确认连接
可变报头
encodeConnAckMessage
private static ByteBuf encodeConnAckMessage(ByteBufAllocator byteBufAllocator,MqttConnAckMessage message) {// 一共4个字节,两字节的固定头,两字节的可变头,无载荷ByteBuf buf = byteBufAllocator.buffer(4);buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));// 剩余长度字段固定为2,表示可变报头+载荷=2字节buf.writeByte(2);// 服务端如果保存了会话,则置为1,如果没有保存,则置为0,buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);buf.writeByte(message.variableHeader().connectReturnCode().byteValue());return buf;}
1.4 encodePublishMessage – 发布消息
固定报头
可变报头
只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中。
encodePublishMessage
private static ByteBuf encodePublishMessage(ByteBufAllocator byteBufAllocator,MqttPublishMessage message) {MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttPublishVariableHeader variableHeader = message.variableHeader();ByteBuf payload = message.payload().duplicate();String topicName = variableHeader.topicName();byte[] topicNameBytes = encodeStringUtf8(topicName);// 1. 可变报头长度 = 2字节长度 + topic长度 + 2字节PacketIdentifier(qos=1或qos=2时存在)int variableHeaderBufferSize = 2 + topicNameBytes.length +(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);int payloadBufferSize = payload.readableBytes();int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variablePartSize);buf.writeShort(topicNameBytes.length);buf.writeBytes(topicNameBytes);if (mqttFixedHeader.qosLevel().value() > 0) {buf.writeShort(variableHeader.packetId());}buf.writeBytes(payload);return buf;}
1.5 encodeSubscribeMessage - 订阅主题
有效载荷
encodeSubscribeMessage
private static ByteBuf encodeSubscribeMessage(ByteBufAllocator byteBufAllocator,MqttSubscribeMessage message) {int variableHeaderBufferSize = 2;int payloadBufferSize = 0;MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttMessageIdVariableHeader variableHeader = message.variableHeader();MqttSubscribePayload payload = message.payload();// 1. 订阅消息的载荷中可以包含多个订阅主题for (MqttTopicSubscription topic : payload.topicSubscriptions()) {String topicName = topic.topicName();byte[] topicNameBytes = encodeStringUtf8(topicName);// 2. 每个订阅主题的载荷 = 2字节长度 + topic过滤器的长度 + 1字节的QospayloadBufferSize += 2 + topicNameBytes.length;payloadBufferSize += 1;}int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variablePartSize);// 3. 可变头中包含2字节的packageIdint messageId = variableHeader.messageId();buf.writeShort(messageId);// Payloadfor (MqttTopicSubscription topic : payload.topicSubscriptions()) {String topicName = topic.topicName();byte[] topicNameBytes = encodeStringUtf8(topicName);buf.writeShort(topicNameBytes.length);buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);buf.writeByte(topic.qualityOfService().value());}return buf;}
1.6 encodeUnsubscribeMessage - 取消订阅
有效载荷示例
encodeUnsubscribeMessage
private static ByteBuf encodeUnsubscribeMessage(ByteBufAllocator byteBufAllocator,MqttUnsubscribeMessage message) {int variableHeaderBufferSize = 2;int payloadBufferSize = 0;MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttMessageIdVariableHeader variableHeader = message.variableHeader();MqttUnsubscribePayload payload = message.payload();// 1. 要取消的主题列表,每个主题过滤器 = 2字节长度 + 过滤器长度for (String topicName : payload.topics()) {byte[] topicNameBytes = encodeStringUtf8(topicName);payloadBufferSize += 2 + topicNameBytes.length;}int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variablePartSize);// Variable Headerint messageId = variableHeader.messageId();buf.writeShort(messageId);// Payloadfor (String topicName : payload.topics()) {byte[] topicNameBytes = encodeStringUtf8(topicName);buf.writeShort(topicNameBytes.length);buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);}return buf;}
1.7 encodeSubAckMessage - 订阅应答
可变报头
可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符。
有效载荷
有效载荷包含一个返回码清单。每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器。它们指定了 SUBSCRIBE 请求的每个订阅被授予的最大 QoS 等级。
encodeSubAckMessage
private static ByteBuf encodeSubAckMessage(ByteBufAllocator byteBufAllocator,MqttSubAckMessage message) {// 1. 可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符,所以固定长度为2int variableHeaderBufferSize = 2;// 2. SUBSCRIBE报文中的每个过滤器,subAck消息中都要给出对应主题的所能赋予的最大Qos等级int payloadBufferSize = message.payload().grantedQoSLevels().size();int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));writeVariableLengthInt(buf, variablePartSize);buf.writeShort(message.variableHeader().messageId());for (int qos : message.payload().grantedQoSLevels()) {buf.writeByte(qos);}return buf;}
1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId
对于UNSUBACK、PUBACK、PUBREC、PUBREL、PUBCOMP消息,都是一些确认消息。这类消息中的可变报头需要携带对应消息的packetId,无载荷。
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ByteBufAllocator byteBufAllocator,MqttMessage message) {MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();int msgId = variableHeader.messageId();// 对于此方法,适用于报文结构为: 可变头只有个2字节packetId,并且无有效载荷的int variableHeaderBufferSize = 2; // variable part only has a message idint fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variableHeaderBufferSize);buf.writeShort(msgId);return buf;}
1.9 encodeMessageWithOnlySingleByteFixedHeader
对于PINGREQ、PINGRESP、DISCONNECT消息,既无可变报头,也无有效载荷。
2. MqttDecoder–解码器
2.1 构造方法
对于解码器,netty提供了对外公共的构造方法,无参构造和有参构造。
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;private final int maxBytesInMessage;public MqttDecoder() {this(DEFAULT_MAX_BYTES_IN_MESSAGE);}public MqttDecoder(int maxBytesInMessage) {super(DecoderState.READ_FIXED_HEADER);this.maxBytesInMessage = maxBytesInMessage;}
采用无参构造,默认消息的最大字节为8092,有参构造则可以指定最大字节数。同时,将initialState设置为固定报头,作用是后面解码时从固定头开始,这里作者注释也有说明。
/*** Creates a new instance with the specified initial state.*/protected ReplayingDecoder(S initialState) {state = initialState;}/*** States of the decoder.* We start at READ_FIXED_HEADER, followed by* READ_VARIABLE_HEADER and finally READ_PAYLOAD.*/enum DecoderState {READ_FIXED_HEADER,READ_VARIABLE_HEADER,READ_PAYLOAD,BAD_MESSAGE,}
2.2 READ_FIXED_HEADER - 固定报头解码
解码器,我们重点看一下decode方法。这里通过state()方法判断是解析固定头、可变头、载荷、错误消息。new对象的时候,初始状态为固定头,从固定头开始,注意固定头和可变头case
中没有break
,所以解码顺序固定头 -> 可变头 -> 载荷。在这三个过程中有异常,则 -> BAD_MESSAGE
。
decodeFixedHeader
private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {// 1. 取无符号第一字节,固定头第一字节,7-4位为消息类型,所以右移4位short b1 = buffer.readUnsignedByte();MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);// 1.1 重发标志第3位,值为1,标志重发boolean dupFlag = (b1 & 0x08) == 0x08;// 1.2 Qos占1、2位,采用& 0x06获取到这两位值,然后右移一位即Qos的值int qosLevel = (b1 & 0x06) >> 1;// 1.3 会话保留标志站0位boolean retain = (b1 & 0x01) != 0;int remainingLength = 0;int multiplier = 1;short digit;int loops = 0;do {digit = buffer.readUnsignedByte();// 2.由于每个字节的最高位为进制位,所以&127获取除进制位之外的7位remainingLength += (digit & 127) * multiplier;multiplier *= 128;loops++;} while ((digit & 128) != 0 && loops < 4);// 协议规定,剩余长度字段最大为4个字节if (loops == 4 && (digit & 128) != 0) {throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');}MqttFixedHeader decodedFixedHeader =new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);return validateFixedHeader(resetUnusedFields(decodedFixedHeader));}
validateFixedHeader
协议中规定,PUBREL、SUBSCRIBE、UNSUBSCRIBE中固定头中的Qos等级必须是1。
static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {switch (mqttFixedHeader.messageType()) {case PUBREL:case SUBSCRIBE:case UNSUBSCRIBE:if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");}default:return mqttFixedHeader;}}
resetUnusedFields
对于协议中规定,固定报头中标志位保留的,一律置为0。
static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {switch (mqttFixedHeader.messageType()) {case CONNECT:case CONNACK:case PUBACK:case PUBREC:case PUBCOMP:case SUBACK:case UNSUBACK:case PINGREQ:case PINGRESP:case DISCONNECT:if (mqttFixedHeader.isDup() ||mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||mqttFixedHeader.isRetain()) {return new MqttFixedHeader(mqttFixedHeader.messageType(),false,MqttQoS.AT_MOST_ONCE,false,mqttFixedHeader.remainingLength());}return mqttFixedHeader;case PUBREL:case SUBSCRIBE:case UNSUBSCRIBE:if (mqttFixedHeader.isRetain()) {return new MqttFixedHeader(mqttFixedHeader.messageType(),mqttFixedHeader.isDup(),mqttFixedHeader.qosLevel(),false,mqttFixedHeader.remainingLength());}return mqttFixedHeader;default:return mqttFixedHeader;}}
2.3 READ_VARIABLE_HEADER - 可变报头解码
解码和编码是相对应的,这里不针对每种都进行说明。讲一下连接消息,其他的都比较简单。
decodeConnectionVariableHeader
private static MqttDecoder.Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {// 1. decodeString方法的作用,通过前两个长度字节(MLB,SLB)指定的字节长度来获取数据并装成string// Connect可变头最开始的两个长度字节用来表示协议名称的长度,所以这里通过此方法获取的就是协议名MQTTfinal MqttDecoder.Result<String> protoString = decodeString(buffer);// 2.注意这里的numberOfBytesConsumed字段,用于表示已经读取的字节个数,等解码完有效载荷后,// 会比较固定头里的剩余长度字段和此字段是否完全相等int numberOfBytesConsumed = protoString.numberOfBytesConsumed;final byte protocolLevel = buffer.readByte();numberOfBytesConsumed += 1;final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);// 3.取出连接标志字节,采用&进行解码final int b1 = buffer.readUnsignedByte();numberOfBytesConsumed += 1;final MqttDecoder.Result<Integer> keepAlive = decodeMsbLsb(buffer);numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;final boolean hasUserName = (b1 & 0x80) == 0x80;final boolean hasPassword = (b1 & 0x40) == 0x40;final boolean willRetain = (b1 & 0x20) == 0x20;final int willQos = (b1 & 0x18) >> 3;final boolean willFlag = (b1 & 0x04) == 0x04;final boolean cleanSession = (b1 & 0x02) == 0x02;if (mqttVersion == MqttVersion.MQTT_3_1_1) {final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;if (!zeroReservedFlag) {throw new DecoderException("non-zero reserved flag");}}final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(mqttVersion.protocolName(),mqttVersion.protocolLevel(),hasUserName,hasPassword,willRetain,willQos,willFlag,cleanSession,keepAlive.value);return new MqttDecoder.Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);}
decodeString
2.4 READ_PAYLOAD- 有效载荷解码
有效载荷也不针对每种都进行说明。讲一下连接消息的有效载荷解码。
decodeConnectionPayload
private static MqttDecoder.Result<MqttConnectPayload> decodeConnectionPayload(ByteBuf buffer,MqttConnectVariableHeader mqttConnectVariableHeader) {// 1.解码clientId,对于connect消息,载荷里的clientId是必须有的。final MqttDecoder.Result<String> decodedClientId = decodeString(buffer);final String decodedClientIdValue = decodedClientId.value;final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),(byte) mqttConnectVariableHeader.version());// 1.1 这里前面说过,如果是v3.1.1版本的mqtt,是允许clientId是空字符串或者大于23字节的,但是v3.1的不支持if (!isValidClientId(mqttVersion, decodedClientIdValue)) {throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);}int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;// 2.对于遗嘱topic和遗嘱消息、用户名、密码这几部分需要根据可变头中的标志位是否为1来判断是否存在MqttDecoder.Result<String> decodedWillTopic = null;MqttDecoder.Result<byte[]> decodedWillMessage = null;if (mqttConnectVariableHeader.isWillFlag()) {decodedWillTopic = decodeString(buffer, 0, 32767);numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;decodedWillMessage = decodeByteArray(buffer);numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;}MqttDecoder.Result<String> decodedUserName = null;MqttDecoder.Result<byte[]> decodedPassword = null;if (mqttConnectVariableHeader.hasUserName()) {decodedUserName = decodeString(buffer);numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;}if (mqttConnectVariableHeader.hasPassword()) {decodedPassword = decodeByteArray(buffer);numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;}final MqttConnectPayload mqttConnectPayload =new MqttConnectPayload(decodedClientId.value,decodedWillTopic != null ? decodedWillTopic.value : null,decodedWillMessage != null ? decodedWillMessage.value : null,decodedUserName != null ? decodedUserName.value : null,decodedPassword != null ? decodedPassword.value : null);return new MqttDecoder.Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);}