RT-Thread中使用Mqtt

环境:

开发板:Panduola(stm32L475)
KEIL5 开发环境
rtthread 4.0.3内核
使用ENV 配置Rtt


MQTT

1.MQTT介绍
客户端 Client

  • 使用MQTT的程序或设备。客户端总是通过网络连接到服务端。
  • 它可以发布应用消息给其它相关的客户端。
  • 订阅以请求接受相关的应用消息。
  • 取消订阅以移除接受应用消息的请求。
  • 从服务端断开连接。服务端 Server一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。

服务端

  • 接受来自客户端的网络连接。

  • 接受客户端发布的应用消息。

  • 处理客户端的订阅和取消订阅请求。

  • 转发应用消息给符合条件的已订阅客户端。

订阅 Subscription

  • 订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。

  • QoS0,At most once,至多一次;Sender 发送的一条消息,Receiver 最多能收到一次,如果发送失败,也就算了。

  • QoS1,At least once,至少一次;Sender 发送的一条消息,Receiver 至少能收到一次,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但Receiver 有可能会收到重复的消息

  • QoS2,Exactly once,确保只有一次。Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
    2.MQTT协议数据包结构

一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、有效载荷(payload)三部分构成。

(1) 固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
(3)有效载荷(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

操作:

添加AT指令部分,使用ESP8266连接网络:

RT-Thread online packages  --->IoT - internet of things  --->[*] AT DEVICE: RT-Thread AT component porting or samples for different device[*]   Espressif ESP8266  --->

在这里插入图片描述

1. 添加RTT组件包:

MQTT
在这里插入图片描述
CJSON
在这里插入图片描述
AHT10:使用AHT10还需要打开I2c的驱动部分,使用旧版本1.0的AHT驱动,可以避免使用Sensor的框架
在这里插入图片描述
启用I2c总线:
在这里插入图片描述

2.使用scons --target=mdk5生成mdk工程

在这里插入图片描述

3.使用MQtt客户端:

修改连接参数:

#define MQTT_URI “tcp://192.168.1.110:1883”
#define MQTT_USERNAME “panduola”
#define MQTT_PASSWORD “panduola”
#define MQTT_SUBTOPIC “/test/topic2”
#define MQTT_PUBTOPIC “/test/topic1”

可在rtconfig.h中修改wifi连接参数:
在这里插入图片描述

  1. 创建一个客户端:
    static MQTTClient client;

  2. 初始化客户端:


/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{/* 初始 condata 参数 */MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;static char cid[20] = {0};static int is_started = 0;if (is_started){return;}/* 配置 MQTT 文本参数 */{client.isconnected = 0;client.uri = MQTT_URI;/* 生成随机客户端 ID */rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());// rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);/* 配置连接参数 */memcpy(sup_pub_topic, MQTT_SUBTOPIC, sizeof(MQTT_SUBTOPIC));memcpy(&client.condata, &condata, sizeof(condata));client.condata.clientID.cstring = cid;client.condata.keepAliveInterval = 60;client.condata.cleansession = 1;client.condata.username.cstring = MQTT_USERNAME;client.condata.password.cstring = MQTT_PASSWORD;/* 配置 mqtt 参数 */client.condata.willFlag = 0;client.condata.will.qos = 1;client.condata.will.retained = 0;client.condata.will.topicName.cstring = sup_pub_topic;client.buf_size = client.readbuf_size = 1024;client.buf = malloc(client.buf_size);client.readbuf = malloc(client.readbuf_size);if (!(client.buf && client.readbuf)){LOG_E("no memory for MQTT client buffer!");goto _exit;}/* 设置事件回调 */client.connect_callback = mqtt_connect_callback;client.online_callback = mqtt_online_callback;client.offline_callback = mqtt_offline_callback;/* 设置要订阅的 topic 和 topic 对应的回调函数 */client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;client.messageHandlers[0].callback = mqtt_sub_callback;client.messageHandlers[0].qos = QOS1;/* 设置默认订阅回调函数 */client.defaultMessageHandler = mqtt_sub_default_callback;}/* 启动 MQTT 客户端 */LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);paho_mqtt_start(&client);is_started = 1;_exit:return;
}
  1. 设置收到信息后的回调:
static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{printf("Receive message ,Topic: %.*s,payload:\n", msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data);// 解析JSON消息cJSON *json_obj = cJSON_Parse((const char *)msg_data->message->payload);if (json_obj == NULL){printf("Failed to parse JSON message\n");return;}cJSON *object = RT_NULL;object = cJSON_GetObjectItem(json_obj, "location");if (object != NULL){printf("Location: %s\n", object->valuestring);}object = cJSON_GetObjectItem(json_obj, "led");if (object != NULL){if(object->type == cJSON_True)printf("led: ture\n");elseprintf("led: false\n");}cJSON_Delete(json_obj);return;
}
  1. 发布消息:

/* MQTT 消息发布函数 */
static void mq_publish(const char *send_str)
{MQTTMessage message;const char *msg_str = send_str;const char *topic = MQTT_PUBTOPIC;message.qos = QOS1;message.retained = 0;message.payload = (void *)msg_str;message.payloadlen = strlen(message.payload);MQTTPublish(&client, topic, &message);return;
}

常用的cjson函数

void cJSON_Delete(cJSON *c)

删除 cJSON 指针,释放空间

char *cJSON_Print(cJSON *item)

cJSON数据解析成JSON字符串,并会在堆中开辟一块char *的内存空间,存放JSON字符串。
函数成功后会返回一个char *指针,该指针指向位于堆中JSON字符串。

cJSON *cJSON_Parse(const char *value)

将一个JSON数据包,按照cJSON结构体的结构序列化整个数据包,并在堆中开辟一块内存存储cJSON结构体
返回值:成功返回一个指向内存块中的cJSON的指针,失败返回NULL

cJSON *cJSON_GetObjectItem(cJSON *object,const char *string)

获取JSON字符串字段值,成功返回一个指向cJSON类型的结构体指针,失败返回NULL

常用的mqtt的API

/*** This function send an MQTT subscribe packet and wait for suback before returning.** @param client the pointer of MQTT context structure* @param qos MQTT Qos type, only support QOS1* @param topic topic filter name* @param callback the pointer of subscribe topic receive data function** @return the error code, 0 on start successfully.*//*订阅主题,最后一个形参是void (*subscribe_cb)(MQTTClient *client, MessageData *data);类型的函数指针*/
int paho_mqtt_subscribe(MQTTClient *client, enum QoS qos, const char *topic, subscribe_cb callback);
/*** This function publish message to specified mqtt topic.* @note it will be discarded, recommend to use "paho_mqtt_publish"** @param c the pointer of MQTT context structure* @param topicFilter topic filter name* @param message the pointer of MQTTMessage structure** @return the error code, 0 on subscribe successfully.*/
int MQTTPublish(MQTTClient *client, const char *topic, MQTTMessage *message);

source:

#include <rtthread.h>
#include <rtdevice.h>
#include <board.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include "paho_mqtt.h"
// #include "wifi_config.h"#include "aht10.h"
#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>#include <cJSON.h>/*** MQTT URI farmat:* domain mode* tcp://iot.eclipse.org:1883** ipv4 mode* tcp://192.168.10.1:1883* ssl://192.168.10.1:1884** ipv6 mode* tcp://[fe80::20c:29ff:fe9a:a07e]:1883* ssl://[fe80::20c:29ff:fe9a:a07e]:1884*/
#define MQTT_URI "tcp://192.168.1.110:1883"
#define MQTT_USERNAME "panduola"
#define MQTT_PASSWORD "panduola"
#define MQTT_SUBTOPIC "/test/topic2"
#define MQTT_PUBTOPIC "/test/topic1"#define LED_PIN GET_PIN(E, 8)/* define MQTT client context */
static MQTTClient client;
static void mq_start(void);
static void mq_publish(const char *send_str);char sup_pub_topic[48] = MQTT_PUBTOPIC;
char sup_sub_topic[48] = MQTT_SUBTOPIC;static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{printf("Receive message ,Topic: %.*s,payload:\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);// 解析JSON消息cJSON *json_obj = cJSON_Parse((const char *)msg_data->message->payload);if (json_obj == NULL){printf("Failed to parse JSON message\n");return;}cJSON *object = RT_NULL;object = cJSON_GetObjectItem(json_obj, "location");if (object != NULL){printf("Location: %s\n", object->valuestring);}object = cJSON_GetObjectItem(json_obj, "led");if (object != NULL){if (object->type == cJSON_True){printf("led: ture\n");rt_pin_write(LED_PIN, PIN_LOW);}else{printf("led: false\n");rt_pin_write(LED_PIN, PIN_HIGH);}}cJSON_Delete(json_obj);return;
}static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';LOG_D("mqtt sub default callback: %.*s %.*s",msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data,msg_data->message->payloadlen,(char *)msg_data->message->payload);return;
}static void mqtt_connect_callback(MQTTClient *c)
{LOG_I("Start to connect mqtt server");
}static void mqtt_online_callback(MQTTClient *c)
{LOG_D("Connect mqtt server success");LOG_D("Publish message: Hello,RT-Thread! to topic: %s", sup_pub_topic);mq_publish("Hello,RT-Thread!");
}static void mqtt_offline_callback(MQTTClient *c)
{LOG_I("Disconnect from mqtt server");
}/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{/* 初始 condata 参数 */MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;static char cid[20] = {0};static int is_started = 0;if (is_started){return;}/* 配置 MQTT 文本参数 */{client.isconnected = 0;client.uri = MQTT_URI;/* 生成随机客户端 ID */rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());// rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);/* 配置连接参数 */memcpy(sup_pub_topic, MQTT_SUBTOPIC, sizeof(MQTT_SUBTOPIC));memcpy(&client.condata, &condata, sizeof(condata));client.condata.clientID.cstring = cid;client.condata.keepAliveInterval = 60;client.condata.cleansession = 1;client.condata.username.cstring = MQTT_USERNAME;client.condata.password.cstring = MQTT_PASSWORD;/* 配置 mqtt 参数 */client.condata.willFlag = 0;client.condata.will.qos = 1;client.condata.will.retained = 0;client.condata.will.topicName.cstring = sup_pub_topic;client.buf_size = client.readbuf_size = 1024;client.buf = malloc(client.buf_size);client.readbuf = malloc(client.readbuf_size);if (!(client.buf && client.readbuf)){LOG_E("no memory for MQTT client buffer!");goto _exit;}/* 设置事件回调 */client.connect_callback = mqtt_connect_callback;client.online_callback = mqtt_online_callback;client.offline_callback = mqtt_offline_callback;/* 设置要订阅的 topic 和 topic 对应的回调函数 */client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;client.messageHandlers[0].callback = mqtt_sub_callback;client.messageHandlers[0].qos = QOS1;/* 设置默认订阅回调函数 */client.defaultMessageHandler = mqtt_sub_default_callback;}/* 启动 MQTT 客户端 */LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);paho_mqtt_start(&client);is_started = 1;_exit:return;
}/* MQTT 消息发布函数 */
static void mq_publish(const char *send_str)
{MQTTMessage message;const char *msg_str = send_str;const char *topic = MQTT_PUBTOPIC;message.qos = QOS1;message.retained = 0;message.payload = (void *)msg_str;message.payloadlen = strlen(message.payload);MQTTPublish(&client, topic, &message);return;
}rt_thread_t TH_Get_HU;
rt_thread_t Publish_value;
float humidity, temperature;void get_humi_temp(void *parameter)
{aht10_device_t dev;const char *i2c_bus_name = "i2c2";int count = 0;rt_thread_mdelay(2000);dev = aht10_init(i2c_bus_name);if (dev == RT_NULL){LOG_E(" The sensor initializes failure");}while (count++ < 100){humidity = aht10_read_humidity(dev);// LOG_D("humidity   : %d.%d %%", (int)humidity, (int)(humidity * 10) % 10);temperature = aht10_read_temperature(dev);// LOG_D("temperature: %d.%d", (int)temperature, (int)(temperature * 10) % 10);rt_thread_mdelay(1000);}
}void Publish_Date(void *parameter)
{char send_str[128];while (1){// sprintf(send_str, "{\"temperature\":%d.%d,\"humidity\":%d.%d}", (int)temperature, (int)(temperature * 10) % 10, (int)humidity, (int)(humidity * 10) % 10);sprintf(send_str, "{\"location\":\"10#A401\",\"led\":true,\"environment\":{\"temperature\":%d.%d,\"humidity\":%d.%d}}", (int)temperature, (int)(temperature * 10) % 10, (int)humidity, (int)(humidity * 10) % 10);mq_publish(send_str);rt_thread_mdelay(1000);}
}int main(void)
{mq_start();TH_Get_HU = rt_thread_create("get_humi_temp", get_humi_temp, RT_NULL, 1024, 20, 10);Publish_value = rt_thread_create("Publish_value", Publish_Date, RT_NULL, 1024, 20, 10);rt_pin_mode(LED_PIN, PIN_MODE_OUTPUT);rt_thread_startup(TH_Get_HU);rt_thread_mdelay(1000);rt_thread_startup(Publish_value);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/328220.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message

场景 想创建一个新的consumer去消费一个已经再使用的topic时&#xff0c;默认情况下会从topic中的第一条消息开始消费&#xff0c;大多数情况是需要从最新的消息开始。然后再使用CONSUME_FROM_LAST_OFFSET设置时并不会对新的consumer生效&#xff0c;它只是在停用consumer重新启…

Google I/O 2024:探索未来AI技术的无限可能

近日&#xff0c;Google I/O 2024大会圆满落幕&#xff0c;带给我们一场关于人工智能的盛宴。在这场大会上&#xff0c;Google推出了一系列令人激动的AI新功能和工具&#xff0c;让我们得以一窥未来的科技发展。今天&#xff0c;就让我来为大家总结一下这些亮点吧&#xff01; …

压力测试及常用的压测工具!

前言 压力测试是一种评估系统性能的方法&#xff0c;通过模拟大量用户同时访问系统或执行特定操作&#xff0c;以测试系统的负载能力和稳定性。 压力测试可以帮助发现系统在高负载情况下的性能瓶颈、错误或故障&#xff0c;从而提前进行优化和改进。在进行压力测试时&#xf…

吴恩达深度学习笔记:优化算法 (Optimization algorithms)2.7

目录 第二门课: 改善深层神经网络&#xff1a;超参数调试、正 则 化 以 及 优 化 (Improving Deep Neural Networks:Hyperparameter tuning, Regularization and Optimization)第二周&#xff1a;优化算法 (Optimization algorithms)2.7 RMSprop 第二门课: 改善深层神经网络&am…

「Python绘图」绘制同心圆

python 绘制同心圆 一、预期结果 二、核心代码 import turtle print("开始绘制同心圆") # 创建Turtle对象 pen turtle.Turtle() pen.shape("turtle") # 移动画笔到居中位置 pen.pensize(2) #设置外花边的大小 # 设置填充颜色 pen.fillcolor("green&…

【大数据】计算引擎MapReduce

目录 1.概述 1.1.前言 1.2.大数据要怎么计算&#xff1f; 1.3.什么是MapReduce&#xff1f; 2.架构 3.工作流程 4.shuffle 4.1.map过程 4.2.reduce过程 1.概述 1.1.前言 本文是作者大数据系列专栏的其中一篇&#xff0c;专栏地址&#xff1a; https://blog.csdn.ne…

NVM镜像源报错:Could not retrieve https://npm.taobao.org/mirrors/node/index.json.

NVM镜像源报错&#xff1a;Could not retrieve https://npm.taobao.org/mirrors/node/index.json. 淘宝前端node镜像源已更换 NVM安装教程&#xff1a;http://t.csdnimg.cn/dihmG 背景 笔者在安装版本切换工具NVM时&#xff0c;配置完镜像源后&#xff0c;在控制台输入&#x…

使用FFmpeg处理RTSP视频流并搭建RTMP服务器实现图片转直播全流程

目录 一、FFmpeg安装与配置二、搭建并配置Nginx RTMP服务器三、从RTSP视频流提取帧并保存为图片四、将图片序列转换为视频五、将视频推送为直播流六、将图片序列推送为直播流七、播放实时流 场景&#xff1a;如何通过FFmpeg工具链&#xff0c;从RTSP视频流中按秒抽取帧生成图片…

六西格玛管理培训对企业有哪些实际帮助?

当下&#xff0c;企业要想脱颖而出&#xff0c;不仅要有创新思维和敏锐的市场洞察力&#xff0c;更要有高效的管理体系和严谨的质量控制手段。而六西格玛管理培训正是这样一项能够帮助企业实现提质增效、提升竞争力的关键举措。那么&#xff0c;六西格玛管理培训对企业究竟有哪…

超级数据查看器 教程合集 整理版本 pdf格式 1-31集

点击下载 超级数据查看器 教程合集整理版本 pdf格式https://download.csdn.net/download/qq63889657/89311725?spm1001.2014.3001.5501

SQL Server共享功能目录显示灰色无法自行选择

SQL Server共享功能目录显示灰色无法自行调整 一、 将之前安装SQL Server卸载干净 二、 清空注册表 1. 打开注册表&#xff0c;winR&#xff0c;输入regedit 2. 注册表-》编辑-》查找&#xff0c;输入C:\Program Files\Microsoft SQL Server\ 3. 注册表-》编辑-》查找&#x…

量子计算机接入欧洲最快超算!芬兰加快混合架构算法开发

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 文丨浪味仙 排版丨沛贤 深度好文&#xff1a;1900字丨7分钟阅读 摘要&#xff1a;芬兰技术研究中心&#xff08;VTT&#xff09;与 CSC 展开合作&#xff0c;基于量子计算机超算架构进行算法开…

Git大文件无法直接push用git lfs track 上传大文件具体操作

Git 因为大文件push失败 回退到git add前用git lfs track单独添加大文件 以下work flow仅代表个人解决问题的办法&#xff0c;有优化流程的欢迎交流 回退到git add前 以下指令回退一个commit git reset --soft HEAD~1以下指令撤销所有git add操作&#xff0c;但不删除本地修…

泰盈科技IPO终止:客户集中度高,业绩未达目标,高管薪酬较高

近日&#xff0c;上海证券交易所披露的信息显示&#xff0c;泰盈科技集团股份有限公司&#xff08;下称“泰盈科技”&#xff09;及其保荐人中金公司撤回上市申请文件。因此&#xff0c;上海证券交易所决定终止对该公司首次公开发行股票并在主板上市的审核。 据贝多财经了解&am…

STM32 HAL TM1638读取24个按键

本文分享一下天微电子的另一款数码管按键驱动芯片TM1638的单片机C语言驱动程序。 笔者采用的MCU是STM32单片机&#xff0c;STM32CubeMX Keil5开发&#xff0c;使用了HAL库。 一、TM1638介绍 1、基础信息 TM1638属于一款LED驱动控制专用电路&#xff0c;其特性如下&#xf…

【文献阅读】李井林等2021ESG促企业绩效的机制研究——基于企业创新的视角

ESG促进企业绩效的机制 摘要 0.引言与文献综述 1.理论分析与研究假设 1.1企业ESG表现与企业绩效 假设1a&#xff1a;企业的环境表现对企业绩效存在正向影响效应。 假设1b&#xff1a;企业的社会表现对企业绩效存在正向影响效应。 假设1c&#xff1a;企业的公司治理表现对企业…

PCIE V3.0物理层协议学习笔记

一、说明 PCI-Express(peripheral component interconnect express)是一种高速串行计算机扩展总线标准&#xff0c;它原来的名称为“3GIO”&#xff0c;是由英特尔在2001年提出的&#xff0c;旨在替代旧的PCI&#xff0c;PCI-X和AGP总线标准。 PCIe属于高速串行点对点双通道高…

JVM---垃圾回收

目录 一、C/C的内存管理 二、Java的内存管理 三、垃圾回收的对比 四、Java的内存管理和自动垃圾回收 五、方法区的回收 手动触发回收 六、堆回收 如何判断堆上的对象可以回收&#xff1f; 引用计数法 可达性分析算法 五种对象引用 软引用 软引用的使用场景-缓存 弱引用 虚…

【Python-爬虫】

Python-爬虫 ■ 爬虫分类■ 1. 通用网络爬虫&#xff1a;&#xff08;搜索引擎使用&#xff0c;遵守robots协议&#xff09;■ robots协议&#xff08;君子协议&#xff09; ■ 2. 聚集网络爬虫&#xff1a;自己写的爬虫程序 ■ urllib.request&#xff08;请求模块&#xff09…

详解typora配置亚马逊云科技Amazon S3图床

欢迎免费试用亚马逊云科技产品&#xff1a;https://mic.anruicloud.com/url/1333 当前有很多不同的博客社区&#xff0c;不同的博客社区使用的编辑器也不尽相同&#xff0c;大概可以分为两种&#xff0c;一种是markdown格式&#xff0c;另外一种是富文本格式。例如华为云开发者…