Linux中的多线程剖析

目录

1、前言

2、多线程理解

2.1 线程

2.2 通俗了解进程和线程

2.2.1 进程是资源分配的基本单位

2.2.2 Linux中的线程是一种轻量化进程

2.3 进程和线程详解

2.3.1 创建一个线程 (pthread_create)

2.3.2 线程自己的一部分数据

2.3.3 线程组

2.3.4 关于进程的其他操作

2.4 Linux线程互斥

2.4.1 互斥量mutex

2.4.2 互斥量的接口

2.5 可重入和线程安全

2.5.1 线程安全

2.5.2 重入

2.6 死锁

2.6.1 死锁四个必要条件

2.6.2 避免死锁

2.7 线程同步

2.7.1 条件变量

2.7.2 同步概念与竞态条件

2.7.3 条件变量函数

2.8 POSIX信号量

3、实现简易的线程池(生产者消费着模型)

生产者消费者模型:

线程池:

(1),使用条件变量和阻塞队列实现线程池

(2),使用信号量和循环队列实现线程池

4、结语


1、前言

        今天呢,我们来深度理解一下什么时多线程,还有他的相关操作,可是为什么叫Linux中的多线程剖析呢,因为多线程在Linux和Windows系统底层中的实现并不一样,相比之下,Windows的实现更加的复杂,而Linux中的简单一些,有利于理解,好了下面我们进入正片。注:如果本文中有什么错误,请在评论区指出,或者私信作者,谢谢。

2、多线程理解

2.1 线程

        要理解多线程,那我们就先得知道什么叫做线程,,通俗的讲呢,就是一个程序的执行路线就是一个线程,我们写的程序大多都是只有一条执行路线的,从调用main函数,一直结束,都只有一个路线,这就是一个线程。

        可是我们之前接触过一个概念叫做进程,那么线程和进程有什么区别,又有什么联系呢?

2.2 通俗了解进程和线程

        首先呢,线程是运行在进程中的,       

        进程是资源分配的基本单位,而进程是CPU调度的基本单位。

        接下来我们详细理解:

2.2.1 进程是资源分配的基本单位

        在我之前的文章中讲过,在Linux中呢,进程在内存中是由PCB进行管理的,而具体实现就是task_struct 结构体,这个结构体中保存着对应进程的相关信息,比如地址空间,文件描述相关,详细可以参照我之前的文章,

        如下如,Linux中的进程包括,PCB(task_struct),地址空间,页表,以及页表映射在内存中的数据,这些东西组成了一个进程,当然,下面这张图只是一个单线程进程,

2.2.2 Linux中的线程是一种轻量化进程

        那么在Linux中,是怎么对进程进行实现的呢?

         如上图所示,Linux中在设计线程的时候,采用了和进程管理一样的模块描述符,一个task_struct就是一个线程,但是这些task_struct并没有指向自己独有的地址空间和资源,这样的进程结构我们称作轻量化进程(LWP),也就是Linux中对线程的实现,

        这种实现,让CPU不能分辨自己处理的是进程还是线程,但是这就是它设计的精妙之处。

        对于轻量化进程来说,他们都指向同一个地址空间,所以他们可以很轻松的共一些数据和资源,但是对于Windows来说,他们为进程单独设计了一种结构,那样子的话,工程量就非常大了,具体可以自己了解。

2.3 进程和线程详解

        通过上面的通俗了解,我们就对进程和线程有了一定的了解和认识,那么我们详细看看:

2.3.1 创建一个线程 (pthread_create)

        Linux对外提供了一个用户级别的库,不是系统调用接口,这个库当中,就提供了一些对线程操作的函数:

