简介:
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅模式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的。
MQTT有三种角色的存在:
Broker代理:很多人理解为中间件,当然可以这样子认为。他就是一个中间件。用于处理信息并发送到相应的订阅者。其实就是相当于服务端,一般是第三方程序。
发布者:用于发布信息到代理上面。注意:发布者也可以是订阅者。
订阅者:就是用于接受信息的客户端。
生产者和消费者都可以视同broker的客户端,自己写代码实现。
Qos(服务质量)
Qos 0
这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。
Qos 1
承诺消息将至少传送一次给订阅者。
QoS 2
使用 QoS 2,我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。
MQTT协议中的方法:
Connect:等待与服务器建立连接
Disconnect:等待MQTT客户端完成所作的工作,并于服务器断开TCP/IP会话
Subscribe:等待完成订阅
UnSubscribe:等待服务器取消客户端的一个活多个和topics订阅
Publish:MQTT客户端发送消息请求,发送完成后返回应用程序线程
优点:
低开销、低带宽占用,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
好处:解耦,异步调用,削峰。
解耦:通过中间件,各个系统之间可以独立运行,不会因为其中一个系统的崩溃影响其他系统,且整个系统的可拓展性也大大加强。
异步:发送方的消息推入了中间件,这条消息可以被所有相关的接收方看到,因此它们可以同时开始处理,这种串联的结构的时间消耗比其他的串行结构小得多。
削峰:在高并发环境下,短时间的大量请求会导致系统和数据库发生很多问题,所以需要对流量进行控制,通过消息队列设置每秒向消费者投递的消息数量,可以控制并发环境下的系统稳定性。
MQTT控制报文格式
MQTT控制报文结构 = 固定报头 + 可变报头 + 有效载荷
注意,所有的控制报文都有固定报头,但不一定有可变报头和有效载荷。
固定报头
固定报头共2个字节,一个字节占8位。第一个字节前4位是用于指定控制报文类型的标志位,后4位是控制报文类型;第二个字节是剩余长度。剩余长度表示当前报文剩余部分的字节数,包括可变报头和有效负载。
剩余长度 = 可变报头长度 + 有效载荷
控制报文类型
控制报文类型标志位
剩余长度
剩余长度(Remaining Length)表示当前报文剩余部分的字节数,包括可变报头和负载的数据。剩余长度不包括用于编码剩余长度字段本身的字节数。剩余长度字段使用一个变长度编码方案,对小于128的值它使用单字节编码。更大的值按下面的方式处理。低7位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个字节可以编码128个数值和一个延续位(continuation bit)。剩余长度字段最大4个字节。
可变报头
同样占2个字节,2个字节共同作用表示报文标识符,第一个字节表示报文标识符的最高有效字节MSB(most significant bit),第二个字节表示报文标识符的最低有效字节(least significant bit)。
有效载荷
CONNECT控制报文的有效载荷字段长度,首先要看 可变报头的标志 是否包含这些字段,如果包含的话,有效载荷就有:客户端标识符,遗嘱主题,遗嘱消息,用户名,密码。
具体控制报文
CONNECT控制报文
客户端与服务端建立连接后,客户端发送给服务端的第一个报文,一旦建立连接之后,CONNECT报文只会被发送一次。
表格最左边数据位,表示数据的第7位,最右端表示第0位
第1Bit:前4位,用来决定命令类型
后4位,保留
第2Bit:剩余长度,计算方法-放最后
剩余长度 = 可变报头长度 + 有效载荷
第3Bit:可变报头的开始
CONNECT控制报文的可变报头包含四个字段:协议名,协议级别,连接标志和保持连接。
协议名
3Bit到8Bit
协议级别
9Bit
协议级别的字段的值是0x04
连接标志
第10Bit:
第0位:保留位
第1位:清理会话
为0时,
为1时,客户端和服务端必须丢弃之前的任何会话并开始一个新会话
第2位:遗嘱标志
为1时,设置遗嘱
第3/4位:发布遗嘱使用的服务等级
如果遗嘱标志位为0,3/4必须为0
如果遗嘱标志位为1,Qos可等于0, 1, 2。
第5位:遗嘱保留
第6位:用户名,有用户名,标志位为1,否则为0
第7位:密码,有密码,标志位为1,否则为0
保持连接
11Bit到12Bit
保持连接是一个以秒为单位的时间间隔,16位(最大设定为16位全1,即18个小时),表示在客户端传输完成一个控制报文的时刻到发送下一个报文的时刻,两者之间允许空闲的最大时间间隔。
客户端负责保证 控制报文 发送的时间间隔不超过设定的保持连接的值,如果没有任何其他的控制报文可以发送,则达到时间间隔尽头,发送PINGREQ控制报文(*就是心跳请求控制报文,当然也可以不在时间间隔尽头发送,只是几乎所有的项目代码都设定为时间间隔尽头而已)
CONNACK确认连接请求
服务端发送CONNACK报文响应从客户端收到的CONNECT报文。服务端发送给客户端的第一个报文必须是CONNACK报文。CONNACK报文只有固定报头和可变报头。
如果客户端在设定的保持连接时间段内没有收到服务端的CONNACK报文,客户端应该关闭网络连接
前2Bit为固定报文
后2Bit为可变报头:由连接确认标志和连接返回码组成
第3Bit:第0位是当前会话标志位
如果服务端收到清理会话标志为 1 的连接,除了将 CONNACK 报文中的返回码设置为 0之外,还必须将 CONNACK 报文中的当前会话设置标志为 0
第4Bit:连接返回码
PUBLISH控制报文
PUBLISH控制报文是指客户端向服务端或者服务端向客户端传输一个应用消息。
前2 Bit 为固定报头
DUP:重发标志,如果该标志被置0,表示这是客户端或服务端第一次请求发送这个PUBLISH报文。如果该标志被置1,表示这可能是一个早前报文请求的重发。客户端或服务端请求重发一个PUBLISH报文时,必须将DUP标志置1
Qos等级为服务质量等级
3Bit到9Bit为可变报头
PUBLISH的可变报头包含主题名和报文标识符。
10Bit为有效载荷
有效载荷包含将被发布的应用消息。数据内容和格式是应用特定的 。
PUBLISH报文的接受者必须按照 PUBLISH报文中的Qos等级发送响应
PUBACK控制报文
PUBACK报文是对 Qos 1 等级的PUBLISH报文的响应。PUBACK没有有效载荷。前2Bit为固定报头,后2Bit为可变报头
PUBREC控制报文 → PUBREL控制报文 → PUBCOMP控制报文
这三个报文是对 Qos 2 等级的PUBLISH报文的响应,所有的回应控制报文都没有有效载荷。
PUBREC – 发布收到(QoS 2,第一步)
PUBREL – 发布释放(QoS 2,第二步)
PUBCOMP – 发布完成(QoS 2,第三步)
SUBSCRIBE控制报文
客户端向服务器发送SUBSCRIBE报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给与那些订阅匹配的主题,服务器发送PUBLISH报文给客户端。SUBSCRIBE报文为每个订阅指定了最大的Qos等级,服务端根据这个发送应用消息给客户端。
SUBSCRIBE控制报文固定报头的保留位必须为0010,服务端收到其他值均不合法,都会断开连接。
SUBSCRIBE报文的有效载荷包含了一个主题过滤器列表,它们表示客户端想要订阅的主题。
SUBSCRIBE报文的有效载荷必须包含至少一个主题过滤器和 Qos等级字段。
SUBACK控制报文
服务端发送SUBACK报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE报文。
SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授权的最大Qos等级。
5Bit是返回码,允许的返回码值:
0x00 - 最大 QoS 0
0x01 - 成功 – 最大 QoS 1
0x02 - 成功 – 最大 QoS 2
0x80 - Failure 失败
UNSUBSCRIBE控制报文
客户端发送 UNSUBSCRIBE报文给服务端,用于取消订阅主题。
UNSUBSCRIBE报文固定报头的保留位必须是0010,否则服务端判定其不合法。
UNSUBSCRIBE报文的有效载荷包含客户端想要取消订阅的主题过滤器列表。
UNSUBACK控制报文
服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文,其没有有效载荷。
DISCONNECT控制报文
DISCONNECT报文是客户端发送给服务端的最后一个控制报文,表示客户端正常断开连接。该报文没有可变报头和有效负载。
部署mqtt服务端
Windows部署mosquitto
Mosquitto是一款实现了消息推送协议MQTT 3.1的开源消息代理软件,提供轻量级的、支持可订阅/可发布的消息推送模式,是设备与设备之间的短消息通信变得简单,广泛应用于低功耗传感器、手机(app消息推送是场景之一)、嵌入式电脑、微型控制器等移动设备。
- 下载地址:https://mosquitto.org/download/
- 安装后,程序介绍
mosquitto:代理器主程序
mosquitto.conf:配置文件【路径:/etc/mosquitto】
mosquitto_passwd:用户密码管理工具
mosquitto_pub:用于发布消息的命令行客户端
mosquitto_sub:用于订阅消息的命令行客户端
mqtt:MQTT的后台进程
libmosquitto:客户端编译的库文件
本地使用 Mosquitto
默认按照好的mosquitto只能在本地使用。
- 修改配置文件mosquitto.conf
如:
allow_anonymous true //表示允许匿名登录
listener 1883 0.0.0.0 //配置端口和ip地址
max_connections -1 //连接数不限制
或者:
allow_anonymous false
listener 1883 0.0.0.0
password_file E:\code\mqtt\windows\server\mosquitto\pwfile.example
- 本地启动mqtt服务
在cmd,输入.\mosquitto.exe
- 从本地mqtt服务器订阅消息
- 本地发送消息到本地mqtt服务器
在新的cmd,输入发布命令,如
远程使用 Mosquitto
将服务装在远程计算机上,在本地发送mqtt消息
正常用的话,mqtt客户端和mqtt服务器都不在一台计算机上。
默认安装的 mosquitto,是不支持从本地mqtt客户端访问远程mqtt客户端的。
- 修改配置文件mosquitto.conf
allow_anonymous true //表示允许匿名登录
listener 19999 10.4.22.37 //配置端口和ip地址
max_connections -1 //连接数不限制
- 启动Mosquitto
使用管理员权限打开cmd, 运行.\mosquitto.exe -c .\mosquitto.conf
- 添加用户
.\mosquitto_passwd.exe .\pwfile.example user1
- 订阅消息
在新的cmd,输入订阅命令,如
.\mosquitto_sub.exe -h 10.4.22.37 -p 19999 -u user1 -P user1 -t "test/topic" -v
- 发布消息
在新的cmd,输入发布命令,如
.\mosquitto_pub.exe -h 10.4.22.37 -p 19999 -u user1 -P user1 -t "test/topic" -m "hello"
常用指令:
- 开启Mosquitto服务器
在Powershell,输入./mosquitto.exe -p 1883,表示就开启了MQTT服务,监听的地址是127.0.0.1,端口是1883;
-c 指定配置文件路径
-p 指定端口号,不推荐跟-c配置文件指定端口号一起使用
- mosquitto_pub命令
-t 指定消息所发布到哪个主题。
-m 从命令行发送一条消息,-m后面跟发送的消息内容。
如: .\mosquitto_pub.exe -t "test/topic" -m "1234"
- mosquitto_sub
-h 说明所连接到的域名,默认是localhost
-p 说明客户端连接到的端口,默认是1883
-u 指定用户名用于代理认证。
-P 指定密码用于代理认证,使用此选项时必须有有效的用户名。
-t 指定订阅的消息主题,允许同时订阅到多个主题
-v, 冗长地打印收到的消息。若指定该选项,打印消息时前面会打印主题名——“主题 消息内容”,否则,只打印消息内容
如: .\mosquitto_sub.exe -t "test/topic" -v
.\mosquitto_sub.exe -h 10.4.22.37 -p 19999 -u user1 -P user1 -t "test/topic" -v
- mosquitto_passwd
mosquitto_passwd [-c | -D] passwordfile username
-c: 创建一个新的password文件,如果文件已经存在,会被新文件替换
-D: 从password文件删除一个指定的用户
如:.\mosquitto_passwd.exe .\pwfile.example user1
- mqtt服务器订阅消息
打开新的cmd,输入订阅命令,如
Linux部署Mosquitto
- 更新包列表
sudo apt update
- 安装Mosquitto
sudo apt install mosquitto
- 安装客户端
sudo apt-get install mosquitto-clients
- 安装完成后查看服务器状态
sudo service mosquitto start #启动服务器
sudo service mosquitto stop #停止服务器
sudo service mosquitto restart #重启服务器
sudo service mosquitto status #查看服务器当前状态
sudo systemctl enable mosquitto.service #设置开机启动
- 允许匿名用户发布和订阅 可选
编辑配置文件 /etc/mosquitto/mosquitto.conf,在文件加上allow_anonymous true
/etc/mosquitto/mosquitto.conf
- 添加账户及密码: 可选
编辑配置文件 /etc/mosquitto/mosquitto.conf,在文件加上password_file /etc/mosquitto/passwd passwd表示文件,需要新建一个passwd文件,并且添加用户
sudo mosquitto_passwd /etc/mosquitto/passwd username
- 订阅
mosquitto_sub -h localhost -t "test"
- 发布
mosquitto_pub -h localhost -t "test" -m "Hello MQTT"
mqtt客户端
编译qtmqtt
Qt开发MQTT程序有两种方式,一个是Qt官方提供的基于MQTT的封装,一个是第三方(EMQ)开发的用于Qt调用MQTT的接口,二者使用方法大同小异,并且均提供了源码。
Qt官方提供的基于MQTT的封装
源码地址:https://github.com/qt/qtmqtt
一般是qt使用哪个版本,mqtt就使用哪个版本。
1.先下载安装Perl (解决编译qtmqtt报perl错误)
下载地址:https://www.perl.org/get.html
ActivePerl_x64_5.24.1.2402.exe
2.下载安装qtmqtt
下载地址:https://github.com/qt/qtmqtt
这里下载的版本是
使用QT5.15.2 MSVC2019 64Bit,打开qmqtt.pro,然后点击构建src即可生成相应的库文件.
这里现在的源码直接编译报错,原因是引用的头文件路径不对。需要改,改用(EMQ)开发
第三方(EMQ)开发的用于Qt调用MQTT的接口
下载地址:
- 先下载安装Perl (解决编译qtmqtt报perl错误)
下载地址:https://www.perl.org/get.html
ActivePerl_x64_5.24.1.2402.exe
2.下载安装qtmqtt
下载地址:https://github.com/emqx/qmqtt
这里下载的版本是1.0.3
使用QT5.15.2 MSVC2019 64Bit,打开qmqtt.pro,然后点击构建src即可生成相应的库文件.
常用函数:
QMQTT::Client类
函数:
void connectToHost(); 连接服务端,连接成功会触发connected信号
void disconnectFromHost(); 断开连接服务端,断开成功会触发disconnected信号
void subscribe(const QString& topic, const quint8 qos = 0);订阅,参数为订阅主题,字符串的形式,订阅成功会触发subscribed信号
void unsubscribe(const QString& topic); 取消订阅,取消订阅成功会触发unsubscribed信号
quint16 publish(const QMQTT::Message& message); 发布,参数为 QMQTT::Message,包括主题和内容,发布成功会触发published信号
信号:
void connected(); 连接服务端成功,会触发该信号
void disconnected();断开连接服务端会触发该信号
void subscribed(const QString& topic, const quint8 qos = 0); 订阅成功会触发该信号
void received(const QMQTT::Message& message); 有数据接收,会触发该信号,在该信号对应的槽函数进行数据处理。
void published(const QMQTT::Message& message, quint16 msgid = 0); 发布成功,触发该信号
Connect using TCP:
#include "qmqtt.h"
QMQTT::Client *client = new QMQTT::Client(QHostAddress::LocalHost, 1883);
client->setClientId("clientId");
client->setUsername("user");
client->setPassword("password");
client->connectToHost();
Connect using WebSockets:
QMQTT::Client *client = new QMQTT::Client("ws://www.example.com/mqtt", "<origin>", QWebSocketProtocol::VersionLatest);
client->setClientId("clientId");
client->setUsername("user");
client->setPassword("password");
client->connectToHost();
创建mqtt客户端工程
- 新建一个Qt工程mqttclient。
在mqttclient目录下新建mqtt目录,mqtt目录下新建include和lib文件夹。
- 然后将刚刚编译的源码生成目录下的lib文件夹中以2个lib文件拷贝到mqttClient\mqtt\lib目录下。
在qtmqtt源码目录下(src\mqtt)的所有.h头文件拷贝到\mqttClient\mqtt\include目录。
- pro 文件增加
QT += network
引用库文件和include文件。
INCLUDEPATH += $$PWD/mqtt/include
CONFIG(debug,debug|release){
LIBS+= -L$$PWD/mqtt/lib -lQt5Qmqttd
}
CONFIG(release,debug|release){
LIBS+= -L$$PWD/mqtt/lib -lQt5Qmqtt
}
源码
#ifndef MQTTCLIENT_H#define MQTTCLIENT_H#include <QObject>#include "qmqtt.h"class mqttClient : public QObject{Q_OBJECTpublic:explicit mqttClient(QObject *parent = nullptr);static mqttClient& getInstance();void init(QString sIp, quint16 nPort);signals:void sig_result_connect();void sig_result_disconnect();void sig_result_dataReceived(QMQTT::Message message);public slots:void slot_connect();void slot_disconnect();void slot_publish(QString sTopic, QString sPayload);void slot_subscribe(QString sTopic);void slot_unSubscribe(QString sTopic);void on_dataReceived(QMQTT::Message message);void slot_result_subscribed(QString topic,quint8 qos);void slot_result_publish(QMQTT::Message message, quint16 msgid);private:QMQTT::Client *m_pClient;};#endif // MQTTCLIENT_H#include "mqttclient.h"mqttClient::mqttClient(QObject *parent): QObject{parent}{m_pClient = nullptr;}mqttClient &mqttClient::getInstance(){static mqttClient s_obj;return s_obj;}void mqttClient::init(QString sIp, quint16 nPort){if(m_pClient){delete m_pClient;m_pClient = nullptr;}m_pClient = new QMQTT::Client(QHostAddress(sIp), nPort);if(m_pClient){qDebug() << "m_pClient Create Success.";// m_pClient->setClientId();// m_pClient->setUsername();// m_pClient->setPassword();m_pClient->setKeepAlive(60);connect(m_pClient, &QMQTT::Client::connected, this, &mqttClient::sig_result_connect);connect(m_pClient, &QMQTT::Client::disconnected, this, &mqttClient::sig_result_disconnect);connect(m_pClient, &QMQTT::Client::subscribed, this, &mqttClient::slot_result_subscribed);connect(m_pClient, &QMQTT::Client::published, this, &mqttClient::slot_result_publish);connect(m_pClient, SIGNAL(received(QMQTT::Message)), this, SLOT(on_dataReceived(QMQTT::Message)));}}void mqttClient::slot_connect(){if(m_pClient){if(!m_pClient->isConnectedToHost()){m_pClient->connectToHost();}}}void mqttClient::slot_disconnect(){if(m_pClient){if(m_pClient->isConnectedToHost()){m_pClient->disconnectFromHost();}}}void mqttClient::slot_publish(QString sTopic, QString sPayload){QMQTT::Message message(136, sTopic, sPayload.toUtf8(),1);if(m_pClient){int n = 0;while(n<10){m_pClient->publish(message);n++;}}}void mqttClient::slot_subscribe(QString sTopic){if(m_pClient)m_pClient->subscribe(sTopic);}void mqttClient::slot_unSubscribe(QString sTopic){if(m_pClient)m_pClient->unsubscribe(sTopic);}void mqttClient::on_dataReceived(QMQTT::Message message){qDebug() << "on_dataReceived";emit sig_result_dataReceived(message);}void mqttClient::slot_result_subscribed(QString topic,quint8 qos){qDebug() << topic << ":" << qos;}void mqttClient::slot_result_publish(QMQTT::Message message, quint16 msgid){qDebug() << message.topic() << ":" << message.payload();}#ifndef WIDGET_H#define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACEnamespace Ui { class Widget; }QT_END_NAMESPACE#include "mqttclient.h"class Widget : public QWidget{Q_OBJECTpublic:Widget(QWidget *parent = nullptr);~Widget();signals:void sig_btn_connect();void sig_btn_disconnect();void sig_btn_publish(QString sTopic, QString sPayload);void sig_btn_subscribe(QString sTopic);void sig_btn_unSubscribe(QString sTopic);private slots:void on_pushButton_Connect_clicked();void on_pushButton_Disconnect_clicked();void on_pushButton_subcribe_clicked();void on_pushButton_unsubcrible_clicked();void on_pushButton_publish_clicked();void slot_result_connect();void slot_result_dataReceived(QMQTT::Message message);void slot_result_disconnect();private:Ui::Widget *ui;};#endif // WIDGET_H#include "widget.h"#include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget){ui->setupUi(this);ui->lineEdit_IP->setText("127.0.0.1");ui->lineEdit_Port->setText("1883");connect(this, SIGNAL(sig_btn_connect()),&mqttClient::getInstance(), SLOT(slot_connect()));connect(this, SIGNAL(sig_btn_disconnect()), &mqttClient::getInstance(), SLOT(slot_disconnect()));connect(this, SIGNAL(sig_btn_publish(QString, QString)),&mqttClient::getInstance(), SLOT(slot_publish(QString, QString)));connect(this, SIGNAL(sig_btn_subscribe(QString)), &mqttClient::getInstance(), SLOT(slot_subscribe(QString)));connect(this, SIGNAL(sig_btn_unSubscribe(QString)), &mqttClient::getInstance(), SLOT(slot_unSubscribe(QString)));connect(&mqttClient::getInstance(), SIGNAL(sig_result_connect()),this, SLOT(slot_result_connect()));connect(&mqttClient::getInstance(), SIGNAL(sig_result_disconnect()),this, SLOT(slot_result_disconnect()));connect(&mqttClient::getInstance(), SIGNAL(sig_result_dataReceived(QMQTT::Message)),this, SLOT(slot_result_dataReceived(QMQTT::Message)));}Widget::~Widget(){delete ui;}void Widget::on_pushButton_Connect_clicked(){QString sIp = ui->lineEdit_IP->text().trimmed();quint16 nPort = (quint16)ui->lineEdit_Port->text().toInt();mqttClient::getInstance().init(sIp, nPort);emit sig_btn_connect();}void Widget::slot_result_connect(){ui->label_status->setText("连接服务器成功");}void Widget::slot_result_dataReceived(QMQTT::Message message){QString sMes = QString(message.topic()) + ":" + message.payload();ui->textEdit_revcMsg->append(sMes);}void Widget::slot_result_disconnect(){ui->label_status->setText("断开连接服务器");}void Widget::on_pushButton_subcribe_clicked(){QString sTopic = ui->lineEdit_sub->text();emit sig_btn_subscribe(sTopic);}void Widget::on_pushButton_unsubcrible_clicked(){QString sTopic = ui->lineEdit_sub->text();emit sig_btn_unSubscribe(sTopic);}void Widget::on_pushButton_publish_clicked(){QString sTopic = ui->lineEdit_topic->text();QString sPayload = ui->textEdit_Msg->toPlainText();emit sig_btn_publish(sTopic, sPayload);}void Widget::on_pushButton_Disconnect_clicked(){emit sig_btn_disconnect();}