使用C++实现一个高效的线程池

在多线程编程中,线程池是一种常见且高效的设计模式。它通过预先创建一定数量的线程来处理任务,从而避免频繁创建和销毁线程带来的性能开销。本文将详细介绍如何使用C++实现一个线程池,并解析相关代码实现细节。

线程池简介

线程池(Thread Pool)是一种管理和复用线程的机制。它通过维护一个线程集合,当有任务需要执行时,从线程池中分配一个空闲线程来处理任务,任务完成后线程归还到池中。这样可以显著减少线程创建和销毁的开销,提高系统的整体性能和响应速度。

设计思路

本文实现的线程池主要包含两个核心类:

  1. Thread类:封装了单个线程的创建、启动和管理。
  2. ThreadPool类:管理多个线程,维护任务队列,并调度任务给线程执行。

线程池支持两种模式:

  • MODE_CACHED:缓存模式,根据任务量动态调整线程数量,适用于任务量不固定的场景。
  • MODE_FIXED:固定模式,线程数量固定,适用于任务量稳定的场景。

Thread类实现

Thread类负责封装单个线程的创建和管理。以下是Thread.hThread.cpp的实现。

Thread.h

#include <functional>
#include <atomic>
#include <cstdint>
#include <thread>class Thread {
public:using ThreadFunc = std::function<void(std::uint32_t)>;public:explicit Thread(ThreadFunc func);void join();~Thread();void start();[[nodiscard]] std::uint32_t getID() const;[[nodiscard]] static std::uint32_t getNumCreated();Thread(const Thread &) = delete;Thread &operator=(const Thread &) = delete;private:ThreadFunc m_func;uint32_t m_threadID;std::thread m_thread;static std::atomic<uint32_t> m_numCreateThread;
};

Thread.cpp

#include "Thread.h"std::atomic<uint32_t> Thread::m_numCreateThread(0);Thread::Thread(Thread::ThreadFunc func) : m_func(std::move(func)), m_threadID(m_numCreateThread.load()) {m_numCreateThread++;
}void Thread::start() {m_thread = std::thread([this]() {m_func(m_threadID);});m_thread.detach();
}uint32_t Thread::getID() const {return m_threadID;
}uint32_t Thread::getNumCreated() {return Thread::m_numCreateThread.load();
}Thread::~Thread() {join();
}void Thread::join() {if (m_thread.joinable()) {m_thread.join();}
}

解析

  1. 成员变量

    • m_func:线程执行的函数。
    • m_threadID:线程的唯一标识。
    • m_threadstd::thread对象。
    • m_numCreateThread:静态原子变量,用于记录已创建的线程数量。
  2. 构造函数

    • 接受一个函数作为参数,并分配一个唯一的线程ID。
  3. start方法

    • 启动线程,执行传入的函数,并将线程设为分离状态,以便在线程结束时自动回收资源。
  4. join方法和析构函数

    • 如果线程可连接,则执行join操作,确保线程资源的正确回收。

ThreadPool类实现

ThreadPool类负责管理多个线程,维护任务队列,并调度任务给线程执行。以下是ThreadPool.hThreadPool.cpp的实现。

ThreadPool.h

