Linux高并发服务器开发(十)反应堆模型和线程池模型

文章目录

  • 1 epoll反应堆
  • 2 线程池
    • 流程
    • 代码
  • 3 复杂版本线程池
    • 代码


1 epoll反应堆

文件描述符 监听事件 回调函数 进行封装

  1. 创建socket
  2. 设置端口复用
  3. 绑定
  4. 监听
  5. 创建epoll树
  6. 将监听文件描述符lfd上epoll树,对应的事件节点包括:文件描述符,事件epollin,回调函数initAccept

initAccept函数
调用accpet函数接受新的连接,同时将cfd对应的事件节点上树,包括cfd,epollin和回调函数readData

readData函数
读数据:
若读数据异常或者客户端关闭了连接,则下树
若读完数据,则将时间节点中的对EPOLLIN修改为写 EPOLLOUT,同时设置写回调函数为sendData

sendData函数
发送数据给客户端,发送数据完毕后,将对应的数据节点由EPOLLOUT修改为EPOLLIN,回调函数设置为readData

在这里插入图片描述

在这里插入图片描述
创建事件驱动结构体,为事件总数 +1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
代码

//��Ӧ�Ѽ򵥰�
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include "wrap.h"#define _BUF_LEN_  1024
#define _EVENT_SIZE_ 1024//ȫ��epoll���ĸ�
int gepfd = 0;//�¼������ṹ��
typedef struct xx_event{int fd;int events;void (*call_back)(int fd,int events,void *arg);void *arg;char buf[1024];int buflen;int epfd;
}xevent;xevent myevents[_EVENT_SIZE_+1];void readData(int fd,int events,void *arg);//�����¼�
//eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_-1],&myevents[_EVENT_SIZE_-1]);
void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{ev->fd = fd;ev->events = events;//ev->arg = arg;//�����ṹ���Լ�,����ͨ��arg�õ��ṹ���������Ϣev->call_back = call_back;struct epoll_event epv;epv.events = events;epv.data.ptr = ev;//����˼��epoll_ctl(gepfd,EPOLL_CTL_ADD,fd,&epv);//����
}//�޸��¼�
//eventset(fd,EPOLLOUT,senddata,arg,ev);
void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{ev->fd = fd;ev->events = events;//ev->arg = arg;ev->call_back = call_back;struct epoll_event epv;epv.events = events;epv.data.ptr = ev;epoll_ctl(gepfd,EPOLL_CTL_MOD,fd,&epv);//�޸�
}//ɾ���¼�
void eventdel(xevent *ev,int fd,int events)
{printf("begin call %s\n",__FUNCTION__);ev->fd = 0;ev->events = 0;ev->call_back = NULL;memset(ev->buf,0x00,sizeof(ev->buf));ev->buflen = 0;struct epoll_event epv;epv.data.ptr = NULL;epv.events = events;epoll_ctl(gepfd,EPOLL_CTL_DEL,fd,&epv);//����
}//��������
void senddata(int fd,int events,void *arg)
{printf("begin call %s\n",__FUNCTION__);xevent *ev = arg;Write(fd,ev->buf,ev->buflen);eventset(fd,EPOLLIN,readData,arg,ev);
}//������
void readData(int fd,int events,void *arg)
{printf("begin call %s\n",__FUNCTION__);xevent *ev = arg;ev->buflen = Read(fd,ev->buf,sizeof(ev->buf));if(ev->buflen>0) //��������{	//void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)eventset(fd,EPOLLOUT,senddata,arg,ev);}else if(ev->buflen==0) //�Է��ر�����{Close(fd);eventdel(ev,fd,EPOLLIN);}}
//�����Ӵ���
void initAccept(int fd,int events,void *arg)
{printf("begin call %s,gepfd =%d\n",__FUNCTION__,gepfd);//__FUNCTION__ ������int i;struct sockaddr_in addr;socklen_t len = sizeof(addr);int cfd = Accept(fd,(struct sockaddr*)&addr,&len);//�Ƿ��������//����myevents�����п��õ�λ��for(i = 0 ; i < _EVENT_SIZE_; i ++){if(myevents[i].fd==0){break;}}//���ö��¼�eventadd(cfd,EPOLLIN,readData,&myevents[i],&myevents[i]);
}int main(int argc,char *argv[])
{//����socketint lfd = Socket(AF_INET,SOCK_STREAM,0);//�˿ڸ���int opt = 1;setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//��struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_port = htons(8888);servaddr.sin_addr.s_addr = htonl(INADDR_ANY);Bind(lfd,(struct sockaddr*)&servaddr,sizeof(servaddr));//����Listen(lfd,128);//����epoll�����ڵ�gepfd = epoll_create(1024);printf("gepfd === %d\n",gepfd);struct epoll_event events[1024];//�������ʼ�¼���������������������eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_],&myevents[_EVENT_SIZE_]);//void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)while(1){int nready = epoll_wait(gepfd,events,1024,-1);if(nready<0) //����epoll_waitʧ��{perr_exit("epoll_wait error");}else if(nready>0) //����epoll_wait�ɹ�,�������¼��������ļ��������ĸ���{int i = 0;for(i=0;i<nready; i++){xevent *xe = events[i].data.ptr;//ȡptrָ��ṹ���ַprintf("fd=%d\n",xe->fd);if(xe->events & events[i].events){xe->call_back(xe->fd,xe->events,xe);//�����¼���Ӧ�Ļص�}}}}//�رռ����ļ�������Close(lfd);return 0;
}

2 线程池

相关线程函数
pthread_create

pthread_detach 分离
pthread_attr_t attr;
pthread_attr_init
pthread_attr_setdetachstate

pthread_exit

涉及到共享资源(主线程和各个子线程共享任务池)
互斥锁
pthread_mutex_t
pthread_mutex_init
pthread_mutex_lock/unlock
pthread_mutex_destory
能够线程引起阻塞的函数
弱任务池已满,主线程应该阻塞等待子线程处理人物,此时主线程需要阻塞等待
弱任务池空了,子线程应该阻塞等待,等待主线程池往任务池中添加任务
pthread_cond_wait
pthread_cond_signal

在这里插入图片描述
子线程从任务池中获取任务,
任务池中有一个回调函数,子线程通过回调函数执行不同操作。

任务池中的任务结构体
在这里插入图片描述
线程池结构体
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

添加任务给子线程
在这里插入图片描述
任务完成后,通知生产者继续生产任务
在这里插入图片描述
退出

流程

1.初始化操作:线程数量、任务总数、malloc内存、互斥锁和条件变量初始化,创建指定数量的子线程

2.主线程:往线程池添加任务
先加锁 然后判断任务池中任务是否已满 若已满,则调用pthread_cond_wait阻塞等待,若未满,则往任务池中添加任务,添加完任务之后调用Pthread_cond_signal同志子线程去取任务
最后解锁

3.子线程:负责从任务池中获取任务并处理任务
先加锁,然后判断任务池中是否有任务,若任务池中没有任务,则调用pthread_cond_wait函数等待主线程添加任务。若任务池中有任务,则取任务并处理任务。
处理完任务后,通知主线程继续添加任务
如果shutdown为1,先解锁再自动退出:pthread_exit;
最后释放锁

在这里插入图片描述

代码

ThreadPool.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>typedef struct _PoolTask
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;typedef struct _ThreadPool
{int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组int job_push;//入队位置int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件}ThreadPool;void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数#endif

ThreadPool.c

//简易版线程池
#include "threadpoolsimple.h"int beginnum = 1000;void *thrRun(void *arg)
{//printf("begin call %s-----\n",__FUNCTION__);ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&thrPool->pool_lock);//无任务并且线程池不是要摧毁while(thrPool->job_num <= 0 && !thrPool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);}if(thrPool->job_num){//有任务需要处理taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));task->arg = task;thrPool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&thrPool->empty_task);//通知生产者}if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&thrPool->pool_lock);free(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&thrPool->pool_lock);task->task_func(task->arg);//执行回调函数}//printf("end call %s-----\n",__FUNCTION__);
}//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf("begin call %s-----\n",__FUNCTION__);thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));thrPool->thr_num = thrnum;thrPool->max_job_num = maxtasknum;thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁thrPool->job_push = 0;//任务队列添加的位置thrPool->job_pop = 0;//任务队列出队的位置thrPool->job_num = 0;//初始化的任务个数为0thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(&thrPool->pool_lock,NULL);pthread_cond_init(&thrPool->empty_task,NULL);pthread_cond_init(&thrPool->not_empty_task,NULL);int i = 0;thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 int i = 0;for(i = 0; i < pool->thr_num ; i++){pthread_join(pool->threads[i],NULL);}pthread_cond_destroy(&pool->not_empty_task);pthread_cond_destroy(&pool->empty_task);pthread_mutex_destroy(&pool->pool_lock);free(pool->tasks);free(pool->threads);free(pool);
}//添加任务到线程池
void addtask(ThreadPool *pool)
{//printf("begin call %s-----\n",__FUNCTION__);pthread_mutex_lock(&pool->pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool->max_job_num <= pool->job_num){pthread_cond_wait(&pool->empty_task,&pool->pool_lock);}int taskpos = (pool->job_push++)%pool->max_job_num;//printf("add task %d  tasknum===%d\n",taskpos,beginnum);pool->tasks[taskpos].tasknum = beginnum++;pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];pool->tasks[taskpos].task_func = taskRun;pool->job_num++;pthread_mutex_unlock(&pool->pool_lock);pthread_cond_signal(&pool->not_empty_task);//通知包身工//printf("end call %s-----\n",__FUNCTION__);
}//任务回调函数
void taskRun(void *arg)
{PoolTask *task = (PoolTask*)arg;int num = task->tasknum;printf("task %d is runing %lu\n",num,pthread_self());sleep(1);printf("task %d is done %lu\n",num,pthread_self());
}int main()
{create_threadpool(3,20);int i = 0;for(i = 0;i < 50 ; i++){addtask(thrPool);//模拟添加任务}sleep(20);destroy_threadpool(thrPool);return 0;
}

3 复杂版本线程池

线程池相关信息
在这里插入图片描述
在这里插入图片描述

代码

threadpool.h

#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_num  thread num* @param max_thr_num  max thread size* @param queue_max_size   size of the queue.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param pool     Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param pool  Thread pool to destroy.* @return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif

threadpool.c

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10                 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
#define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct 
{void *(*function)(void *);          /* 函数指针,回调函数 */void *arg;                          /* 上面函数的参数 */
} threadpool_task_t;                    /* 各子线程任务结构体 *//* 描述线程池相关信息 */
struct threadpool_t 
{pthread_mutex_t lock;               /* 用于锁住本结构体 */    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid;               /* 存管理线程tid */threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */int min_thr_num;                    /* 线程池最小线程数 */int max_thr_num;                    /* 线程池最大线程数 */int live_thr_num;                   /* 当前存活线程个数 */int busy_thr_num;                   /* 忙状态线程个数 */int wait_exit_thr_num;              /* 要销毁的线程个数 */int queue_front;                    /* task_queue队头下标 */int queue_rear;                     /* task_queue队尾下标 */int queue_size;                     /* task_queue队中实际任务数 */int queue_max_size;                 /* task_queue队列可容纳任务数上限 */int shutdown;                       /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL;do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  printf("malloc threadpool fail");break;                                      /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0;                           /* 有0个产品 */pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;                         /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 队列开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail\n");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail\n");break;}//启动工作线程pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool);/*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}//创建管理者线程pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);return pool;} while (0);/* 前面代码调用失败时,释放poll存储空间 */threadpool_free(pool);return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) {  printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//暂停到这/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));//pthread_detach(pthread_self());pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());//pthread_detach(pthread_self());pthread_exit(NULL);     /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/pool->busy_thr_num++;                                                   /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg);                                           /*执行回调函数任务*///task.function(task.arg);                                              /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;                      /* 关注 任务数 */int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock));  int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*///pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}/*for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}*/threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1;pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1;pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num;pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1
/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),*(int *)arg);sleep(1);printf("task %d is end\n", *(int *)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i]=i;printf("add task %d\n",i);threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */}sleep(10);                                          /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/368219.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在Docker容器中,修改MySQL密码

