Netty应用(七) ----MQTT编解码器

目录

  • 0.前言
  • 1. MqttEncoder--编码器
    • 1.1 构造方法
    • 1.2 encodeConnectMessage -- 连接消息
    • 1.3 encodeConnAckMessage - 确认连接
    • 1.4 encodePublishMessage -- 发布消息
    • 1.5 encodeSubscribeMessage - 订阅主题
    • 1.6 encodeUnsubscribeMessage - 取消订阅
    • 1.7 encodeSubAckMessage - 订阅应答
    • 1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId
    • 1.9 encodeMessageWithOnlySingleByteFixedHeader
  • 2. MqttDecoder--解码器
    • 2.1 构造方法
    • 2.2 READ_FIXED_HEADER - 固定报头解码
    • 2.3 READ_VARIABLE_HEADER - 可变报头解码
    • 2.4 READ_PAYLOAD- 有效载荷解码

0.前言

这里梳理下netty中对mqtt协议的编码和解码的处理。一方面对mqtt协议的结构再巩固些,另一方面就是学习下netty中对字节的处理。对于MQTT协议,可以参考前一篇文章MQTT协议详解。

1. MqttEncoder–编码器

1.1 构造方法

对于编码器,构造方法是私有的。我们可以通过其提供的静态常量INSTANCE访问。

    public static final MqttEncoder INSTANCE = new MqttEncoder();private MqttEncoder() { }

1.2 encodeConnectMessage – 连接消息

编码器,我们重点看一下doEncode方法。通过固定报头中的消息类型来对不同类型的消息进行特定编码。

    static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {switch (message.fixedHeader().messageType()) {case CONNECT:return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);case CONNACK:return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);case PUBLISH:return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);case SUBSCRIBE:return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);case UNSUBSCRIBE:return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);case SUBACK:return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);case UNSUBACK:case PUBACK:case PUBREC:case PUBREL:case PUBCOMP:return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);case PINGREQ:case PINGRESP:case DISCONNECT:return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);default:throw new IllegalArgumentException("Unknown message type: " + message.fixedHeader().messageType().value());}}