#include <pthread.h>
int pthread_create(pthread_t *thread, //返回线程IDconst pthread_attr_t *attr, //设置线程的属性,attr为NULL表示使用默认属性void *(*start_routine) (void*), //是个函数地址,线程启动后要执行的函数void *arg //传给线程启动函数的参数);
//返回值:成功返回0,错误返回错误码,
int pthread_join(pthread_t thread, void **value_ptr);

        我们可以使用这个函数来创建一个线程,但是等线程执行万指定的函数之后,我们需要对线程资源进行释放

        要注意的是,这里我们使用函数创建的线程是用户态的线程,对应底层的维护是使用轻量级进程来维护的

#include <iostream>
#include <pthread.h>void* fun1(void* arg){std::cout << (char*)arg << std::endl;return nullptr;
}
void* fun2(void* arg){std::cout << (char*)arg << std::endl;return nullptr;
}int main() {pthread_t p1,p2;//创建并启动线程pthread_create(&p1,nullptr,fun1,(void*)"我是线程p1");pthread_create(&p2,nullptr,fun2,(void*)"我是线程p2");//阻塞等待线程结束,并对其进行资源释放pthread_join(p1, nullptr);pthread_join(p2, nullptr);return 0;
}

        在运行的时候,要注意:要指定pthread库的库名,不然会链接错误

makefile:

.PHONY:all
all:thread01thread01:thread01.cppg++ -o $@ $^ -std=c++11 -lpthread.PHONY:clean
clean:rm -f thread01

运行结果:

2.3.2 线程自己的一部分数据

        大家肯定注意到了一个点,那就是pthread_create中,有一个输出型参数,他返回的是什么呢,我们打印 出来看一下

std::cout << "p1:" << p1 << std::endl;
std::cout << "p2:" << p2 << std::endl;
//运行结果:
//p1:139891316045568
//p2:139891307652864

        这么大长串的东西,叫做他的线程ID,但是进程ID那么一点点,这个东西怎么会那么大,大家不要将这两个东西搞混了,这里所说的线程ID并不是我们在监视窗口上看到的那个ID,在监视窗口上那个是线程的tid,这里的进程ID是他在编码层面上的ID,我们可以详细理解一下:

监视窗口上的进程ID(LWP):

        编码层面上的线程ID到底是个什么鬼,这里我们要知道一个东西,就是线程是CPU调度的最小粒子,就是他是要在单独运行的,那我们知道一个程序都是在栈里运行的,但是现在很多个线程都公用一个地址空间,那栈也是公用的吗,并不是,那样就太复杂了,在弹栈,压栈的时候,并没有那么多的寄存器可以供我们维护这个东西,

        所以呢,在地址空间中,有一块叫做共享区的地方,这块地方上面是栈,下边是堆,他们两个相向而生,所以说这块空间非常大,所以呢,在设计之初,就使用这块地方来为本进程创建的线程开辟栈区和一些线程私有的资源区,

         如上图所示,这就是创建的线程的资源分配,我们使用 pthread_create 的时候,返回给我们的线程ID,其实就是对应的线程资源在地址空间中的地址,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID,

pthread_t pthread_self(void);

        线程自己的东西除了栈还有线程ID,一组寄存器,errno,信号屏蔽字,调度优先级

2.3.3 线程组

        没有线程之前,一个进程对应内核里的一个进程描述符,对应一个进程ID。但是引入线程概念之后,情况发生了变化,一个用户进程下管辖N个用户态线程,每个线程作为一个独立的调度实体在内核态都有自己的进程描述符,进程和内核的描述符一下子就变成了1:N关系,POSIX标准又要求进程内的所有线程调用getpid函数时返回相同的进程ID,如何解决上述问题呢?

        Linux内核引入了线程组的概念。

        多线程的进程,又被称为线程组,线程组内的每一个线程在内核之中都存在一个进程描述符(task_struct)与之对应。进程描述符结构体中的pid,表面上看对应的是进程ID,其实不然,它对应的是线程ID;进程描述符中的tgid,含义是Thread Group ID,该值对应的是用户层面的进程ID

        不同于pthread_t类型的线程ID,和进程ID一样,线程ID是pid_t类型的变量,而且是用来唯一标识线程的一个整型变量。

2.3.4 关于进程的其他操作

线程终止

void pthread_exit(void *value_ptr);
//参数
//value_ptr:value_ptr不要指向一个局部变量。
//返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)

        需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。 

取消一个执行中的线程

int pthread_cancel(pthread_t thread);
//参数
//thread:线程ID
//返回值:成功返回0;失败返回错误码

等待线程结束

int pthread_join(pthread_t thread, void **value_ptr);
//参数
//thread:线程ID
//value_ptr:它指向一个指针,后者指向线程的返回值
//返回值:成功返回0;失败返回错误码

        调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

        1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。

        2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_CANCELED。

        3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传pthread_exit的参数。

        4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。