如果MySQL运行在Docker容器中&#xff0c;修改MySQL密码的方法稍有不同。以下是如何在Docker中修改MySQL密码的步骤&#xff1a; 方法1&#xff1a;使用MySQL命令行工具 1. 找到MySQL容器的ID或者名字&#xff1a; docker ps 2. 进入MySQL容器&#xff1a; docker exec -i…

2024年小米SU7维修手册和电路图线路图接线图资料更新

此次更新了2024年小米SU7维修手册和电路图资料&#xff0c;覆盖市面上99%车型&#xff0c;包括维修手册、电路图、新车特征、车身钣金维修数据、全车拆装、扭力、发动机大修、发动机正时、保养、电路图、针脚定义、模块传感器、保险丝盒图解对照表位置等等&#xff01; 汽修帮…

QT加载安装外围依赖库的翻译文件后翻译失败的现象分析:依赖库以饿汉式的形式暴露单例接口导致该现象的产生

1、前提说明 VS2019 QtClassLibaryDll是动态库,QtWidgetsApplication4是应用程序。 首先明确:动态库以饿汉式的形式进行单例接口暴露; 然后,应用程序加载动态库的翻译文件并进行全局安装; // ...QTranslator* trans = new QTranslator();//qDebug() << trans->…

无人机云台类型及作用

