1. 背景概念
假设现在有多个线程,一部分线程负责生产任务,称为生产者productor,另一部线程负责执行任务,称为消费者consumer,他们之间是一对一一对一一对一的关系。
现在生产者productor-3
有任务要派发,但是consumer-3
正在执行上一个任务,于是productor-3
就只能等待。但是此时明明有两个线程consumer-1
和consumer-2
是空闲的,它们却不能执行这个任务,这就出现了很明显的资源分配不合理问题。
于是提出了生产者消费者模型,即通过一个容器来解决生产者和消费者的强耦合问题:
我们可以把这个任务队列看成一个缓冲区,生产者负责生成数据或任务,并将其放入该共享的缓冲区中;消费者则从共享缓冲区中取出数据或任务进行处理。
这个模型的目的是解决生产者和消费者之间的协调问题,尤其是在多线程或多进程环境中,当两者的工作速率不一致时,如何有效地管理资源和避免数据竞争或死锁。
这个任务队列的实现上我们有两种形式,分别是阻塞队列BlockQueue以及环形队列。
2. 阻塞队列BlockQueue
阻塞队列是实现生产者消费者模型的一种常用数据结构。在阻塞队列中,当队列为空时,消费者尝试获取元素的操作会被阻塞,直到生产者向队列中添加了新的元素。同样,当队列已满时,生产者尝试添加新元素的操作会被阻塞,直到消费者从队列中取出了元素。这种机制自然地实现了生产者和消费者之间的同步。
阻塞队列BlockQueue的成员变量
queue<T> _q;//队列int _capacity;//阻塞队列的容量pthread_mutex_t _mutex;pthread_con_t _consume_cond;pthread_con_t _produce_cond;
由于这个阻塞队列为临界资源,并且生产者productor线程和消费者consumer线程都要访问阻塞队列,所以我们需要一把锁mutex来保护阻塞队列,保证线程互斥。即生产者和生产者,消费者和消费者,生产者和消费者之间均为互斥关系。
同时,当队列为空时,消费者需要到等待队列排队等待直到生产者向阻塞队列添加元素,同理,当队列为满时,生产者需要到等待队列排队等待直到消费者从阻塞队列获取元素,因为生产者和消费者排队等待的并不是同一个队列,所以我们需要两个队列,即两个条件变量produce_cond和consume_cond。
BlockQueue.hpp完整代码
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class blockqueue
{public:blockqueue(int capacity):_capacity(capacity),_q(capacity){pthread_mutex_init(&mutex,nullptr);pthread_cond_init(&consume_cond,nullptr);pthread_cond_init(&produce_cond,nullptr);}~blockqueue(){pthread_mutex_destroy(&mutex);pthread_cond_destroy(&consume_cond);pthread_cond_destroy(&produce_cond);}void push(T val){pthread_mutex_lock(&mutex);//加锁while(_q.size()==_capacity)//如果队列满了,进入生产者等待队列阻塞{pthread_cond_wait(&produce_cond,&mutex);}_q.push(val);pthread_cond_signal(&consume_cond);//唤醒消费者pthread_mutex_unlock(&mutex);//解锁}T pop(){pthread_mutex_lock(&mutex);//加锁while(_q.empty())//如果队列为空,进入消费者等待队列阻塞{pthread_cond_wait(&consume_cond,&mutex);}T val=_q.front();_q.pop();pthread_cond_signal(&produce_cond);//唤醒生产者pthread_mutex_unlock(&mutex);//解锁return val;}private:queue<T> _q;//队列int _capacity;//阻塞队列的容量pthread_mutex_t _mutex;pthread_con_t _consume_cond;pthread_con_t _produce_cond;
};
我们写一个main.cpp来测试基于阻塞队列BlockQueue实现的生产者消费者模型,main.cpp如下
#include "BlockQueue.hpp"
#include <string>
#include <ctime>
#include <unistd.h>
struct ThreadData
{blockqueue<int> *bq;pthread_t tid;string name;
};
void *ProductorRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){sleep(1);int num = rand() % 10 + 1;td->bq->push(num);cout << "[" << td->name << "] push: " << num << endl;}
}
void *ConsumerRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){int num = td->bq->pop();cout << "[" << td->name << "] pop: " << num << endl;}
}
int main()
{srand(time(nullptr));blockqueue<int> bq;ThreadData productors[3], consumers[3];for (int i = 0; i < 3; i++){sleep(1);productors[i].bq = &bq;productors[i].name = "Productor-" + to_string(i + 1);pthread_create(&productors[i].tid, nullptr, ProductorRoutine, (void *)&productors[i]);}for (int i = 0; i < 3; i++){consumers[i].bq = &bq;consumers[i].name = "Consumer-" + to_string(i + 1);pthread_create(&consumers[i].tid, nullptr, ConsumerRoutine, (void *)&consumers[i]);}for (int i = 0; i < 3; i++){pthread_join(productors[i].tid, nullptr);pthread_join(consumers[i].tid, nullptr);}return 0;
}
makefile如下
main.exe:main.cpp BlockQueue.hppg++ -o $@ $^ -l pthread
.PHONEY:clean
clean:rm -f main.exe
编译运行后结果如下
生产者往阻塞队列里投放数据,消费者从阻塞队列里获取数据,并且保持了生产者和生产者,消费者和消费者,生产者和消费者之间的互斥关系以及生产者和生产者,消费者和消费者之间的同步关系。
3. 环形队列RingQueue
环形队列是另一种用于生产者消费者模型的数据结构,它通过循环利用固定大小的数组来模拟无限大的队列。在环形队列中,我们需要使用POSIX信号量,用于控制生产者和消费者对缓冲区的访问,确保在任何时刻只有一个生产者或消费者能够修改缓冲区中的数据。与阻塞队列不同的是,环形队列通过使用POSIX信号量允许生产者和消费者同时进行生产和消费操作,从而提高资源利用率。
POSIX信号量
POSIX信号量是一种同步机制,可以用来实现生产者消费者模型中的同步。信号量本质上是一个计数器,表示某种资源的可用数量,用于控制对共享资源的访问。生产者和消费者可以通过增加或减少信号量的值来协调对数据缓冲区的访问。
例如,当一个线程想要访问资源时,它会检查信号量的计数器。如果计数器大于 0,则表示有可用资源,线程可以访问资源并使计数器减 1。如果计数器为 0,则表示没有可用资源,线程会进入等待状态,直到有其他线程释放资源;当一个线程完成对资源的访问后,它会将计数器加 1,并唤醒一个等待的线程。
POSIX信号量的使用
POSIX信号量的类型为 sem_t ,在使用POSIX信号量时,通常需要包含头文件 <semaphore.h>,并且在编译时链接pthread库,以支持多线程操作。
sem_init
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_init 函数用于初始化一个POSIX信号量。
参数:
sem
:指向要初始化的信号量的指针。pshared
:一个标志位,用于指定信号量的类型。如果pshared
为0,信号量是进程本地的,只能在单个进程内部的线程之间共享;如果pshared
非0,信号量是进程共享的,可以在不同进程之间共享。value
:信号量的初始值。
函数调用成功时返回0,失败时返回-1,并设置errno
以指示错误。
sem_destroy
int sem_destroy(sem_t *sem);
sem_destroy函数用于销毁一个POSIX信号量。
sem
:指向要销毁的信号量的指针。
函数返回0表示成功,非0值表示失败,并设置errno
以指示错误。
sem_wait & sem_trywait
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
sem_wait函数用于请求信号量,如果信号量的值大于0,则函数将信号量的值减1并立即返回。如果信号量的值为0,则当前线程将被阻塞,直到信号量的值变为正数.
sem_trywait函数是sem_wait的非阻塞版本,它尝试减少信号量的值。如果信号量的当前值大于0,则函数成功并将信号量的值减1。如果信号量的当前值为0,则sem_trywait不会阻塞调用线程,而是立即返回一个错误码EAGAIN
,表示操作不能立即执行.
sem_post
int sem_post(sem_t *sem);
sem_post函数用于释放信号量并增加信号量的值,并唤醒一个等待该信号量的线程。当信号量的值增加后,如果有线程因为 sem_wait 或 sem_trywait 而被阻塞,这些线程中的至少一个将被唤醒并重新竞争信号量。
sem_getvalue
int sem_getvalue(sem_t *sem, int *sval);
sem_getvalue函数用于获取信号量的当前值,并将该值存储在提供的整数指针指向的变量中。
sem
:指向要查询的信号量的指针。sval
:指向整数的指针,用于接收信号量的当前值。
函数返回0表示成功,返回-1表示出现错误,并设置errno
以指示错误类型.
环形队列的实现
上图是一个环形队列,生产者与消费者都绕着环顺时针走,黑色代表这个位置有数据,白色代表这个位置没有数据。生产者在前面生产,消费者跟在后面消费,这就是基于环形队列的生产消费模型。
RingQueue的成员变量
vector<T> _q;//数组模拟环形队列int _capacity;//队列容量int _productor_index;//生产者当前位置int _consumer_index;//消费者当前位置pthread_mutex_t _productor_mutex;//生产者锁pthread_mutex_t _consumer_mutex;//消费者锁sem_t _space_sem;sem_t _data_sem;
由于生产者只需要关注环形队列剩余空间数量,消费者只需要关注剩余数据数量,所以在这个模型中,通常会有两个信号量:一个用于控制生产者对环形队列剩余空间的访问 space_sem(通常初始化为环形队列大小),另一个用于控制消费者对环形队列剩余数据的访问 data_sem(初始化为0)。
就上图来说: space_sem = 4
,data_sem = 4
。
- 只有 space_sem 的值大于
0
,表示还有空间可以放数据,此时生产者才可以生产 - 只有 data_sem 的值大于
0
,表示当前有数据可以读取,此时消费者才可以消费
所以环形队列的判空和判满问题也能很好解决
而且在该种情况下,我们就可以直接保证了生产者和消费者之间的互斥关系,因为只有当环形队列为空或者为满的时候生产者和消费者的位置才重合在一起,而为空时,data_sem == 0,消费者不能再获取数据,为满时,space_sem == 0,生产者不能在添加数据,这样就天然维护了生产者和消费者之间的互斥关系。
而我们还要维护生产者和生产者,消费者和消费者之间互斥关系,就需要两把锁 productor_mutex 和 consumer_mutex,这样生产者和消费者就可以同时访问同一个队列的不同部分,这就可以让生产和消费同时进行提高效率。
RingQueue.hpp完整代码
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
template <class T>
class ringqueue
{
public:ringqueue(int capacity = 5): _capacity(capacity), _productor_index(0), _consumer_index(0),{_q.resize(capacity);pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);sem_init(&_space_sem, 0, capacity);sem_init(&_data_sem, 0, 0);}~ringqueue(){pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);sem_destroy(&_space_sem);sem_destroy(&_data_sem);}void push(T data){// 因为信号量本身是互斥的,所以可以把申请信号量放在加锁之前sem_wait(&_space_sem); // 空间不足时等待pthread_mutex_lock(&_productor_mutex); // 申请锁_q[_productor_index++] = data; // 添加数据_productor_index %= _capacity; // 更新位置pthread_mutex_unlock(&_productor_mutex); // 释放锁sem_post(&_data_sem); // 数据添加完成,通知消费者}T pop(){sem_wait(&_data_sem); // 数据不足时等待pthread_mutex_lock(&_consumer_mutex); // 申请锁T data = _q[_consumer_index++]; // 取出数据_consumer_index %= _capacity; // 更新位置pthread_mutex_unlock(&_consumer_mutex); // 释放锁sem_post(&_space_sem); // 空间释放完成,通知生产者return data;}private:vector<T> _q; // 数组模拟环形队列int _capacity; // 队列容量int _productor_index; // 生产者当前位置int _consumer_index; // 消费者当前位置pthread_mutex_t _productor_mutex; // 生产者锁pthread_mutex_t _consumer_mutex; // 消费者锁sem_t _space_sem;sem_t _data_sem;
};
同理,我们把main.cpp的代码改用RingQueue.hpp来测试一下
main.cpp如下
#include "BlockQueue.hpp"
#include "RingQueue.hpp"
#include <string>
#include <ctime>
#include <unistd.h>
struct ThreadData
{ringqueue<int> *rq;pthread_t tid;string name;
};
void *ProductorRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){sleep(1);int num = rand() % 10 + 1;td->rq->push(num);cout << "[" << td->name << "] push: " << num << endl;}
}
void *ConsumerRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){int num = td->rq->pop();cout << "[" << td->name << "] pop: " << num << endl;}
}
int main()
{srand(time(nullptr));ringqueue<int> rq;ThreadData productors[3], consumers[3];for (int i = 0; i < 3; i++){sleep(1);productors[i].rq = &rq;productors[i].name = "Productor-" + to_string(i + 1);pthread_create(&productors[i].tid, nullptr, ProductorRoutine, (void *)&productors[i]);}for (int i = 0; i < 3; i++){consumers[i].rq = &rq;consumers[i].name = "Consumer-" + to_string(i + 1);pthread_create(&consumers[i].tid, nullptr, ConsumerRoutine, (void *)&consumers[i]);}for (int i = 0; i < 3; i++){pthread_join(productors[i].tid, nullptr);pthread_join(consumers[i].tid, nullptr);}return 0;
}
测试结果如下