MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息协议。它基于发布/订阅模式,专为低带宽、高延迟或不可靠网络设计。它主要用于物联网(IoT)设备之间的通信,但也广泛应用于其他需要高效消息传递的场景。
特点:
- 轻量级,头部开销小,适合资源受限的设备。
- 支持发布/订阅模式,适合一对多的消息传递。
- 支持QoS(服务质量)级别,确保消息的可靠传递。
- 支持持久会话,断线后可以恢复未完成的消息传递。
- 支持TLS/SSL加密,确保安全传输。
适用场景:
- 物联网设备之间的通信。
- 传感器网络和智能设备的管理。
- 低带宽、低功耗的嵌入式系统。
1、基本概念
(1)、发布/订阅模型
MQTT使用发布/订阅模型进行通信,而不是传统的客户端-服务器请求/响应模型。这种模型的优点在于解耦了消息的生产者和消费者。
主要涉及3个概念:
- 发布者(Publisher):发送消息到特定主题(Topic)。
- 订阅者(Subscriber):订阅特定主题并接收该主题的所有消息。
- 代理(Broker):负责管理和分发消息的中间件。
(2)、主题(Topic)
主题是消息的分类方式,类似于文件系统中的路径。主题可以使用斜杠/分隔层次结构,例如home/livingroom/temperature。
主题支持通配符订阅:
- 单层通配符+:匹配单个层级的主题(即:+只能匹配一个单词),如home/+/temperature可以匹配home/livingroom/temperature和home/kitchen/temperature。
- 多层通配符* :匹配多个层级的主题(即:*可以匹配多个单词),如home/*可以匹配 home/livingroom和home/kitchen/light等。
(3)、QoS(Quality of Service)
QoS定义了消息传递的服务质量级别,有三种级别。
级别:
- QoS 0:最多一次(At most once)。消息可能丢失,但不会重复发送。
- QoS 1:至少一次(At least once)。消息保证送达,但可能会重复发送。
- QoS 2:恰好一次(Exactly once)。消息保证仅送达一次,可靠性最高。
2、MQTT协议的工作流程
原理示例图:
(1)、连接建立
客户端通过TCP连接到Broker,并发送CONNECT消息。
CONNECT消息包含以下字段:
- Client Identifier (ClientId):唯一标识客户端。
- Clean Session:决定是否清除会话状态。
- Will Topic:遗嘱消息的主题。
- Will Message:遗嘱消息的内容。
- Username/Password:用于认证。
Broker收到CONNECT消息后,返回CONNACK消息确认连接成功或失败。
(2)、订阅主题
客户端向Broker发送SUBSCRIBE消息以订阅一个或多个主题。SUBSCRIBE消息包含主题列表和对应的QoS级别。
Broker返回SUBACK消息确认订阅成功,并指明每个主题的实际QoS级别。
(3)、发布消息
客户端向Broker发送PUBLISH消息以发布消息到指定主题。
PUBLISH消息包含以下字段:
- Topic Name:消息发布的主题。
- Payload:消息内容。
- QoS:服务质量级别。
- Retain Flag:是否保留消息。
Broker将消息转发给所有订阅了该主题的客户端。
(4)、取消订阅
客户端向Broker发送UNSUBSCRIBE消息以取消订阅一个或多个主题。UNSUBSCRIBE消息包含主题列表。
Broker返回UNSUBACK 消息确认取消订阅成功。
(5)、断开连接
客户端向Broker发送DISCONNECT消息断开连接。如果Clean Session设置为true,Broker将清除与该客户端相关的会话状态。
3、MQTT协议的优势
(1)、轻量级
MQTT的头部非常小,通常只有2字节,这使得它非常适合资源受限的设备,如传感器和嵌入式系统。
(2)、高效
MQTT使用发布/订阅模型,减少了不必要的网络流量和计算资源消耗。同时,QoS机制确保了消息的可靠传输。
(3)、可扩展性
MQTT支持大规模分布式系统中的消息传递。通过使用Broker,可以在不同地理位置的设备之间实现高效的通信。
(4)、安全性
MQTT支持多种安全机制。
例如:
- TLS/SSL:加密传输通道。
- 用户名/密码认证:基于凭证的身份验证。
- ACL(Access Control List):细粒度的权限控制。
4、MQTT实际应用案例
(1)、物联网设备管理
在智能家居、工业自动化等场景中,MQTT广泛应用于设备状态监控、远程控制和数据采集。例如,智能温控器可以发布当前温度到home/livingroom/temperature主题,用户可以通过订阅该主题获取最新温度数据。
(2)、移动应用
移动应用可以使用MQTT实现即时通讯功能。由于其轻量级特性,即使在网络条件不佳的情况下,也能保证消息的及时传递。
(3)、边缘计算
边缘计算设备可以使用MQTT实现与云端的高效通信。例如,在自动驾驶汽车中,车载传感器可以将实时数据发布到云端进行分析和处理。
5、MQTT实现与工具
(1)、MQTT Broker
常见的MQTT Broker实现有:
- EMQ X:高性能、可扩展的MQTT Broker。(最常用)
- Mosquitto:开源、轻量级的MQTT Broker。
- HiveMQ:企业级MQTT Broker,支持集群部署。
(2)、MQTT客户端库
主流编程语言都有相应的MQTT客户端库。
例如:
- Java:Eclipse Paho
- Python:paho-mqtt
- JavaScript:mqtt.js
(3)、MQTT测试工具
常用的MQTT测试工具有:
- MQTTX:图形化MQTT客户端,适合手动测试。
- mosquitto_pub/mosquitto_sub:命令行工具,适合脚本化测试。
6、代码示例
在Spring Boot应用中集成MQTT,通常使用Eclipse Paho的MQTT客户端库。
(1)、添加依赖
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Eclipse Paho MQTT Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Lombok (可选) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
(2)、MQTT发布者
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttPublisher {public static void main(String[] args) {String broker = "tcp://broker.hivemq.com:1883"; // emq服务器地址信息String clientId = "publisher-client"; // 客户端的唯一标识符,尽量设置唯一String topic = "home/livingroom/message"; // 主题try {// 创建客户端实例MqttClient client = new MqttClient(broker, clientId);// 设置连接选项MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true); // 表示每次连接时清除会话状态。// 设置回调函数client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) { //方法在连接丢失时调用System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception { // 方法在收到消息时调用。System.out.println("Message arrived from topic: " + topic + " -> " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) { // 方法在消息发送完成时调用。System.out.println("Delivery complete: " + token.isComplete());}});// 连接到Brokerclient.connect(options);System.out.println("Connected to broker");// 创建消息String content = "我想吃火锅,速去!!";MqttMessage message = new MqttMessage(content.getBytes());message.setQos(1); // 设置 QoS 级别为 1// 发布消息client.publish(topic, message);System.out.println("Message published");// 断开连接client.disconnect();System.out.println("Disconnected");} catch (MqttException me) {System.out.println("Reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
(3)、MQTT消费者
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttSubscriber {public static void main(String[] args) {String broker = "tcp://broker.hivemq.com:1883";String clientId = "subscriber-client";String topic = "home/livingroom/message";try {// 创建客户端实例MqttClient client = new MqttClient(broker, clientId);// 设置连接选项MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);// 设置回调函数client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Message arrived from topic: " + topic + " -> " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("Delivery complete: " + token.isComplete());}});// 连接到 Brokerclient.connect(options);System.out.println("Connected to broker");// 订阅主题int qos = 1; // 设置QoS级别为 1client.subscribe(topic, qos); // 订阅主题System.out.println("Subscribed to topic: " + topic);// 保持程序运行以接收消息while (true) {Thread.sleep(1000);}} catch (MqttException | InterruptedException me) {System.out.println("Reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
7、QoS(Quality of Service)可靠性原理
MQTT的QoS机制确保消息在不同网络条件下以不同的可靠性级别进行传输。QoS分为三个级别:0、1和2,每个级别提供不同程度的消息传递保证。
(1)、QoS 0–最多一次(At most once)
- 描述:消息发送后不进行确认,可能会丢失。
- 适用场景:适用于对数据丢失容忍度较高的应用,如环境传感器的数据采集,偶尔丢失一条数据不会影响整体分析结果。
- 工作原理:
- 发布者将消息发送到Broker。
- Broker收到消息后直接转发给订阅者。
- 没有确认机制,因此消息可能丢失。
流程示例图:
(2)、QoS 1–至少一次(At least once)
- 描述:消息至少会被传递一次,但可能会重复传递。
- 适用场景:适用于对数据丢失敏感但可以容忍重复的应用,如智能设备的状态报告。
- 工作原理:
- 发布者发送消息时附带一个消息ID。
- Broker收到消息后,向发布者发送PUBACK确认消息已接收。
- 如果发布者在一定时间内没有收到PUBACK,它会重新发送消息,直到收到确认。
- 订阅者接收到消息后也会向Broker发送确认(PUBACK),Broker再次确认消息已被处理。
- 可能导致消息重复,但不会丢失。
示例图:
(3)、QoS 2–恰好一次(Exactly once)
- 描述:消息只会被传递一次,确保无重复和无丢失。
- 适用场景:适用于对数据准确性要求极高的应用,如金融交易系统或关键任务控制系统。
- 工作原理:
- 发布者发送消息时附带一个消息ID。
- Broker收到消息后,向发布者发送PUBREC确认消息已接收。
- 发布者收到PUBREC后,发送PUBREL请求释放消息。
- Broker收到PUBREL后,发送PUBCOMP确认消息已完成,并将消息转发给订阅者。
- 订阅者接收到消息后,向Broker发送PUBREC确认消息已接收,Broker再次确认消息已被处理。
- 这种机制确保消息只会被传递一次,不会重复也不会丢失。
示例图:
注意:
QoS 2虽然消息处理是最好的,但是消耗也是最厉害的,性能也比较差,如果非必要,不建议设置这么高的消息级别。
8、订阅主题和发布消息的QoS设置
在MQTT中,订阅主题和发布消息都可以设置QoS级别。它们之间的关系决定了实际消息传递的QoS级别。
(1)、订阅者的QoS设置
当客户端订阅某个主题时,它可以指定一个QoS级别。这表示该客户端希望从该主题接收到的消息的最低QoS级别。
(2)、发布者的QoS设置
当客户端发布消息到某个主题时,它也可以指定一个QoS级别。这表示该消息将以何种QoS级别传递给Broker。
(3)、实际QoS级别的确定
实际传递给订阅者的QoS级别由以下规则决定:
- 订阅者的QoS >发布者的QoS:实际传递的QoS级别为发布者的QoS级别。
- 订阅者的QoS <=发布者的QoS:实际传递的QoS级别为订阅者的QoS级别。
简单说:
以qos较小的为准,提供性能更加的服务。
例如:
- 订阅者QoS = 0,无论发布者设置的QoS是什么,订阅者都将按QoS 0接收消息。
- 订阅者QoS = 1,如果发布者设置的QoS是0,1,2,订阅者将按QoS 1接收消息;
- 订阅者QoS = 2,只有当发布者设置的QoS是2 时,订阅者才能按QoS 2接收消息;否则,订阅者将按发布者的QoS级别接收消息。
9、MQTT报文格式详解
MQTT(Message Queuing Telemetry Transport)协议使用一种紧凑的二进制报文格式进行通信。MQTT报文分为多种类型,每种类型的报文都有其特定的结构和用途。
(1)、MQTT报文结构
每个MQTT报文由以下部分组成:
- 固定报头(Fixed Header):所有MQTT报文都包含固定报头。
- 可变报头(Variable Header):某些类型的报文包含可变报头。
- 有效载荷(Payload):某些类型的报文包含有效载荷。
1、固定报头(Fixed Header)
固定报头是每个 MQTT 报文的必需部分,它包含以下字段:
- 报文类型(Type):4位,表示报文的类型,如CONNECT、PUBLISH、SUBSCRIBE等。
- 标志位(Flags):4位,用于指定报文的具体行为。
- 剩余长度(Remaining Length):表示可变报头和有效载荷的总长度。
2、可变报头(Variable Header)
可变报头的内容取决于报文类型。例如,PUBLISH报文的可变报头包含主题名称和消息ID。
3、有效载荷(Payload)
有效载荷的内容也取决于报文类型。例如,PUBLISH报文的有效载荷包含实际的消息内容。
(2)、PUBLISH报文示例
假设发送一条消息:“我想吃火锅,速去!!”,可以通过PUBLISH报文来实现。
1、固定报头示例
- 报文类型:0x30(PUBLISH报文)
- 标志位:根据QoS和其他标志位设置
- 剩余长度:计算可变报头和有效载荷的总长度
假设QoS = 0,不保留消息,则固定报头为0x30。
2、可变报头示例
- 主题名称(Topic Name):消息发布的主题,例如home/livingroom/message
- 消息ID(Message ID):仅当QoS > 0时需要
3、有效载荷示例
有效载荷包含实际的消息内容,即 “我想吃火锅,速去!!”。
4、完整报文示例
Fixed Header: 0x30 0x24
Variable Header:Topic Name Length MSB: 0x00Topic Name Length LSB: 0x1CTopic Name Content: 0x68 0x6F 0x6D 0x65 0x2F 0x6C 0x69 0x76 0x69 0x6E 0x67 0x72 0x6F 0x6F 0x6D 0x2F 0x6D 0x65 0x73 0x73 0x61 0x67 0x65
Payload:0xE6 0x88 0x91 0xE6 0x83 0xB3 0xE5 0x90 0x83 0xE7 0x82 0x8A 0xE9 0xA3 0x9F 0xEF 0xBC 0x8C 0xE9 0x80 0x9F 0xE5 0x8E 0xBB 0xEF 0xBC 0x81 0xEF 0xBC 0x81
解释:
Fixed Header为固定报头。
Variable Header为可变报头。
Payload为有效载荷。
5、消费者接收报文
消费者(订阅者)接收到的报文与发布者发送的报文相同。订阅者需要解析固定报头、可变报头和有效载荷以获取消息内容。
解析步骤:
(1)、读取固定报头:确定报文类型和剩余长度。
(2)、读取可变报头:获取主题名称的长度和内容。
(3)、读取有效载荷:获取实际的消息内容。
10、总结
MQTT是一种高效、可靠的轻量级消息协议,特别适用于物联网和移动应用领域。通过发布/订阅模型和 QoS 机制,MQTT 提供了灵活且强大的消息传递能力。无论是简单的设备状态监控还是复杂的分布式系统,MQTT 都能提供有效的解决方案。
乘风破浪!Dare to Be!!!