本篇文章对生产者消费者(模型)问题进行了详解。其中给出了基于阻塞队列的生产者消费者模型demo代码和对涉及到的条件变量与互斥锁的操作也进行了详细解释。解释了条件变量等待时,为什么还需要一把锁的问题。对生产者消费者(模型)问题进行总结,且解释了生产者消费者(模型)问题效率提升的所在。希望本篇文章会对你有所帮助。
文章目录
一、生产者消费者模型
1、1 什么是生产者消费者模型
1、2 再次生产者消费者模型
1、2、1 生产者与生产者
1、2、2 消费者与消费者
1、2、3 消费者与生产者
二、条件变量
2、1 条件变量引入
2、1、1 线程同步
2、1、2 多线程竞争引起饥饿等问题
2、2 什么是条件变量
2、2、1 pthread_cond_t
2、2、1 pthread_cond_init
2、2、2 pthread_cond_wait
2、2、3 pthread_cond_signal
2、2、4 pthread_cond_broadcast
2、2、5 pthread_cond_destroy
三、生产者消费者模型demo代码
3、1 基于阻塞队列生产者消费者模型
3、2 派发任务不同
3、3 LockGuard(RAII)
四、生产者消费者模型总结
生产者消费者模型的效率提升到底提升在哪里呢?
🙋♂️ 作者:@Ggggggtm 🙋♂️
👀 专栏:Linux从入门到精通 👀
💥 标题:生产者消费者(模型)问题💥
❣️ 寄语:与其忙着诉苦,不如低头赶路,奋路前行,终将遇到一番好风景 ❣️
一、生产者消费者模型
1、1 什么是生产者消费者模型
生产者消费者模型是一种并发编程中常用的设计模式,也被称为生产者-消费者问题。它基于生产者和消费者之间共享一个缓冲区(是由一个数据结构来维护的),并通过该缓冲区来实现数据的交换和协调。
在生产者消费者模型中,生产者负责生成数据或任务,并将其放入缓冲区,而消费者则从缓冲区中获取数据或任务并进行处理。这种模型主要解决了生产者和消费者之间数据交互的同步和协作问题。
以下是生产者消费者模型的基本流程:
- 生产者生成数据或任务。
- 如果缓冲区未满,生产者将数据或任务放入缓冲区;否则,生产者等待直到缓冲区有空间。
- 消费者从缓冲区获取数据或任务。
- 如果缓冲区不为空,消费者处理数据或任务;否则,消费者等待直到缓冲区有数据。
- 当生产者将数据或任务放入缓冲区后,通知等待中的消费者继续处理。
- 当消费者处理完数据或任务后,通知等待中的生产者继续生成。
1、2 再次生产者消费者模型
其实对上述的概念理解后,我们也不难发现:生产者和消费者彼此之间不直接通讯,而通过特定数据结构来进行通讯。所以生产者生产完数据之后不用等待消费者处理,直接扔给特定数据结构,消费者不找生产者要数据,而是直接从特定数据结构里取,而这个特定的数据结构就相当于一个缓冲区,平衡了生产者和消费者的处理能力,从而提高了效率,也完成了生产和消费的解耦。
对应的并发编程怎么理解呢?通俗理解:消费者就是由单线程或者多个线程来取数据,生产者是由单线程或者多个线程来生产数据,中间会有一个特定数据结构供消费者与生产者数据交换与协调。我们接着探究一下消费者和生产者之间的关系。
1、2、1 生产者与生产者
其实我们不难发现:生产数据时,肯定是存在竞争关系的。其次为了保护数据安全,必须是互斥的。当然,只有一个消费者,也就是这有一个生产数据的线程时,不存在所谓的竞争关系。
1、2、2 消费者与消费者
一个消费者与另一个消费者存在竞争关系吗?答案是存在的!假如两个消费者同时看到一份数据,那么由谁来取呢?同时也必须存在互斥关系,保证数据的安全。只有一个消费者,也就是只有一个消费数据的线程时,也不存在竞争关系。
1、2、3 消费者与生产者
生产者和消费者之间的互斥与同步关系。所谓互斥,就是生产数据过程中,不能消费数据。目的就是保证读写安全。所谓同步,当缓冲区数据满了或空了,进行阻塞等待另一方通知,而不是一直在申请和释放资源,判断资源是否存在。
二、条件变量
2、1 条件变量引入
上述的关系维护中,互斥关系可由互斥锁来维护,进而保证数据的安全。消费者与生产者的同步关系怎么维护呢?
2、1、1 线程同步
线程同步是指在多线程编程中,通过各种机制来协调和控制线程的执行顺序,以确保它们按照特定的规则访问共享资源或执行特定的操作。线程同步的主要目的是避免竞争条件(Race Condition)和保证数据的一致性。下面我们看一下多线程竞争引起的不合理情况。
2、1、2 多线程竞争引起饥饿等问题
我们看下图例子:
这也是我们讲解互斥锁时候举的例子。当一个线程申请了锁资源后,他会首先会对临界资源进行检测。假如检测资源不存在,则会释放当前的锁资源。但是,由于进程时间片和优先级等问题,该线程仍然会取申请锁资源,然后再次进行检测、释放、申请……进入循环。当然,该线程被切换后依然可以与其他线程竞争锁资源!!!那么这个线程有错吗?并没有错误,只是不合理!!!
该线程在竞争资源时,同时也频繁申请释放锁资源。有可能使其他线程申请不到资源,同时也处于等待状态,这也就造成了其他线程的饥饿问题。同时,该线程已经知道临界资源不存在后,还在频繁的申请资源,自己本身不就是一种浪费吗!!!
这时就需要引入同步的情况,来控制线程的执行顺序。通俗理解线程同步:让线程按照一定顺序来执行。合理的情况是:当该线程检测到临界资源不存在后,可以进行等待。等待其他线程提供资源后,再将该线程唤醒。
而条件变量就可以很好的控制线程的执行顺序。
2、2 什么是条件变量
当我们在申请临界资源前,首先要检测临界资源是否存在。检测的本质就是在访问临界资源。所以:对临界资源的检测也一定是需要在加锁解锁之间的。
而常规的对临界资源检测是否就绪,就必须频繁的申请和释放锁。那能不能在检测到临界资源条件不就绪时:不要再频繁申请资源和自己检测了,进行等待。同时当条件就绪时,由某一线程来通知该线程来申请临界资源和访问。条件变量就可以很好的做到。
在多线程编程中,条件变量是一种同步机制,它用于协调多个线程之间的操作。条件变量的作用是让线程在某个条件满足的情况下等待或唤醒。
具体来说,如果一个线程需要等待某个特定的条件被满足才能继续执行,那么该线程可以调用条件变量上的等待函数,这样线程会被阻塞,直到其他线程通过调用条件变量上的唤醒函数来唤醒它。条件变量可以避免线程在等待时浪费 CPU 资源,从而提高程序的效率。
此外,条件变量还可以用于解决多线程中的竞态条件问题。竞态条件是指多个线程在访问共享资源时发生争用的情况,如果不加以控制,容易导致不可预期的结果。条件变量可以和互斥量一起使用,通过在互斥量上加锁来保护共享资源的访问,并使用条件变量来等待或唤醒其他线程,从而避免竞态条件的发生。
总结来说,引入条件变量可以提供线程之间的协作和通信机制,避免竞争条件和死锁情况的发生,提高多线程程序的效率和可靠性。接下来看一下条件变量的相关操作的使用。
2、2、1 pthread_cond_t
pthread_cond_t是一个条件变量类型,用于线程间的同步和通信。它是POSIX线程库提供的一种机制,用于解决多个线程之间共享数据时可能出现的竞态条件问题。
条件变量(Condition Variable)可以让线程在满足特定条件之前进入等待状态,并在条件发生变化时被唤醒。它通常与互斥锁(Mutex)结合使用,以确保在线程等待条件变量的同时,对共享资源进行安全的访问。
pthread_cond_t类型的变量需要通过一些函数来进行初始化、销毁和操作,常用的函数有:
- pthread_cond_init():用于初始化条件变量。参数为pthread_cond_t类型的指针,该函数会将条件变量初始化为默认值。在使用条件变量之前,必须先进行初始化。
- pthread_cond_destroy():用于销毁条件变量。参数为pthread_cond_t类型的指针,该函数会释放相关资源。
- pthread_cond_wait():用于等待条件变量满足特定条件,将线程挂起。参数包括条件变量和互斥锁,函数会自动释放互斥锁,并在接收到信号时重新获取互斥锁后继续执行。
- pthread_cond_signal():用于发送信号,唤醒一个等待条件变量的线程。参数为条件变量,会选择一个处于等待状态的线程进行唤醒。
- pthread_cond_broadcast():用于发送广播,唤醒所有等待条件变量的线程。参数为条件变量,会将所有等待的线程都唤醒。
条件变量的使用流程一般如下:
- 创建并初始化互斥锁和条件变量。
- 持有互斥锁,访问共享资源。
- 如果某个条件不满足,调用pthread_cond_wait()等待条件满足,并释放互斥锁。
- 其他线程改变了条件后,通过pthread_cond_signal()或pthread_cond_broadcast()发送信号。
- 等待的线程被唤醒后,重新获取互斥锁,并继续执行之后的操作。
条件变量的引入可以有效避免线程的忙等待,减少了系统资源的占用,并提高了程序的效率和性能。它是多线程编程中重要的同步机制之一,可以用于各种场景,如生产者-消费者模型、线程池等。
2、2、1 pthread_cond_init
pthread_cond_init()函数用于初始化条件变量,它的函数原型如下:
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- 返回值:函数执行成功返回0,失败返回错误码。
- 参数:
- cond:指向待初始化的条件变量的指针。
- attr:指向条件变量属性的指针,通常可以将其设置为NULL,表示使用默认属性。
上述情况是pthread_cond_t 对象是局部变量。当pthread_cond_t 对象是全局时,我们一般用:PTHREAD_COND_INITIALIZER。
2、2、2 pthread_cond_wait
pthread_cond_wait()函数是用于等待条件变量的特定状态发生,并挂起线程的执行。pthread_cond_wait()函数有两个参数和一个返回值,具体含义如下:
参数cond:条件变量
- 类型:pthread_cond_t*。
- 简介:表示要等待的条件变量。
参数mutex:互斥锁
- 类型:pthread_mutex_t*。
- 简介:与条件变量结合使用,保证在等待前后对共享资源的访问是线程安全的。
有很多人都有一个疑问:为什么使用条件变量等待时,还需要一把互斥锁呢?主要原因就是对临界资源的判断是在锁之间的。后文会结合实际应用对此详细解释。
需要注意的是,在pthread_cond_wait()函数中,线程被挂起,直到收到由其他线程调用pthread_cond_signal()或者pthread_cond_broadcast()发出的信号。并且,当线程被唤醒继续执行时,它会自动重新获得互斥锁,以继续对被保护资源进行访问。
2、2、3 pthread_cond_signal
pthread_cond_signal()函数用于向指定正在等待条件变量的线程发送信号,以唤醒其线程继续执行。函数定义如下:
int pthread_cond_signal(pthread_cond_t *cond);
返回值:
- 如果成功发送信号,则返回0;
- 如果发生错误,则返回非零错误代码。常见的错误包括传入无效的条件变量指针或未初始化的条件变量。
下面我们结合一个例子来再次理解我们上述的所有概念。代码如下:
#include <stdio.h> #include <stdlib.h> #include <pthread.h>int condition = 0;pthread_mutex_t mutex; pthread_cond_t cond; void* thread1(void* arg) {pthread_mutex_lock(&mutex);// 检查条件是否满足,如果不满足则等待条件变量while (condition == 0) {pthread_cond_wait(&cond, &mutex);}printf("Thread 1: Condition is now fulfilled.\n");pthread_mutex_unlock(&mutex);return NULL; }void* thread2(void* arg) {pthread_mutex_lock(&mutex);// 修改条件为满足condition = 1;printf("Thread 2: Condition has been changed.\n");// 通知所有等待该条件的线程pthread_cond_signal(&cond);pthread_mutex_unlock(&mutex);return NULL; }int main() {pthread_t tid1, tid2;// 初始化互斥锁和条件变量pthread_mutex_init(&mutex, NULL);pthread_cond_init(&cond, NULL);// 创建线程1pthread_create(&tid1, NULL, thread1, NULL);// 创建线程2pthread_create(&tid2, NULL, thread2, NULL);// 等待线程1和线程2结束pthread_join(tid1, NULL);pthread_join(tid2, NULL);// 销毁互斥锁和条件变量pthread_mutex_destroy(&mutex);pthread_cond_destroy(&cond);return 0; }
此示例中,创建了两个线程thread1和thread2,并在主函数中等待它们的结束。线程1等待条件变量condition满足后继续执行,而线程2修改condition为满足,并使用pthread_cond_signal()通知等待该条件的线程。
在thread1线程中,我们首先使用pthread_mutex_lock()锁住互斥锁mutex,然后使用while循环来检查条件是否满足。如果条件不满足,则调用pthread_cond_wait()函数来等待条件变量。调用pthread_cond_wait()函数进行阻塞等待后,回去执行其他线程(thread2)。但是线程thread1 还带着互斥锁呢!线程thread2怎么执行呢?
其实在调用pthread_cond_wait()函数时,pthread_cond_wait()函数会自动释放互斥锁mutex并挂起线程的执行,直到条件变量被其他线程发出信号来通知该线程条件已经满足。一旦收到通知,该线程会重新获取互斥锁并继续执行。这正是等待时需要传入一把互斥锁的原因。
在thread2线程中,我们首先使用pthread_mutex_lock()锁住互斥锁mutex,然后修改条件变量condition为满足,并使用pthread_cond_signal()函数通知等待该条件的线程。最后,释放互斥锁并结束线程。
大多数情况下,都是一个线程进入阻塞等待后,需要另一个线程检测临界资源是否满足条件,进而是否去唤醒指定等待的线程。当被唤醒的时候,还是在临界区唤醒。但是这里有一个细节:
我们检测条件标量否满足时,采用的时循环式检测。为什么要采用循环式检测呢?直接采用一个if 判断不就完了。原因有如下两点:
- pthread_cond_wait() 也是一个函数调用,只要是函数调用,就有可能调用失败!
- 当有多个线程进行等待时,且使用同一条件变量,这时候有肯能存在伪唤醒的情况!
- 防止只用thread_cond_broadcast()进行唤醒。
2、2、4 pthread_cond_broadcast
pthread_cond_broadcast()函数是用于广播条件变量,唤醒所有等待该条件变量的线程。函数的原型如下:
int pthread_cond_broadcast(pthread_cond_t *cond);
返回值:
- 函数返回值为0表示成功。
- 如果返回值为正数,则表示失败,并且返回的值对应于错误代码。
参数:
- 参数cond:表示需要广播的条件变量。它是一个指向pthread_cond_t类型的指针,该类型在头文件pthread.h中定义。 注意:使用pthread_cond_broadcast()函数之前,条件变量必须使用pthread_cond_init()函数进行初始化。
2、2、5 pthread_cond_destroy
pthread_cond_destroy()函数,该函数用于销毁条件变量,释放相关资源。函数的原型如下:
int pthread_cond_destroy(pthread_cond_t *cond);
它的返回值是整型,并且有两种可能的取值:
- 如果成功执行,返回值为0。
- 如果出现错误,返回值为一个非零的错误码。
参数详解如下:
pthread_cond_t *cond
:指向条件变量的指针。该参数表示需要销毁的条件变量。在调用该函数之前,必须确保没有任何线程正在等待该条件变量。如果有线程在等待条件变量上阻塞,会导致未定义行为。因此,在调用pthread_cond_destroy()之前,必须先调用pthread_cond_broadcast()或pthread_cond_signal()函数唤醒所有等待的线程,或者使用其他同步机制来确保没有线程在等待条件变量。
当条件变量销毁成功后,相关的资源将被释放,可以通过重新初始化(pthread_cond_init())来再次使用该条件变量。
三、生产者消费者模型demo代码
当我们了解完上述的概念后,我们再来看一下生产者和消费者模型的德莫代码。
3、1 基于阻塞队列生产者消费者模型
当我们在生产数据时,一定是有限制的。我们这里采用阻塞队列,当数据满的时候就不可再生产数据。当数据为空的时候,就不可再消费数据。我们先看一下阻塞队列的代码:
const int DefCapacity=5; template<class T> class BlockQueue { private:bool isEmpty(){return bq_.size()==0;}bool isFull(){return bq_.size()==capacity_;} public:BlockQueue(int capacity=DefCapacity):capacity_(capacity){pthread_mutex_init(&mtx_,nullptr);pthread_cond_init(&Full_,nullptr);pthread_cond_init(&Empty_,nullptr);}void push(const T& in){pthread_mutex_lock(&mtx_);while(isFull())pthread_cond_wait(&Full_,&mtx_);bq_.push(in);pthread_cond_signal(&Empty_);pthread_mutex_unlock(&mtx_);}void pop(T* out){pthread_mutex_lock(&mtx_);while(isEmpty())pthread_cond_wait(&Empty_,&mtx_);*out=bq_.front();bq_.pop();pthread_cond_signal(&Full_);pthread_mutex_unlock(&mtx_);}~BlockQueue(){pthread_mutex_destroy(&mtx_);pthread_cond_destroy(&Empty_);pthread_cond_destroy(&Full_);} private:queue<T> bq_;int capacity_;pthread_mutex_t mtx_;pthread_cond_t Full_;pthread_cond_t Empty_; };void* consumer(){} void* producter(){} int main() {BlockQueue<Task> *bqueue=new BlockQueue<Task>();pthread_t c,p;pthread_create(&c,nullptr,consumer,(void*)bqueue);pthread_create(&p,nullptr,producter,(void*)bqueue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0; }
BlockQueue
类是一个模板类,支持线程安全的阻塞队列。它使用互斥锁(pthread_mutex_t
)对队列的访问进行保护,同时使用条件变量(pthread_cond_t
)进行线程间的同步。其中push
用于向队列中添加元素,如果队列已满,则等待非满条件;pop
用于从队列中取出元素,如果队列为空,则等待非空条件。我们使用了两个不同的条件变量和一把锁维护了生产者线程(Full_)与消费者线程(Empty_)的同步与互斥。BlockQueue
的容量由构造函数参数指定,默认为5。在析构函数中对使用的互斥锁和条件变量进行了销毁操作。我们在main()函数中,创建一个
BlockQueue<Task>
对象作为生产者和消费者之间的共享阻塞队列。然后创建两个线程,分别执行consumer
和producter
函数,将共享队列作为参数传递给这两个线程。最后使用pthread_join
函数等待线程的结束,释放线程资源。
3、2 派发任务不同
当我们学习到了生产者消费者模型后,他是有类模板来实现的。所以我们生产不同的任务类型。具体如下:
- 生产消费数据类型为int:
BlockQueue<int> *bqueue=new BlockQueue<int>();void* consumer(void* args) {BlockQueue<int> *bqueue=(BlockQueue<int>*) args;while(true){int a;bqueue->pop(&a);cout<<"consumer use data: "<< a <<endl;sleep(1);}return nullptr; }void* producter(void* args) {BlockQueue<int> *bqueue=(BlockQueue<int>*) args;int a=100;while(true){bqueue->push(a);cout<<"producter make data: "<<a<<endl;a--;}return nullptr; }
- 生产消费数据类型为class:
typedef std::function<int(int, int)> func_t;class Task {public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}int operator ()(){return func_(x_, y_);} public:int x_;int y_;// int type;func_t func_; };BlockQueue<Task> *bqueue=new BlockQueue<Task>();int myAdd(int x, int y) {return x + y; }void* consumer(void* args) {BlockQueue<Task> *bqueue=(BlockQueue<Task>*) args;while(true){Task t;bqueue->pop(&t);std::cout <<" consumer use data: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;}return nullptr; }void* producter(void* args) {BlockQueue<Task> *bqueue=(BlockQueue<Task>*) args;while(true){int x = rand()%10 + 1;usleep(rand()%1000);int y = rand()%5 + 1;// int x, y;// std::cout << "Please Enter x: ";// std::cin >> x;// std::cout << "Please Enter y: ";// std::cin >> y;Task t(x, y, myAdd);bqueue->push(t);cout<<"producter make data: "<<t.x_ << "+" << t.y_ << " = ?"<<endl;sleep(1);}return nullptr; }
当然,生产和消费的数据可以是任意类型。我们只需要修改所线程所对应的函数就行。
3、3 LockGuard(RAII)
当然,我们也可采用RAII的思想对互斥锁进行封装。这样我们就不需要担心释放锁资源的问题了。具体代码如下:
class Mutex { public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){} private:pthread_mutex_t *pmtx_; };// RAII风格的加锁方式 class lockGuard { public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();} private:Mutex mtx_; };
四、生产者消费者模型总结
生产者消费者模型是一种常见的并发编程模式,用于处理生产者和消费者之间的数据共享和同步。它可以有效地解决在多线程或多进程环境下,生产者与消费者之间的数据竞争和资源冲突问题。下面是对生产者消费者模型的总结:
模型介绍: 生产者消费者模型涉及两个主要角色:生产者和消费者。同时包含了一个交易场所:仓库(特定的数据结构,也可视为数据缓冲区)。生产者负责生成数据并将其放入共享缓冲区,而消费者负责从共享缓冲区中获取数据并进行处理。
共享缓冲区: 共享缓冲区是生产者和消费者之间进行数据交换的中介。生产者将数据放入缓冲区,消费者从缓冲区中获取数据。缓冲区可以是有界(固定大小)或无界(动态增长)的。
同步与互斥: 为了保证生产者和消费者之间的正确协作,需要使用同步机制和互斥机制来处理数据的操作和访问。同步机制用于确保生产者和消费者之间的正确顺序执行,互斥机制用于防止多个线程同时访问共享资源。
解决方案: 有多种方式可以实现生产者消费者模型,其中包括使用信号量、互斥锁、条件变量等同步和互斥机制。常见的解决方案包括使用线程库提供的线程同步原语,如pthread库中的pthread_cond_t、pthread_mutex_t等。
解决方法的选择: 在选择解决方案时,需要考虑具体的应用场景和需求。根据并发程度、性能要求、资源限制等因素,可以选择合适的解决方案。同时,还需注意解决方案的正确性、可靠性和可维护性。
避免常见问题: 在实现生产者消费者模型时,需要注意避免常见的问题,如死锁(deadlock)、资源饥饿(resource starvation)和活锁(live lock)。这些问题可能导致程序无法正常执行或性能下降。
生产者消费者模型的效率提升到底提升在哪里呢?
我们学完生产者消费者模型后,发现生产者和消费者也是需要竞争锁资源的。简单来说,即使有多个生产者和多个消费者(多线程),最终不也是串行访问临界资源吗? 当然,因为条件变量的使用,可以在一定程度上提高效率。但是这并不明显,因为本质上还是在串行执行。
我们不应该局限于模型本身上。试想正常情况下数据的来源需要耗费时间吗?数据的处理需要消耗时间吗?答案是要的!生产者消费者模型给我们提供了数据缓冲区,从而实现了生产者与消费者的解耦。此时我们应该分清生产数据和向缓冲区放数据是两个概念。生产者消费者模型本省是放数据和拿数据。当然,在放数据和拿数据时,也并不影响我们放数据前的生产数据和拿数据后的对数据的处理。这样也就从一定意义上实现了生产数据和对数据处理的并发!从而大大提高了效率。