目录
信号量的原理
信号量函数
使用信号量实现线程互斥功能
基于环形队列的生产消费模型
生产者和消费者必须遵守的两个规则
信号量的原理
通过之前的学习,我们知道有的资源可能会被多个执行流同时申请访问,我们将这种资源叫做临界资源,临界资源需要被保护,否则可能会出现数据不一致的问题。当一份临界资源只能被当作整体来访问时,也就是同一时刻只允许一个执行流对这个临资源进行访问,这时只需用一个互方锁就可对临界资源进行保护。但对于一些资源我们可以将它们再细分为多个区域, 这时临界资源就可以同时被多年执行流访问,但在访问前要申请信号量。
信号量的概念:本质是一个计数器,用于述描临界资源中资源数目,信号量能够更细粒度的对临界资源进行管理。
每个执行流在对临界资源进行访问前,都应申请信号量即P操作,申请成功后,执行流就拥有对该临界资源的访问权限,当操作完成后,应释放信号量即V操作。
PV操作:
P操作:执行流申请信号量的操作称为P操作,申请信号量的本质就是申请使用临界资源中某一区域资源的权限,当申请成功时,信号量的数目减一。
V操作:执行流释放信号量的操作称为V操作,释放信号量的本质就是归还使用临界资源中是一区域资源的权限, 当释放完成后,信号量的数目加一。
注意:PV操作为原子性。
当执行流申请信号量失败时被挂起等待
当执行流在申请信号量时,可能会出现信号量的值为0的情况,也就是说临界资源中的所有区域都在被其它执行流在使用中,这时该执行流会申请信号量失败,而被放到申请信号量的队列中进行申请等待,直到有其它执行流释放信号量。
信号量函数
初始化信号量:sem_init()。
函数原型:
int sem_init(sem_t *sem,int pshared,unsigned int value);
参数说明:
sem:初始化的信号量。
pshared:传入0表示用于线程间通信,传入非0表示用于进程间通信。
value:信号量的初始值(计数器的不始值)。
返回值:
初始做功返回0、失败返回1。
销毁信号量:sem_destroy()。
函数原型:
int sem_destroy(sem_t *sem);
参数说明:
sem:需要全销毁的信号量。
返回值:销毁信号量成功返回0,失败返回-1。
申请信号量:sem_wait()。
函数原型:
int sem_wait(sem_t*sem);
参数说明:
sem申请的信号量。
返回值说明:
申请成功返回0,信号量值减一,申请失败返回-1,信号量值保持不变。
释放信号量:sem_post( )。
函数原型:
int sem_post(sem_t*sem);
参数说明:
sem需要释放的信号量。
返回值说明:
释放成功返回0,信号值加一,释放失败返回一,信号量值保持不变。
使用信号量实现线程互斥功能
信号量本质是一个计数器,如果将信号量的初始值设置为1,那么该信号量就叫二元信号量。
当信号量的值为1,说明该信号量表示的临界资源只有一份,只能被整体访问,那么描述该资源的信号量就相当于互斥锁,使用二元信号量实现多线程互斥。
未引入二元信号量时,多线程对临界资源的访问操作会导致数据不一致。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>using namespace std;int tickets = 3000;void *RobTickest(void *arg)
{string name = (char *)arg;while (true){if (tickets > 0){usleep(1000);cout << name << "抢到了一张票,剩余票数:" << --tickets << endl;}else{break;}}cout << name << "退出...!" << endl;pthread_exit((void *)0);
}int main()
{pthread_t tid1, tid2, tid3, tid4;pthread_create(&tid1, nullptr, RobTickest, (void *)"线程 1");pthread_create(&tid2, nullptr, RobTickest, (void *)"线程 2");pthread_create(&tid3, nullptr, RobTickest, (void *)"线程 3");pthread_create(&tid4, nullptr, RobTickest, (void *)"线程 4");pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);return 0;
}
在引入二元信号量后,多线程对临界资源的访问操作具有互斥性。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>using namespace std;int tickets = 3000;class Sem
{
public:Sem(int nums){sem_init(&_sem, 0, nums);}~Sem(){sem_destroy(&_sem);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}private:sem_t _sem;
};
Sem sem(1);
void *RobTickest(void *arg)
{string name = (char *)arg;while (true){sem.P();if (tickets > 0){usleep(1000);cout << name << "抢到了一张票,剩余票数:" << --tickets << endl;sem.V();}else{sem.V();break;}}cout << name << "退出...!" << endl;pthread_exit((void *)0);
}int main()
{pthread_t tid1, tid2, tid3, tid4;pthread_create(&tid1, nullptr, RobTickest, (void *)"线程 1");pthread_create(&tid2, nullptr, RobTickest, (void *)"线程 2");pthread_create(&tid3, nullptr, RobTickest, (void *)"线程 3");pthread_create(&tid4, nullptr, RobTickest, (void *)"线程 4");pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);return 0;
}
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
using namespace std;
#define NUM 8template <class T>
class RingQueue
{
private:// p操作void P(sem_t &p){sem_wait(&p);}//V操作void V(sem_t &v){sem_post(&v);}public:RingQueue(int cap = NUM) : _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap);sem_init(&_blank_sem, 0, _cap);//构造时空间信号量为环形队列的容量sem_init(&_data_sem, 0, 0);//构造时数据信号量为0}~RingQueue(){sem_destroy(&_blank_sem);sem_destroy(&_data_sem);}//生产者生产数据void Push(const T&data){P(_blank_sem);//申请空间资源_q[_p_pos]=data;V(_data_sem);//释放数据信号_p_pos++;_p_pos=_p_pos%_cap;}void Pop(T&data){P(_data_sem);//申请数据资源data=_q[_c_pos];V(_blank_sem);//释放空间资源_c_pos++;_c_pos=_c_pos%_cap;}private:vector<T> _q; // 使用vector实现环形队列int _cap; // 环形队列的容量int _p_pos; // 生产的位置int _c_pos; // 消费的位置sem_t _blank_sem; // 空间信号量sem_t _data_sem; // 数据信号量
};
#include "RingQueue.hpp"void *Producer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Push(data);cout << "生产数据:" << data << endl;}
}
void *Consumer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Pop(data);cout << "消费数据:" << data << endl;}
}int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;RingQueue<int> *rq = new RingQueue<int>;pthread_create(&producer, nullptr, Producer, rq);pthread_create(&consumer, nullptr, Consumer, rq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete rq;return 0;
}
生产快,消费慢。
#include "RingQueue.hpp"void *Producer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){int data = rand() % 100 + 3;rq->Push(data);cout << "生产数据:" << data << endl;}
}
void *Consumer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);//生产1秒后,才消费int data = rand() % 100 + 3;rq->Pop(data);cout << "消费数据:" << data << endl;}
}
消费快,生产慢。
void *Producer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Push(data);cout << "生产数据:" << data << endl;}
}
void *Consumer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){int data = rand() % 100 + 3;rq->Pop(data);cout << "消费数据:" << data << endl;}
}
基于环形队列的生产消费模型
在使用环形队列实现生产消费模型的实现设计中,我们应明白生产者和消费者关心的资源是什么?
对于生产者和消费者它们关注的资源是不同的,生产者关注的是环形队列中是否有空间可以存放生产者生产的数据,只要有空间生产者可以执行生产任务。
消费者关注的是环形队列中是否有数据可以行消费,只要有数据消费者就可以获取数据。
对空间信号量的初始化和对数据信号量的初始化的设置
使用信号量在环形队列中描述空间资源和数据资源时,它们的信号量的初始值是不同的。
我们用blank_sem表示空间信号量,用data_sem表示数据信号量。
blank_sem空间信号量的初始值应为环形队列的容量,因为在生产初期,环形队列中没有数据,dota_sem数据信号量的初始值应为0,因为生产者没有执行生产,环形队列中没有数据。
生产者和消费者对信号量的申请与释放。
1.生产者申请空间资源,释放数据资源。
对生产者来说,在生产前都需要生申请空间资源blank_sem信号量。
如果生产者申请信号量成功,则可以执行生产操,如果请失败,则需要在blank_sem的等待队列下进行阻塞等待,直到环形队列中有可以存放生产数据的空间,才会被唤醒,执行生产操作,当生产者执行完生产操作后,应释放数据信号量data_sem。虽然生产者在执行生产操作前申请的是空间资源,但生产完数据后,应对数据资源data_sem信号量进行释放操作,而不是释放空间资源,因为此时空间资源的数目还是环形队列容量-1,即生产者生产完数据后,空间资源blank_sem信号量少了一个,而数据资源data_sem信号量多了一个,因此对data_sem资源进行释放才是合理。
2.消费者申请 数据资源,释放空间资。
对消费者来说,在消费前都要申请数据资源data_sem信号量信号量。
如果消费者申请信号量成功,则同以执行消费操作,如果申请失则,则需要在data_sem的等待队列下进行阻塞等待,直前环形队列中有可以进行消费的数据资源才会被唤醒,执行消费操作,当消费者执行完消费操作后,应释放空间信号量blank_sem;
虽然消费者在执行消费操作前申请的是数据资源,但消费完数据后,应对空间资源blank_sem信号量进行释放操作,而不是释放数据信号量。此时数据资源的数目为0,即消费者消费完数据后,数据资源就少了一个,而空间资源blank_sem信号量多了一个,因此空间资源信号量由消费者进行释放才是合理。
生产者和消费者必须遵守的两个规则
规则1:消费者和生产者不能同时访问同一个位置。
生产者和消费者在访问环形队列时,如果访的是同一个位置, 这是不允许的,可能会出现数据不一致性。
规则2:无论是生产者还是消费者释放操作不能使对方的信号量超过对方的的初始值,即不能套圈生产或套圈消费。
生产者从消费者的位置,一直按顺时针方向进行生产,如果生产的速度大于消费的速度,那么用于存放生产数据的位置,很快就会和消费数据的位置重叠了。此时生产者就不该执行生产操作了,因为再生产就会覆盖还未被消费的数据,造成数据丢失。
同理,消费者从生产者的位置,一直按顺时针方向进生消费,如果消费的速度大于生产的速度,那么消费的位置很快就会和用于存放生产数据的位置重叠。此时消费者应停止消费操作,因为再消费就会就会造二次消费 。
设计思想:
默认将环形队列的容量上限设置为6。
代码中使用STL库中的vector实现环形队列,生产者每次生产的数据放到vector下标为_p_pos的位置,消费者每次消费的数据来源于vector下标为_c_pos的位置。
生产者每次生产数据后_p_pos都会进行++,标记下一次生产数据的存放位置,++后的下标通过进行取模运算实现“环形”的效果。
消费者每次消费数据后_c_pos都会进行++,标记下一次消费数据的来源位置,++后的下标通过进行取模运算实现“环形”的效果。
_p_pos由生产者线程进行更新,_c_pos由消费者线程进行更新,对这两个变量访问时不需要进行保护,因此代码中将_p_pos和_c_pos的更新放到了V操作之后,就是为了尽量减少临界区的代码。
为了方便理解,我们这里实现单生产者、单消费者的生产者消费者模型。于是在主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费。
基于环形队列的单生产者、单消费者的生产者消费者模型
实现代码:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
using namespace std;
#define NUM 6template <class T>
class RingQueue
{
private:// p操作void P(sem_t &p){sem_wait(&p);}//V操作void V(sem_t &v){sem_post(&v);}public:RingQueue(int cap = NUM) : _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap);sem_init(&_blank_sem, 0, _cap);//构造时空间信号量为环形队列的容量sem_init(&_data_sem, 0, 0);//构造时数据信号量为0}~RingQueue(){sem_destroy(&_blank_sem);sem_destroy(&_data_sem);}//生产者生产数据void Push(const T&data){P(_blank_sem);//申请空间资源_q[_p_pos]=data;V(_data_sem);//释放数据信号_p_pos++;_p_pos=_p_pos%_cap;}void Pop(T&data){P(_data_sem);//申请数据资源data=_q[_c_pos];V(_blank_sem);//释放空间资源_c_pos++;_c_pos=_c_pos%_cap;}private:vector<T> _q; // 使用vector实现环形队列int _cap; // 环形队列的容量int _p_pos; // 生产的位置int _c_pos; // 消费的位置sem_t _blank_sem; // 空间信号量sem_t _data_sem; // 数据信号量
};
生产和消费同步
#include "RingQueue.hpp"void *Producer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Push(data);cout << "生产数据:" << data << endl;}
}
void *Consumer(void *arg)
{sleep(1);RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Pop(data);cout << "消费数据:" << data << endl;}
}int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;RingQueue<int> *rq = new RingQueue<int>;pthread_create(&producer, nullptr, Producer, rq);pthread_create(&consumer, nullptr, Consumer, rq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete rq;return 0;
}
生产快,消费慢。
void *Producer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){int data = rand() % 100 + 3;rq->Push(data);cout << "生产数据:" << data << endl;}
}
void *Consumer(void *arg)
{sleep(1);RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Pop(data);cout << "消费数据:" << data << endl;}
}
消费快,生产慢。
void *Producer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){sleep(1);int data = rand() % 100 + 3;rq->Push(data);cout << "生产数据:" << data << endl;}
}
void *Consumer(void *arg)
{RingQueue<int> *rq = (RingQueue<int> *)arg;while (true){int data = rand() % 100 + 3;rq->Pop(data);cout << "消费数据:" << data << endl;}
}
即使消费再快也要等到队列中有数据才能消费。
总结:
在用c++语言实现基于信号量和环形队列的生产消费模型的原理中,只声明空间信号量和数据信号量如何实现同步与互斥?
在用c++语言实现基于信号量和环形队列的生产消费模型中,只声明空间信号量和数据信号量,可以通过以下方式实现同步和互斥:
-
空间信号量:表示可用空间的数量。在生产者线程中,每次放入一个元素时,空间信号量减1;在消费者线程中,每次取出一个元素时,空间信号量加1。当空间信号量为0时,表示环形队列已满,生产者线程需要等待;当空间信号量为环形队列长度时,表示环形队列为空,消费者线程需要等待。
-
数据信号量:表示已有数据的数量。在生产者线程中,每次放入一个元素时,数据信号量加1;在消费者线程中,每次取出一个元素时,数据信号量减1。当数据信号量为0时,表示环形队列为空,消费者线程需要等待;当数据信号量为环形队列长度时,表示环形队列已满,生产者线程需要等待。
-
互斥:为了避免多个线程同时对环形队列进行操作,可以使用互斥锁来实现。在生产者和消费者线程中,通过获取互斥锁来独占对环形队列的访问权。当一个线程拥有互斥锁时,其他线程需要等待。
由于只使用了空间信号量和数据信号量来实现同步和互斥,这种实现方法需要考虑很多特殊情况,比如队列为空/满时的特殊处理、互斥锁的释放等问题。如果使用信号量和互斥锁的完整实现方式,可以更加简单和可靠地实现生产者-消费者模型。