对于消息的编码,我们可以对照着协议来看,这样会更清晰。
固定报头
在这里插入图片描述
可变报头
在这里插入图片描述
encodeConnectMessage方法

    private static ByteBuf encodeConnectMessage(ByteBufAllocator byteBufAllocator,MqttConnectMessage message) {// 1. 有效载荷初始大小设为0字节int payloadBufferSize = 0;MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttConnectVariableHeader variableHeader = message.variableHeader();MqttConnectPayload payload = message.payload();// 2.public enum MqttVersion {//    MQTT_3_1("MQIsdp", (byte) 3),//    MQTT_3_1_1("MQTT", (byte) 4);// }  对mqtt版本做校验,枚举中只有两种,如果名称和版本不匹配,则报错MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),(byte) variableHeader.version());// 3.如果可变报头中,没有用户名称但是有用户密码,则报错if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {throw new DecoderException("Without a username, the password MUST be not set");}// 4. 校验有效载荷中的clientId,版本MQTT_3_1中clinetId编码字节长度必须为1到23,// 在v3.1.1中允许超过23字节或者长度为0的clientId,所以不为null即可。String clientIdentifier = payload.clientIdentifier();if (!isValidClientId(mqttVersion, clientIdentifier)) {throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);}byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);// 4.1 connect消息的有效载荷,如果都包含的话必须按这个顺序出现:客户端标识符,遗嘱主题,遗嘱消息,用户名,密码// 而这几部分的结构,都是由一个两字节的长度和对应的载荷消息的组成,所以这里都是 2字节 + 消息的字节长度payloadBufferSize += 2 + clientIdentifierBytes.length;// 5.校验有效载荷中的遗嘱主题和遗嘱消息,如果可变报头中的遗嘱标志为1,则表示这两项存在String willTopic = payload.willTopic();byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;byte[] willMessage = payload.willMessageInBytes();byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;if (variableHeader.isWillFlag()) {payloadBufferSize += 2 + willTopicBytes.length;payloadBufferSize += 2 + willMessageBytes.length;}// 6.校验有效载荷中的用户和密码,如果可变报头中的用户名/密码标识为1,则表示用户名/密码存在String userName = payload.userName();byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;if (variableHeader.hasUserName()) {payloadBufferSize += 2 + userNameBytes.length;}byte[] password = payload.passwordInBytes();byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;if (variableHeader.hasPassword()) {payloadBufferSize += 2 + passwordBytes.length;}byte[] protocolNameBytes = mqttVersion.protocolNameBytes();// 7. 可变报头的长度 = 2字节的协议名的长度(v3.1为3,v3.1.1值为4) + 4字节的协议名(MQTT) + 1字段协议级别 + 1字节连接标志 + 2字节保持时间int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;int variablePartSize = variableHeaderBufferSize + payloadBufferSize;// 8.这里计算的是固定头的长度 = 1字节的报文类型(包括类型标志位) + 剩余长度字段,剩余长度=可变报头 + 有效载荷// 由于剩余长度字段中每字节的最高位为进制位,所以每个字节表示的最大值为128(0-127),所以(可变报头+有效载荷)/128即剩余长度所占的字字节int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);// 9.创建一个这Mqtt消息长度的byteBuf,开始写入消息ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);// 9.1 向byteBuf中写入固定报头中的第一个字节,详细介绍见下段代码buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));// 9.2 向byteBuf中写入固定报头中的剩余长度字段,详细介绍见下段代码writeVariableLengthInt(buf, variablePartSize);// 9.3 写入两字节的协议名的长度,再写入协议名buf.writeShort(protocolNameBytes.length);buf.writeBytes(protocolNameBytes);// 9.4 1字节协议版本,1字节连接标志,2字节连接时间buf.writeByte(variableHeader.version());buf.writeByte(getConnVariableHeaderFlag(variableHeader));buf.writeShort(variableHeader.keepAliveTimeSeconds());// 9.5 最后写入有效载荷buf.writeShort(clientIdentifierBytes.length);buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);if (variableHeader.isWillFlag()) {buf.writeShort(willTopicBytes.length);buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);buf.writeShort(willMessageBytes.length);buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);}if (variableHeader.hasUserName()) {buf.writeShort(userNameBytes.length);buf.writeBytes(userNameBytes, 0, userNameBytes.length);}if (variableHeader.hasPassword()) {buf.writeShort(passwordBytes.length);buf.writeBytes(passwordBytes, 0, passwordBytes.length);}return buf;}
    /*** 计算固定报头第一字节** @param header* @return*/private static int getFixedHeaderByte1(MqttFixedHeader header) {int ret = 0;// 1. 将消息类型左移四位,因为第一字节的0-3位为类型对应的标志位ret |= header.messageType().value() << 4;// 2. 如果是重发报文,则重发标志为1,第一字节的第3位if (header.isDup()) {// 采用或运算,将第一字节的第三位这只为1ret |= 0x08;}// 3. Qos等级为第一字节的第1、2位ret |= header.qosLevel().value() << 1;// 4.如果要保留消息,则保留标志为1,第一字节的第0位if (header.isRetain()) {ret |= 0x01;}return ret;}/*** 写入固定报头中的剩余长度字段** @param buf* @param num*/private static void writeVariableLengthInt(ByteBuf buf, int num) {do {// 1. %求余,取前一个字节所表示的数值大小,// 比如num表示200字节,第一次这里digit表示72// 由于第二次,num=1,大于0成立,这里1 % 128 = 1int digit = num % 128;// 2. 第一除以128,即去除前一个字节,num表示1,第二次等于0了,表示没有字节了num /= 128;if (num > 0) {// 3.如果还有字节,则最高位进制位设置为1digit |= 0x80;}// 4. 第一字节写入72,第二字节写入1buf.writeByte(digit);} while (num > 0);}/*** 通过或运算计算连接标志*/private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {int flagByte = 0;if (variableHeader.hasUserName()) {flagByte |= 0x80;}if (variableHeader.hasPassword()) {flagByte |= 0x40;}if (variableHeader.isWillRetain()) {flagByte |= 0x20;}flagByte |= (variableHeader.willQos() & 0x03) << 3;if (variableHeader.isWillFlag()) {flagByte |= 0x04;}if (variableHeader.isCleanSession()) {flagByte |= 0x02;}return flagByte;}

1.3 encodeConnAckMessage - 确认连接

可变报头
在这里插入图片描述
encodeConnAckMessage

    private static ByteBuf encodeConnAckMessage(ByteBufAllocator byteBufAllocator,MqttConnAckMessage message) {// 一共4个字节,两字节的固定头,两字节的可变头,无载荷ByteBuf buf = byteBufAllocator.buffer(4);buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));// 剩余长度字段固定为2,表示可变报头+载荷=2字节buf.writeByte(2);// 服务端如果保存了会话,则置为1,如果没有保存,则置为0,buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);buf.writeByte(message.variableHeader().connectReturnCode().byteValue());return buf;}

1.4 encodePublishMessage – 发布消息

固定报头
在这里插入图片描述

可变报头
只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中
在这里插入图片描述
encodePublishMessage

    private static ByteBuf encodePublishMessage(ByteBufAllocator byteBufAllocator,MqttPublishMessage message) {MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttPublishVariableHeader variableHeader = message.variableHeader();ByteBuf payload = message.payload().duplicate();String topicName = variableHeader.topicName();byte[] topicNameBytes = encodeStringUtf8(topicName);// 1. 可变报头长度 = 2字节长度 + topic长度 + 2字节PacketIdentifier(qos=1或qos=2时存在)int variableHeaderBufferSize = 2 + topicNameBytes.length +(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);int payloadBufferSize = payload.readableBytes();int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variablePartSize);buf.writeShort(topicNameBytes.length);buf.writeBytes(topicNameBytes);if (mqttFixedHeader.qosLevel().value() > 0) {buf.writeShort(variableHeader.packetId());}buf.writeBytes(payload);return buf;}

1.5 encodeSubscribeMessage - 订阅主题

有效载荷
在这里插入图片描述
encodeSubscribeMessage

    private static ByteBuf encodeSubscribeMessage(ByteBufAllocator byteBufAllocator,MqttSubscribeMessage message) {int variableHeaderBufferSize = 2;int payloadBufferSize = 0;MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttMessageIdVariableHeader variableHeader = message.variableHeader();MqttSubscribePayload payload = message.payload();// 1. 订阅消息的载荷中可以包含多个订阅主题for (MqttTopicSubscription topic : payload.topicSubscriptions()) {String topicName = topic.topicName();byte[] topicNameBytes = encodeStringUtf8(topicName);// 2. 每个订阅主题的载荷 = 2字节长度 + topic过滤器的长度 + 1字节的QospayloadBufferSize += 2 + topicNameBytes.length;payloadBufferSize += 1;}int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variablePartSize);// 3. 可变头中包含2字节的packageIdint messageId = variableHeader.messageId();buf.writeShort(messageId);// Payloadfor (MqttTopicSubscription topic : payload.topicSubscriptions()) {String topicName = topic.topicName();byte[] topicNameBytes = encodeStringUtf8(topicName);buf.writeShort(topicNameBytes.length);buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);buf.writeByte(topic.qualityOfService().value());}return buf;}

1.6 encodeUnsubscribeMessage - 取消订阅

有效载荷示例
在这里插入图片描述
encodeUnsubscribeMessage

    private static ByteBuf encodeUnsubscribeMessage(ByteBufAllocator byteBufAllocator,MqttUnsubscribeMessage message) {int variableHeaderBufferSize = 2;int payloadBufferSize = 0;MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttMessageIdVariableHeader variableHeader = message.variableHeader();MqttUnsubscribePayload payload = message.payload();// 1. 要取消的主题列表,每个主题过滤器 = 2字节长度 + 过滤器长度for (String topicName : payload.topics()) {byte[] topicNameBytes = encodeStringUtf8(topicName);payloadBufferSize += 2 + topicNameBytes.length;}int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variablePartSize);// Variable Headerint messageId = variableHeader.messageId();buf.writeShort(messageId);// Payloadfor (String topicName : payload.topics()) {byte[] topicNameBytes = encodeStringUtf8(topicName);buf.writeShort(topicNameBytes.length);buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);}return buf;}

1.7 encodeSubAckMessage - 订阅应答

可变报头
可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符。
有效载荷
有效载荷包含一个返回码清单。每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器。它们指定了 SUBSCRIBE 请求的每个订阅被授予的最大 QoS 等级。
在这里插入图片描述
encodeSubAckMessage

    private static ByteBuf encodeSubAckMessage(ByteBufAllocator byteBufAllocator,MqttSubAckMessage message) {// 1. 可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符,所以固定长度为2int variableHeaderBufferSize = 2;// 2. SUBSCRIBE报文中的每个过滤器,subAck消息中都要给出对应主题的所能赋予的最大Qos等级int payloadBufferSize = message.payload().grantedQoSLevels().size();int variablePartSize = variableHeaderBufferSize + payloadBufferSize;int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));writeVariableLengthInt(buf, variablePartSize);buf.writeShort(message.variableHeader().messageId());for (int qos : message.payload().grantedQoSLevels()) {buf.writeByte(qos);}return buf;}

1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId

对于UNSUBACK、PUBACK、PUBREC、PUBREL、PUBCOMP消息,都是一些确认消息。这类消息中的可变报头需要携带对应消息的packetId,无载荷。

    private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ByteBufAllocator byteBufAllocator,MqttMessage message) {MqttFixedHeader mqttFixedHeader = message.fixedHeader();MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();int msgId = variableHeader.messageId();// 对于此方法,适用于报文结构为: 可变头只有个2字节packetId,并且无有效载荷的int variableHeaderBufferSize = 2; // variable part only has a message idint fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));writeVariableLengthInt(buf, variableHeaderBufferSize);buf.writeShort(msgId);return buf;}

1.9 encodeMessageWithOnlySingleByteFixedHeader

对于PINGREQ、PINGRESP、DISCONNECT消息,既无可变报头,也无有效载荷。

2. MqttDecoder–解码器

2.1 构造方法

对于解码器,netty提供了对外公共的构造方法,无参构造和有参构造。

    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;private final int maxBytesInMessage;public MqttDecoder() {this(DEFAULT_MAX_BYTES_IN_MESSAGE);}public MqttDecoder(int maxBytesInMessage) {super(DecoderState.READ_FIXED_HEADER);this.maxBytesInMessage = maxBytesInMessage;}

采用无参构造,默认消息的最大字节为8092,有参构造则可以指定最大字节数。同时,将initialState设置为固定报头,作用是后面解码时从固定头开始,这里作者注释也有说明。

    /*** Creates a new instance with the specified initial state.*/protected ReplayingDecoder(S initialState) {state = initialState;}/*** States of the decoder.* We start at READ_FIXED_HEADER, followed by* READ_VARIABLE_HEADER and finally READ_PAYLOAD.*/enum DecoderState {READ_FIXED_HEADER,READ_VARIABLE_HEADER,READ_PAYLOAD,BAD_MESSAGE,}

2.2 READ_FIXED_HEADER - 固定报头解码

解码器,我们重点看一下decode方法。这里通过state()方法判断是解析固定头、可变头、载荷、错误消息。new对象的时候,初始状态为固定头,从固定头开始,注意固定头和可变头case中没有break,所以解码顺序固定头 -> 可变头 -> 载荷。在这三个过程中有异常,则 -> BAD_MESSAGE
decodeFixedHeader

    private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {// 1. 取无符号第一字节,固定头第一字节,7-4位为消息类型,所以右移4位short b1 = buffer.readUnsignedByte();MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);// 1.1 重发标志第3位,值为1,标志重发boolean dupFlag = (b1 & 0x08) == 0x08;// 1.2 Qos占1、2位,采用& 0x06获取到这两位值,然后右移一位即Qos的值int qosLevel = (b1 & 0x06) >> 1;// 1.3 会话保留标志站0位boolean retain = (b1 & 0x01) != 0;int remainingLength = 0;int multiplier = 1;short digit;int loops = 0;do {digit = buffer.readUnsignedByte();// 2.由于每个字节的最高位为进制位,所以&127获取除进制位之外的7位remainingLength += (digit & 127) * multiplier;multiplier *= 128;loops++;} while ((digit & 128) != 0 && loops < 4);// 协议规定,剩余长度字段最大为4个字节if (loops == 4 && (digit & 128) != 0) {throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');}MqttFixedHeader decodedFixedHeader =new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);return validateFixedHeader(resetUnusedFields(decodedFixedHeader));}

validateFixedHeader
协议中规定,PUBREL、SUBSCRIBE、UNSUBSCRIBE中固定头中的Qos等级必须是1。
在这里插入图片描述

    static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {switch (mqttFixedHeader.messageType()) {case PUBREL:case SUBSCRIBE:case UNSUBSCRIBE:if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");}default:return mqttFixedHeader;}}

resetUnusedFields
对于协议中规定,固定报头中标志位保留的,一律置为0。

    static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {switch (mqttFixedHeader.messageType()) {case CONNECT:case CONNACK:case PUBACK:case PUBREC:case PUBCOMP:case SUBACK:case UNSUBACK:case PINGREQ:case PINGRESP:case DISCONNECT:if (mqttFixedHeader.isDup() ||mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||mqttFixedHeader.isRetain()) {return new MqttFixedHeader(mqttFixedHeader.messageType(),false,MqttQoS.AT_MOST_ONCE,false,mqttFixedHeader.remainingLength());}return mqttFixedHeader;case PUBREL:case SUBSCRIBE:case UNSUBSCRIBE:if (mqttFixedHeader.isRetain()) {return new MqttFixedHeader(mqttFixedHeader.messageType(),mqttFixedHeader.isDup(),mqttFixedHeader.qosLevel(),false,mqttFixedHeader.remainingLength());}return mqttFixedHeader;default:return mqttFixedHeader;}}

2.3 READ_VARIABLE_HEADER - 可变报头解码

解码和编码是相对应的,这里不针对每种都进行说明。讲一下连接消息,其他的都比较简单。
decodeConnectionVariableHeader

    private static MqttDecoder.Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {// 1. decodeString方法的作用,通过前两个长度字节(MLB,SLB)指定的字节长度来获取数据并装成string// Connect可变头最开始的两个长度字节用来表示协议名称的长度,所以这里通过此方法获取的就是协议名MQTTfinal MqttDecoder.Result<String> protoString = decodeString(buffer);// 2.注意这里的numberOfBytesConsumed字段,用于表示已经读取的字节个数,等解码完有效载荷后,// 会比较固定头里的剩余长度字段和此字段是否完全相等int numberOfBytesConsumed = protoString.numberOfBytesConsumed;final byte protocolLevel = buffer.readByte();numberOfBytesConsumed += 1;final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);// 3.取出连接标志字节,采用&进行解码final int b1 = buffer.readUnsignedByte();numberOfBytesConsumed += 1;final MqttDecoder.Result<Integer> keepAlive = decodeMsbLsb(buffer);numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;final boolean hasUserName = (b1 & 0x80) == 0x80;final boolean hasPassword = (b1 & 0x40) == 0x40;final boolean willRetain = (b1 & 0x20) == 0x20;final int willQos = (b1 & 0x18) >> 3;final boolean willFlag = (b1 & 0x04) == 0x04;final boolean cleanSession = (b1 & 0x02) == 0x02;if (mqttVersion == MqttVersion.MQTT_3_1_1) {final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;if (!zeroReservedFlag) {throw new DecoderException("non-zero reserved flag");}}final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(mqttVersion.protocolName(),mqttVersion.protocolLevel(),hasUserName,hasPassword,willRetain,willQos,willFlag,cleanSession,keepAlive.value);return new MqttDecoder.Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);}

decodeString
在这里插入图片描述
在这里插入图片描述

2.4 READ_PAYLOAD- 有效载荷解码

有效载荷也不针对每种都进行说明。讲一下连接消息的有效载荷解码。
decodeConnectionPayload

    private static MqttDecoder.Result<MqttConnectPayload> decodeConnectionPayload(ByteBuf buffer,MqttConnectVariableHeader mqttConnectVariableHeader) {// 1.解码clientId,对于connect消息,载荷里的clientId是必须有的。final MqttDecoder.Result<String> decodedClientId = decodeString(buffer);final String decodedClientIdValue = decodedClientId.value;final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),(byte) mqttConnectVariableHeader.version());// 1.1 这里前面说过,如果是v3.1.1版本的mqtt,是允许clientId是空字符串或者大于23字节的,但是v3.1的不支持if (!isValidClientId(mqttVersion, decodedClientIdValue)) {throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);}int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;// 2.对于遗嘱topic和遗嘱消息、用户名、密码这几部分需要根据可变头中的标志位是否为1来判断是否存在MqttDecoder.Result<String> decodedWillTopic = null;MqttDecoder.Result<byte[]> decodedWillMessage = null;if (mqttConnectVariableHeader.isWillFlag()) {decodedWillTopic = decodeString(buffer, 0, 32767);numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;decodedWillMessage = decodeByteArray(buffer);numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;}MqttDecoder.Result<String> decodedUserName = null;MqttDecoder.Result<byte[]> decodedPassword = null;if (mqttConnectVariableHeader.hasUserName()) {decodedUserName = decodeString(buffer);numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;}if (mqttConnectVariableHeader.hasPassword()) {decodedPassword = decodeByteArray(buffer);numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;}final MqttConnectPayload mqttConnectPayload =new MqttConnectPayload(decodedClientId.value,decodedWillTopic != null ? decodedWillTopic.value : null,decodedWillMessage != null ? decodedWillMessage.value : null,decodedUserName != null ? decodedUserName.value : null,decodedPassword != null ? decodedPassword.value : null);return new MqttDecoder.Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/220636.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS应用开发实战—开箱即用的应用首页页面【ArkTS】【鸿蒙专栏-34】

一.HarmonyOS应用开发实战—开箱即用的应用首页页面【ArkTS】【鸿蒙专栏-34】 1.1 项目背景 HarmonyOS(鸿蒙操作系统)是华为公司推出的一种分布式操作系统。它被设计为一种全场景、全连接的操作系统,旨在实现在各种设备之间的无缝协同和共享,包括智能手机、平板电脑、智能…

计算机网络(四)

九、网络安全 &#xff08;一&#xff09;什么是网络安全&#xff1f; A、网络安全状况 分布式反射攻击逐渐成为拒绝攻击的重要形式 涉及重要行业和政府部门的高危漏洞事件增多。 基础应用和通用软硬件漏洞风险凸显&#xff08;“心脏出血”&#xff0c;“破壳”等&#x…

出国旅游需要注意些什么

出国旅游是一种令人兴奋、令人期待的经历。然而&#xff0c;在进行这种经历之前&#xff0c;有几件事情是需要注意的。本文将为您介绍出国旅游需要注意的一些重要事项。首先&#xff0c;为了确保您的出国旅行顺利进行&#xff0c;您应该提前办理好您的签证和护照。不同国家对于…

【神器】wakatime代码时间追踪工具

文章目录 wakatime简介支持的IDE安装步骤API文档插件费用写在最后 wakatime简介 wakatime就是一个IDE插件&#xff0c;一个代码时间追踪工具。可自动获取码编码时长和度量指标&#xff0c;以产生很多的coding图形报表。这些指标图形可以为开发者统计coding信息&#xff0c;比如…

头部首发优志愿头部u_sign生成与TLS指纹处理! + 数据可视化技术讲解【Python爬虫】

目录 针对大学名称 大学排名, 综合指数,学校情况等数据进行爬取 找对应得数据包 请求发现数据有加密 发现加密参数 搜索加密参数&#xff0c;好进行分析 分析过程 数据可视化 针对大学名称 大学排名, 综合指数,学校情况等数据进行爬取 首先进行鼠标右键&#xff0c;进行…

Spring Boot+Mybatis设置sql日志打印

在全局配置文件添加以下内容&#xff1a;logging.level.com.demo.mapperdebug&#xff0c;com.demo.mapper&#xff1a;src下的mapper路径&#xff0c;debug&#xff1a;设置日志打印级别为debug&#xff0c;亦可设置为&#xff1a;ERROR、WARN、INFO application.properties …

TikTok获客技巧分享(纯干货)

随着全球短视频的兴起&#xff0c;TikTok已经成为了最受欢迎的社交媒体平台之一&#xff0c;对于企业和个人而言&#xff0c;如何在TikTok上获取更多的客户和粉丝&#xff0c;成为了他们关注的焦点&#xff0c;本文将分享一些TikTok获客技巧&#xff0c;帮助大家在短视频平台上…

初识Redis缓存,一文掌握Redis重要知识文集。

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

云原生基础入门概念

文章目录 发现宝藏云原生的概念云原生的关键技术为何选择云原生&#xff1f;云原生的实际应用好书推荐 发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【宝藏入口】。 云原生的概念 当谈及现…

[23] GaussianAvatars: Photorealistic Head Avatars with Rigged 3D Gaussians

[paper | proj] 给定FLAME&#xff0c;基于每个三角面片中心初始化一个3D Gaussian&#xff08;3DGS&#xff09;&#xff1b;当FLAME mesh被驱动时&#xff0c;3DGS根据它的父亲三角面片&#xff0c;做平移、旋转和缩放变化&#xff1b;3DGS可以视作mesh上的辐射场&#xff1…

智能优化算法应用:基于算术优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于算术优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于算术优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.算术优化算法4.实验参数设定5.算法结果6.…

SD-WAN网络的可扩展性解析

SD-WAN组网以其卓越的可扩展性而脱颖而出&#xff0c;为企业提供了一个灵活适应不断扩张和增长需求的网络解决方案。SD-WAN组网通过轻松实现规模调整、拓扑变更以及多种接入方式的切换&#xff0c;确保网络的高效性和可管理性。对于正处于快速发展时期的企业而言&#xff0c;SD…

在IDEA中使用Git 、远程仓库克隆工程到本地

4.1 在IDEA中配置Git 安装好IntelliJ IDEA后,如果Git安装在默认路径下,那么idea会自动找到git的位置,如果更改了Git的安装位置则需要手动配置下Git的路径。 选择File→Settings打开设置窗口,找到Version Control下的git选项: 选择git的安装目录后可以点击“Test”按钮测…

探讨小鹏汽车CAN通讯协议分析破解过程数据研究技术应用

当前新能源电动汽车设计日益复杂&#xff0c;为提高舒适性、功能性、提升性能和确保更高的安全性&#xff0c;很多汽车的设计中融入了更复杂的功能。包括了雷达、激光雷达、自适应巡航、L2以上自动驾驶系统&#xff0c;高级驾驶辅助系统、盲区监测等等。安装在汽车上的传感器和…

Java中线程状态的描述

多线程-基础方法的认识 截止目前线程的复习 Thread 类 创建Thread类的方法 继承Thread类,重写run方法实现Runnable接口,重写run方法使用匿名内部类继承Thread类,重写run方法使用匿名内部类实现Runnable接口,重写run方法使用Lambda表达式 run方法中的所有的代码是当前线程对…

oracle 锁表解决办法

相关表介绍 V$LOCKED_OBJECT&#xff08;记录锁信息的表&#xff09;v$session&#xff08;记录会话信息的表&#xff09;v$sql&#xff08;记录 sql 执行的表&#xff09;dba_objects&#xff08;用来管理对象&#xff0c;表、库等等&#xff09; 查询锁表的 SID select b.…

滑动窗口最大值(LeetCode 239)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路方法一&#xff1a;暴力法方法二&#xff1a;优先队列方法三&#xff1a;单调队列 参考文献 1.问题描述 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动…

【Trino权威指南(第二版)】Trino的架构、trino架构组件、 trino连接器架构的细节、trino的查询执行模型

文章目录 一. Trino架构1. 架构概览2. 协调器3. 发现服务4. 工作节点 二. 基于连接器的架构三. 查询执行模型1. 解析—>查询计划2. 查询计划 —> 分布式查询计划3. 运行阶段3.1. 基础概念切片&#xff1a;并行单元page 与 exchange算子pipeline切片的driverOperator 3.2.…

大数据技术14:FlinkCDC数据变更捕获

前言&#xff1a;Flink CDC是Flink社区开发的flink-cdc-connectors 组件&#xff0c;这是⼀个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。 https://github.com/ververica/flink-cdc-connectors 一、CDC 概述 CDC 的全称是 Change …

深度学习 Day19——P8YOLOv5-C3模块实现

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 文章目录 前言1 我的环境2 代码实现与执行结果2.1 前期准备2.1.1 引入库2.1.2 设置GPU&#xff08;如果设备上支持GPU就使用GPU,否则使用C…