Linux下使用C语言实现线程池---代码及分析

线程池

相关文章
协议
Socket编程
高并发服务器实现
线程池

如果一个客户端建立连接使用创建一个线程用于处理这一个线程, 处理结束的时候把这一个线程删除, 这个时候会导致线程的创建以及销毁会消耗大量的时间

这时候可以一次性创建多个线程, 这几个线程统称线程池, 如果客户端建立一个连接, 线程池分配一个线程处理客户发过来的数据, 不处理的时候这几个线程阻塞

可以使用条件变量进行阻塞

线程的数量可以随着连接的个数, 时间等条件进行变换, 但是要有一个上限, 连接分个数增加的时候加线程, 如果忙的线程数量比较少的时候释放线程, 这一些处理使用一个adjust线程进行处理

img

实际实现

主函数

  • 创建一个线程池
  • 在里面添加任务
  • 等待
  • 关闭线程池

线程池

  • 初始化各种数据

  • 获取基础线程, 让他们阻塞等待任务

  • 获取任务, 唤醒线程处理

  • 处理结束接着阻塞

    后台的控制线程(adjust_thread)根据线程的实际使用情况添加或者释放线程

/*************************************************************************> File Name: thread_pool.c> Author: XvSenfeng> Mail: 1458612070@qq.com > Created Time: Wed 10 Apr 2024 08:23:27 PM CST************************************************************************/#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>

使用的头文件


//多长时间判断一下是否需要增加获取减少线程个数
//这一个值小一点看现象
#define DEFAULT_TIME	3
//最小的执行的线程的个数
#define MIN_WAIT_TASK_NUM	10
//增加减少的时候步长
#define DEFAULT_THREAD_VARY	10
#define true 1
#define false 0

使用的宏定义