分离线程

        默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。

        如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。

int pthread_detach(pthread_t thread);

2.4 Linux线程互斥

我们先来了解一些概念性的东西:

        临界资源:多线程执行流共享的资源就叫做临界资源

        临界区:每个线程内部,访问临界自娱的代码,就叫做临界区

        互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

        原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

2.4.1 互斥量mutex

        大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。

多个线程并发的操作共享变量,会带来一些问题。

我们举一个抢票的例子来说

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cerrno>
#include <cstdlib>
#include <string>
#define THREADNO 5int residue = 1000;void *loot(void* arg){while (true) {if (residue > 0) {std::cout << "线程" << (long long)arg + 1 << ":" << residue-- << " 余票:" << residue << std::endl;;usleep(1000);}else {break;}usleep(2000);}
}int main() {//创建线程pthread_t tids[THREADNO];for (long long i = 0; i < THREADNO; ++i) {if (pthread_create(tids + i, NULL, loot, (void*)i) != 0){perror("pthread_create");exit(1);}}std::cout << "进程创建完毕" << std::endl;for (int i = 0; i < THREADNO; ++i) {pthread_join(tids[i],nullptr);}return 0;
}

        CPU运算速度是非常快的,所以在访问临界资源的时候,就可能出现数据错误,本来只有一千张票,结果发放了一千多张。

为什么可能无法获得错误结果?

        if 语句判断条件为真以后,代码可以并发的切换到其他线程

        usleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段

        --ticket操作本身就不是一个原子操作

2.4.2 互斥量的接口

初始化互斥量

初始化互斥量有两种方法:

        方法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

        方法2,动态分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr
);
//参数:
//mutex:要初始化的互斥量 
//attr:NULL

销毁互斥量

销毁互斥量需要注意:

        使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁

        不要销毁一个已经加锁的互斥量

        已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

 互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); 
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
//返回值:成功返回0,失败返回错误号

调用pthread_ lock 时,可能会遇到以下情况:

        互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功

        发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

修改上面的抢票代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cerrno>
#include <cstdlib>
#include <string>#define THREADNO 2pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
int residue = 100;void *loot(void* arg){while (true) {pthread_mutex_lock(&mtx);if (residue > 0) {std::cout << "线程" << (long long)arg + 1 << ":" << residue << " 余票:" << residue - 1 << std::endl;residue--;usleep(5000);//sleep(1);}else {pthread_mutex_unlock(&mtx);break;}pthread_mutex_unlock(&mtx);usleep(2000);}
}int main() {//创建线程pthread_t tids[THREADNO];for (long long i = 0; i < THREADNO; ++i) {if (pthread_create(tids + i, NULL, loot, (void*)i) != 0){perror("pthread_create");exit(1);}}std::cout << "进程创建完毕" << std::endl;for (int i = 0; i < THREADNO; ++i) {pthread_join(tids[i],nullptr);}return 0;
}

        经过上面的例子,大家已经意识到单纯的i++或者++i都不是原子的,有可能会有数据一致性问题为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

2.5 可重入和线程安全

2.5.1 线程安全

        多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。

2.5.2 重入

        同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

常见的线程不安全的情况

        不保护共享变量的函数

        函数状态随着被调用,状态发生变化的函数

        返回指向静态变量指针的函数

        调用线程不安全函数的函数

常见的线程安全的情况

        每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的

        类或者接口对于线程来说都是原子操作

        多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

        调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的

        调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构

        可重入函数体内使用了静态的数据结构

常见可重入的情况

        不使用全局变量或静态变量

        不使用用malloc或者new开辟出的空间

        不调用不可重入函数

        不返回静态或全局数据,所有数据都有函数的调用者提供

        使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

可重入与线程安全联系

        函数是可重入的,那就是线程安全的

        函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题

        如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

可重入与线程安全区别

        可重入函数是线程安全函数的一种

        线程安全不一定是可重入的,而可重入函数则一定是线程安全的。

        如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。

2.6 死锁

        死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

2.6.1 死锁四个必要条件

        互斥条件:一个资源每次只能被一个执行流使用

        请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

        不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

        循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

2.6.2 避免死锁

        破坏死锁的四个必要条件

        加锁顺序一致

        避免锁未释放的场景

        资源一次性分配

2.7 线程同步

2.7.1 条件变量

        当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

2.7.2 同步概念与竞态条件

        同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步

        竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

2.7.3 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 
//参数:
//cond:要初始化的条件变量 
//attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex
); 
//参数:
//cond:要在这个条件变量上等待 
//mutex:互斥量

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); 
int pthread_cond_signal(pthread_cond_t *cond);