无人机云台主要分为三种类型&#xff1a; 单轴云台&#xff1a;仅支持单向旋转&#xff0c;适合拍摄平滑的延时摄影和全景照片。 双轴云台&#xff1a;支持水平和垂直旋转&#xff0c;可用于拍摄流畅的视频和运动物体。 三轴云台&#xff1a;全面支持所有旋转轴&#xff0c;…

CASS中按指定距离和方向移动图形

1、绘制一个图形 打开软件&#xff0c;随意绘制一个矩形&#xff0c;并量取左下角点的坐标值&#xff0c;具体如下&#xff1a; 2、按距离移动原理讲解 例1&#xff1a;将图形沿着y轴负方向移动100米&#xff0c;如何实现&#xff1f; 如上图所示&#xff0c;测绘中的坐标系…

办公软件WPS与Office的区别

临近计算机考试很多同学在纠结我是报wps好&#xff1f;还是ms office好&#xff1f;下面就来详细说说。 1、wps属于国内金山公司的办公软件&#xff0c;里面包含word、Excel和PPT。考试是2021年开始的&#xff01; 2、MS&#xff08;Microsoft 微软&#xff09; office属于美…

了解MySQL【事务】的功能:确保数据完整性的关键技术

在数据库管理中&#xff0c;事务是确保数据完整性和一致性的核心机制。特别是对MySQL这样广泛应用的开源数据库系统&#xff0c;掌握事务的使用至关重要。在这篇文章中&#xff0c;我们将全面探讨MySQL事务的工作原理、ACID属性、隔离级别以及最佳实践&#xff0c;从而帮助开发…