//记录一个回调函数, 由于线程执行
typedef struct {void *(*function)(void *);void *arg;
} threadpool_task_t;//线程池的控制结构体
typedef struct{pthread_mutex_t lock;				//这一个结构体使用pthread_mutex_t thread_counter;		//记录忙状态的线程的个数pthread_cond_t queue_not_full;		//队列,没有满的时候添加使用pthread_cond_t queue_not_empty;pthread_t *threads;					//线程数组, 记录现在使用的线程的pidpthread_t adjust_tid;				//管理者的idthreadpool_task_t *task_queue;		//线程执行任务的时候使用的环形队列int min_thr_num;			//最小的时候保留的线程个数int max_thr_num;			//最大可以支持的个数int live_thr_num;			//存活的线程的个数int busy_thr_num;			//执行任务的线程的个数int wait_exit_thr_num;		//需要释放的线程的个数//队列使用int queue_front;int queue_rear;int queue_size;int queue_max_size;int shutdown;			//记录这一个线程池有没有使用}threadpool_t; 

线程池的控制结构体, 以及任务信息记录的结构体

void *adjust_thread(void *threadpool);
void threadpool_free(threadpool_t *pool);
//普通的线程处理函数
void *threadpool_thread(void *threadpool){threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while(true){pthread_mutex_lock(&(pool->lock));//判断一下现在没有可以执行的任务, 并且这一个线程组还没有销毁while((pool->queue_size == 0) && (!pool->shutdown)){printf("thread %#x is waiting\n", (unsigned int)pthread_self());//等待任务, 获取清除唤醒pthread_cond_wait(&pool->queue_not_empty, &pool->lock);//需要清除一部分线程if(pool->wait_exit_thr_num > 0){pool->wait_exit_thr_num--;//记录需要清除的任务的个数if(pool->live_thr_num > pool->min_thr_num){printf("thread %#x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&pool->lock);pthread_exit(NULL);}}}//这时候是有待处理的任务, 或者这一个线程池被销毁了if(pool->shutdown){//销毁线程池pthread_mutex_unlock(&pool->lock);printf("thread %#x will exit\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL);}//处理一个待处理的任务//获取一下这一个任务的处理函数task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;//调整一下环形缓冲区的指针pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;pool->queue_size--;//发送一个任务队列现在有位置的信号, 这一个的阻塞在添加一个待处理任务的位置pthread_cond_broadcast(&pool->queue_not_full);pthread_mutex_unlock(&pool->lock);printf("thread %#x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&pool->thread_counter);pool->busy_thr_num++;pthread_mutex_unlock(&pool->thread_counter);(*(task.function))(task.arg);//调用则一个任务的处理函数printf("thread %#x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&pool->thread_counter);pool->busy_thr_num--;pthread_mutex_unlock(&pool->thread_counter);}pthread_exit(NULL);}
//创建一个线程池, 返回这一个线程池的操控结构体
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){int i;threadpool_t *pool = NULL;do{printf("malloc thread pool\n");if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL){printf("malloc threadpool fail");break;}//记录一下初始信息pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;pool->wait_exit_thr_num = 0;pool->queue_size = 0;pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;//获取线程数组pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);if(pool->threads == NULL){printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);//获取任务队列printf("malloc task queue\n");pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);printf("success\n");if(pool->task_queue == NULL){printf("malloc queue fail");break;}memset(pool->task_queue, 0, sizeof(threadpool_task_t)*queue_max_size);//初始化一下各种锁if(pthread_mutex_init(&(pool->thread_counter), NULL)!= 0	||pthread_mutex_init(&(pool->lock), NULL)	!= 0			||pthread_cond_init(&(pool->queue_not_empty), NULL) != 0	||pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init lock fail");break;}//开启一定数量的线程for(i = 0;i<min_thr_num ;i++){pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);printf("start thread %#x...\n", (unsigned int)pool->threads[i]);}//开启一个控制线程pthread_create(&pool->adjust_tid, NULL, adjust_thread, (void *)pool);return pool;}while(0);threadpool_free(pool);return NULL;
}
//释放这一个线程池的空间
void threadpool_free(threadpool_t *pool){if(pool == NULL){return;}if(pool->task_queue){free(pool->task_queue);}if(pool->threads){free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));free(pool);}pool = NULL;
}
//销毁这一个线程池
void threadpool_destory(threadpool_t *pool){if(pool == NULL){return;}pool->shutdown = true;//等待服务线程的结束pthread_join(pool->adjust_tid, NULL);int i;for(i = 0; i < pool->live_thr_num; i++){pthread_cond_broadcast(&(pool->queue_not_empty));}for(i = 0; i < pool->live_thr_num; i++){pthread_join(pool->threads[i], NULL);}threadpool_free(pool);
}
//判断一个线程是不是还活着
int is_thread_alive(pthread_t tid){int kill_rc = pthread_kill(tid, 0);if(kill_rc == ESRCH){return false;}return true;
}
//控制线程
void *adjust_thread(void *threadpool){int i;threadpool_t *pool = (threadpool_t *)threadpool;//只要这一个线程池还在, 这一个线程就在while(!pool->shutdown){sleep(DEFAULT_TIME);pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;int live_thr_num = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;		//获取忙线程的个数pthread_mutex_unlock(&(pool->thread_counter));//创建线程, 这时候等待队列中的任务数大于等待阈值,且存活的线程数小于最大线程数if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){//需要创建的线程数pthread_mutex_lock(&(pool->lock));int add = 0;//获取可以使用的数组位置for(i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++){if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){pthread_create(&pool->threads[i], NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;printf("new thread %#x is created\n", (unsigned int)pool->threads[i]);}}pthread_mutex_unlock(&(pool->lock));}//销毁线程,如果忙线程*2 < 存活线程数且存活线程数大于最小线程数if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num){pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;//需要销毁的线程数pthread_mutex_unlock(&(pool->lock));//唤醒空闲线程,让其自杀for(i = 0; i < DEFAULT_THREAD_VARY; i++){pthread_cond_signal(&(pool->queue_not_empty));}}}
}
//给线程池里面加一个待处理的任务
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg){pthread_mutex_lock(&(pool->lock));//如果队列满了,调用wait阻塞while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}//线程池已经关闭if(pool->shutdown){pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}//清空工作线程的回调函数的参数if(pool->task_queue[pool->queue_rear].arg != NULL){pool->task_queue[pool->queue_rear].arg = NULL;}//添加任务到任务队列中pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;pool->queue_size++;//添加任务后,队列不为空,唤醒线程池中的线程pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}
//模拟处理业务
void * process(void *arg){printf("thread %x working on task %d\n", (int)pthread_self(), *(int *)arg);sleep(1);printf("task %d is end\n", *(int *)arg);return NULL;
}

线程池的代码实现, 建议跟随main函数的流程分析

int main(void){threadpool_t *thp = threadpool_create(3, 100, 100);printf("pool inited\n");int num[20];//模拟添加任务for(int i = 0; i < 20 ; i++){num[i] = i;printf("add task %d\n", i);threadpool_add(thp, process, (void *)&num[i]); //添加任务}sleep(10);//等待处理结束threadpool_destory(thp);
}

测试这一个线程池

执行分析

2024-4-7-MPIO ./a.out  
start thread 0x2b192700...			#开启线程池的基础线程
thread 0x2b192700 is waiting
start thread 0x2a991700...
thread 0x2a991700 is waiting
start thread 0x2a190700...
thread 0x2a190700 is waiting
pool inited							#初始化结束
add task 0							#加入任务
add task 1
add task 2
thread 0x2b192700 start working		  #任务开始处理
thread 723068672 working on task 0
thread 0x2a991700 start working
thread 714675968 working on task 1
thread 0x2a190700 start working
add task 3
add task 4
add task 5
add task 6
add task 7
add task 8
add task 9
add task 10
add task 11
add task 12
add task 13
add task 14
add task 15
add task 16
add task 17
add task 18
add task 19
thread 706283264 working on task 2		#任务处理结束
task 0 is end
thread 0x2b192700 end working
thread 0x2b192700 start working
thread 723068672 working on task 3
task 1 is end
thread 0x2a991700 end working
thread 0x2a991700 start working
thread 714675968 working on task 4
task 2 is end
thread 0x2a190700 end working
thread 0x2a190700 start working
thread 706283264 working on task 5
task 3 is end
thread 0x2b192700 end working
thread 0x2b192700 start working
thread 723068672 working on task 6
task 4 is end
thread 0x2a991700 end working
thread 0x2a991700 start working
thread 714675968 working on task 7
task 5 is end
thread 0x2a190700 end working
thread 0x2a190700 start working
thread 706283264 working on task 8
new thread 0x2918e700 is created	#由于线程数量比较少, 开始加入线程
new thread 0x2898d700 is created
new thread 0x23fff700 is created
new thread 0x237fe700 is created
new thread 0x22ffd700 is created
new thread 0x227fc700 is created
new thread 0x21ffb700 is created
new thread 0x217fa700 is created
new thread 0x20ff9700 is created
new thread 0x207f8700 is created
thread 0x207f8700 start working
thread 0x2898d700 start working
thread 681105152 working on task 11
thread 0x23fff700 start working
thread 603977472 working on task 12
thread 545228544 working on task 9
thread 0x237fe700 start working
thread 595584768 working on task 13
thread 0x2918e700 start working
thread 689497856 working on task 10
thread 0x22ffd700 start working
thread 587192064 working on task 14
thread 0x227fc700 start working
thread 578799360 working on task 15
thread 0x21ffb700 start working
thread 570406656 working on task 16
thread 0x217fa700 start working
thread 562013952 working on task 17
thread 0x20ff9700 start working
thread 553621248 working on task 18
task 6 is end
task 8 is end
task 7 is end
thread 0x2a190700 end working
thread 0x2b192700 end working
thread 0x2b192700 is waiting
thread 0x2a991700 end working
thread 0x2a991700 is waiting
thread 0x2a190700 start working
thread 706283264 working on task 19
task 11 is end
thread 0x2898d700 end working
task 10 is end
thread 0x2918e700 end working
task 16 is end
thread 0x21ffb700 end working
task 12 is end
thread 0x23fff700 end working
task 15 is end
thread 0x227fc700 end working
task 18 is end
thread 0x20ff9700 end working
task 17 is end
thread 0x217fa700 end working
task 13 is end
thread 0x237fe700 end working
thread 0x2898d700 is waiting		#线程开始空闲
thread 0x2918e700 is waiting
task 9 is end
thread 0x207f8700 end working
task 19 is end
thread 0x2a190700 end working
task 14 is end
thread 0x22ffd700 end working
thread 0x21ffb700 is waiting
thread 0x23fff700 is waiting
thread 0x227fc700 is waiting
thread 0x20ff9700 is waiting
thread 0x217fa700 is waiting
thread 0x237fe700 is waiting
thread 0x207f8700 is waiting
thread 0x2a190700 is waiting
thread 0x22ffd700 is waiting
thread 0x2918e700 is exiting			#空闲任务不需要了
thread 0x2b192700 is exiting
thread 0x2a991700 is exiting
thread 0x2898d700 is exiting
thread 0x21ffb700 is exiting
thread 0x23fff700 is exiting
thread 0x227fc700 is exiting
thread 0x217fa700 is exiting
thread 0x237fe700 is exiting
thread 0x20ff9700 is exiting
thread 0x207f8700 will exit				#销毁线程池的时候还有三个线程
thread 0x2a190700 will exit
thread 0x22ffd700 will exit

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/305741.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac下用adb命令安装apk到android设备笔记

查询了些资料记录备用。以下是在Mac上使用命令行安装APK文件的步骤&#xff1a; 1. 下载并安装ADB&#xff1a; 如果您的Mac上没有安装ADB&#xff0c;请从官方的Android开发者网站下载Android SDK Platform Tools&#xff1a;Android SDK Platform Tools。将下载的ZIP文件解…

Maven创建项目

目录 1.创建项目 2.从Maven Repository: Search/Browse/Explore (mvnrepository.com)链接&#xff0c;下载API 3.1.0 3.在main文件内创建webapp文件夹&#xff0c;再webapp文件夹内创建WEB-INF文件夹&#xff0c;在WEB-INF文件夹内创建web.xml 4.网络编程 5.打包 6.部署 …

前端服务请求跨域被拦截,Java后端Springboot服务解决办法

跨域问题 跨域前端遇到的问题&#xff1a; Access to XMLHttpRequest at ‘http://www.xxx.xxxx/api/x/d/xc’ from origin ‘http://127.0.0.1:3000’ has been blocked by cors policy: No ‘Access-Contorl-Allow-Origin’ header is present on the requested resource. …

雪亮工程视频联网综合管理/视频智能分析系统建设方案(二)

一、我国雪亮工程当前建设需求 1&#xff09;加强社会治安防控感知网络建设 加强社会治安防控智能感知网络建设&#xff0c;针对城中村、背街小巷、城乡结合部等重点区域建设安装视频监控设备&#xff0c;减少死角和盲区&#xff0c;与已有感知系统结合&#xff0c;形成高低搭…

Transformer模型-decoder解码器,target mask目标掩码的简明介绍

今天介绍transformer模型的decoder解码器&#xff0c;target mask目标掩码 背景 解码器层是对前面文章中提到的子层的包装器。它接受位置嵌入的目标序列&#xff0c;并将它们通过带掩码的多头注意力机制传递。使用掩码是为了防止解码器查看序列中的下一个标记。它迫使模型仅使用…

Unity 中画线

前言&#xff1a; 在Unity项目中&#xff0c;调试和可视化是开发过程中不可或缺的部分。其中&#xff0c;绘制线条是一种常见的手段&#xff0c;可以用于在Scene场景和Game视图中进行调试和展示。本篇博客将为你介绍多种不同的绘制线条方法&#xff0c;帮助你轻松应对各种调试…

新手尝试硬件买单片机还是树莓派?

新手尝试硬件买单片机还是树莓派&#xff1f; 新手的话&#xff0c;先学单片机吧&#xff0c;51&#xff0c;stm32&#xff0c;都可以&#xff0c;很多学习平台给的例子比较多&#xff0c;程序相对都比较简单&#xff0c;更贴近硬件&#xff0c;玩起来比较容易做出小东西&…

SI案例分享--实用的单端口Delta-L测试方法

目录 0 引言 1 单端口Delta-L技术 2 基于单端口Delta-L方法的反射灵敏度分析 3 用充分表征的材料系统验证该方法 4 在单端口法中提取总损耗 5 总结 0 引言 Intel Delta-L方法已被公认为一种常规方法&#xff0c;通过对测试线进行2端口测量来提取层压板材料的Dk和插入损耗…

机器学习——模型融合:Stacking算法

机器学习——模型融合&#xff1a;Stacking算法 在机器学习中&#xff0c;模型融合是一种常用的方法&#xff0c;它可以提高模型的泛化能力和预测性能。Stacking算法&#xff08;又称为堆叠泛化&#xff09;是一种强大的模型融合技术&#xff0c;它通过组合多个基本分类器的预…

ActiveMQ入门案例(queue模式和topic模式)

目录 前言&#xff1a;为什么使用消息中间件&#xff1f; 异步通信 缓冲 解耦 前提&#xff1a;安装并启动activemq 一、点对点&#xff08;point to point&#xff0c; queue&#xff09; 1.1 创建maven项目 1.2 Pom依赖 1.2 JmsProduce 消息生产者 1.3 JmsConsumer…

案例三 BeautifulSoup之链家二手房

本案例用到列表&#xff0c;函数&#xff0c;字符串等知识点&#xff0c;知识点参考链接如下&#xff1a; python基础知识&#xff08;一&#xff09;&输入输出函数 python基础知识&#xff08;二&#xff09;&基本命令 python基础知识&#xff08;三&#xff09;&…

绝地求生:AUG爆裂弹球黑货箱:街机动漫风格大家会喜欢吗?

大好&#xff0c;我闲游盒&#xff01; 4.10更新后&#xff0c;AUG的新成长型也出来了&#xff0c;更新后我觉得AUG变好用了一点&#xff0c;不知道大家有没有感觉出来&#xff1f; 宝箱概率 本期主角 AUG-爆裂弹球&#xff08;紫色配粉红色&#xff09; 本次的AUG我才升到5级…

计算两个时间段的差值

计算两个时间段的差值 运行效果&#xff1a; 代码实现&#xff1a; #include<stdio.h>typedef struct {int h; // 时int m; // 分int s; // 秒 }Time;void fun(Time T[2], Time& diff) {int sum_s[2] { 0 }; for (int i 0; i < 1; i) { // 统一为秒数sum_s[…

程序员如何搞副业?

文章目录 每日一句正能量前言写博客开付费专栏制作教程卖相关的技术知识自己做个人网站卖技术和程序1.软件开发和定制:2.移动应用开发:3.独立软件产品:4.网络服务和咨询: 写自媒体获取收益开发小程序或网站插件出书卖教程后记 每日一句正能量 努力的人&#xff0c;生活不会迷茫…

嵌入式单片机入职第二天-EEPROM与IIC

上午&#xff1a; 1.安装Jlink驱动&#xff0c;死活没反应&#xff0c;因为昨天才装完系统&#xff0c;领导让我装电脑主板驱动 领导方法进惠普官网通过查询电脑型号&#xff0c;里面几十个驱动搞得我眼花&#xff0c;领导告诉我进官网就去开会了&#xff0c;可能因为是外网&…

计算机网络——抓取icmp包

前言 本博客是博主用于记录计算机网络实验的博客&#xff0c;如果疏忽出现错误&#xff0c;还望各位指正。 抓包 我们是用Wireshark工具来进行抓包的。 ​在安装时候一路打勾安装即可&#xff0c;不过最后那个因为是英文&#xff0c;一定要看清&#xff0c;点了立即重启&am…

sky光遇加速器推荐 steam光遇低延迟稳定的加速器推荐

在光遇游戏中&#xff0c;子民指的就是游戏中的人影&#xff0c;玩家在游戏里面需要找到蓝色人影并触碰它&#xff0c;然后跟随光点&#xff0c;这样的话我们就可以看到一个深灰色的石像&#xff0c;点燃石像上的火苗&#xff0c;它就会教我们一个新的互动姿势。玩家找到黄色人…

安装 Kali NetHunter (完整版、精简版、非root版)、实战指南、ARM设备武器化指南

From&#xff1a;https://www.kali.org/docs/nethunter/ NetHunter 实战指南&#xff1a;https://www.vuln.cn/6430 乌云 存档&#xff1a;https://www.vuln.cn/wooyundrops 1、Kali NetHunter Kali NetHunter 简介 Net&#xff08;网络&#xff09;&#xff0c;hunter&#x…

【C语言基础】:文件操作详解(后篇)

文章目录 一、文件的顺序读写1.1 顺序函数读写函数介绍1.2 fgetc函数和fputc函数1.3 fputs函数和fgets函数1.4 fprintf函数和fscanf函数1.5 fwrite函数和fread函数 二、文件的随机读写2.1 fseek函数2.2 ftell函数2.3 rewind函数 三、文件读取结束的判定3.1 feof函数 四、文件缓…

解决idea种maven依赖时明明有包,但是一直提示 Cannot resolve com.grandtech:gny-common:0.0.7

1、先看提示问题 &#xff0c;Cannot resolve com.grandtech:gny-common:0.0.7&#xff0c; 2、依赖我也是是没有问题 3、在maven库中的包也是要来的新的别人能运行的。但是放进去就是无法解析。 解决办法&#xff1a;在idea中直接&#xff0c;用mvn命令装载&#xff1a; ①…