#include <mutex>
#include <unordered_map>
#include <memory>
#include <functional>
#include <queue>
#include <iostream>
#include <condition_variable>
#include <future>
#include <cstdint>
#include "Thread.h"enum class THREAD_MODE {MODE_CACHED,MODE_FIXED,
};class ThreadPool {
public:explicit ThreadPool(THREAD_MODE mode = THREAD_MODE::MODE_CACHED, std::uint32_t maxThreadSize = 1024,std::uint32_t initThreadSize = 4, std::uint32_t maxTaskSize = 1024);~ThreadPool();void setThreadMaxSize(uint32_t maxSize);void setMode(THREAD_MODE mode);void setTaskMaxSize(uint32_t maxSize);void start(uint32_t taskSize = std::thread::hardware_concurrency());ThreadPool(const ThreadPool &) = delete;ThreadPool &operator=(const ThreadPool &) = delete;template<typename Func, typename ...Args>auto submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type>;protected:[[nodiscard]] bool checkState() const;void ThreadFun(uint32_t threadID);private:using Task = std::function<void()>;std::unordered_map<uint32_t, std::unique_ptr<Thread>> m_threads;uint32_t m_initThreadSize; // 初始线程数量std::atomic<std::uint32_t> m_spareThreadSize; // 空闲线程数量uint32_t m_maxThreadSize; // 最大线程数量std::atomic<bool> m_isRunning; // 线程池运行标志THREAD_MODE m_mode; // 线程池运行模式std::deque<Task> m_tasks;std::atomic<uint32_t> m_taskSize;uint32_t m_maxTaskSize;uint32_t m_thread_maxSpareTime;mutable std::mutex m_mutex; // 线程池互斥量std::condition_variable m_notEmpty;std::condition_variable m_notFull;std::condition_variable m_isExit;
};

ThreadPool.cpp

#include "ThreadPool.hpp"
#include <thread>ThreadPool::ThreadPool(THREAD_MODE mode, uint32_t maxThreadSize, uint32_t initThreadSize,uint32_t maxTaskSize) : m_initThreadSize(initThreadSize), m_spareThreadSize(0),m_maxThreadSize(maxThreadSize), m_isRunning(false), m_mode(mode), m_taskSize(0), m_maxTaskSize(maxTaskSize), m_thread_maxSpareTime(60) {
}bool ThreadPool::checkState() const {return m_isRunning;
}void ThreadPool::setThreadMaxSize(uint32_t maxSize) {if (checkState())std::cerr << "threadPool is running, cannot change!" << std::endl;elsethis->m_maxThreadSize = maxSize;
}void ThreadPool::setMode(THREAD_MODE mode) {if (checkState())std::cerr << "threadPool is running, cannot change!" << std::endl;elsethis->m_mode = mode;
}void ThreadPool::setTaskMaxSize(uint32_t maxSize) {if (checkState())std::cerr << "threadPool is running, cannot change!" << std::endl;elsethis->m_maxTaskSize = maxSize;
}void ThreadPool::ThreadFun(uint32_t threadID) {auto last_time = std::chrono::high_resolution_clock::now();for (;;) {Task task;{std::unique_lock<std::mutex> lock(m_mutex);std::cout << "threadID: " << threadID << " trying to get a task" << std::endl;while (m_tasks.empty() && m_isRunning) {if (m_mode == THREAD_MODE::MODE_CACHED && m_threads.size() > m_initThreadSize) {if (m_notEmpty.wait_for(lock, std::chrono::seconds(3)) == std::cv_status::timeout) {auto now_time = std::chrono::high_resolution_clock::now();auto dur_time = std::chrono::duration_cast<std::chrono::seconds>(now_time - last_time);if (dur_time.count() > m_thread_maxSpareTime && m_threads.size() > m_initThreadSize) {m_threads.erase(threadID);m_spareThreadSize--;std::cout << "threadID: " << threadID << " exiting due to inactivity!" << std::endl;return;}}} else {m_notEmpty.wait(lock);}}if (!m_isRunning && m_tasks.empty()) {m_threads.erase(threadID);std::cout << "threadID: " << threadID << " exiting!" << std::endl;m_isExit.notify_all();return;}if (!m_tasks.empty()) {m_spareThreadSize--;task = std::move(m_tasks.front());m_tasks.pop_front();std::cout << "threadID: " << threadID << " retrieved a task!" << std::endl;if (!m_tasks.empty())m_notEmpty.notify_all();m_notFull.notify_all();}}if (task) {try {task();} catch (const std::exception &e) {std::cerr << "Exception in task: " << e.what() << std::endl;} catch (...) {std::cerr << "Unknown exception in task." << std::endl;}std::cout << "threadID: " << threadID << " completed a task." << std::endl;m_spareThreadSize++;last_time = std::chrono::high_resolution_clock::now();}}
}void ThreadPool::start(std::uint32_t taskSize) {m_isRunning = true;for (std::uint32_t i = 0; i < taskSize; ++i) {auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));auto threadID = ptr->getID();m_threads.emplace(threadID, std::move(ptr));}for (auto &it: m_threads) {it.second->start();m_spareThreadSize++;}
}ThreadPool::~ThreadPool() {m_isRunning = false;std::unique_lock<std::mutex> lock(m_mutex);m_notEmpty.notify_all();m_notFull.notify_all();m_isExit.wait(lock, [&]() -> bool { return m_threads.empty(); });
}