简单案例:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <string>struct data{char name;pthread_mutex_t *mtx;pthread_cond_t *cond;
};void* print(void *arg){data *d = (data*)arg;char name[64];snprintf(name, sizeof(name),"线程%c打印--我是线程%c!",d->name,d->name);std::string threadname = name;while (true) {pthread_cond_wait(d->cond,d->mtx);std::cout << threadname <<std::endl;sleep(1);pthread_cond_signal(d->cond);}
}int main() {pthread_mutex_t mtx;pthread_cond_t cond;pthread_mutex_init(&mtx,nullptr);pthread_cond_init(&cond,nullptr);data d1,d2;d1.name = 'A';d1.mtx = &mtx;d1.cond = &cond;d2.name = 'B';d2.mtx = &mtx;d2.cond = &cond;pthread_t p1,p2;pthread_create(&p1,nullptr,print,(void*)&d1);pthread_create(&p2,nullptr,print,(void*)&d2);sleep(1);pthread_cond_signal(&cond);pthread_join(p1,nullptr);pthread_join(p2,nullptr);while (1);pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}

运行结果:

为什么pthread_ cond_ wait 需要互斥量?

        条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

2.8 POSIX信号量

        POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。  但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); 
//参数:
//pshared:0表示线程间共享,非零表示进程间共享 
//value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem);

发布信号量

//发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);

3、实现简易的线程池(生产者消费着模型)

生产者消费者模型:

        为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

        解耦、支持并发、支持忙闲不均

线程池:

        一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

        1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

        2.  对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

        3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

        下来我们分别用条件变量和信号量实现线程池,单机版的,我们就设计在线程池内部自产自销的方案实现

(1),使用条件变量和阻塞队列实现线程池

