目录
一,池化技术
二,线程池概念
三,线程池实现
3.1 线程封装
3.2 预备头文件实现
3.3 线程池类的简单实现
3.4 主函数实现
3.5 效果展示
一,池化技术
池化技术是计算机编程领域非常常用的一种技术,该技术可以在编程过程中提前准备一些资源,在需要时可以重复使用这些预先准备的资源
最直观的两个例子:
- 我们要申请堆空间,要使用malloc去申请。我们都知道申请内存是有代价的,在频繁malloc的情况下,执行时间会大幅度上升,造成整体效率降低。
- malloc使用完后,还要free掉避免内存泄漏。而释放和申请一样,都是有代价的。
在系统化的开发过程中,我们经常会用到池化技术:把一些资源预先申请并分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。即对象池对资源(线程、连接和内存)进行管理,这样做带来几个明显的好处:
- 资源能重复利用:减少了资源分配和释放过程中的系统消耗。比如上面的malloc和free,减少申请次数和释放次数。再比如,在IO密集型的服务器上,并发处理过程中的子线程或子进程的创建和销毁过程,带来的系统开销将是难以接受的。所以在业务实现上,通常把一些资源预先分配好,如线程池,数据库连接池,Redis连接池,HTTP连接池等,来减少系统消耗,提升系统性能。
- 可以对资源的整体使用做限制:相关资源预分配且只在预分配是生成,后续不再动态添加,从而限制了整个系统对资源的使用上限。类似一个令牌桶的功能。
- 池化技术分配对象池:通常会集中分配,这样有效避免了碎片化的问题。
二,线程池概念
线程池是一种线程使用模式,池化技术的一种具体作用和优点前面也符合池化技术的用途和优点
- 线程池避免了系统在处理短时间任务时反复创建和销毁线程时要付出的代价
- 线程池保证了内核资源充分利用,也能防止过度调度
线程池常见的应用场景如下:
- 任务时间短,但是有很多个,需要大量线程来完成任务
- 需要较为苛刻的性能要求,比如要求服务器能迅速响应客户需求
- 接受突发性的大量请求,但不至于使服务器立马创建大量线程的应用
三,线程池实现
3.1 线程封装
我们可以把线程搞成一个类,并一齐封装好线程的常用接口,能在线程池代码中方便使用,并且在以后的网络套接字代码编程也可以方便使用
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>// typedef std::function<void* (void*)> fun_t; // 定义一个函数对象,这个函数返回值是void,参数类型是void*
// 但是要考虑兼容性问题,操作系统的接口是C的,所以用函数对象的话下面的创建线程可能无法识别函数对象
typedef void *(*fun_t)(void *); // 这是一个参数为void*,返回值为void*的一个函数指针,该指针类型命名为fun_tclass ThreadData
{
public:void *_args;std::string _name;
};class Thread
{
public:Thread(int num, fun_t callback, void *args): _func(callback){char nameBuffer[64];snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num); // 这个函数表示往特定数组写字符串,第一个表示要写的对象,第二个表示要写的数量或大小,第三个表示写的内容_name = nameBuffer;_tdata._args = args;_tdata._name = _name; // 方便后面使用线程名字}void start(){pthread_create(&_tid, nullptr, _func, (void *)&_tdata);}void join(){pthread_join(_tid, nullptr);}std::string GetName(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;ThreadData _tdata;fun_t _func; // 表示线程要执行的函数
};
3.2 预备头文件实现
makefile:
thread_pool:testMain.ccg++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:rm -f thread_pool
RAII加锁方式,lockGuard.hpp:
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *mtx) : pmtx_(mtx){}void lock(){// std::cout << "要进行加锁" << std::endl;pthread_mutex_lock(pmtx_);}void unlock(){// std::cout << "要进行解锁" << std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx) : mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}private:Mutex mtx_;
};
任务头文件,Task.hpp:
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"typedef std::function<int(int, int)> func_t;class Task
{
public:Task() {}Task(int x, int y, func_t func) : _x(x), _y(y), _func(func){}void operator()(const std::string &name){// 不规范// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",name.c_str(), _x, _y, _func(_x, _y), __FILE__, __LINE__); // 预处理符,方便定位文件名和位置}public:int _x;int _y;// int type;func_t _func;
};
日志头文件,复杂打印程序运作信息到屏幕上或者将信息写进日志文件
log.hpp:
#pragma once#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"};#define LOGFILE "./threadpool.log"// 完整的日志功能至少包含:日志等级,时间,日志内容,并且支持用户自定义
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW // 在makefile中加-DDEBUG_SHOW表示我想调试,就打印调试信息,如果我们#注释掉宏定义,再次make后就不打印调试信息了if (level == DEBUG)return;
#endif// 提取可变参数 -- 麻烦// va_list ap; //是一个char*类型// va_start(ap, format); //用一个参数来初始化它// while(true)// int x = va_arg(ap, int); //用ap制作提取一个整数,没有int就返回NULL// va_end(ap); //类似ap=nullptrchar stdBuffer[1024]; // 日志的标准部分time_t timestamp = time(nullptr); // 获取时间戳// struct tm *localtime = localtime(×tamp);// localtime->tm_hour;snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp); // 把日志的标准部分搞到字符数组里char logBuffer[1024];va_list args;va_start(args, format); // 可变参数列表的起始地址// vprintf(format, args); //格式化打印可变参数列表到屏幕vsnprintf(logBuffer, sizeof logBuffer, format, args); // 将参数列表搞到logBuffer中va_end(args);// 把日志信息往文件里写入// FILE *fp = fopen(LOGFILE, "a");printf("%s%s\n", stdBuffer, logBuffer);// fprintf(fp, "%s%s\n", stdBuffer, logBuffer);// fclose(fp);
}
3.3 线程池类的简单实现
线程池主要包含两个数据结构:一个数组,负责存放线程类的指针;一个队列,负责存放任务。
- 线程池中的多个线程只负责处理执行任务,并不参与制作任务,因为到后面任务从网络中来
- 线程池只对外提供Push接口,用于外部线程将任务扔进队列里
代码如下,threadPool.hpp:
#pragma once#include <vector>
#include <queue>
#include <unistd.h>#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"const int g_thread_num = 3; // 表示默认创建线程个数// 线程池本质是一个生产消费模型
template <class T>
class ThreadPool
{
public:pthread_mutex_t *getMutex() // 获取锁的地址{return &_lock;}bool isEmpty() // 判断队列是否为空{return task_queue_.empty();}void waitCond(){pthread_cond_wait(&_cond, &_lock); // 等待的时候释放锁,唤醒时再申请锁}T getTask(){T t = task_queue_.front();task_queue_.pop();return t;}public:ThreadPool(int thread_num = g_thread_num): _num(thread_num){pthread_mutex_init(&_lock, nullptr); // 初始化锁pthread_cond_init(&_cond, nullptr); // 初始化条件变量for (int i = 1; i <= _num; i++){_threads.push_back(new Thread(i, routine, this)); // 传this指针,让回调方法能够访问类}}void run() // 线程池启动{for (auto &iter : _threads){iter->start(); // 执行thread_create函数,创建线程,创建的数量由数组大小来定,而数组大小在构造函数定义好了,// std::cout << iter->GetName() << "启动成功" << std::endl;logMessage(NORMAL, "%s%s", iter->GetName().c_str(), "启动成功");}}// 取任务// 如果定义在类内,会有隐藏this指针从而影响使用,所以加上static// 如果一个类内部成员用static,那么它只能使用静态成员再调用静态方法,无法使用类内的成员属性和方法// 如果这个静态的routine是所谓的消费线程,那么要pop队列,但是编译时会报错,这就坑了// 所以为了能让routine拿到类内属性,我们再上面push_back的插入Thread对象时,可以把this指针传过来,通过函数来进行访问(与其让它拿到task_queue,不如让它拿static void *routine(void *args){ThreadData *td = static_cast<ThreadData *>(args); // 该操作形象点说就是改文件后缀,这里的后缀是args指针ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(td->_args); // 然后这一步相当于解压操作,拿到指针指向对象的线程池指针// 消费逻辑// 先加锁,while(task_queue_.empty()) wait(); 如果任务队列为空就等待// 不为空就获取任务,然后处理,处理完就解锁while (true){T task;{lockGuard lockguard(tp->getMutex()); // 通过this指针调用getMutex获得锁的地址,实现加锁,保证该代码块是安全的代码块while (tp->isEmpty())tp->waitCond(); // 判断队列是否为空,为空就等待// 读取任务task = tp->getTask(); // 任务队列是共享的,这句话就是将任务从共享,拿到自己的私有空间}task(td->_name); // 执行任务,task是队列里的数据,也就是Task类,改类重载了operator(),所以可以直接使用圆括号执行任务// 测试能否传入this指针// tp->show();// sleep(1);}}// 往队列里塞任务void pushTask(const T &task){lockGuard lockguard(&_lock); // 只单纯加锁,加了任务后还应该要唤醒对应的消费线程来消费task_queue_.push(task);pthread_cond_signal(&_cond);}// 测试接口void joins(){for (auto &iter : _threads){iter->join();}}// 测试构造函数是否可以传入this指针void show(){std::cout << "可以传入this,可以让静态方法的线程,访问到线程池内的方法" << std::endl;}~ThreadPool(){for (auto &iter : _threads){iter->join(); // 在释放前join下delete iter;}pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}private:std::vector<Thread *> _threads; // 这个数组存的是将来要创建的线程int _num;std::queue<T> task_queue_; // 别人发任务来放到队列里,然后派发给指定线程去执行,所以只要添加到队列里,就自动叫醒一个线程来处理pthread_mutex_t _lock;pthread_cond_t _cond;// 另一种方案:// 我们一开始定义两个队列queue1,queue2// 然后再定义两个制作std::queue<T> *p_queue, *c_queue// 然后p_queue->queue1, c_queue->queue2// 当生产一批任务后,我们放到queue1里,然后swap(p_queue, c)queue);// 然后消费者处理完毕后再swap(p_queue, c_queue);// 所以因为我们生产和消费用的是不同的队列,未来我们进行资源任务处理的时候,仅仅只需要交换制作,而且也只要把这个交换这一句加锁即可
};
①关于线程池里的互斥锁和条件变量
- 线程池中的任务队列会被多个执行流共同访问,因此需要加锁保护任务队列
- 队列里有任务线程才能拿任务,所以需要有条件变量,当队列为空时,线程就阻塞等待,当使用Push往队列放任务后,通知线程来执行
- 并且我们是把任务执行的语句放在解锁后的,因为当线程从任务队列拿到任务后,该任务就已经属于该线程了,与其它线程没有关系了,不需要保护了;并且如果是在加锁解锁中间执行的任务,那么一个线程只有执行完任务释放锁后其它线程才能申请到锁执行任务,就没有发挥线程“并行”的优势了
②线程执行函数为什么要设置成静态
- 线程执行的函数为routine,该函数设置为静态
- 因为线程接口的要求是参数为void*,返回值为void*的一个函数,而routine函数在类内实现,所以会默认带一个this指针,从而导致编译失败,所以要设置为静态,不让他默认带this指针
- 但是由于routine没有this指针,导致它不能访问类内的成员属性和其它成员函数,所以我们在构造函数中创建线程时,将this指针作为参数传递给线程执行函数routine,这样routine就可以通过它的参数指针访问类内的属性了
3.4 主函数实现
#include "threadPool.hpp"
#include "Task.hpp"
#include <iostream>
#include <sys/types.h>
#include <unistd.h>int main()
{// logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);srand((unsigned long)time(nullptr) ^ getpid());// 线程池启动ThreadPool<Task> *tp = new ThreadPool<Task>();tp->run();sleep(3);// 构建任务while (true){// 生产任务的过程,制作任务的时候,要花时间int x = rand() % 100 + 1;usleep(7721);int y = rand() % 30 + 1;Task t(x, y, [](int x, int y) -> int{ return x + y; });// std::cout << "任务制作完成" << x << " + " << y << " = ?" << std::endl;logMessage(DEBUG, "制作任务完成:%d+%d=?", x, y);// 推送任务到线程池中tp->pushTask(t);sleep(1);}return 0;
}
主函数代码很简单,首先创建线程池对象,然后while循环,创建任务对象,用lamdba表达式构建的加法函数初始化任务对象,然后将任务对象push进线程池的任务队列,然后线程池里面的线程自动执行任务代码