submitTask模板方法实现

template<typename Func, typename ...Args>
auto ThreadPool::submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type> {using Rtype = typename std::invoke_result<Func, Args...>::type;auto task = std::make_shared<std::packaged_task<Rtype()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<Rtype> result = task->get_future();std::unique_lock lock(m_mutex);if (!m_notFull.wait_for(lock, std::chrono::seconds(3),[&]() -> bool { return m_tasks.size() < m_maxTaskSize; })) {std::cerr << "Task queue is full, submit task failed!" << std::endl;throw std::runtime_error("Task queue is full");}m_tasks.emplace_back([task] { (*task)(); });m_notEmpty.notify_all();if (m_mode == THREAD_MODE::MODE_CACHED && m_tasks.size() > m_spareThreadSize) {if (m_threads.size() >= m_maxThreadSize) {std::cerr << "Thread pool has reached max size, cannot create new thread!" << std::endl;} else {std::cout << "Creating a new thread!" << std::endl;auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));u_int64_t threadID = ptr->getID();m_threads.emplace(threadID, std::move(ptr));m_threads[threadID]->start();++m_spareThreadSize;}}return result;
}

解析

  1. 成员变量

    • m_threads:存储所有线程的集合。
    • m_tasks:任务队列,存储待执行的任务。
    • m_mutexm_notEmptym_notFullm_isExit:用于线程同步和任务调度的互斥量和条件变量。
    • 其他变量用于控制线程池的状态,如最大线程数、初始线程数、任务队列最大长度等。
  2. 构造函数

    • 初始化线程池的各项参数,如模式、最大线程数、初始线程数、最大任务数等。
  3. start方法

    • 启动线程池,创建初始数量的线程,并将其启动。
  4. submitTask模板方法

    • 提交任务到线程池,支持任意可调用对象。
    • 使用std::packaged_taskstd::future实现任务的异步执行和结果获取。
    • 如果任务队列已满,则在指定时间内等待,若仍满则抛出异常。
    • 在缓存模式下,根据任务量动态创建新线程。
  5. ThreadFun方法

    • 线程的工作函数,从任务队列中获取任务并执行。
    • 在缓存模式下,线程在空闲一定时间后会自动退出,降低资源占用。
  6. 析构函数

    • 关闭线程池,通知所有线程退出,并等待所有线程结束。

线程池的使用

以下是一个简单的示例,展示如何使用上述实现的线程池。

#include "ThreadPool.h"
#include <iostream>
#include <chrono>// 示例任务函数
void exampleTask(int n) {std::cout << "Task " << n << " is starting." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "Task " << n << " is completed." << std::endl;
}int main() {// 创建线程池,使用缓存模式,最大线程数为8,初始线程数为4,最大任务数为16ThreadPool pool(THREAD_MODE::MODE_CACHED, 8, 4, 16);pool.start();// 提交多个任务std::vector<std::future<void>> futures;for (int i = 0; i < 10; ++i) {futures.emplace_back(pool.submitTask(exampleTask, i));}// 等待所有任务完成for (auto &fut : futures) {fut.get();}std::cout << "All tasks have been completed." << std::endl;return 0;
}

运行结果

