一,MQTT 及其在物联网中的应用
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,设计用于低带宽、延迟高、不稳定的网络环境,特别适合物联网(IoT)应用。它采用了发布/订阅(Pub/Sub)模型,以简化设备之间的消息交换,是物联网领域广泛采用的通信协议之一。
MQTT 的核心概念:
- Broker(消息代理):MQTT Broker 是 MQTT 通信的核心,负责接收、存储并分发消息。所有客户端通过 Broker 进行通信,客户端之间不会直接相互通信。
- Publisher(发布者):客户端可以作为发布者,向某个主题(Topic)发布消息。
- Subscriber(订阅者):客户端可以订阅一个或多个主题,Broker 会将对应主题的消息推送给订阅者。
- Topic(主题):消息通过主题组织,Publisher 发布消息时会指定主题,Subscriber 订阅相应主题来接收消息。
- QoS(服务质量):MQTT 提供三种消息传递服务质量:
QoS 0:消息最多传递一次,不保证消息会被成功接收。
QoS 1:消息至少传递一次,保证消息至少会被接收到一次。
QoS 2:消息传递一次且仅一次,保证消息不会重复或丢失。
MQTT 专为物联网(IoT)设备和低带宽、高延迟或不可靠的网络设计。它的主要特点包括:
- 轻量级: 协议简单,消息开销小,适合资源受限的设备。
- 发布/订阅模式: 允许一对多的消息分发和应用程序解耦。
- 可靠性: 提供三种服务质量级别(QoS),确保消息传递。
- 安全性: 支持TLS加密和用户名/密码认证。
- 保留消息: 可以存储最后一条消息,新订阅者可立即获得最新状态。
由于其轻量级的特性,MQTT 广泛应用于物联网中,适用于以下场景:
- 远程监控与管理:通过 MQTT 协议,物联网设备(如传感器、智能家居设备等)可以定期向服务器上传数据,实现远程监控。服务器也可以通过 MQTT 向设备发送控制命令,进行远程管理。
- 实时数据传输:MQTT 支持低延迟的消息传输,适合需要实时数据更新的场景,如工业自动化、智能电网、车联网等。
- 低功耗设备通信:由于 MQTT 协议的低带宽和低开销,适合电池供电的物联网设备,通过节省功耗延长设备的使用寿命。
- 智能家居:MQTT 协议广泛应用于智能家居系统中,例如控制灯光、恒温器、安防设备等,用户可以通过手机或其他终端远程控制家中的设备。
- 车联网(V2X):在车联网中,MQTT 可以用于车辆与后台服务器之间的数据传输,如状态监控、远程控制、紧急事件处理等。
在典型的物联网架构中,MQTT 作为一种协议桥接了物联网设备与云平台之间的数据传输,架构通常包括以下主要部分:
- 物联网设备(IoT Devices):如传感器、智能家居设备等,设备通过 MQTT 协议将数据发布到 MQTT Broker,也可以订阅主题接收命令。
- MQTT Broker:位于物联网系统的核心位置,负责管理客户端的连接和消息的传输。所有设备和服务器的通信都通过 Broker 进行,Broker 根据主题分发消息。
- 云平台(Cloud Platform):云平台通常订阅设备的数据主题,接收到设备上传的数据进行存储和处理。云平台也可以通过 Broker 向设备发布控制命令。
- 用户终端(User Interface):通过手机应用、网页等形式,用户可以远程查看设备状态和数据,并发送控制命令,控制物联网设备的运行。
+--------------------+ +--------------------+| | | || User Interface | | Cloud Platform || (Mobile) | |(Web App, Analytics)|+--------------------+ +--------------------+^ ^| || Subscribe/Publish || |v v+----------------------------------------------+| MQTT Broker || (Mosquitto, EMQX, HiveMQ, etc.) |+----------------------------------------------+^ ^| |Publish| |Subscribe| |v v+--------------------+ +--------------------+| IoT Device 1 | | IoT Device 2 || (Sensors, Actuators)| | (Sensors, Actuators)|+--------------------+ +--------------------+
- 物联网设备(如 IoT Device 1 和 IoT Device 2)通过 MQTT 协议连接到 Broker,并且分别发布数据或订阅控制命令的主题。
- MQTT Broker 是中心节点,负责接收物联网设备发布的消息,并将消息推送给订阅者(如云平台或用户终端)。
- 云平台 订阅设备的数据,并通过处理这些数据实现监控、分析或控制操作。云平台也可以通过发布控制命令,控制物联网设备的行为。
- 用户终端(如移动应用或 Web 界面)允许用户查看设备的状态和数据,并发送控制命令到云平台,云平台再通过 MQTT Broker 传递给相应的物联网设备。
二,在 Windows 10 中安装与测试 MQTT broker
-
下载 Mosquitto:
- 访问 https://mosquitto.org/download/
- 下载最新版本的 Windows 安装程序 (64-bit) -
安装 Mosquitto:
- 运行下载的安装程序
- 按照安装向导的提示进行操作 -
添加 添加 Mosquitto 安装路径到系统路径。
-
配置 Mosquitto:
- 打开记事本,以管理员身份运行
- 打开安装路径的文件: mosquitto.conf
- 添加或修改以下行:listener 1883 # MQTT默认端口allow_anonymous true # 允许匿名连接(仅用于测试)persistence true # 开启持久化,重启后保留消息persistence_location G:\mosquitto\data # 持久化文件存储路径
-
启动 Mosquitto 服务:
- 打开命令提示符(以管理员身份运行)
- 输入以下命令:
net start mosquitto
-
设置 Mosquitto 为自动启动:
- 打开 “Services” (服务)
- 找到 “Mosquitto Broker” 服务
- 右击并选择 “Properties”
- 将 “Startup type” 设置为 “Automatic”
- 点击 “Apply” 然后 “OK” -
测试 Mosquitto:
- 打开两个命令提示符窗口
- 在第一个窗口中,输入订阅命令: mosquitto_sub -t test/topic
- 在第二个窗口中,输入发布命令: mosquitto_pub -t test/topic -m “Hello MQTT”
- 如果在第一个窗口中看到 “Hello MQTT”,则说明 Mosquitto 运行正常 -
安全配置(生产环境)——启用用户认证:
- 添加用户: mosquitto_passwd -c G:\mosquitto\passwd username
- 在配置文件中启用认证: password_file G:\mosquitto\password.txt -
监控和日志
- 查看Mosquitto日志文件: G:\mosquitto\log\mosquitto.log
- 使用工具如Prometheus+Grafana对Mosquitto进行监控
三,物联网设备通过 MQTT 发布与订阅数据
(一)模拟
由于手边暂时没有设备,这里就用python写一个程序来模拟。
1,首先开一个命令行窗口启动 MQTT:
G:\mosquitto>mosquitto -v
1725282189: mosquitto version 2.0.18 starting
1725282189: Using default config.
1725282189: Starting in local only mode. Connections will only be possible from clients running on this machine.
1725282189: Create a configuration file which defines a listener to allow remote access.
1725282189: For more details see https://mosquitto.org/documentation/authentication-methods/
1725282189: Opening ipv4 listen socket on port 1883.
1725282189: Opening ipv6 listen socket on port 1883.
1725282189: mosquitto version 2.0.18 running
- 此命令会启动 Mosquitto 并以详细模式输出日志。
2,编写模拟物联网设备的 Python 客户端。
用 paho-mqtt Python 库,它是一个流行的 MQTT 客户端库,可以方便地发布和订阅消息。
import json
import random
import timeimport paho.mqtt.client as mqtt# MQTT Broker 的地址和端口
broker_address = "localhost" # 本地服务器
broker_port = 1883# 设备 ID
device_id = "device_001"
data_topic = f"devices/{device_id}/data"
control_topic = f"devices/{device_id}/control"# MQTT 客户端
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, device_id)# 连接回调函数
def on_connect(client, userdata, flags, rc):print(f"Connected with result code {rc}")# 连接成功后,订阅控制命令主题client.subscribe(control_topic)# 消息回调函数
def on_message(client, userdata, msg):print(f"Message received from {msg.topic}: {msg.payload.decode()}")# 连接 MQTT 服务器
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, 60)# 启动 MQTT 客户端的网络循环
client.loop_start()# 模拟设备定期发送数据
try:while True:# 模拟传感器数据sensor_data = {"temperature": round(random.uniform(20.0, 30.0), 2),"humidity": round(random.uniform(30.0, 70.0), 2),"timestamp": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}# 将数据发布到 data_topicclient.publish(data_topic, json.dumps(sensor_data))print(f"Published data: {sensor_data}")# 每隔 5 秒发送一次数据time.sleep(5)except KeyboardInterrupt:print("Simulation stopped.")# 停止 MQTT 客户端
client.loop_stop()
client.disconnect()
- 运行 Python 脚本,模拟设备会定期向 devices/device_001/data 主题发布传感器数据
3,测试设备与 MQTT Broker 的通信。
开一个命令行窗口监听设备数据:
mosquitto_sub -h localhost -t "devices/device_001/data"
开一个命令行窗口发送控制命令:
mosquitto_pub -h localhost -t "devices/device_001/control" -m '{"command": "turn_on", "target": "fan"}'
(二)使用 ESP8266 + DHT11 + MicroPython
使用ESP8266外接一个DHT11温湿度传感器,用MicroPython编程,让ESP8266采集温湿度数据,并通过MQTT发送到部署在电脑上的MQTTbroker。
1,硬件:
- ESP8266 板子(如 NodeMCU 或 Wemos D1 Mini)
- DHT11 温湿度传感器
2,接线:
- DHT11 VCC → ESP8266 3.3V
- DHT11 GND → ESP8266 GND
- DHT11 数据引脚 → ESP8266 GPIO 引脚(如 GPIO 2/D4)
3,下载并将 MicroPython 固件刷入 ESP8266 板子上:
- 安装 esptool.py:pip install esptool
- 刷入固件:
esptool.py --port /dev/ttyUSB0 erase_flash esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash --flash_size=detect 0 esp8266-<version>.bin
4,连接到 ESP8266:
通过串口工具(如 PuTTY 或 Thonny),连接到 ESP8266,使用 REPL 进行调试和执行 MicroPython 代码。
5,安装 MQTT 和 DHT11 驱动:
MicroPython 中已经有内置的 umqtt 和 dht 模块,用于 MQTT 通信和 DHT11 传感器数据采集。
6,编写 MicroPython 代码:
import network
import time
import dht
import machine
from umqtt.simple import MQTTClient# Wi-Fi 配置
SSID = 'your_ssid'
PASSWORD = 'your_password'# MQTT Broker 配置
MQTT_BROKER = '192.168.1.100' # 电脑上的 MQTT Broker IP
MQTT_TOPIC = 'home/temperature'
CLIENT_ID = 'esp8266_dht11'# 连接 Wi-Fi
def connect_wifi(ssid, password):wlan = network.WLAN(network.STA_IF)wlan.active(True)if not wlan.isconnected():print('Connecting to network...')wlan.connect(ssid, password)while not wlan.isconnected():passprint('Network connected:', wlan.ifconfig())# 初始化 DHT11 传感器
dht_pin = machine.Pin(2) # DHT11 数据引脚接到 GPIO2/D4
sensor = dht.DHT11(dht_pin)# 连接 Wi-Fi
connect_wifi(SSID, PASSWORD)# 连接 MQTT
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.connect()try:while True:# 采集数据sensor.measure()temperature = sensor.temperature()humidity = sensor.humidity()# 创建消息msg = f'{{"temperature": {temperature}, "humidity": {humidity}}}'print('Publishing:', msg)# 发布到 MQTT 服务器:消息会发布到 home/temperature 主题上。client.publish(MQTT_TOPIC, msg)# 每隔10秒发送一次数据time.sleep(10)except KeyboardInterrupt:print("Stopping...")client.disconnect()
- 连接到 Wi-Fi。
- 使用 DHT11 传感器采集温湿度数据。
- 将数据通过 MQTT 协议发送到本地的 MQTT Broker。
四,云平台通过 MQTT 订阅数据与发布命令
云平台要做的就是:
- 接收 MQTT 消息:Django 应用需要订阅 MQTT Broker 的温度数据。
- 分析数据并作出决策:当接收到的温度数据超过 50°C 时,发送命令打开风扇,否则关闭风扇。
- 通过 MQTT 发送控制命令:使用 Django 应用中的 MQTT 客户端发送控制命令(如 fan/on 或 fan/off)到 MQTT Broker,控制风扇。
1,接收温度数据并分析。
创建一个管理命令(management command)来启动 MQTT 客户端并订阅主题。
# myapp/
# management/
# commands/
# mqtt_subscriber.pyimport json
import paho.mqtt.client as mqtt
from django.core.management.base import BaseCommandMQTT_BROKER = 'localhost' # MQTT Broker 地址
TEMPERATURE_TOPIC = 'home/temperature' # 订阅的温度主题
FAN_CONTROL_TOPIC = 'home/fan/control' # 风扇控制命令主题
CLIENT_ID = 'django_subscriber'# 温度阈值
TEMP_THRESHOLD = 50 # 摄氏度# 定义 MQTT 客户端回调函数
def on_connect(client, userdata, flags, rc):print(f"Connected with result code {rc}")client.subscribe(TEMPERATURE_TOPIC)def on_message(client, userdata, msg):print(f"Message received from {msg.topic}: {msg.payload.decode()}")try:# 解析消息并提取温度数据data = json.loads(msg.payload.decode())temperature = data.get('temperature')# 检查温度是否超过阈值,并发送控制命令if temperature is not None:if temperature > TEMP_THRESHOLD:print("Temperature is above 50°C. Turning on the fan.")client.publish(FAN_CONTROL_TOPIC, 'on')else:print("Temperature is below 50°C. Turning off the fan.")client.publish(FAN_CONTROL_TOPIC, 'off')else:print("No temperature data found.")except Exception as e:print(f"Error processing message: {e}")class Command(BaseCommand):help = 'Start MQTT subscriber and control fan based on temperature'def handle(self, *args, **kwargs):# 创建 MQTT 客户端并设置回调client = mqtt.Client(CLIENT_ID)client.on_connect = on_connectclient.on_message = on_message# 连接 MQTT Brokerclient.connect(MQTT_BROKER, 1883, 60)# 开始 MQTT 客户端的循环try:client.loop_forever()except KeyboardInterrupt:client.disconnect()print("MQTT client disconnected.")
在项目根目录下运行以下命令启动 MQTT 订阅服务:
python manage.py mqtt_subscriber
2,ESP8266 接收风扇控制命令并控制风扇。
在 ESP8266 上编写代码,订阅 home/fan/control 主题,并根据接收到的消息来控制电机(模拟风扇)。
import machine
import network
import time
from umqtt.simple import MQTTClient# Wi-Fi 配置
SSID = 'your_ssid'
PASSWORD = 'your_password'# MQTT 配置
MQTT_BROKER = '192.168.1.100' # 电脑上的 MQTT Broker IP
FAN_CONTROL_TOPIC = 'home/fan/control'
CLIENT_ID = 'esp8266_fan_controller'# 定义 GPIO 引脚来控制电机(风扇)
fan_pin = machine.Pin(5, machine.Pin.OUT) # GPIO5/D1# 连接 Wi-Fi
def connect_wifi(ssid, password):wlan = network.WLAN(network.STA_IF)wlan.active(True)if not wlan.isconnected():print('Connecting to network...')wlan.connect(ssid, password)while not wlan.isconnected():passprint('Network connected:', wlan.ifconfig())# MQTT 回调函数
def on_message(topic, msg):print(f"Message received on topic {topic}: {msg}")if msg == b'on':fan_pin.on() # 打开风扇print("Fan turned on")elif msg == b'off':fan_pin.off() # 关闭风扇print("Fan turned off")# 连接 Wi-Fi
connect_wifi(SSID, PASSWORD)# 连接 MQTT Broker 并订阅主题
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.set_callback(on_message)
client.connect()
client.subscribe(FAN_CONTROL_TOPIC)try:while True:client.check_msg() # 检查是否有新消息time.sleep(1)except KeyboardInterrupt:print("Disconnecting from broker.")client.disconnect()