一个简单实用的线程池及线程池组的实现!

1.线程池简介

线程池,顾名思义,就是一个“池子”里面放有多个线程。为什么要使用线程池呢?当我们编写的代码需要并发异步处理很多任务时候,一般的处理办法是一个任务开启一个线程去处理,处理结束后释放线程。可是这样频繁的申请释放线程,系统的开销很大,为了解决这个问题,线程池就产生了。线程池实现原理是事先申请一定数量的线程存放在程序中,当外部有任务需要线程处理时,把这个任务放到这个“池子”里面,“池子”里面空闲的线程就去取这个任务进行处理,这样既能实现多线程并发处理任务,又能减少系统频繁创建删除线程的开销,这种技术叫做池化技术,相应的池化技术还有内存池、连接池等。

为了更形象理解线程池,举一个例子:线程池就像鸡圈里面鸡,每一个鸡比作一个线程,你手里有一把玉米,每一个颗玉米比作一个任务,鸡吃玉米比作处理任务。当你把一把玉米撒入鸡圈,一群鸡就围过来抢玉米,但是一次一只鸡只能吃一颗玉米,吃完一颗继续吃下一颗,直到鸡圈里面的玉米吃完才停止。线程池处理任务也是一样的,当任务链表上有任务时,通过条件变量通知线程池里面的线程,一群线程就围过来了,但是一个线程一次只能取一个任务进行处理,处理完又去取下一个任务,池子里面每一个线程都是这样处理,直到链表上面的任务全部处理完了,池子中的线程又空闲起来了。

2.线程池-设计实现

实现思路:通过向系统申请多个线程,创建一个任务链表,链表上面存放待处理的任务,当有新的任务加入时候,通过条件变量通知池子里面的线程从链表上面取任务,然后处理任务,一直这样循环下去。

​首先定义结构体,定义如下:

/*** 定义的回调函数
*/
typedef void (*task_func_t)(void *args);/*** 定义的任务节点结构体
*/
typedef struct task_t
{void             *args; //任务参数task_func_t      func;  //任务函数指针struct list_head node;  //链表节点
}task_t;/*** 线程池信息
*/
typedef struct threadpool_t
{struct list_head hlist;       //任务链表int              thread_num;  //线程池数量int              max_ts_num;  //最大任务数量volatile int     curr_ts_num; //当前线程池存在的任务数volatile int     is_exit;     //是否退出线程池标志pthread_mutex_t  mutex;       //互斥锁pthread_cond_t   cond;        //条件变量pthread_t        *ths;        //线程id数组
}threadpool_t;

这是线程池实现的程序:

