本项目是基于C++11的线程池。使用了许多C++的新特性,包含不限于模板函数泛型编程、std::future、std::packaged_task、std::bind、std::forward完美转发、std::make_shared智能指针、decltype类型推断、std::unique_lock锁等C++11新特性功能。
本项目有一定的上手难度。推荐参考系列文章
C++11实用技术(一)auto与decltype的使用
C++11实用技术(二)std::function和bind绑定器
C++11实用技术(三)std::future、std::promise、std::packaged_task、async
C++ operator关键字的使用(重载运算符、仿函数、类型转换操作符)
右值引用以及move移动语义和forward 完美转发
C++标准库中的锁lock_guard、unique_lock、shared_lock、scoped_lock、recursive_mutex
代码结构
本项目线程池功能分以下几个函数去实现:
threadpool.init(isize_t num);设置线程的数量
threadpool::get(TaskFuncPtr& task);读取任务队列中的任务
threadpool::run();通过get()读取任务并执行
threadpool.start(); 启动线程池,并通过run()执行任务
threadpool.exec();封装任务到任务队列中
threadpool.waitForAllDone();等待所有任务执行完成
threadpool.stop();分离线程,释放内存
threadpool.init
init的功能是初始化线程池,主要是设置线程的数量到类的成员变量中。
bool ZERO_ThreadPool::init(size_t num)
{std::unique_lock<std::mutex> lock(mutex_);if (!threads_.empty()){return false;}threadNum_ = num;return true;
}
threadNum_:保存线程的数量,在init函数中被赋值
此处使用unique_lock或lock_guard的加锁方式都能实现自动加锁和解锁。但是unique_lock可以进行临时解锁和再上锁,而lock_guard不行,特殊情况下还是必须使用unique_lock(用到条件变量的情况)。(lock_guard比较简单,相对来说性能要好一点)
threadpool::get
从任务队列中获取获取任务,这里其实就是我们的消费者模块。
bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{std::unique_lock<std::mutex> lock(mutex_);if (tasks_.empty()) //判断任务是否存在{//要终止线程池 bTerminate_设置为true,任务队列不为空condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });}if (bTerminate_)return false;if (!tasks_.empty()){task = std::move(tasks_.front()); // 使用了移动语义tasks_.pop(); //释放资源,释放一个任务return true;}return false;
}
条件变量condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });是需要一直等待条件完成才退出。即任务终止,或者任务队列不为空时,就会退出条件变量的阻塞状态,继续执行下面的逻辑。
task = std::move(tasks_.front()); 使用了移动语义,将 tasks_.front() 的内容移动到了 task 中。可以减少内容拷贝。移动完之后tasks_.front() 的内容会变为未指定的状态,所以直接pop掉就好了。
threadpool::run
这里是运行我们的任务部分。包括调用get在任务队列中获取任务,以及执行任务。
void ZERO_ThreadPool::run() // 执行任务的线程
{//调用处理部分while (!isTerminate()) // 判断是不是要停止{TaskFuncPtr task;bool ok = get(task); // 读取任务if (ok){++atomic_;try{if (task->_expireTime != 0 && task->_expireTime < TNOWMS){//如果设置了超时,并且超时了,就需要执行本逻辑//超时任务,本代码未实现,有需要可实现在此处}else{task->_func(); // 执行任务}}catch (...){}--atomic_;}}}
}
atomic_:运行一个任务,该参数+1;执行完毕,该参数-1。这里是为了待会停止线程池时判断是否还有运行中的任务(未完成的线程)。
threadpool.start
创建线程,并把线程池存入vector中,后面释放线程池时,好一一释放线程。
bool ZERO_ThreadPool::start()
{std::unique_lock<std::mutex> lock(mutex_);if (!threads_.empty()){return false;}for (size_t i = 0; i < threadNum_; i++){threads_.push_back(new thread(&ZERO_ThreadPool::run, this));}return true;
}
threads_.push_back(new thread(&ZERO_ThreadPool::run, this));创建线程,线程的回调函数为run。
threadpool.exec
exec是将我们的任务存入任务队列中,这段代码是本项目最难的,用了很多C++的新特性。
/*template <class F, class... Args>它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值返回值后置*/template <class F, class... Args>auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>//接受一个超时时间 `timeoutMs`,一个可调用对象 `f` 和其它参数 `args...`,并返回一个 `std::future` 对象,该对象可以用于获取任务执行的结果。{int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 根据超时时间计算任务的过期时间 `expireTime`,如果超时时间为 0,则任务不会过期。//定义返回值类型using RetType = decltype(f(args...)); // 使用 `decltype` 推导出 `f(args...)` 的返回值类型,并将其定义为 `RetType`(这里的using和typedef功能一样,就是为一个类型起一个别名)。// 封装任务 使用 `std::packaged_task` 将可调用对象 `f` 和其它参数 `args...` 封装成一个可执行的函数,并将其存储在一个 `std::shared_ptr` 对象 `task` 中。auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime); // 封装任务指针,设置过期时间 创建一个 `TaskFunc` 对象,并将任务的过期时间 `expireTime` 传递给它。fPtr->_func = [task]() { // 具体执行的函数 将封装好的任务函数存储在 `TaskFunc` 对象的 `_func` 成员中,该函数会在任务执行时被调用。(*task)();};std::unique_lock<std::mutex> lock(mutex_);tasks_.push(fPtr); // 将任务插入任务队列中condition_.notify_one(); // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notifyreturn task->get_future();; //返回一个 `std::future` 对象,该对象可以用于获取任务执行的结果。}
使用了可变参数模板函数。
tasks_:保存任务的队列
condition_.notify_one():保存一个任务唤醒一个条件变量
std::future : 异步指向某个任务,然后通过future特性去获取任务函数的返回结果。
std::bind:将参数列表和函数绑定,生成一个新的可调用对象
std::packaged_task:将任务和feature绑定在一起的模板,是一种封装对任务的封装。
本函数用到了泛型编程模板函数,输入参数有3个:一个超时时间 timeoutMs
,一个可调用对象 f
和参数 args...
。采用返回值后置的方式返回一个std::future对象。这里采用返回值后置是为了方便使用decltype(f(args…)推导数据类型。
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward(f), std::forward(args)…));是将我们传进来的任务函数和参数bind成一个对象,这个对象可以看作是一整个函数,其返回值就是RetType 类型,并且没有输入参数。所以用std::packaged_task<RetType()>这样的格式来打包封装。封装好的对象用智能指针(std::make_shared)来管理。
同时还要创建一个TaskFunc的对象,同样用智能指针管理,这个对象包括两项内容,一个就是超时时间,一个就是我们封装好的task对象。
通过TaskFuncPtr fPtr = std::make_shared(expireTime);和fPtr->_func = task {(*task)();};两条代码将这两项传进去。
最后会通过task->get_future()返回我们任务函数执行的结果返回值。
threadpool.waitForAllDone
等待所有任务执行完成。
bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{std::unique_lock<std::mutex> lock(mutex_);if (tasks_.empty() && atomic_ == 0)return true;if (millsecond < 0){condition_.wait(lock, [this] { return tasks_.empty() && atomic_ == 0; });return true;}else{return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty() && atomic_ == 0; });}
}
使用条件变量来等待任务执行完成。支持超时执行功能。
此处unique_lock的使用是必须的: 条件变量condition_在wait时会进行unlock再进入休眠, lock_guard并无该操作接口
threadpool.stop
终止线程池。会调用waitForAllDone等待所有任务执行完成再终止。
void ZERO_ThreadPool::stop()
{{std::unique_lock<std::mutex> lock(mutex_);bTerminate_ = true;condition_.notify_all();}waitForAllDone();for (size_t i = 0; i < threads_.size(); i++){if (threads_[i]->joinable()){threads_[i]->join();}delete threads_[i];threads_[i] = NULL;}std::unique_lock<std::mutex> lock(mutex_);threads_.clear();
}
通过join等线程执行完成后才返回。
主函数调用
class Test
{
public:int test(int i) {cout << _name << ", i = " << i << endl;Sleep(1000);return i;}void setName(string name) {_name = name;}string _name;
};
void test3() // 测试类对象函数的绑定
{ZERO_ThreadPool threadpool;threadpool.init(2);threadpool.start(); // 启动线程池Test t1;Test t2;t1.setName("Test1");t2.setName("Test2");auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);cout << "t1 " << f1.get() << endl;cout << "t2 " << f2.get() << endl;threadpool.stop();
}
int main()
{test3(); // 测试类对象函数的绑定cout << "main finish!" << endl;return 0;
}
运行结果:
本项目完整代码下载地址基于C++11的线程池