//thread.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <string>
#include <cstdio>
//#include <functional>class Thread{//typedef std::function<void* (void*)> function;typedef void*(*function)(void*);public:Thread(){}Thread(int num):_tid(0){char name[64];snprintf(name, sizeof(name),"thread%d",num);_name = name;}void create(function fun,void* arg){pthread_create(&_tid, nullptr, fun, arg);}void join() {pthread_join(_tid,nullptr);}std::string name(){return _name;}~Thread(){}private:pthread_t _tid;std::string _name;
};
//Task.hpp
#pragma once
#include <iostream>template<class T, class D>
class Task{public:Task(T fun, D num1, D num2):_fun(fun),_arg0(num1),_arg1(num2){}T fun() {return _fun;}D arg0() {return _arg0;}D arg1() {return _arg1;}private:T _fun;D _arg0;D _arg1;
};
//mypool.hpp
#pragma once#include "thread.hpp"
#include "Task.hpp"
#include <vector>
#include <queue>
//#include <cstdlib>
#include <ctime>
#include <unistd.h>
#define CAPACITY 10int add(int x, int y)
{return x + y;
}template <class T, class D>
struct poolData
{int _capacity;//线程自身Thread* _self;std::queue<Task<T, D> *> *_task;// 锁pthread_mutex_t *_mtx;// 条件变量pthread_cond_t *_full;  // 满了pthread_cond_t *_empty; // 空了
};template <class T, class D>
class Pool
{
public:Pool(int num) : _capacity(10){_productor = new std::vector<Thread *>(num);_consumer = new std::vector<Thread *>(num);_task = new std::queue<Task<T, D> *>;pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}std::vector<Thread *> *getpro(){return _productor;}std::vector<Thread *> *getcon(){return _consumer;}std::queue<Task<T, D> *> *gettask(){return _task;}void strat(){//poolData<T, D> prodata[_productor->size()];//poolData<T, D> condata[_productor->size()];std::vector<poolData<T, D>> prodata(_productor->size());std::vector<poolData<T, D>> condata(_productor->size());for (int i = 0; i < _productor->size(); ++i){//data[i]._consumer = _consumer;//data[i]._productor = _productor;prodata[i]._task = _task;prodata[i]._full = &_full;prodata[i]._empty = &_empty;prodata[i]._mtx = &_mtx;prodata[i]._capacity = _capacity;(*_productor)[i] = new Thread(i);prodata[i]._self = (*_productor)[i];(*_productor)[i]->create(productor, &prodata[i]);condata[i]._task = _task;condata[i]._full = &_full;condata[i]._empty = &_empty;condata[i]._mtx = &_mtx;condata[i]._capacity = _capacity;(*_consumer)[i] = new Thread(i);condata[i]._self = (*_consumer)[i];(*_consumer)[i]->create(consumer, &condata[i]);}for (int i = 0; i < _productor->size(); ++i){(*_productor)[i]->join();(*_consumer)[i]->join();}for (int i = 0; i < _productor->size(); ++i){delete (*_productor)[i];delete (*_consumer)[i];}}// 生产者static void *productor(void *args){poolData<T, D> *pd = (poolData<T, D> *)args;//std::vector<Thread *> *pro = pd->_productor;// 消费者队列// std::vector<Thread*> *con = pd->_consumer;Thread* self = pd->_self;// 阻塞队列std::queue<Task<T, D> *> *task = pd->_task;// 锁pthread_mutex_t *mtx = pd->_mtx;// 条件变量pthread_cond_t *full = pd->_full;   // 满了pthread_cond_t *empty = pd->_empty; // 空了int capacity = pd->_capacity;srand((uint64_t)time(nullptr) ^ 0x6666);while (true){int x = rand() % 1000 + 10;int y = rand() % 1000 + 10;pthread_mutex_lock(mtx);// 如果满了,就挂起while (task->size() == capacity){std::cout << "生产者挂起!" << std::endl;pthread_cond_wait(full, mtx);}std::cout << "生产者拿到锁,开始生产"<< std::endl;// 添加任务task->push(new Task<int (*)(int, int), int>(add, x, y));std::cout << "生产者" << self->name() << " 生产:" << x << "+" << y << "=?" << std::endl;usleep(rand()%3000);std::cout << "生产者生产完毕,释放锁,唤醒线程"<< std::endl;pthread_cond_signal(empty);pthread_mutex_unlock(mtx);sleep(1);}}// 消费者static void *consumer(void *args){poolData<T, D> *pd = (poolData<T, D> *)args;// std::vector<Thread*>* pro = pd->_productor;// 消费者队列//std::vector<Thread *> *con = pd->_consumer;Thread* self = pd->_self;// 阻塞队列std::queue<Task<T, D> *> *task = pd->_task;// 锁pthread_mutex_t *mtx = pd->_mtx;// 条件变量pthread_cond_t *full = pd->_full;   // 满了pthread_cond_t *empty = pd->_empty; // 空了while (true){pthread_mutex_lock(mtx);while (task->empty()) {std::cout << "消费者挂起!" << std::endl;pthread_cond_wait(empty, mtx);}std::cout << "消费者拿到锁,开始消费"<< std::endl;// 执行任务// task->push(Task(add,x,y));Task<T, D> *t = task->front();task->pop();D num = t->fun()(t->arg0(), t->arg1());std::cout << "消费者" << self->name() << " 消费:" << t->arg0() << "+" << t->arg1() << "=" << num << std::endl;delete t;usleep(rand()%3000);std::cout << "消费者消费完毕,释放锁,唤醒线程"<< std::endl;pthread_cond_signal(full);pthread_mutex_unlock(mtx);//usleep(rand() % 2000);sleep(2);}}~Pool(){if (!_productor->empty()){for (int i = 0; i < _productor->size(); ++i){if ((*_productor)[i] != nullptr){delete (*_productor)[i];}}}if (!_consumer->empty()){for (int i = 0; i < _consumer->size(); ++i){if ((*_consumer)[i] != nullptr){delete (*_consumer)[i];}}}while (!_task->empty()){Task<T,D> *t = _task->front();_task->pop();delete t;}delete _productor;delete _consumer;delete _task;pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}private:int _capacity;// 生产着队列std::vector<Thread *> *_productor;// 消费者队列std::vector<Thread *> *_consumer;// 阻塞队列std::queue<Task<T, D> *> *_task;// 锁pthread_mutex_t _mtx;// 条件变量pthread_cond_t _full;  // 满了pthread_cond_t _empty; // 空了
};
//test.cpp
#include "mypool.hpp"int main() {//std::cout << "hello" << std::endl;Pool<int(*)(int,int),int> p(1);p.strat();return 0;
}