threadID: 0 trying to get a task
threadID: 1 trying to get a task
threadID: 2 trying to get a task
threadID: 3 trying to get a task
Task 0 is starting.
Task 1 is starting.
Task 2 is starting.
Task 3 is starting.
threadID: 0 completed a task.
threadID: 0 trying to get a task
Task 4 is starting.
threadID: 1 completed a task.
threadID: 1 trying to get a task
Task 5 is starting.
...
All tasks have been completed.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/498226.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024.12.30(多点通信)

作业&#xff1a; 1、将广播发送和接收端实现一遍&#xff0c;完成一个发送端发送信息&#xff0c;对应多个接收端接收信息实验。 发送端 #include <myhead.h>#define PORT 8888 #define IP "192.168.124.255"int main(int argc, const char *argv[]) {//1、…

Python爬虫 - 豆瓣电影排行榜数据爬取、处理与存储

文章目录 前言一、使用版本二、需求分析1. 分析要爬取的内容1.1 分析要爬取的分类1.2 分析要爬取的单个电影的数据1.3 分析如何获取单个电影数据1.3.1 预览数据1.3.2 查看请求网址、方法及请求头信息1.3.3 查看请求参数 2. 数据用途2.1 统计分析2.2 探索性数据分析 (EDA)2.3 高…

爬虫后的数据处理与使用(处理篇)

紧接上文爬虫&#xff0c;我们获取到了一些数据&#xff0c;接下来就是使用和分析了~爬虫阶段式教学——从数据获取到格式化存储&#xff08;附代码与效果图&#xff09;_爬虫网页数据格式化-CSDN博客 为保证数据的正确性和有效性需要对数据进行筛选&#xff0c;保存有效信息&a…

模电面试——设计题及综合分析题0x01(含答案)

1、已知某温控系统的部分电路如下图&#xff08;EDP070252&#xff09;&#xff0c;晶体管VT导通时&#xff0c;继电器J吸合&#xff0c;压缩机M运转制冷&#xff0c;VT截止时&#xff0c;J释放&#xff0c;M停止运转。 &#xff08;1&#xff09;电源刚接通时&#xff0c;晶体…

基于FPGA的2ASK+帧同步系统verilog开发,包含testbench,高斯信道,误码统计,可设置SNR

目录 1.算法仿真效果 2.算法涉及理论知识概要 2.1 2ASK调制解调 2.2 帧同步 3.Verilog核心程序 4.完整算法代码文件获得 1.算法仿真效果 vivado2019.2仿真结果如下&#xff08;完整代码运行后无水印&#xff09;&#xff1a; 设置SNR8db 设置SNR20db 整体波形效果&…

学习笔记:使用 pandas 和 Seaborn 绘制柱状图

学习笔记&#xff1a;使用 pandas 和 Seaborn 绘制柱状图 前言 今天在使用 pandas 对数据进行处理并在 Python 中绘制可视化图表时&#xff0c;遇到了一些关于字体设置和 Seaborn 主题覆盖的小问题。这里将学习到的方法和注意事项做个总结&#xff0c;以便之后的项目中可以快…

【算法day27】动态规划:基础2

题目引用 不同路径不同路径II整数拆分不同的二叉搜索树 1. 不同路径 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Fin…

大数据技术-Hadoop(四)Yarn的介绍与使用

目录 一、Yarn 基本结构 1、Yarn基本结构 2、Yarn的工作机制 二、Yarn常用的命令 三、调度器 1、Capacity Scheduler&#xff08;容量调度器&#xff09; 1.1、特点 1.2、配置 1.2.1、yarn-site.xml 1.2.2、capacity-scheduler.xml 1.3、重启yarn、刷新队列 测试 向hi…

Vscode左大括号不另起一行、注释自动换行

参考大佬的博客VSCode 格式化 cpp 文件时配置左大括号不换行_vscode大括号不换行-CSDN博客 Clang_format_style {BasedOnStyle: Chromium, IndentWidth: 4}

