文章目录
- 8.4 生产者消费者模型
- 8.4.1 为何要使用生产者消费者模型
- 8.4.2 生产者消费者模型优点
- 8.5 基于BlockingQueue的生产者消费者模型
- 8.5.1 C++ queue模拟阻塞队列的生产消费模型
- 8.6. 为什么pthread_cond_wait 需要互斥量?
- 8.7 条件变量使用规范
- 8.8 条件变量的封装
- 8.9 POSIX信号量
- 8.9.1 基于环形队列的生产消费模型
- 实现一个信号量的封装
- 实现一个线程安全的环形队列
8.4 生产者消费者模型
8.4.1 为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
8.4.2 生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
8.5 基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
8.5.1 C++ queue模拟阻塞队列的生产消费模型
单生产者,单消费者
代码:
#ifndef __BLOCK_QUEUE_HPP__ // 头文件防重包含宏
#define __BLOCK_QUEUE_HPP__#include <iostream>
#include <queue>
#include <pthread.h> // POSIX线程库头文件// 阻塞队列类模板,实现单生产者-单消费者模型
template <typename T>
class BlockQueue
{
private:// 检查队列是否已满bool IsFull(){return _block_queue.size() == _cap;}// 检查队列是否为空bool IsEmpty(){return _block_queue.empty();}public:// 构造函数:初始化队列容量和同步原语BlockQueue(int cap) : _cap(cap){// 单生产者-单消费者模型不需要记录等待数量pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁pthread_cond_init(&_product_cond, nullptr); // 初始化生产者条件变量pthread_cond_init(&_consum_cond, nullptr); // 初始化消费者条件变量}// 生产者接口 - 仅供单个生产者线程调用// 参数in: 要入队的元素void Push(const T& in){pthread_mutex_lock(&_mutex); // 获取互斥锁,进入临界区// 如果队列满,生产者需要等待// 单生产者场景使用if而不是while,因为不会有虚假唤醒if(IsFull()){// pthread_cond_wait会:// 1. 释放互斥锁// 2. 阻塞等待条件变量// 3. 被唤醒后重新获取互斥锁pthread_cond_wait(&_product_cond, &_mutex);}// 将数据放入队列_block_queue.push(in);// 唤醒可能在等待的消费者// 单消费者场景下最多只有一个线程在等待pthread_cond_signal(&_consum_cond);pthread_mutex_unlock(&_mutex); // 释放互斥锁,离开临界区}// 消费者接口 - 仅供单个消费者线程调用// 参数out: 用于存储出队元素的指针void Pop(T* out){pthread_mutex_lock(&_mutex); // 获取互斥锁,进入临界区// 如果队列空,消费者需要等待// 单消费者场景使用if而不是while,因为不会有虚假唤醒if(IsEmpty()){pthread_cond_wait(&_consum_cond, &_mutex);}// 从队列取出数据*out = _block_queue.front(); // 获取队首元素_block_queue.pop(); // 移除队首元素// 唤醒可能在等待的生产者// 单生产者场景下最多只有一个线程在等待pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex); // 释放互斥锁,离开临界区}// 析构函数:清理同步原语~BlockQueue(){pthread_mutex_destroy(&_mutex); // 销毁互斥锁pthread_cond_destroy(&_product_cond); // 销毁生产者条件变量pthread_cond_destroy(&_consum_cond); // 销毁消费者条件变量}private:std::queue<T> _block_queue; // 底层队列容器int _cap; // 队列最大容量pthread_mutex_t _mutex; // 互斥锁,保护共享资源pthread_cond_t _product_cond; // 生产者条件变量,用于生产者等待pthread_cond_t _consum_cond; // 消费者条件变量,用于消费者等待
};#endif
多生产者,多消费者
代码:
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>// 阻塞队列类模板,用于实现生产者-消费者模型
template <typename T>
class BlockQueue
{
private:// 检查队列是否已满bool IsFull(){return _block_queue.size() == _cap;}// 检查队列是否为空bool IsEmpty(){return _block_queue.empty();}public:// 构造函数:初始化队列容量和同步原语BlockQueue(int cap) : _cap(cap){_productor_wait_num = 0; // 等待的生产者数量_consumer_wait_num = 0; // 等待的消费者数量pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁pthread_cond_init(&_product_cond, nullptr); // 初始化生产者条件变量pthread_cond_init(&_consum_cond, nullptr); // 初始化消费者条件变量}// 入队方法:生产者接口void Enqueue(T &in){pthread_mutex_lock(&_mutex); // 获取互斥锁,进入临界区// 当队列满时,生产者需要等待while(IsFull()) // 使用while而不是if,防止虚假唤醒{// 生产者等待流程:// 1. 增加等待计数// 2. 释放互斥锁并等待条件变量// 3. 被唤醒后减少等待计数_productor_wait_num++;pthread_cond_wait(&_product_cond, &_mutex);_productor_wait_num--;}// 将数据放入队列_block_queue.push(in);// 如果有消费者在等待,唤醒其中一个if(_consumer_wait_num > 0)pthread_cond_signal(&_consum_cond); // 也可以用broadcast唤醒所有pthread_mutex_unlock(&_mutex); // 释放互斥锁}// 出队方法:消费者接口void Pop(T *out){pthread_mutex_lock(&_mutex); // 获取互斥锁,进入临界区// 当队列空时,消费者需要等待while(IsEmpty()) // 使用while防止虚假唤醒{// 消费者等待流程:// 1. 增加等待计数// 2. 释放互斥锁并等待条件变量// 3. 被唤醒后减少等待计数_consumer_wait_num++;pthread_cond_wait(&_consum_cond, &_mutex);_consumer_wait_num--;}// 从队列取出数据*out = _block_queue.front();_block_queue.pop();// 如果有生产者在等待,唤醒其中一个if(_productor_wait_num > 0)pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex); // 释放互斥锁}// 析构函数:清理同步原语~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}private:std::queue<T> _block_queue; // 底层队列容器int _cap; // 队列容量pthread_mutex_t _mutex; // 互斥锁,保护共享资源pthread_cond_t _product_cond; // 生产者条件变量pthread_cond_t _consum_cond; // 消费者条件变量int _productor_wait_num; // 等待的生产者数量int _consumer_wait_num; // 等待的消费者数量
};#endif
这里采用模版,是想告诉我们,队列中不仅仅可以放置内置类型,比如int, 对象也可以作为任务来参与生产消费的过程。
8.6. 为什么pthread_cond_wait 需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了?如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {pthread_mutex_unlock(&mutex);//解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过pthread_cond_wait(&cond);pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作。调用解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。
- int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复成原样。
8.7 条件变量使用规范
等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
8.8 条件变量的封装
#pragma once // 防止头文件重复包含,比#ifndef更现代的方式#include <iostream>
#include <string>
#include <pthread.h>
#include "Lock.hpp" // 包含互斥锁的封装类// 条件变量模块命名空间
namespace CondModule
{// 使用互斥锁模块的命名空间using namespace LockModule;// 条件变量封装类// 目的:将pthread_cond_t的C接口封装为C++类,实现RAII机制class Cond{public:// 构造函数:初始化条件变量Cond(){// pthread_cond_init返回0表示成功,非0表示失败int n = pthread_cond_init(&_cond, nullptr); // 默认属性初始化(void)n; // 暂时忽略返回值,实际使用时应该添加错误处理和日志}// 等待条件变量// 参数:互斥锁的引用,必须在调用Wait前已经获得锁void Wait(Mutex &mutex){// pthread_cond_wait会自动:// 1. 释放互斥锁// 2. 等待条件// 3. 被唤醒后重新获取锁int n = pthread_cond_wait(&_cond, mutex.GetMutexOriginal());(void)n; // 暂时忽略返回值,实际使用时应该添加错误处理}// 唤醒一个等待的线程void Notify(){// 如果有多个线程在等待,则随机唤醒其中一个int n = pthread_cond_signal(&_cond);(void)n; // 暂时忽略返回值}// 唤醒所有等待的线程void NotifyAll(){// 唤醒所有等待该条件变量的线程// 被唤醒的线程仍需要重新获得互斥锁才能继续执行int n = pthread_cond_broadcast(&_cond);(void)n; // 暂时忽略返回值}// 析构函数:销毁条件变量~Cond(){// 销毁条件变量,释放相关资源int n = pthread_cond_destroy(&_cond);(void)n; // 暂时忽略返回值,实际使用时应该添加错误处理和日志}private:pthread_cond_t _cond; // 底层条件变量// 禁止复制和赋值// C++11前的方式:声明为private但不实现Cond(const Cond&);Cond& operator=(const Cond&);};
}
为了让条件变量更具有通用性,建议封装的时候,不要在Cond类内部引用对应的封装互斥量,要不然后面组合的时候,会因为代码耦合的问题难以初始化,因为一般而言Mutex和Cond基本是一起创建的。
8.9 POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。
信号量的本质是一把计数器,那么这把计数器的本质是什么?
是用来描述资源数目的,把资源是否就绪放在了临界区之外。申请信号量时,其实就间接的已经在做判断了。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):
8.9.1 基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
实现一个信号量的封装
#pragma once // 防止头文件重复包含#include <iostream>
#include <semaphore.h> // POSIX信号量头文件// 信号量封装类
// 将POSIX信号量封装为C++类,实现RAII机制
// P-V操作源自荷兰语Proberen(尝试)/Verhogen(增加)
class Sem
{
public:// 构造函数:初始化信号量// 参数n: 信号量初始值,表示可用资源数量Sem(int n){// sem_init参数说明:// &_sem: 信号量对象// 0: 信号量类型,0表示线程间共享,非0表示进程间共享// n: 信号量初始值sem_init(&_sem, 0, n);}// P操作(等待操作)// 使信号量值减1,如果信号量值为0则阻塞// 对应生产者-消费者模型中的取走资源操作void P(){// sem_wait会导致:// 1. 信号量值大于0:将其减1并继续执行// 2. 信号量值等于0:阻塞等待,直到信号量大于0sem_wait(&_sem);}// V操作(释放操作)// 使信号量值加1,如果有线程阻塞则唤醒一个// 对应生产者-消费者模型中的放入资源操作void V(){// sem_post会:// 1. 将信号量值加1// 2. 如果有线程因为sem_wait阻塞,则唤醒其中一个sem_post(&_sem);}// 析构函数:销毁信号量~Sem(){// 释放信号量相关的资源sem_destroy(&_sem);}private:sem_t _sem; // POSIX信号量对象// 禁止拷贝构造和赋值操作Sem(const Sem&) = delete;Sem& operator=(const Sem&) = delete;
};
核心功能:
实现了一个计数器,用于控制对共享资源的访问
比如:有3个座位的餐厅,这个信号量初始值就设为3
主要操作:
P()
操作(等待):
- 想进入餐厅时调用
- 如果还有座位(信号量>0),就能直接进入,计数器-1
- 如果没座位(信号量=0),就要在门口等待
V()
操作(释放):
- 离开餐厅时调用
- 释放一个座位,计数器+1
- 如果有人在等待,会唤醒一个等待的人
实现一个线程安全的环形队列
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>// 环形队列实现 - 支持多生产者多消费者
// 设计思路 "321":
// 3: 需要处理三种同步关系
// a) 生产者和消费者之间的互斥与同步
// b) 多个生产者之间的互斥
// c) 多个消费者之间的互斥
// 2: 使用两把互斥锁
// - 一把用于生产者间互斥
// - 一把用于消费者间互斥
// 1: 一个循环队列作为数据缓冲区template<typename T>
class RingQueue
{
private:// 辅助函数:加锁void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}// 辅助函数:解锁void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:// 构造函数:初始化环形队列和同步原语// 参数cap: 环形队列容量RingQueue(int cap): _ring_queue(cap), // 初始化vector大小为cap_cap(cap), // 保存容量_room_sem(cap), // 空闲位置信号量,初值为容量_data_sem(0), // 数据项信号量,初值为0_productor_step(0), // 生产位置,从0开始_consumer_step(0) // 消费位置,从0开始{// 初始化生产者和消费者互斥锁pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}// 入队方法 - 生产者调用// 参数in: 要入队的数据void Enqueue(const T &in){_room_sem.P(); // 等待空闲位置Lock(_productor_mutex); // 生产者间互斥// 此时一定有空间可用(由信号量保证)_ring_queue[_productor_step++] = in; // 放入数据_productor_step %= _cap; // 循环更新位置Unlock(_productor_mutex); // 解除生产者间互斥_data_sem.V(); // 通知有新数据可用}// 出队方法 - 消费者调用// 参数out: 存储出队数据的指针void Pop(T *out){_data_sem.P(); // 等待有数据可用Lock(_consumer_mutex); // 消费者间互斥*out = _ring_queue[_consumer_step++]; // 取出数据_consumer_step %= _cap; // 循环更新位置Unlock(_consumer_mutex); // 解除消费者间互斥_room_sem.V(); // 通知有新空位可用}// 析构函数:清理同步原语~RingQueue(){pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}private:// 1. 环形队列存储结构std::vector<T> _ring_queue; // 存储数据的环形缓冲区int _cap; // 队列容量// 2. 生产和消费位置int _productor_step; // 生产者放入位置int _consumer_step; // 消费者取出位置// 3. 信号量 - 用于生产者消费者同步Sem _room_sem; // 空闲位置信号量,控制生产者Sem _data_sem; // 数据项信号量,控制消费者// 4. 互斥锁 - 用于多生产者/消费者间互斥pthread_mutex_t _productor_mutex; // 生产者间互斥pthread_mutex_t _consumer_mutex; // 消费者间互斥
};
用一个餐厅自助取餐区的例子来解释:
基本结构:
想象一个传送带上有固定数量(cap)的餐盘位置
_ring_queue
就是这个传送带
_productor_step
是厨师放餐盘的位置
_consumer_step
是顾客取餐盘的位置核心功能:
Enqueue()
: 厨师(生产者)放餐
- 检查是否有空位置(
_room_sem
)- 确保只有一个厨师在放餐(
_productor_mutex
)- 放入餐品
- 通知有新餐品可取(
_data_sem
)Pop()
: 顾客(消费者)取餐
- 检查是否有餐品可取(
_data_sem
)- 确保只有一个顾客在取餐(
_consumer_mutex
)- 取走餐品
- 通知有新空位可用(
_room_sem
)
同步机制:
使用两个信号量控制生产和消费
使用两个互斥锁确保多个生产者/消费者之间不冲突
环形特性:
- 位置到达末尾后回到开头继续使用(像传送带一样循环)