(2),使用信号量和循环队列实现线程池

#pragma once//mutex.hpp
#include <iostream>
#include <pthread.h>class mutex{public:mutex(){pthread_mutex_init(&_mtx,nullptr);}void lock(){pthread_mutex_lock(&_mtx);}void unlock(){pthread_mutex_unlock(&_mtx);}~mutex(){pthread_mutex_destroy(&_mtx);}private:pthread_mutex_t _mtx;
};
//sem.hpp
#pragma once#include <iostream>
#include <semaphore.h>class sem{public:sem(int value){sem_init(&_sem, 0, value);}void p(){sem_wait(&_sem);}void v(){sem_post(&_sem);}~sem(){sem_destroy(&_sem);}private:sem_t _sem;
};
//ringqueue.hpp
#include <iostream>
#include <vector>
#include "sem.hpp"
#include "mutex.hpp"template<class T>
class ringqueue {public:ringqueue(int capacity = 10):_ring_queue(capacity),_start(0),_tail(0),_space_sem(capacity),_data_sem(0),_mtx(){}void push(const T &in){_space_sem.p();_mtx.lock();_ring_queue[_start++] = in;_start %= _ring_queue.size();_data_sem.v();_mtx.unlock();}void pop(T & out){_data_sem.p();_mtx.lock();out = _ring_queue[_tail++];_tail %= _ring_queue.size();_space_sem.v();_mtx.unlock();}~ringqueue(){}private:std::vector<T> _ring_queue;int _start;int _tail;sem _space_sem;sem _data_sem;mutex _mtx;
};
//mypool.hpp
#pragma once#include "thread.hpp"
#include "Task.hpp"
#include "ringQueue.hpp"
//#include <vector>
//#include <queue>
//#include <cstdlib>
#include <ctime>
#include <unistd.h>
//#define CAPACITY 10int add(int x, int y)
{return x + y;
}template <class T, class D>
struct poolData
{Thread* _self;ringqueue<Task<T,D>*>* _rq;
};template <class T, class D>
class Pool
{
public:Pool(int num) :_productor(num),_consumer(num){}void strat(){poolData<T,D> prodata[_productor.size()];poolData<T,D> condata[_consumer.size()];for (int i = 0; i < _productor.size(); ++i) {_productor[i] = new Thread(i);prodata[i]._self = _productor[i];prodata[i]._rq = &_rq;_productor[i]->create(productor,&prodata[i]);_consumer[i] = new Thread(i);condata[i]._self = _consumer[i];condata[i]._rq = &_rq;_consumer[i]->create(consumer,&condata[i]);}for (int i = 0; i < _productor.size(); ++i) {_productor[i]->join();delete _productor[i];_consumer[i]->join();delete _consumer[i];}}// 生产者static void *productor(void *args){poolData<T,D> *pd = (poolData<T,D>*)args;Thread *self = pd->_self;ringqueue<Task<T,D>*> *rq = pd->_rq;srand(time(nullptr) ^ 0x6666);while (true) {int x = rand() % 1000 + 10;int y = rand() % 1000 + 10;rq->push(new Task<T,D>(add,x,y));std::cout << "生产线程" << self->name() << "生产任务:" << x << "+" << y << "=?" << std::endl;usleep(rand() %5000 + 3000);//sleep(1);}}// 消费者static void *consumer(void *args){poolData<T,D> *pd = (poolData<T,D>*)args;Thread *self = pd->_self;ringqueue<Task<T,D>*> *rq = pd->_rq;srand(time(nullptr) ^ 0x6666);while (true) {Task<T,D> *t;rq->pop(t);int z = t->fun()(t->arg0(),t->arg1());std::cout << "消费者" << self->name() << "消费任务:" << t->arg0() << "+" << t->arg1() << "=" << z << std::endl;delete t;usleep(rand() % 5000 + 3000);}}~Pool(){}private:ringqueue<Task<T,D>*> _rq;std::vector<Thread*> _productor;std::vector<Thread*> _consumer;
};
//test.cpp
#include "mypool.hpp"int main() {Pool<int(*)(int,int),int> p(5);p.strat();return 0;
}

