目录
一、POSIX信号量:
接口:
二、基于环形队列的生产消费者模型
环形队列:
单生产单消费实现代码:
RingQueue.hpp:
main.cc:
多生产多消费实现代码:
RingQueue.hpp:
main.cc:
一、POSIX信号量:
在实现线程的同步,互斥不仅仅只有条件变量和锁,还有POSIX信号量,这里学习的POSIX信号量和之前学习的SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX可以用于线程间同步
引入信号量:
对于共享资源,为了保证其并发性,将其分成了几份资源,就允许几个线程进入共享资源访问,此时就引入了信号量来对其进行保护
信号量的本质是一个计数器
这把计数器用来描述临界资源中资源数目的多少,实际上是对资源的预加载机制(这就像在电影院买票,买票的本质就是对电影院座位的预加载机制,当你买到票了,就一定会有位置给你,并且别人即使有票也是坐别人的座位,也不会抢了你的座位)
虽然信号量的本质是一个计数器,当一个线程申请资源成功就将计数器--,当一个线程申请资源失败就将计数器++,但是不能用一个简简单单的普通变量代替信号量,因为变量的--和++操作不是原子的,所以,我们就要使用一个支持PV操作的原子的计数器------信号量
那么什么是PV操作呢?
P:代表申请资源,计数器--
V:代表释放资源,计数器++
当将共享资源分为N份,此时信号量也就是N,这个时候就能够申请资源,再将信号量--,当信号量为0的时候,线程就不能够申请资源了,只能阻塞等待
如上,这是一个多元信号量sem,我们之前学习的锁被叫做二元信号量
在使用多元信号量访问资源的时候,要先申请信号量,只有申请成功了,才能访问资源否则就需要进入阻塞队列等待
接口:
初始化信号量
参数1:需要初始化信号量的地址
参数2:表示的是线程共享还是进程共享,默认为零,线程共享,非零表示进程共享
参数3:设定的信号量的初始值
返回值:初始化成功返回0,失败返回-1,并设置错误码
其中sem_t实际上是一个联合体
销毁信号量
参数:就是需要销毁信号量的地址
返回值:初始化成功返回0,失败返回-1,并设置错误码
申请信号量:
其中下面用的是sem_wait,其功能就是成功将信号量-1,也就是P操作
参数:就是需要销毁信号量的地址
返回值:初始化成功返回0,失败返回-1,并设置错误码
sem_trywait:尝试申请,如果没有申请到资源,就会放弃申请
sem_timedwait:每隔一段时间进行申请
释放信号量(发布信号量)
参数:就是需要销毁信号量的地址
返回值:初始化成功返回0,失败返回-1,并设置错误码
其表示资源使用完毕,归还资源,成功将信号量+1,也就是V操作
二、基于环形队列的生产消费者模型
环形队列:
在实现生产消费者的模型中,不仅仅只有共享队列,还有环形队列,什么是环形队列呢?
虽然它叫环形队列,但是它不是队列,而是用数组实现的,
其中head作为头指针,当申请资源成功的时候就向后移动一位,
tail作为尾指针,当释放资源成功的时候向后移动一位,
首先,如何让数组成环呢?
在每次head++后都进行一次取模操作,这样保证head的大小不会超过这个环形队列的大小
特殊的是,当为空或者为满的时候,头指针和尾指针都指向同一个位置,那么如何证明此时是空还是满呢?
这里有两种方法:
方法一:添加一个计数器,当计数器的值为0的时候,表示当前为空,当计数器的值为容器大小的时候,表示该环形队列为满
判空条件:count == 0
判满条件:count == size方法二:牺牲一个空间的大小,通过预留一个空位,避免head和tail重合时无法区分空和满。此时队列最大容量为size-1
判空条件:head== tail
判满条件:(head+ 1) % size == tail在下面实现的时候采用计数器,毕竟信号量是一个天然的计数器
当数据不为空或者满的时候,此时head指针和tail指针必定不指向同一个位置,
此时就能够进行生产者和消费者的同时访问,
为空的时候,只能生产者访问,生产者只关注还剩多少空间
为满的时候,只能消费者访问,消费者只关注还剩多少数据
所以在使用信号量标识资源的情况下,生产者和消费者关注的资源不一样,所以就需要两个信号量来进行计数:
生产者的信号量:表示当前有多少可用空间
消费者的信号量:表示当前有多少可消费数据
所以以下在实现的时候,定义两个信号量,spacesem = N 和datasem = 0
对于生产者的PV操作:P(spacesem)将空间资源-1,V(datasem)将数据资源+1
对于消费者的PV操作:P(datasem)将数据资源-1,V(spacesem)将空间资源+1
单生产单消费实现代码:
RingQueue.hpp:
首先创建一个实现环形队列的文件:
#pragma once
#include <vector>
#include <iostream>
#include <semaphore.h>template <class T>
class RingQueue
{
private:std::vector<T> _ringqueue; // 用vector模拟环形队列int _maxcap; // 环形队列的最大容量int _p_step; // 生产者下标int _c_step; // 消费者下标sem_t _pspace_sem; // 生产者关注的空间资源sem_t _cdata_sem; // 消费者关注的数据资源
};
接着依次实现其中的接口:
构造与析构
RingQueue(int maxcap = 5): _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0){sem_init(&_pspace_sem,0,maxcap);sem_init(&_cdata_sem,0,0);pthread_mutex_init(&_p_mutex,nullptr);pthread_mutex_init(&_c_mutex,nullptr);}~RingQueue(){sem_destroy(&_pspace_sem);sem_destroy(&_cdata_sem);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}
其中,构造函数的主要作用就是初始化各种变量,析构函数的主要作用就是释放这些变量
push与pop
push的作用是从交易场所中放入数据,pop的作用是从交易场所中拿到数据
void push(const T& in){//生产数据先要申请信号量来预定资源P(_pspace_sem);_ringqueue[_p_step] = in;//将所对应的数据放入到环形队列中_p_step++;//将生产者对应的下标向后移动一位_p_step %= _maxcap;//保证生产者不会超过环形队列的大小V(_cdata_sem);}void pop(T *out){P(_cdata_sem);pthread_mutex_lock(&_c_mutex);*out = _ringqueue[_c_step];//将该位置的数据交给out作为输出型参数带出去_c_step++;//将消费者对应的下标向后移动一位_c_step %= _maxcap;//保证消费者下标不会超过环形队列的大小pthread_mutex_unlock(&_c_mutex);V(_pspace_sem);}
生产者push后,证明环形队列中一定有数据,所以就需要在V后传入消费者关心的信号量,也就是需要传递_cdata_sem
消费者pop后,证明环形队列中一定有空间,所以就需要在V后传入生产者关心的信号量,也就是需要传递_pspace_sem
PV操作:
void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}
P操作代表申请资源,也就是semwait这个函数
V操作就是释放了资源,比如生产者就是释放了一个数据
这里要保证数据在为空的时候只能生产者运行,在数据为满的时候只能消费者去运行,
所以wait是为了保持顺序同步,保证即使消费者先调用,但是没有数据,就将消费者申请资源所关注的数据信号量送去等待队列里去等待
在封装V操作中,post就是释放资源,对于生产者就是给了个数据给消费者
对于消费者post就是释放了空间,生产者就能接着生产了
那消费者一开始调用P操作,没有数据就会阻塞,而生产者这边V了数据,消费者这边P就不会阻塞了可以拿到数据了
所以生产和消费这两者的PV操作是反的
生产者V了,消费者的p就停止阻塞了因为生产者给了消费者资源了
反之同理
main.cc:
void *Productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){int data = rand()%10+1;rq->push(data);std::cout<<"Productor : data = "<< data << std::endl;sleep(1);}return nullptr;
}void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){int data = 0;rq->pop(&data);std::cout<<"Consumer : data = "<< data << std::endl;sleep(1);}return nullptr;
}int main()
{srand(time(nullptr)^getpid());RingQueue<Task> *rq = new RingQueue<Task>();pthread_t c, p;pthread_create(&p, nullptr, Productor, rq);pthread_create(&c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete rq;return 0;
}
或者也可以让消费者疯狂消费数据,生产者疯狂生产
多生产多消费实现代码:
RingQueue.hpp:
在多生产多消费中,需要保证生产者和生产者之间、消费者和消费者之间的互斥关系,生产者和消费者之间的互斥关系已经由信号量承担了
所以在多生产多消费的代码中要加上锁
构造与析构中也要增加初始化锁与释放锁
template <class T>
class RingQueue
{
public:RingQueue(int maxcap = 5): _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0){sem_init(&_pspace_sem,0,maxcap);sem_init(&_cdata_sem,0,0);pthread_mutex_init(&_p_mutex,nullptr);pthread_mutex_init(&_c_mutex,nullptr);}~RingQueue(){sem_destroy(&_pspace_sem);sem_destroy(&_cdata_sem);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}private:std::vector<T> _ringqueue; // 用vector模拟环形队列int _maxcap; // 环形队列的最大容量int _p_step; // 生产者下标int _c_step; // 消费者下标sem_t _pspace_sem; // 生产者关注的空间资源sem_t _cdata_sem; // 消费者关注的数据资源pthread_mutex_t _p_mutex;//保证生产者和生产者之间的互斥pthread_mutex_t _c_mutex;//保证消费者和消费者之间的互斥
};
push与pop
void push(const T& in){//生产数据先要申请信号量来预定资源P(_pspace_sem);//信号量的申请本来就是原子的,所以加锁的时候就需要在这之后pthread_mutex_lock(&_p_mutex);_ringqueue[_p_step] = in;//将所对应的数据放入到环形队列中_p_step++;//将生产者对应的下标向后移动一位_p_step %= _maxcap;//保证生产者下标不会超过环形队列的大小pthread_mutex_unlock(&_p_mutex);V(_cdata_sem);}void pop(T *out){P(_cdata_sem);pthread_mutex_lock(&_c_mutex);*out = _ringqueue[_c_step];//将该位置的数据交给out作为输出型参数带出去_c_step++;//将消费者对应的下标向后移动一位_c_step %= _maxcap;//保证消费者下标不会超过环形队列的大小pthread_mutex_unlock(&_c_mutex);V(_pspace_sem);}
细节:
在加锁的时候要在申请信号量之后,这样能够提高并发度
如果是在申请信号量之前进行加锁,那么申请信号量的线程永远只有一个 不能够提高并发度
理解:
就像在电影院中,是先买票在进行排队的,这样能够加快进场的速度,如果排队后再买票,需要一人一人地进行操作,这相比上一种就会很慢的
申请信号量的操作是原子的,不需要加锁保护也能保证线程安全,所以并发申请信号量,串行访问临界资源能够提高并发度
main.cc:
在进行生产消费者模型中的数据问题,不仅仅是让二者看到同一份资源,更重要的是让消费者拿到资源并对资源进行处理,这里引入上一章的Task文件来进行数据处理
Task.hpp
#include <iostream>
#include <string>std::string opers = "+-*/%";enum
{Divzero = 1,Modzero,Unknow
};class Task
{
public:Task(){}Task(int data1, int data2, char oper): _data1(data1), _data2(data2), _oper(oper),_exitcode(0){}void run(){switch (_oper){case '+':_result = _data1 + _data2;break;case '-':_result = _data1 - _data2;break;case '*':_result = _data1 * _data2;break;case '/':if (_data2 == 0)_exitcode = Divzero;else_result = _data1 / _data2;break;case '%':if (_data2 == 0)_exitcode = Modzero;else_result = _data1 % _data2;break;default:_exitcode = Unknow;break;}}void operator()(){run();}std::string Getresult(){std::string ret = std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=";ret += std::to_string(_result);ret += "[exitcode=";ret += std::to_string(_exitcode);ret += "]";return ret;}std::string GetTask(){std::string ret = std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=?";return ret;}~Task(){}private:int _data1;int _data2;char _oper;int _exitcode;int _result;
};
接着在生产消费者的线程所执行的对应的方法中,基本和上一章类似
void *Productor(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);RingQueue<Task> *rq = td->rq;std::string name = td->threadname;int len = opers.size();while (true){int data1 = rand() % 10 + 1;usleep(10);int data2 = rand() % 10;char op = opers[rand()%len];Task t(data1,data2,op);rq->push(t);std::cout<<"Productor : Task = "<< t.GetTask() << " who "<< name << std::endl;sleep(1);}return nullptr;
}void *Consumer(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);RingQueue<Task> *rq = td->rq;std::string name = td->threadname;while (true){Task t;rq->pop(&t);//处理数据t();std::cout << "Consumer : Task = " << t.GetTask() << " who: " << name << " result: " << t.Getresult() << std::endl;// sleep(1);}return nullptr;
}
我们也可以创建一个结构体来存储线程名称与任务
struct ThreadData
{RingQueue<Task> *rq;std::string threadname;
};
int main()
{srand(time(nullptr));RingQueue<Task> *rq = new RingQueue<Task>();pthread_t c[5], p[3];for(int i = 0;i<3;i++){ThreadData *td = new ThreadData();td->rq = rq;td->threadname = "Productor-" + std::to_string(i);pthread_create(p+i, nullptr, Productor, td);usleep(10);}sleep(1);for(int i = 0;i<5;i++){ThreadData *td = new ThreadData();td->rq = rq;td->threadname = "Consumer-" + std::to_string(i);pthread_create(c+i, nullptr, Consumer, td);usleep(10);}for(int i = 0;i<3;i++){pthread_join(p[i], nullptr);}for(int i = 0;i<5;i++){pthread_join(c[i], nullptr);}return 0;
}
注意:在环形队列中允许多个生产者线程一起进行生活数据,也允许多个消费者线程一起消费数据,多个线程一起操作并非同时操作,任务开始时间有先后,但都是在进行处理的