消息队列
消息队列是一种在多任务操作系统中广泛使用的通信机制。它可以用于不同任务之间的消息传递,从而实现数据共享和协调处理任务之间的顺序。
消息队列通常具有以下基本特点:
- 消息队列的大小有限:消息队列被设计为一种缓冲区,用于存储消息数据。在创建消息队列时,需要指定其大小。当消息队列中的消息数量达到队列的最大容量时,将无法再添加新的消息,在此之后的消息将被忽略或者阻塞等待。
- 先进先出的顺序:消息队列通常采用先进先出(FIFO)的顺序处理消息,即先放入队列的消息优先被处理,后放入队列的消息后被处理。
- 独立于任务的数据传输:消息队列允许任务之间以独立的方式发送和接收消息,任务不需要了解对方的细节,只需要知道发送和接收消息的队列即可。
在多任务操作系统中,消息队列被广泛应用于任务之间的通信与同步,例如在实时系统中,可以使用消息队列来实现数据流水线,从而提高数据处理的效率。
FIFO和LIFO
FIFO即First In First Out,就是先进先出的意思,是一种队列管理方式,另外还有一种是LIFO即Last
In First Out,后进先出,比如用在单片机的栈操作就是LIFO的模式(与之对应的堆不是FIFO的模式,如有有人跟你们这样讲过,那他肯定是错的,堆是另外一种存储模式,是一种数据结构,他更像一种二叉树的结构,等讲到的时候你们就明白了),而我们今天讲到的消息队列是FIFO的模式。
消息队列的结构
消息队列和计数器信号量有些类似,也有个容量(学名叫水位线),和当前数据量,大小使用Length表示长度,但其中存储的数据并不是一个简单的数字,而是一组数据,数据也有大小叫做size
在这个图中,消息队列的长度是Length,表示可以容纳多少个消息,而每个消息都有自己的一个size,表示一条消息所占的内存大小。
消息队列在FreeRTOS中通过三个函数进行简单操作:
xQueueCreate(Length, Size) 用于创建消息队列,传入的两个参数分别表示消息队列的长度和单个消息的大小。
xQueueReceive 用于从消息队列中读取一条消息,接收三个参数,依次是消息队列句柄指针,消息指针,等待超时时间。
xQueueSend 用于向消息队列中发送一条消息,接收三个参数,依次是消息队列句柄指针,消息指针,等待超时时间。
用消息队列处理单类型数据
代码共享位置:https://wokwi.com/projects/362797906911699969
#include <Wire.h>
#include <LiquidCrystal_I2C.h>
#define SCL 41
#define SDA 42
LiquidCrystal_I2C lcd(0x27, 20, 4);
QueueHandle_t queueMsg = xQueueCreate(8, sizeof(char[20])); // 创建消息队列,长度为8,每条消息大小为20字节
char user_id = 'A'; // 用户ID
// 随机返回一段文字
String randomMsg() {static const String myStrings[] = {"Nice to meet you","Where are U from?","What do you do?","What do U like?","What is UR num?","Do U have FB?","Thanks so much.","I am Chinese.","I do not KNOW.","Thank you.","That helps.","I Love U","Do U miss me?","Be careful.","Don't worry.","Good idea.","He's right.","I ate already.","More than that.","Nothing else.","See you later.","Take it outside.",};return myStrings[random(0, 22)];
}
// 用户线程
void user_task(void *param_t){char msg[20];String prefix= String(user_id++);prefix += ":";while(1){(prefix + randomMsg()).toCharArray(msg, 20); // 拼接消息体// 向消息队列中发送一条消息, 如果满了,则无限期等待if (xQueueSend(queueMsg, &msg, portMAX_DELAY) == pdPASS) {Serial.println(msg);}else{Serial.println("消息队列已满!");}vTaskDelay(pdMS_TO_TICKS(random(2000,5000)));}
}
// 显示屏线程
void lcd_task(void *param_t){Wire.begin(SDA, SCL);lcd.init();lcd.backlight();// LCD每行显示的内容char line0[20] = {' '};char line1[20] = {' '};char line2[20] = {' '};char line3[20] = {' '};char * lines[] = { line0, line1, line2, line3 };while(1){//文字向上滚动strcpy(line0, line1);strcpy(line1, line2);strcpy(line2, line3);// 从消息队列中取出一条消息,如果成功则显示到屏幕上if (xQueueReceive(queueMsg, lines[3], 1000) == pdPASS) {for (int i = 3; i >= 0; i--) {lcd.setCursor(0, i); // 定位文字打印位置lcd.print(" "); // 清空这一行内容,向这一行发送20个空格即可清空lcd.setCursor(0, i); // 重新定位文字打印位置,因为print操作后光标会变lcd.print(lines[i]); // 在这行位置上输出内容}}else {Serial.println("消息队列没有内容...");};vTaskDelay(100);}
}
void setup() {Serial.begin(115200);Serial.println("Hello, ESP32-S3!");// 创建LCD显示任务xTaskCreate(lcd_task, "LCD", 1024 * 8, NULL, 1, NULL);// 创造一些用户for(int i=0; i<3; i++){xTaskCreate(user_task, "USER", 1024 * 8, NULL, 1, NULL);}
}
void loop() {delay(10);
}
该例程使用聊天室的方式演示了消息队列收发消息。
首先创建了多个 user_task 用于模拟用户向大屏幕发送消息,任务随机生成一条消息,通过 xQueueSend 发送出去,这里采用了最大限度的时间等待,如果消息队列满了,则等待一定时间(这个时间选择的是最大时长,50天,并不是无限期等待,所以必须对超时做处理),在等待期间程序基本上是不消耗CPU资源的。
lcd_task 现成模拟了显示屏,每间隔100ms就向消息队列请求,看是否有新的消息到达,如果在1秒钟之内获得不了新的消息,则取消等待,如果发现有新的消息后,将消息取出并打印在屏幕上。
单数据类型消息和多数据类型消息
通过上一个例子我们知道,在消息队列中传输的消息(数据)必须是定长的,所以上一个例程中我们用的是char[20]的字符数组进行消息的传递,但在实际项目中,要传输的数据可能多种多样,那我们需要用什么类型进行传输呢?
消息队列中数据的传输方式
在单数据传输的例程中我们可以得知,传输的字符串虽然把指针扔给了Send函数,我们接收的时候也把一个字符串指针扔给了Receive函数,不用测试我们就知道,这两个字符串首地址肯定是不同的。所以我们大概能判断出来,消息队列是按值进行传递的,也就是他内部运行机制其实就是从我们传入地址所指向的内容中,把符合长度的内复制了一份,这就是为什么结构体数据需要一个Size的原因。
如果我们需要传输多种数据类型,就必须使用结构体进行数据传输,但在结构体中可以存储多种数据类型,消息队列使用结构体传输数据的时候尽量不要使用指针类型的数据,如果需要使用,则指针指向的地址应该开在堆空间中,否则可能会导致内存溢出。
以下是结构体数据传输的例程:
代码共享位置:https://wokwi.com/projects/362852236497013761
typedef struct{uint16_t from_id;uint16_t to_id;uint8_t type;char data[20];char *p_data;
}Message;
QueueHandle_t queueMsg = xQueueCreate(8, sizeof(Message)); // 创建消息队列,长度为8,每条消息大小为20字节
// 模拟数据分发服务器动作
void server_task(void* param_t){char data[20]="This is message!";Message msg;msg.from_id=1;msg.to_id=100;msg.type=2;strcpy(msg.data, data);msg.p_data = data;if(xQueueSend(queueMsg, &msg, portMAX_DELAY) == pdPASS){printf("[SEND] 消息发送成功: 0x%p\n", &msg);printf("[SEND] data指针:0x%p\n",msg.data);printf("[SEND] p_data指针:0x%p\n",&msg.p_data);printf("[SEND] p_data值:0x%p\n",msg.p_data);}vTaskDelete(NULL);
}
// 模拟客户端运算器,用于获取消息
void client_task(void* param_t){Message msg;if(xQueueReceive(queueMsg, &msg, portMAX_DELAY)==pdPASS){printf("[RECV] 消息接收完毕: 0x%p\n", &msg);printf("[RECV] data指针:0x%p\n",msg.data);printf("[RECV] p_data指针:0x%p\n",&msg.p_data);printf("[RECV] p_data值:0x%p\n",msg.p_data);}vTaskDelete(NULL);
}
void setup() {Serial.begin(115200);Serial.println("Hello, ESP32-S3!");// 创建发送和接收任务xTaskCreate(server_task, "Sender", 1024 * 8, NULL, 1, NULL);xTaskCreate(client_task, "Receiver", 1024 * 8, NULL, 1, NULL);
}
void loop() {delay(10);
}
例程中我能自定义了一个Message结构体,结构体中前三个数据属于基础数据类型,第四个是一个长度为20的char数组,第五个是一个char型指针。
从运行的输出结果看,发送时候msg的指针和接收时的指针完全不同,说明这两个变量不是同一个(从定义时候的作用域也可以得知,他们分别属于两个不同的函数),这说明结构体也是被复制过去的,而不是简单的指针拷贝。
收发任务两个Message的结构体中的data变量指向的存储位置也是不同的,所以我们也可以断定data也是被复制过去的。
p_data变量有些不一样,打印p_data的地址发现,两个变量的地址是不同的,也验证了他们分别属于不同的结构体,但这两个变量的值是相同的,说明他们指向了同一个地址,而这个地址就是在server_task 函数开头定义的内部变量 data 的地址。
如果我们在 client_task 接收到消息体后尝试输出msg.p_data所指向的字符串,必定会内存溢出,因为在 server_task 消息发送完毕后,改地址内容已经被清空了,不可能读到准确的数据。
由此可见,在消息队列中传递消息时,请尽可能少的使用指针变量
实际中的应用
在实际项目开发中,消息队列用的最多的地方就是与外部的通讯,因为在代码中,不同的线程都可能用到同一个外设,之前我们的做法是通过互斥信号量的方式对资源进行保护,我们也可以通过消息队列等方式实现,把所有操作设备的行为封装在一个任务中,其他如果有需要操作设备的,都以消息的方式发送到消息队列中,设备任务依次对消息队列中消息进行处理。
一般对于复杂类型的设备操作,我们用互斥信号量实现,比如LCD、Flash、Wifi、蓝牙等,这些设备一般系统都对其做了OOB封装,暴露给我们的是各种操作函数,这类的设备更适用于用互斥信号量控制。
而对于一些流设备,比如串口、SPI、各种传感器、缓冲等简单类型的设备,建议使用消息队列方式进行数据首发操作。
使用消息队列模拟邮箱
关于消息队列的所有API,可以参考:https://www.freertos.org/zh-cn-cmn-s/a00018.html