4、结语

        好了,今天的分享就到这里了,如果文章对你有帮助,请留下你的评论,如果有错误,也请评论私信作者,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/120115.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【聚类】DBCAN聚类

OPTICS是基于DBSCAN改进的一种密度聚类算法&#xff0c;对参数不敏感。当需要用到基于密度的聚类算法时&#xff0c;可以作为DBSCAN的一种替代的优化方案&#xff0c;以实现更优的效果。 原理 基于密度的聚类算法&#xff08;1&#xff09;——DBSCAN详解_dbscan聚类_root-ca…

python安装wind10

一、下载&#xff1a; 官网:Python Releases for Windows | Python.org 二、安装 双击下载的安装程序文件。这将打开安装向导。安装界面图下方两个框的" Use admin privileges wheninstalling py. exe和” Add python. exe to PATH"都要勾选,一定要勾选!一定要勾选…

5年测试在职经验之谈:2年功能测试、3年自动化测试,从入门到不可自拔...

毕业3年了&#xff0c;学的是环境工程专业&#xff0c;毕业后零基础转行做软件测试。 已近从事测试行业8年了&#xff0c;自己也从事过2年的手工测试&#xff0c;从事期间越来越觉得如果一直在手工测试的道路上前进&#xff0c;并不会有很大的发展&#xff0c;所以通过自己的努…

Linux之基于HTTPS的静态网站

目录 Linux之基于HTTPS的静态网站 定义 SSL协议 使用Apachemod_ssl组件的加密认证网站 mod_ssl模组 安装 配置文件 ssl配置文件的主要参数 案例 案例1 --- 搭建HTTPSSL的加密认证的web服务器 案例2 --- 组建多个子目录的网站www.joker.com&#xff0c;该网站下有2个子…

【docker】Mac M1 构建 x64 linux镜像

亲测教程 文章目录 首先构建环境 首先 首先你需要有一个 Dockerfile 比如&#xff1a;这里以一个 python 项目举例 FROM python:3.10-slimWORKDIR /appCOPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txtCOPY . .CMD [ "pyth…

Spring MVC:域对象共享数据

Spring MVC 前言域对象共享数据使用 ModelAndView 向 request 域对象中共享数据使用 Map 、Model 或 ModelMap 向 request 域对象中共享数据使用 SesionAttributes 注解向 session 域对象中共享数据使用 Servlet API 向 application 域对象中共享数据 附 前言 在上一章中&…

Redis的数据类型到底有什么奥秘

这里我们先只介绍五种常用的数据类型~ 目录 1、string 2、hash 3、list 4、set 5、zset 6、示例 1、string 数据类型&#xff1a;string内部编码&#xff1a;raw、int、embstr 说明&#xff1a; raw是最基本的字符串--底层是一个char数组&#xff08;此处的char是一个字…

【计算机网络】 静态库与动态库

文章目录 静态库实践使用方法总结 动态库实践使用方法总结 静态库与动态库的优缺点静态库优点缺点 动态库缺点优点 库有两种&#xff1a;静态库&#xff08;.a、.lib&#xff09;和动态库&#xff08;.so、.dll&#xff09;。所谓静态、动态是指链接。静态库是将整个库文件都拷…

学习网络编程No.5【TCP套接字通信】

引言&#xff1a; 北京时间&#xff1a;2023/8/25/15:52&#xff0c;昨天刚把耗时3天左右的文章更新&#xff0c;充分说明我们这几天并不是在摆烂中度过&#xff0c;而是在为了更文不懈奋斗&#xff0c;历时这么多天主要是因为该部分知识比较陌生&#xff0c;所以需要我们花费…

京东搜索EE链路演进 | 京东云技术团队

