目录
1. 生产者消费者模型
1.1 阻塞队列(BlockingQueue)
1.2 一个实际应用的例子
2. POSIX信号量
2.1 引入
2.2 回顾加深理解信号量
2.3 信号量的操作接口
3. 基于循环队列的生产消费模型
3.1 循环队列
3.2 整个项目
4. 线程池
4.1 概念
4.2 线程池实现
1. 生产者消费者模型
超市(交易场所):
-
定义:超市是数据“交易”的场所,即共享资源或临界资源的存储空间(也可以叫缓冲区)。在多线程编程中,这通常是一个数据结构(如队列、缓冲区等),用于临时存储数据,供生产者和消费者线程进行访问。
一般我们使用阻塞队列作为缓冲区
-
功能:作为生产者和消费者之间数据传递的桥梁。生产者线程在此处添加(生产)数据,消费者线程在此处取走(消费)数据。
生产者(Producer):
-
定义:生产者线程负责生成数据并将其放入超市(共享资源)中。
-
并发度:生产者线程可以并发地运行,以提高数据的生成速度。但需要注意同步和互斥问题,以避免多个生产者同时写入数据导致的冲突。
-
生产者之间都是互斥的:不能多个生产者同时都在往共享资源里面写
消费者(Consumer):
-
定义:消费者线程负责从超市(共享资源)中取出数据并进行处理。
-
并发度:消费者线程也可以并发地运行,以提高数据的处理速度。同样需要注意同步和互斥问题。
-
消费者之间都是互斥的:不能多个消费者同时都在从共享资源里面拿数据
3种关系:
生产者 vs 生产者 — 互斥
多个生产者线程可能同时试图向共享缓冲区(如队列或数组)中写入数据。为了防止数据竞争和不一致,我们需要使用互斥机制来确保同一时间只有一个生产者线程能够访问共享资源。
互斥通常通过互斥锁(Mutex)来实现。当一个生产者线程获得互斥锁时,其他生产者线程将被阻塞,直到锁被释放。这样,每个生产者线程在写入缓冲区时都能独占资源,从而避免了数据竞争。
消费者 vs 消费者 — 互斥
多个消费者线程可能同时试图从共享缓冲区中读取数据。为了确保数据的正确性和一致性,我们同样需要使用互斥机制来防止多个消费者线程同时访问缓冲区。
互斥锁在这里同样起到关键作用。当一个消费者线程获得互斥锁时,其他消费者线程将被阻塞,直到锁被释放。这样,每个消费者线程在读取缓冲区时都能独占资源,避免了潜在的冲突和不一致。
生产者 vs 消费者 — 互斥 && 同步
生产者线程和消费者线程需要共享一个缓冲区。这要求我们使用互斥机制来确保同一时间只有一个线程(生产者或消费者)能够访问缓冲区,以避免数据竞争和不一致。
但是,仅仅互斥是不够的。我们还需要使用同步机制来确保生产者和消费者之间的协调。例如,当缓冲区为空时,消费者线程应该被阻塞,直到生产者线程向其中添加了数据。同样地,当缓冲区满时,生产者线程也应该被阻塞,直到消费者线程从中取走了数据。
同步通常通过条件变量(Condition Variables)来实现。生产者线程在添加数据到缓冲区后,会向条件变量发送信号(signal),以唤醒等待的消费者线程。类似地,消费者线程在取走数据后,也会向条件变量发送信号,以唤醒等待的生产者线程。通过这种方式,生产者和消费者线程能够协调地工作,确保缓冲区的有效使用和数据的一致性。
优点:
解耦:由于引入了一个缓冲区作为中介,生产者和消费者之间并不直接相互调用,从而降低了它们之间的耦合度。这使得生产者和消费者的代码发生变化时,不会对对方产生直接影响,提高了系统的灵活性和可维护性。
支持并发:生产者和消费者是两个独立的并发体,它们之间通过缓冲区进行通信。生产者只需将数据放入缓冲区,就可以继续生产下一个数据;消费者只需从缓冲区中取出数据,就可以继续处理。这种并发处理的方式可以避免因生产者和消费者速度不匹配而导致的阻塞问题
支持忙闲不均:在生产者和消费者模型中,生产者和消费者的速度可以不相同。当生产者生产数据的速度过快,而消费者处理数据的速度较慢时,未处理的数据可以暂时存储在缓冲区中,等待消费者处理。这种机制可以平衡生产者和消费者之间的速度差异,避免资源的浪费和瓶颈的产生。
1.1 阻塞队列(BlockingQueue)
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
BlockQueue.hpp
#include<iostream>
#include<pthread.h>
#include<queue>using namespace std;template<class T>
class BlockQueue
{
public://初始化BlockQueue(int max=20):_max(max){pthread_cond_init(&_cond1,nullptr);pthread_cond_init(&_cond2,nullptr);pthread_mutex_init(&_mutex,nullptr);}//头删,并且返回这个数,生产者使用的接口T pop(){pthread_mutex_lock(&_mutex);//加锁while(_q.size()==0)//如果队列中没有数据,则让线程等待{pthread_cond_wait(&_cond1,&_mutex);}T data=_q.front();_q.pop();pthread_cond_signal(&_cond2);//唤醒正在等待的尾插线程,可以插入了pthread_mutex_unlock(&_mutex);//解锁return data;}//尾插,消费者使用的接口void push(T& data){pthread_mutex_lock(&_mutex);//加锁while(_q.size()==_max)//如果队列中数据满了,则让线程等待{pthread_cond_wait(&_cond2,&_mutex);}_q.push(data);pthread_cond_signal(&_cond1);//唤醒正在等待的头删线程,可以删除了pthread_mutex_unlock(&_mutex);//解锁}//销毁~BlockQueue(){pthread_cond_destroy(&_cond1);pthread_cond_destroy(&_cond2);pthread_mutex_destroy(&_mutex);}private:pthread_cond_t _cond1;//头删条件变量pthread_cond_t _cond2;//尾插条件变量pthread_mutex_t _mutex;//定义锁queue<T> _q;//定义队列int _max;//队列的空间大小
};
1.2 一个实际应用的例子
BlockQueue.hpp:封装的阻塞队列
text.cc:测试代码
Task.hpp:任务类(这里只是进行一个加法)
Task.hpp
#include<string>std::string oper="+-*/%";class Task
{
public://初始化Task(int x,int y,char oper):_x(x),_y(y),_oper(oper),_result(0),_correct(0){}//运行int run(){switch (_oper){case '+':_result=_x+_y;break;case '-':_result=_x-_y;break;case '*':_result=_x*_y;break;case '/':if(_y==0) _correct=1;else _result=_x/_y;break;case '%':if(_y==0) _correct=1;else _result=_x%_y;break;default:_correct=2;break;}}//消费者拿的任务std::string GetResult(){std::string rs=std::to_string(_x);rs+=_oper;rs+=std::to_string(_y);rs+="=";rs+=std::to_string(_result);rs+="[";rs+=std::to_string(_correct);rs+="]";return rs;}//生产者生产的任务std::string GetTask(){std::string r = std::to_string(_x);r += _oper;r += std::to_string(_y);r += "=?";return r;}~Task(){}private:int _x; int _y;int _result; //结果char _oper; //运算符int _correct; //值是否正确
};
text.cc
#include<iostream>
#include"BlockQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>
#include"Task.hpp"//消费者
void* conmuser(void* args)
{BlockQueue<Task>* td=static_cast<BlockQueue<Task>*>(args);while(true){Task t=td->pop();t.run();cout<<"conmuser get a task:"<<t.GetResult()<<endl;sleep(1);}
}//生产者
void* producer(void* args)
{int len=oper.size();srand(time(nullptr));BlockQueue<Task>* td=static_cast<BlockQueue<Task>*>(args);while(true){int data1=rand()%5;usleep(100);int data2=rand()%10;char op=oper[rand()%len];Task t(data1,data2,op);td->push(t);cout<<"producer a task:"<<t.GetTask()<<endl;sleep(1);}
}int main()
{BlockQueue<Task>* q=new BlockQueue<Task>;//创建队列pthread_t c,p;//定义生产者和消费者pthread_create(&c,nullptr,producer,(void*)q);//创建生产者的线程pthread_create(&c,nullptr,conmuser,(void*)q);//创建消费者的线程//回收线程pthread_join(c,nullptr);pthread_join(p,nullptr);//销毁队列delete q;return 0;
}
2. POSIX信号量
2.1 引入
上次我们使用了阻塞队列的生产消费模型,在先前的生产者-消费者模型代码中,当一个线程想要操作临界资源时,必须确保临界资源处于满足条件的状态才能进行修改;否则无法修改。例如,在push接口中,当队列已满时,临界资源处于条件不可用的状态,无法继续进行push操作。此时,线程应该进入条件变量队列cond中等待。如果队列未满,即临界资源条件已准备好,那么可以继续push,调用队列_q的push接口。
观察代码可以看到,在判断临界资源是否就绪之前,必须先获取锁,因为判断临界资源实质上就是对临界资源的访问,而访问临界资源自然需要加锁以保护。因此,代码通常会先获取锁,然后手动检查临界资源的就绪状态,根据状态判断是等待还是直接操作临界资源。
但是如果事先知道临界资源的状态是否就绪,则无需一上来就加锁。一旦提前知道临界资源的就绪状态,便不再需要手动检查资源状态。在这种情况下,若有一个计数器来表示临界资源中小块资源的数量(如队列中每个空间),线程在访问临界资源前会先请求该计数器。若计数器大于0,则表明队列中有空余位置,可以直接向队列push数据;若计数器等于0,则说明队列已满,不能继续push数据,应该阻塞等待,直至计数器再次大于0,方可继续向队列push数据。
void push(T& data){pthread_mutex_lock(&_mutex);//加锁while(_q.size()==_max)//如果队列中数据满了,则生产者等待{pthread_cond_wait(&_cond2,&_mutex);}_q.push(data);if(_q.size()>0){pthread_cond_signal(&_cond1);//通知消费者可以消费了}pthread_mutex_unlock(&_mutex);//解锁}
2.2 回顾加深理解信号量
信号量是一种用于进程间通信和同步的机制。它本质上是一个计数器,用于衡量系统中的资源可用数量。通过信号量,可以实现对临界资源的访问控制,确保多个进程或线程能够安全地共享资源而不发生冲突。
在访问临界资源之前,程序可以通过申请信号量来获取对资源的访问权限。如果信号量的值大于0,表示资源可用,程序可以继续访问资源;如果信号量的值等于0,表示资源已被占用,程序需要等待,直到资源可用为止。
信号量并不仅仅是简单的计数器,它是通过原子操作实现的,确保信号量的操作是线程安全的。常用的信号量操作包括P操作(等待操作)和V操作(释放操作),也称为PV操作。P操作会将信号量的值减1,用于占用资源;V操作会将信号量的值加1,用于释放资源。
通过合理地使用信号量和PV操作,可以实现多线程或多进程之间的同步和互斥,避免资源竞争和死锁等并发问题。信号量是操作系统中重要的同步工具,广泛应用于进程间通信、资源管理、线程同步等场景。
system信号量和POSIX信号量都是用于进程间通信和同步的机制,但它们之间存在一些区别。
1. 系统信号量:
系统信号量是Linux中的一种系统调用,用于进程间通信和同步。
系统信号量是以系统级资源的形式存在,可以跨越进程边界,不仅可以用于线程之间的同步,也可以用于进程之间的同步。
系统信号量是一个全局的计数器,可以通过系统调用函数来创建、初始化、P操作(等待操作)和V操作(释放操作)等。
系统信号量的操作是通过系统调用函数来实现的,如semget、semop等。
2. POSIX信号量:
POSIX信号量是基于POSIX标准的一种同步机制
POSIX信号量与系统信号量类似,但是在接口和使用上有些许差异。
POSIX信号量允许用于进程间通信和线程间同步。
POSIX信号量通过调用相关的POSIX函数来创建、初始化、等待和释放,如sem_open、sem_wait、sem_post等。
系统信号量是Linux系统提供的一种进程间通信和同步机制,而POSIX信号量是基于POSIX标准的一种同步机制,二者都可以实现进程或线程间的同步和互斥操作
2.3 信号量的操作接口
初始化信号量:
使用sem_init
函数可以初始化信号量,给定的value
值会成为信号量的初始值。如果信号量是线程间共享的,可以被多个线程同时使用;如果是进程间共享的,可以被多个进程使用
#include <semaphore.h>//下面的函数都这此头文件int sem_init(sem_t *sem, int pshared, unsigned int value);
-
sem
: 指向要初始化的信号量的指针(我们使用sem_t 类型直接定义) -
pshared
: 0 表示该信号量为线程间共享;非零值表示信号量为进程间共享 -
value
: 信号量的初始值
若成功,返回值为0,表示初始化信号量成功。
若出现错误,返回值为-1,表示初始化失败,并设置errno来指示具体错误。(下面都是一样的)
销毁信号量:
使用sem_destroy
函数可以销毁之前初始化的信号量。在销毁信号量之前,要确保所有线程或进程都已经停止使用该信号量。
int sem_trywait(sem_t *sem);
-
sem
: 要销毁的信号量的指针
等待信号量:(P操作- -)
使用sem_wait
函数可以等待信号量,即执行P操作。如果信号量的值大于0,则将其减1并立即返回,否则线程(或进程)会阻塞等待信号量变为大于0。
int sem_wait(sem_t *sem);
-
sem
: 要等待的信号量的指针
发布信号量:(V操作++)
使用sem_post
函数可以发布(释放)信号量,即执行V操作。对信号量执行V操作会将其值加1,并唤醒可能正在等待该信号量的线程(或进程)。
int sem_post(sem_t *sem);
-
sem
: 要发布的信号量的指针
3. 基于循环队列的生产消费模型
3.1 循环队列
之前在阻塞队列里,我们不能实现出队列与入队列的同时进行。现在因为是循环队列我们使用了两个索引,而两个索引不同时可以同时进行出和入
当为空时或者满时,二者只能有一个开始执行。然后就不再相等了,也是能分开进行了
#include<iostream>
#include<string>
#include<vector>
#include<pthread.h>
#include<semaphore.h>using namespace std;template<class T>
class RoundQwueue
{
private://信号量--void P(sem_t& sub){sem_wait(&sub);}//信号量++void V(sem_t& add){sem_post(&add);}//加锁void Lock(pthread_mutex_t& mutex){pthread_mutex_lock(&mutex);}//解锁void Unlock(pthread_mutex_t& mutex){pthread_mutex_unlock(&mutex);}
public://初始化RoundQwueue(int max=20):_q(max),_max(max),_con(0),_pro(0){pthread_mutex_init(&_mutex1,nullptr);pthread_mutex_init(&_mutex2,nullptr);sem_init(&_sem1,0,max);sem_init(&_sem2,0,0);}//生产者生产数据void push(T& data){P(_sem1); //生产者的信号量--,也就是可放数据的空间减一Lock(_mutex1);_q[_pro++]=data; //将数据添加到队列中//_pro++;_pro%=_q.size(); //保证队列的循环,使_pro的下标不会超过队列的最大空间值Unlock(_mutex1);V(_sem2); //消费者的信号量++,也就是可拿数据的空间加一}//消费者拿数据void pop(T& data){P(_sem2); //消费者的信号量--,也就是可拿数据的空间减一Lock(_mutex2);data=_q[_con]; //将队列中的数据往外拿_con++;_con%=_q.size(); //保证队列的循环,使_con的下标不会超过队列的最大空间值Unlock(_mutex2);V(_sem1); //生产者的信号量++,也就是可放数据的空间加一}//回收~RoundQwueue(){pthread_mutex_destroy(&_mutex1);pthread_mutex_destroy(&_mutex2);sem_destroy(&_sem1);sem_destroy(&_sem2);}private:vector<T> _q; //这个一定要初始化int _max; //队列空间的最大值int _con; //消费者的下标int _pro; //生产者的下标pthread_mutex_t _mutex1; //生产者的锁pthread_mutex_t _mutex2; //消费者的锁sem_t _sem1; //生产者信号量,一开始为队列的最大值sem_t _sem2; //消费者信号量,一开始为0
};
3.2 整个项目
RingQueue.hpp:封装的循环队列
text.cc:程序的主体
Task.hpp:任务类(这里只是一个function包装器)
Task.hpp
#include<string>std::string oper="+-*/%";class Task
{
public://初始化Task(){}Task(int x,int y,char oper):_x(x),_y(y),_oper(oper),_result(0),_correct(0){}//运行void run(){switch (_oper){case '+':_result=_x+_y;break;case '-':_result=_x-_y;break;case '*':_result=_x*_y;break;case '/':if(_y==0) _correct=1;else _result=_x/_y;break;case '%':if(_y==0) _correct=1;else _result=_x%_y;break;default:_correct=2;break;}}//消费者拿的任务std::string GetResult(){std::string rs=std::to_string(_x);rs+=_oper;rs+=std::to_string(_y);rs+="=";rs+=std::to_string(_result);rs+="[";rs+=std::to_string(_correct);rs+="]";return rs;}//生产者生产的任务std::string GetTask(){std::string r = std::to_string(_x);r += _oper;r += std::to_string(_y);r += "=?";return r;}~Task(){}private:int _x; int _y;int _result; //结果char _oper; //运算符int _correct; //值是否正确
};
text.cc
#include<iostream>
#include"RoundQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>using namespace std;//生产者
void* producer(void* args)
{srand(time(nullptr));int len=oper.size();RoundQwueue<Task>* td=static_cast<RoundQwueue<Task>*>(args);while(true){int data1=rand()%10;usleep(100);int data2=rand()%5;char op=oper[rand()%len];Task t(data1,data2,op);td->push(t);cout<<"producer give a task:"<<t.GetTask()<<endl;sleep(1);}
}//消费者
void* consumer(void* args)
{RoundQwueue<Task>* td=static_cast<RoundQwueue<Task>*>(args);while(true){Task t;td->pop(t);t.run();cout<<"consumer get a task:"<<t.GetResult()<<endl;}
}int main()
{RoundQwueue<Task>* tr=new RoundQwueue<Task>;//创造队列pthread_t tid1;pthread_t tid2;pthread_create(&tid1,nullptr,producer,(void*)tr);pthread_create(&tid2,nullptr,consumer,(void*)tr);pthread_join(tid1,nullptr);pthread_join(tid2,nullptr);delete tr;return 0;
}
4. 线程池
4.1 概念
线程池:见名知义,就是多个线程构成的集合。其中线程的个数是确定的,并不是固定的
为什么要有线程池?
如果每次都只创建一个线程,首先当用户请求过多时,每次都需要创建一个线程,创建线程需要时间和调度开销,这样会影响缓存的局部性和整体的性能。其次,如果无上限一直创建线程,还会导致CPU的过分调度。
线程池已经创建好了一定数量的线程,等待着分配任务,这样避免了处理任务时的线程创建和销毁。线程池里线程个数确定,能够保证内核的充分利用,还能防止过分调度。
线程池中可用线程数量取决于可用额并发处理器,处理器内核,内存,网络socket等的数量。
线程池的应用场景:
需要大量的线程来完成任务,且完成人物的时间比较短。比如:WEB服务器完成网页请求这样的任务,因为当个任务小,并且任务量巨大,你可以想象一个热门网站的请求次数。但是对于长时间的任务,线程池的优先就不明显了。比如:一个Telnet连接请求,因为Telnet会话时间比线程创建时间大多了。
对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
接收突发性的大量请求,但是不至于使服务器因此产生大量线程应用。突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量的线程可能使内存到达极限,出现错误。
4.2 线程池实现
线程池实际也是一个生产者消费者模型,接收任务,往任务队列中放任务的是生产者,从任务队列中拿任务并执行的是消费者。
主线程是生产者,用来接收任务和放任务。
Task.hpp
#include<string>std::string oper="+-*/%";class Task
{
public://初始化Task(int x,int y,char oper):_x(x),_y(y),_oper(oper),_result(0),_correct(0){}//运行int run(){switch (_oper){case '+':_result=_x+_y;break;case '-':_result=_x-_y;break;case '*':_result=_x*_y;break;case '/':if(_y==0) _correct=1;else _result=_x/_y;break;case '%':if(_y==0) _correct=1;else _result=_x%_y;break;default:_correct=2;break;}}//消费者拿的任务std::string GetResult(){std::string rs=std::to_string(_x);rs+=_oper;rs+=std::to_string(_y);rs+="=";rs+=std::to_string(_result);rs+="[";rs+=std::to_string(_correct);rs+="]";return rs;}//生产者生产的任务std::string GetTask(){std::string r = std::to_string(_x);r += _oper;r += std::to_string(_y);r += "=?";return r;}~Task(){}private:int _x; int _y;int _result; //结果char _oper; //运算符int _correct; //值是否正确
};
threadpool.hpp
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>using namespace std;class threadname
{
public:string _name; //线程的名字pthread_t tid; //线程的tid
};template <class T>
class Threadpool
{
private://加锁void Lock(){pthread_mutex_lock(&_mutex);}//解锁void Unlock(){pthread_mutex_unlock(&_mutex);}//线程等待void Wait(){pthread_cond_wait(&_cond, &_mutex);}//唤醒线程void signal(){pthread_cond_signal(&_cond);}//输出线程的名字string Getthreadname(pthread_t tid){for (auto ti : _v){if (ti.tid == tid){return ti._name;}}return nullptr;}public://构造函数Threadpool(int max = 10): _max(max), _v(max) //初始化数组和_max{pthread_mutex_init(&_mutex, nullptr); //初始化锁pthread_cond_init(&_cond, nullptr); //初始化条件变量}static void *hander(void *args){Threadpool<T> *td = static_cast<Threadpool<T> *>(args);string name = td->Getthreadname(pthread_self()); //通过tid,将该线程的名字从数组中拿出while (true){td->Lock();while (td->_q.empty()) //队列如果为空,则进行线程等待{td->Wait();}T t = td->pop(); //将任务从队列中拿出td->Unlock();t.run();cout << name << " a result:" << t.GetResult() << endl;}}//创造线程void Create(){int num = _v.size();for (int i = 0; i < num; i++){_v[i]._name = "thread_" + to_string(i + 1); //将线程的名字存入数组中pthread_create(&(_v[i].tid), nullptr, hander, (void *)this);cout<<_v[i]._name<<endl; //打印创造的线程名字}}//将任务添加到队列中void push(T &data){Lock();_q.push(data);cout << "thread produser a task:" << data.GetTask() << endl;signal();Unlock();}//从队列中拿出任务T pop(){T t = _q.front();_q.pop();return t;}//析构函数~Threadpool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:vector<threadname> _v; //定义一个数组,用来存放线程的名字和tidqueue<T> _q; //队列,用来存放任务int _max; // 数组的大小pthread_mutex_t _mutex; // 锁pthread_cond_t _cond;
};
text.cc
#include "threadpool.hpp"
#include "Task.hpp"
#include <ctime>using namespace std;int main()
{Threadpool<Task> *tp = new Threadpool<Task>;tp->Create();srand(time(nullptr));int len = oper.size();while (true){int x = rand() % 10 + 1;usleep(100);int y = rand() % 20 + 1;char op = oper[rand() % len];Task t(x, y, op);tp->push(t);sleep(1);}delete tp;return 0;
}
结果:
线程池是一开始,我们就创建好n个线程,然后将任务添加到线程池中,由之前创建好的线程去挣抢完成任务