/*** @brief:线程处理任务函数* @args: 传入的参数* @return: NULL
*/
static void* _process_task_thread(void *args)
{threadpool_t* tp = (threadpool_t*)args;struct list_head *pos = NULL;task_t *task=NULL;if(!args) return NULL;while(1){pthread_mutex_lock(&tp->mutex);while(list_empty(&tp->hlist) && !tp->is_exit){pthread_cond_wait(&tp->cond, &tp->mutex);}if(tp->is_exit){        //判断释放退出线程池pthread_mutex_unlock(&tp->mutex);break;}pos = tp->hlist.next;   //从任务链表取出头节点list_del(pos);          //从链表中删除节点--tp->curr_ts_num;      //更新任务数pthread_mutex_unlock(&tp->mutex);task = list_entry(pos, task_t, node); //从链表节点推出任务节点task->func(task->args); //执行任务free(task);             //释放任务内存}return NULL;
}/*** @brief:创建一个线程池* @thread_nums: 线程数量* @max_ts_num:线程池中最大的任务数量* @return: 线程池句柄
*/
threadpool_t* create_threadpool(int thread_nums, int max_ts_num)
{if(thread_nums <= 0) return NULL;threadpool_t* tp = (threadpool_t*)malloc(sizeof(threadpool_t));memset(tp, 0, sizeof(threadpool_t));INIT_LIST_HEAD(&tp->hlist);tp->is_exit = 0;tp->curr_ts_num = 0;tp->thread_num = thread_nums;tp->max_ts_num = max_ts_num;tp->ths = (pthread_t*)malloc(sizeof(pthread_t)*thread_nums);pthread_mutex_init(&tp->mutex, NULL);pthread_cond_init(&tp->cond, NULL);for(int i=0; i<tp->thread_num; ++i){pthread_create(&(tp->ths[i]), NULL, _process_task_thread, tp);}return tp;
}   /*** @brief:往线程池中添加任务* @tp: 线程池句柄* @func:任务处理函数指针* @args:传入任务参数* @priority: 优先级 1:优先处理 其他:添加到尾部* @return: 返回状态 0:ok
*/
int add_task_threadpool(threadpool_t* tp, task_func_t func, void *args, int priority)
{if(!tp) return -1;if(!func) return -2;if(tp->curr_ts_num > tp->max_ts_num) return -3;task_t *task = (task_t*)malloc(sizeof(task_t)); //申请任务节点内存task->func = func; //给函数指针赋值task->args = args; //保持参数指针pthread_mutex_lock(&tp->mutex);if(priority==1)    //高优先级,添加到头部list_add(&task->node, &tp->hlist);else               //添加到尾部list_add_tail(&task->node, &tp->hlist);++tp->curr_ts_num; //更新任务数pthread_mutex_unlock(&tp->mutex);pthread_cond_signal(&tp->cond); //通知线程取任务return 0;
}/*** @brief:获取线程池中当前存在的任务数量* @tp: 线程池句柄* @return: 当前任务数量
*/
int get_ts_num_threadpool(threadpool_t* tp)
{return tp ? tp->curr_ts_num : -1;
}/*** @brief:释放线程池资源* @tp:线程池句柄* @return: 0:ok
*/
int destory_threadpool(threadpool_t* tp)
{if(!tp) return -1;while(!list_empty(&tp->hlist)){  //等待线程池执行完链表中的任务continue;   }tp->is_exit = 1;                  //更新标志,退出线程池pthread_cond_broadcast(&tp->cond);//通知所有线程函数for(int i=0; i<tp->thread_num; ++i){//等待所有线程函数结束pthread_join(tp->ths[i], NULL);}pthread_mutex_destroy(&tp->mutex); //释放资源pthread_cond_destroy(&tp->cond);free(tp->ths);free(tp);tp = NULL;return 0;
}

相关视频推荐

手把手实现线程池(120行),实现异步操作,解决项目性能问题 

手撕高性能线程池,准备好linux开发环境

线程池在3个开源框架的应用(redis、skynet、workflow)

免费学习地址:c/c++ linux服务器开发/后台架构师

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

3.线程池组-设计实现

有了线程池处理并发任务,为什么还要线程池组呢?原因在于线程池中,所有的线程都使用一个互斥锁阻塞,当你创建的线程池中线程的个数比较多的情况下,存在很多线程对同一个线程抢占,这样会影响线程池取任务处理的效率。因此由"小颗粒"(即每一个线程池中线程个数少)的线程池组成一个"线程池组"这样就能减轻多个线程对同一个锁抢占造成效率低的问题。

设计实现:将多个线程池封装组合到一起,当外部有任务需要处理时,找到线程池组中线程池任务最少的池子,把任务给放进去。

​定义一个线程池组管理结构体:

typedef struct manange_thpool_t
{int thpool_nums;                            //线程池个数threadpool_t *thpools[MAX_THREADPOOL_NUMS]; //线程池结构体
}manange_thpool_t;

代码实现:

/*** @brief:创建线程池组管理句柄* @tp_nums:线程池组中线程池个数* @thread_num:单个线程池中线程个数* @max_ts_n:单个线程池中最大的任务数量
*/
manange_thpool_t* create_group_threadpool(int tp_nums, int thread_num, int max_ts_n)
{manange_thpool_t* mtp = (manange_thpool_t*)malloc(sizeof(manange_thpool_t));if(!mtp) return NULL;memset(mtp, 0, sizeof(manange_thpool_t));mtp->thpool_nums = tp_nums;for(int i=0; i<tp_nums; ++i){mtp->thpools[i] = create_threadpool(thread_num, max_ts_n);}return mtp;
}/*** @brief:往线程池组中添加任务* @mtp:线程池组句柄* @func:任务函数* @args:任务函数的参数* @priority: 优先级 1:优先处理 其他:依次处理* @return: 0:ok 其他:err
*/
int add_task_group_threadpool(manange_thpool_t* mtp, task_func_t func, void *args, int priority)\
{int ts_num= INT_MAX;threadpool_t *tp=NULL;int index=0;for(register int i=0; i<mtp->thpool_nums; ++i){if(mtp->thpools[i]->curr_ts_num < ts_num){ts_num = mtp->thpools[i]->curr_ts_num;tp = mtp->thpools[i];index=i;}}if(!tp){tp = mtp->thpools[0];}return add_task_threadpool(tp, func, args, priority);
}/*** @brief:释放线程池组函数* @tp: 线程池组句柄* @return:none
*/
void destory_group_threadpool(manange_thpool_t* tp)
{if(!tp) return;for(int i=0; i<tp->thpool_nums; ++i){if(tp->thpools[i]) destory_threadpool(tp->thpools[i]);}
}

4.测试

测试程序如下:

#include <stdio.h>
#include <unistd.h>
#include "list.h"
#include "threadpool.h"
#include "manange_threadpool.h"//任务传递的参数
typedef struct info_t
{int times;char buffer[32];
}info_t;void task1(void *args)
{info_t *info = (info_t*)args;printf("handle task1 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);free(args);
}void task2(void *args)
{info_t *info = (info_t*)args;printf("handle task2 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);free(args);
}void task3(void *args)
{info_t *info = (info_t*)args;printf("handle task3 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);free(args);
}//------------split-----------------void test_threadpool(void)
{threadpool_t* tp = create_threadpool(4, 128);info_t *info;for(int t=0; t<10; ++t){for(int i=0; i<32; ++i){info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test ThreadPool task1 info...");add_task_threadpool(tp, task1, info, 1); //往线程池组添加任务info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test ThreadPool task2 info...");add_task_threadpool(tp, task2, info, 0);info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test ThreadPool task3 info...");add_task_threadpool(tp, task3, info, 0);}sleep(1);}destory_threadpool(tp);printf("Test ThreadPool Finish...\n");
}void test_manange_threadpool(void)
{//创建线程池组句柄,有4个线程池,每个线程池使用4线程,每个线程池最大的任务数是32manange_thpool_t* mtp = create_group_threadpool(4, 4, 128);info_t *info;for(int t=0; t<10; ++t){for(int i=0; i<32; ++i){info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test task1 info...");add_task_group_threadpool(mtp, task1, info, 1); //往线程池组添加任务info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test task2 info...");add_task_group_threadpool(mtp, task2, info, 0);info = (info_t *)malloc(sizeof(info_t));info->times=i;sprintf(info->buffer, "Test task3 info...");add_task_group_threadpool(mtp, task3, info, 0);}sleep(1);}//释放线程池组资源destory_group_threadpool(mtp);printf("Test Manage ThreadPool Finish...\n");
}int main(void)
{#if 1 //测试单个的线程池功能test_threadpool();
#else //测试线程池组功能test_manange_threadpool();
#endifreturn 0;
}

通过修改宏定义,决定使用线程池还是线程池组

  1. 测试线程池结果

​2.测试线程池组结果

5.总结

使用线程池情况:一般程序中有并发处理任务,但是处理的任务并发量不高时候采用线程池。

使用线程池组情况:程序中任务并发量很大情况下使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/85912.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第四十八周周报

学习目标&#xff1a; 修改ViTGAN 学习内容&#xff1a; 位置编码和多尺度 学习时间&#xff1a; 8.5-8。12 学习产出&#xff1a; 这两周主要工作在修改ViTGAN的结构和代码&#xff0c;将相对位置编码加入ViTGAN并将生成器变为多尺度&#xff0c;由于匹配维度很困难&am…

maven的入门使用

maven的入门使用 1.Maven&#xff08;Maven Apache&#xff09;是一个流行的项目构建和管理工具&#xff0c;2.项目结构和POM文件&#xff1a;3.POM文件&#xff08;Project Object Model&#xff09;4.依赖管理&#xff1a; 在POM文件中5.生命周期和构建过程1.前言2.插件系统3…

Windows 编译CEF源码详细记录

背景 默认的CEF不支持音视频功能&#xff0c;需要下载源码将ffmpeg开关打开&#xff0c;再进行编译。 Linux编译参考&#xff1a; 《Linux CEF源码下载编译详细记录》 创建目录结构 code/automate/automate-git.py <-- CEF build scriptchromium_git/cef/ …

湘大 XTU OJ 1148 三角形 题解(非常详细):根据题意朴素模拟+观察样例分析需要计算几轮 具体到一般

一、链接 1148 三角形 二、题目 题目描述 给一个序列&#xff0c;按下面的方式进行三角形累加&#xff0c;求其和值。 比如序列为 1,2,3,4,5 1 2 3 4 53 5 7 98 12 1620 2848输入 有多组样例。每个样例的第一行是一个整数N(1≤N≤100),表示序列的大小&…

27.Netty源码之FastThreadLocal

highlight: arduino-light FastThreadLocal FastThreadLocal 的实现与 ThreadLocal 非常类似&#xff0c;Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。下面我们看下这两个类是如何实现的。 FastThreadLocalThread 是对…

0基础学习VR全景平台篇 第80篇:Insta360 影石如何直播推流

一、下载Insta360 Pro APP 1、手机进入Insta360官网Insta360 | Action Cameras | 360 Cameras | VR Cameras&#xff0c;页面往下滑动到Insta360 Pro2相机处&#xff0c;点击相机图片进入详情页。详情页继续下滑到到手机APP处&#xff0c;根据自己的手机系统选择对应的客户端进…

PhotoShop2023 Beta AI版安装教程

从 Photoshop 开始&#xff0c;惊艳随之而来​ 从社交媒体贴子到修饰相片&#xff0c;设计横幅到精美网站&#xff0c;日常影像编辑到重新创造 – 无论什么创作&#xff0c;Photoshop 都可以让它变得更好。​ Photoshop2023 Beta版本安装教程和软件下载 地址&#xff1a;点击…

dubbo之高可用

负载均衡 概述 负载均衡是指在集群中&#xff0c;将多个数据请求分散到不同的单元上执行&#xff0c;主要是为了提高系统的容错能力和对数据的处理能力。 Dubbo 负载均衡机制是决定一次服务调用使用哪个提供者的服务。 策略 在Dubbo中提供了7中负载均衡策略&#xff0c;默…

Vue生命周期函数(详解)

目录 生命周期图 生命周期函数 beforeCreate和created的区别 beforeCreate创建前应用场景 created创建后应用场景 beforeMount和mounted的区别 beforeMount挂载前应用场景 mounted挂载后应用场景 beforeUpdate和updated的区别 beforeUpdate更新前应用场景 updated更新后应用…

Linux:Shell编辑之文本处理器(awk)

目录 绪论 1、用法 1.1 格式选项 1.2 awk 常用内置变量 1.3 awk的打印功能 1.4 奇偶打印 1.5 awk运算 1.6 awk的内置函数&#xff1a;getline 1.7 文本过滤打印 1.8 awk条件判断打印 1.9 三元表达式&#xff0c;类似于java 1.10 awk的精确筛选 1.11 awk和tr比较改变…

RabbitMQ 79b5ad38df29400fa52ef0085a14b02f

RabbitMQ 一、什么是消息队列 消息队列可以看作是一个存放消息的容器&#xff0c;其中&#xff0c;生产者负责生产数据到消息队列中&#xff0c;而消费者负责消费数据。消息队列是分布式系统中重要的组件&#xff0c;目前使用较多的消息队列有ActiveMQ&#xff0c;RabbitMQ&am…

【雕爷学编程】Arduino动手做(202)---热释电效应、热释电元件与HC-SR505运动传感器模块

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

谈谈Spring与字节码生成技术

Java程序员几乎都了解Spring。 它的IoC&#xff08;依赖反转&#xff09;和AOP&#xff08;面向切面编程&#xff09;功能非常强大、易用。而它背后的字节码生成技术&#xff08;在运行时&#xff0c;根据需要修改和生成Java字节码的技术&#xff09;就是一项重要的支撑技术。 …

PK Nounique CASCADE DROP INDEX keep index

Explicit Control Over Indexes when Creating, Disabling, or Dropping PK/Unique Constraints (Doc ID 139666.1)​编辑To Bottom PURPOSEIn Oracle 9i, the DBA has an explicit control over how indexes are affectedwhile creating, disabling, or dropping Primary Ke…

自动切换HTTP爬虫ip助力Python数据采集

在Python的爬虫世界里&#xff0c;你是否也被网站的IP封锁问题困扰过&#xff1f;别担心&#xff0c;我来教你一个终极方案&#xff0c;让你的爬虫自动切换爬虫ip&#xff0c;轻松应对各种封锁和限制&#xff01;快来跟我学&#xff0c;让你的Python爬虫如虎添翼&#xff01; 首…

SpringBoot 项目使用 Redis 对用户 IP 进行接口限流

一、思路 使用接口限流的主要目的在于提高系统的稳定性&#xff0c;防止接口被恶意打击&#xff08;短时间内大量请求&#xff09;。 比如要求某接口在1分钟内请求次数不超过1000次&#xff0c;那么应该如何设计代码呢&#xff1f; 下面讲两种思路&#xff0c;如果想看代码可…

【redis 3.2 集群】

目录 一、Redis主从复制 1.概念 2.作用 2.1 数据冗余 2.2 故障恢复 2.3 负载均衡 2.4 高可用 3.缺点 4.流程 4.1 第一步 4.2 第二步 4.3 第三步 4.4 第四步 5.搭建 5.1 主 5.2 从 6.验证 二、Reids哨兵模式 1.概念 2.作用 2.1 监控 2.2 自动故障转移 2.…

ArcGIS Pro基础:【按顺序编号】工具实现属性字段的编号自动赋值

本次介绍一个字段的自动排序编号赋值工具&#xff0c;基于arcgis 的字段计算器工具也可以实现类似功能&#xff0c;但是需要自己写一段代码实现&#xff0c; 相对而言不是很方便。 如下所示&#xff0c;该工具就是【编辑】下的【属性】下的【按顺序编号】工具。 其操作方法是…

redis基础

目录 前言 一、概述 1.NoSQL 2.Redis 二、安装 1.编译安装 2.RPM安装 三、目录结构 四、命令解析 五、redis登录更改 六、数据库操作 &#xff08;一&#xff09;、登录数据库 1.本地 2.远程登录 &#xff08;二&#xff09;、数据操作 1.数据库操作 2.数据操作 …

C++笔记之将定时器加入向量并设置定时器的ID为i

C笔记之将定时器加入向量并设置定时器的ID为i code review! 文章目录 C笔记之将定时器加入向量并设置定时器的ID为i关于代码中的void operator()() 运行 代码 #include <chrono> #include <iostream> #include <thread> #include <vector>// 定义定时…