Docker安装PostgreSQL详细教程

本章教程,使用Docker安装PostgreSQL具体步骤。 一、拉取镜像 docker pull postgres二、启动容器 docker run -it --name postgres --restart always -e POSTGRES_PASSWORD=123456 -e

网安小贴士(7)网络加密

一、前言 网络加密的历史是一个长期发展的过程&#xff0c;其起源可以追溯到古代文明&#xff0c;主要用于战争时期的通信保密&#xff0c;其目的始终是为了保护信息的安全和保密。 二、定义 网络加密是一种安全措施&#xff0c;它通过使用编码算法对通过网络&#xff08;例…

Arthas实战(2)- OOM问题排查

一、 准备测试应用 新建一个 SpringBoot应用&#xff0c;写一段有 OOM bug 的代码&#xff1a; RestController RequestMapping public class JvmThreadController {List<TestWrapper> memoryList new ArrayList<>();GetMapping("/test")public Strin…

BME688 环境传感器

型号简介 BME688是博世&#xff08;bosch-sensortec&#xff09;的一款首款具有人工智能 (AI) 和集成高线性度和高精度压力、湿度和温度传感器的气体传感器。它采用坚固而紧凑的 3.0 x 3.0 x 0.9 mm 封装&#xff0c;专为移动和联网应用而开发&#xff0c;在这些应用中&#xf…