12.30 Redis网络模型基础 IO NIO多路复用

图片引用自黑马程序员redis 网络模型 上图引用自java guide javaguide NIO

基于Qt事件机制中的定时器事件的闹钟设计

目标 代码 pro文件 QT core gui texttospeechgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # depend on …

PawSQL性能巡检平台 (3) - 慢查询采集和优化

在数据库运维管理中&#xff0c;慢查询一直是影响系统性能的重要因素。本文将详细介绍PawSQL数据库性能巡检平台在慢查询管理和优化方面的功能特性&#xff0c;帮助数据库管理员更好地应对性能挑战。 一、PawSQL巡检平台慢查询管理概述 PawSQL平台提供了全面的慢查询管理功能&…

检索增强生成(RAG)的全面综述:演进、当前格局与未来方向

摘要 https://arxiv.org/pdf/2410.12837 本文全面研究了检索增强生成&#xff08;RAG&#xff09;&#xff0c;追溯了其从基础概念到当前最先进技术的演变历程。RAG将检索机制与生成式语言模型相结合&#xff0c;以提高输出的准确性&#xff0c;从而解决了大型语言模型&#…

关于无线AP信道调整的优化(锐捷)

目录 一、信道优化的基本原则二、2.4G频段信道优化三、5G频段信道优化四、信道优化代码具体示例五、其他优化措施 一、信道优化的基本原则 信道优化旨在减少信道间的干扰&#xff0c;提高网络覆盖范围和信号质量。基本原则包括&#xff1a; 1. 选择合适的信道&#xff1a;根据…

拓展C盘内存的方法(C盘旁边不一定是D盘)

问题&#xff1a; 比如&#xff1a;windows现在C盘200GB&#xff0c;D盘600GB&#xff0c;准备额外拓展一个新的盘2TB&#xff0c;如何把新的盘中500GB拓展到C盘中 总结&#xff1a; 通过磁盘管理&#xff1a;如果C盘旁边有未分配空间&#xff0c;可以直接使用“扩展卷”功能…

基于springboot的膳食问答系统的设计与实现

摘 要 本文介绍了一个基于SpringBoot框架的膳食问答系统&#xff0c;该系统融合了文章查看、膳食问答、用户管理、文章管理、知识点管理、系统日志查看、在线用户查看以及办公管理等多项功能。系统采用主流界面设计风格&#xff0c;前端使用HTML构建用户界面&#xff0c;后端则…

如何在LabVIEW中更好地使用ActiveX控件?

在LabVIEW中&#xff0c;ActiveX控件可以帮助实现与其他应用程序或第三方组件的集成&#xff08;例如Microsoft Excel、Word、Internet Explorer等&#xff09;。以下是一些建议&#xff0c;帮助您更好地在LabVIEW中使用ActiveX控件&#xff1a; ​ 1. 理解ActiveX控件的基本原…

使用套接字创建一个服务端,创建一个客户端然后相互通讯

以下是对上述代码的详细解释&#xff1a; #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h>#include <stdio.h> #include <stdlib.h> #include <string.h&…

17.3、网络安全应急响应技术与常见的工具

目录 应急响应常用技术分类信息系统容灾恢复入侵取证过程网络安全应急响应参考案例——阿里云安全应急响应服务阿里云应急响应服务网络安全应急响应参考案例—永恒之蓝Wannacry 应急响应常用技术分类 一共五个类别&#xff0c;访问控制、安全评估系统&#xff0c;恢复、安全监测…

MySQL系列之数据类型(String)

导览 前言一、字符串类型知多少 1. 类型说明2. 字符和字节的转换 二、字符串类型的异同 1. CHAR & VARCHAR2. BINARY & VARBINARY3. BLOB & TEXT4. ENUM & SET 结语精彩回放 前言 MySQL数据类型第三弹闪亮登场&#xff0c;欢迎关注O。 本篇博主开始谈谈MySQ…