导读 搜索系统中容易存在头部效应&#xff0c;中长尾的优质商品较难获得充分的展示机会&#xff0c;如何破除系统的马太效应&#xff0c;提升展示结果的丰富性与多样性&#xff0c;助力中长尾商品成长是电商平台搜索系统的一个重要课题。其中&#xff0c;搜索EE系统在保持排序…

C#-SQLite-使用教程笔记

微软官网资料链接&#xff08;可下载文档&#xff09; 教程参考链接&#xff1a;SQLite 教程 - SQLite中文手册 项目中对应的system.dat文件可以用SQLiteStudio打开查看 参考文档&#xff1a;https://d7ehk.jb51.net/202008/books/SQLite_jb51.rar 总结介绍 1、下载SQLiteS…

RK3399平台开发系列讲解(内核调试篇)IO 数据工具:iostat和iotop

🚀返回专栏总目录 文章目录 一、iostat 命令二、/proc/diskstats 文件三、iotop 命令沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 在 Linux 系统上,iostat 和 iotop 这两个 IO 数据工具非常常用。它们都是性能分析领域中不可缺少的工具性软件。 一、iostat 命令…

mysql主从复制与读写分离

一&#xff0c;主从复制 1&#xff0c;为什么要做主从复制 单台mysql在安全性&#xff0c;高可用和高并发方面都无法满足实际的需求&#xff0c;所以可以选择配置多台主从数据库服务器以实现读写分离。 2&#xff0c;主从复制的原理 主从复制是为了保证数据的完整性&#xff0c…

界面控件DevExpress .NET应用安全 Web API v23.1亮点:支持Swagger模式

DevExpress拥有.NET开发需要的所有平台控件&#xff0c;包含600多个UI控件、报表平台、DevExpress Dashboard eXpressApp 框架、适用于 Visual Studio的CodeRush等一系列辅助工具。 DevExpress 今年第一个重要版本v23.1日前已正式发布了&#xff0c;该版本拥有众多新产品和数十…

javaee spring aop实现事务 项目结构

spring配置文件 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xmlns:context"http://www.springframewo…

最小生成树Kruskal、Prim算法C++

什么是最小生成树 连通图&#xff1a; 在无向图中&#xff0c;若从顶点v1到顶点v2有路径&#xff0c;则称顶点v1和顶点v2是连通的。如果图中任意一对顶点都是连通的&#xff0c;则称此图为连通图。 生成树&#xff1a; 一个连通图的最小连通子图称作为图的生成树。有n个顶点的…

ARM编程模型-寄存器组

Cortex A系列ARM处理器共有40个32位寄存器,其中33个为通用寄存器,7个为状态寄存器。usr模式和sys模式共用同一组寄存器。 通用寄存器包括R0~R15,可以分为3类: 未分组寄存器R0~R7分组寄存器R8~R14、R13(SP) 、R14(LR)程序计数器PC(R15)、R8_fiq-R12_fir为快中断独有 在不同模…

centos中得一些命令 记录

redis命令 链接redis数据库的命令 redis-cli如果 Redis 服务器在不同的主机或端口上运行&#xff0c;你需要提供相应的主机和端口信息。例如&#xff1a; redis-cli -h <hostname> -p <port>连接成功后&#xff0c;你将看到一个类似于以下的提示符&#xff0c;表…

手写Mybatis:第12章-完善ORM框架,增删改查操作

文章目录 一、目标&#xff1a;完善增删改查二、设计&#xff1a;完善增删改查三、实现&#xff1a;完善增删改查3.1 工程结构3.2 完善增删改查类图3.3 扩展解析元素3.4 新增执行方法3.4.1 执行器接口添加update3.4.2 执行器抽象基类3.4.3 简单执行器 3.5 语句处理器实现3.5.1 …

【Eclipse】Project interpreter not specified 新建项目时,错误提示,已解决

目录 0.环境 1&#xff09;问题截图&#xff1a; 2&#xff09;错误发生原因&#xff1a; 1.解决思路 2.具体步骤 0.环境 windows 11 64位&#xff0c;Eclipse 2021-06 1&#xff09;问题截图&#xff1a; 2&#xff09;错误发生原因&#xff1a; 由于我手欠&#xff0c;将…