Element中的选择器组件Select (一级选择组件el-select)

简述&#xff1a;在 Element UI 中&#xff0c;ElSelect&#xff08;或简称为 Select&#xff09;是一个非常常用的选择器组件&#xff0c;它提供了丰富的功能来帮助用户从一组预定义的选项中选择一个或多个值。这里来简单记录一下 一. 组件和属性配置 <el-selectv-model&q…

普通集群与镜像集群配置

目录 一. 环境准备 二. 开始配置集群 三. RabbitMQ镜像集群配置 四. 安装并配置负载均衡器HA 一. 环境准备 关闭防火墙和selinux&#xff0c;进行时间同步 主机名系统IP服务rabbitmq-1 Rocky_linux9.4 192.168.226.22RabbitMQ&#xff0c;MySQLrabbitmq-2Rocky_linux9.41…

SpringBoot 多数据源配置

目录 一. 引入maven依赖包 二. 配置yml 三、创建 xml 分组文件 四、切换数据源 一. 引入maven依赖包 <dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1&…

提升学生职务执行力的智慧校园学工管理策略

智慧校园的学工管理系统匠心独运地融入了“学生职务”这一创新模块&#xff0c;它紧密贴合学生的实际需求&#xff0c;致力于在校期间的实践经验积累和个人能力的全面提升。这个模块化身为一个便捷的综合平台&#xff0c;让学生们能够轻松发掘并参与到丰富多彩的校内职务中去&a…

Spring容器生命周期中如前置运行程序和后置运行程序

在Spring容器加入一个实现了BeanPostProcessor接口bean实例&#xff0c;重写postProcessBeforeInitialization、postProcessAfterInitialization方法&#xff0c;在方法里面写具体的实现&#xff0c;从而达到Spring容器在初如化前或销毁时执行预定的程序&#xff0c;方法如下&a…

成都欣丰洪泰文化传媒有限公司开网店可靠吗?

在数字化浪潮席卷全球的今天&#xff0c;电商行业无疑是这场浪潮中的佼佼者。而在这波汹涌的电商大潮中&#xff0c;成都欣丰洪泰文化传媒有限公司凭借其专业的电商服务能力和前瞻的市场洞察力&#xff0c;成为了业界的一匹黑马。今天&#xff0c;就让我们一起揭开这家专注于电…

深圳技术大学oj C : 生成r子集

Description 输出给定序列按字典序的 &#xfffd; 组合&#xff0c;按照所有 &#xfffd; 个元素出现与否的 01 标记串 &#xfffd;&#xfffd;&#xfffd;&#xfffd;−1,...,&#xfffd;1 的字典序输出. 此处01串的字典序指&#xff1a;先输入的数字对应低位&#x…

【大数据】StarRocks的系统架构

StarRocks 架构简洁&#xff0c;整个系统的核心只有 FE&#xff08;Frontend&#xff09;、BE (Backend) 或 CN (Compute Node) 两类进程&#xff0c;方便部署与维护&#xff0c;节点可以在线水平扩展&#xff0c;元数据和业务数据都有副本机制&#xff0c;确保整个系统无单点。…

【C++】string基本用法(常用接口介绍)

文章目录 一、string介绍二、string类对象的创建&#xff08;常见构造&#xff09;三、string类对象的容量操作1.size()和length()2.capacity()3.empty()4.clear()5.reserve()6.resize() 四、string类对象的遍历与访问1.operator[ ]2.正向迭代器begin()和end()3.反向迭代器rbeg…