线程池 2(第二部分--循环队列)

        在考虑如何去设计一个任务容器的时候,其实尝试了很多。最开始的时候直接用的是std::queue容器,主要是看了知乎上面的 “ 基于C++11实现线程池 - 知乎 ”这个帖子,去封装一个安全队列。但是这个操作每次都要上一次锁,实在是太浪费时间了等于每访问一次任务队列,都要所有线程停下来等任务队列操作完,才能继续执行。等于从多线程又回到单线程了。

        那么核心问题就是原版STL库不是线程安全的。那么现在的需求就是自己设计一个数据结构,无锁+线程安全。

        那么第一反应,c++11的原子操作。

        原子操作的原子锁是硬件级别的指令,比mutex快了很多。实际上再线程池中,我们需要保护的内存变量,很少,没有必要用mutex保护整个数据结构和逻辑。


        好。现在有了线程安全的解决方向,然后就是数据结构的问题了,用什么数据结构呢。好了,直接去问ChatGPT。

很好的一个思路,循环队列,内部数据类型为函数。


首先设计一个类,去模仿了标准库中vector的实现,三根指针,一个头,一个尾,一个最大容量。

template <typename T>
class BoundedQueue
{
public:using value_type = T;using size_type = uint64_t;public:// 禁用 拷贝赋值运算符 和拷贝构造函数BoundedQueue& operator= (const BoundedQueue& other) = delete;BoundedQueue(const BoundedQueue& other) = delete;BoundedQueue() {}~BoundedQueue();void BreakAllWait();// 初始化   设定队列大小    和   等待策略bool Init(uint64_t size);bool Init(uint64_t size, WaitStrategy* strategy);// 入队操作bool Enqueue(const T& element);// 等待入队操作,当队列满时等待bool WaitEnqueue(const T& elemen);// 出队bool Dequeue(T* element);// 等待出队,当队列为空时,等待出队bool WaitDequeue(T* element);private:// 索引,队里索引下标uint64_t GetIndex(uint64_t num);设定内存对齐方式alignas(64) std::atomic<uint64_t> head_ = { 0 };	// 头alignas(64) std::atomic<uint64_t> tail_ = { 1 };	// 尾alignas(64) std::atomic<uint64_t> commit_ = { 1 };	// 最大容量uint64_t pool_size_ = 0;		// 队列大小T* pool_ = nullptr;		// 当前任务队列的内存指针std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr;volatile bool break_all_wait_ = false;
};

1、首先是防止编译器自动生成默认的复制构造函数和赋值运算符重载函数。在设计一个类的时候必须要考虑到,编译器的默认行为是不是我们必须的。如果不需要就要禁止。

        这循环队列只能存在于线程池中,外部是禁止访问的,线程池内也不允许拷贝任务队列。所以该禁止得禁止。

// 禁用 拷贝赋值运算符 和拷贝构造函数BoundedQueue& operator= (const BoundedQueue& other) = delete;BoundedQueue(const BoundedQueue& other) = delete;

2、接下来就是一些需要用到的动作,初始化,入队,出队,任务队列满了等待入队,任务队列空了,等待出队。

public:// 禁用 拷贝赋值运算符 和拷贝构造函数BoundedQueue& operator= (const BoundedQueue& other) = delete;BoundedQueue(const BoundedQueue& other) = delete;BoundedQueue() {}~BoundedQueue();void BreakAllWait();// 初始化   设定队列大小    和   等待策略bool Init(uint64_t size);bool Init(uint64_t size, WaitStrategy* strategy);// 入队操作bool Enqueue(const T& element);// 等待入队操作,当队列满时等待bool WaitEnqueue(const T& elemen);// 出队bool Dequeue(T* element);// 等待出队,当队列为空时,等待出队bool WaitDequeue(T* element);

3、然后就是最关键的

private:// 获取索引,队里索引下标uint64_t GetIndex(uint64_t num);alignas(64) std::atomic<uint64_t> head_ = { 0 };	// 头alignas(64) std::atomic<uint64_t> tail_ = { 1 };	// 尾alignas(64) std::atomic<uint64_t> commit_ = { 1 };	// 最大容量uint64_t pool_size_ = 0;		// 队列大小T* pool_ = nullptr;		// 当前线程池的内存指针std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr;// volatile 关键字告诉编译器在访问该变量时不进行优化,每次都从内存中读取变量的值。// 是否中断所有的等待操作volatile bool break_all_wait_ = false;

        这里的这三根指针都是用的原子类去实现的,而且进行了64位的对齐。其实一开始写的时候实际上并没有写内存对齐的逻辑,就直接用的原子类。但是在对整个项目其进行压力测试,然后看看看有什么地方能优化一下,减少一下时间,就发现,整个线程池最耗时间的居然是从任务队列中取出任务这个地方。

        然后我们在操作任务队列的时候加了定时器,然后打印出来。发现前几十个任务的访问时间是0.038s,后面就变成了0.121s然后一直都是这个水平,突然这个时间的涨幅3倍多很不正常。然后就看汇编,就一个mov指令,也不应该这么慢啊。源操作数是内存地址,目标操作数是寄存器,从内存地址拷贝数据到一个寄存器,而这个内存数据应该被cache住了,为什么会这么慢呢?

        查了一堆资料发现不对劲,是不是出现了内存伪共享了。

        当线程A去取出一个任务时,修改了这3个指针的值,同时也让线程B中这3个值也被修改了。虽然当时线程B不关心这3个指针的值,cpu还是重新去内存加载了一遍这个更新后的数据。

        发生了线程伪共享!!

        线程伪共享:

        如果​​cpu​​只有一个核,在多线程编程时,每个线程进行切换时,都需要将当前线程的上下文进行保存,然后加载下次要运行的线程的上下文,这就叫做上下文切换。

        现代​​cpu​​一般都会有多个核,因此实际运行时会有多个线程并行运行,每个核都有独立的缓存,正常情况下并行运行的​​2​​个线程如果没有访问或者修改相同内存是不会相互影响。但由于​​cache line​​的存在,如果一个线程修改了运行在另外一个核上线程​​cache line​​上的某一数据,则此时​​cpu​​需要重新加载该​​cache line​​上的数据。

        为了解决这个问题,使用了内存对齐。 

        (这个问题我们几个人查了快一个礼拜.....真的是没经验.....)

alignas(64) std::atomic<uint64_t> head_ = { 0 };	// 头
alignas(64) std::atomic<uint64_t> tail_ = { 1 };	// 尾
alignas(64) std::atomic<uint64_t> commit_ = { 1 };	// 最大容量

ps: 不过说实话,要不是压测还真不见得这个问题能被找出来。不过这个地方还真让整个流程,快了差不多0.3s - 0.7s。 也算是优化成功了


然后就是每个动作的实现。   一组一组的说吧。

1、出队

// 出队
template<typename T>
bool BoundedQueue<T>::Dequeue(T* element) {uint64_t new_head = 0;uint64_t old_head = head_.load(std::memory_order_acquire);do{// 计算新值new_head = old_head + 1;// 如果头位置等于了最大容量  出队失败   队列中没有元素可以取出 new_head == commit_if (new_head == commit_.load(std::memory_order_acquire))return false;// 将要取出的元素复制到 element指向的内存中*element = pool_[GetIndex(new_head)];} while (!head_.compare_exchange_weak(old_head, new_head,std::memory_order_acq_rel,  std::memory_order_relaxed));return true;
}// 等待出队,当队列为空时,等待出队
template<typename T>
bool BoundedQueue<T>::WaitDequeue(T* element) {while (!break_all_wait_) {if (Dequeue(element))return true;if (wait_strategy_->EmptyWait()) {continue;}break;}return false;
}

出队采用的是头出法。利用原子类的特性,设计了一种无锁算法,在多线程的环境下保证线程安全。所有的取值都是原子操作。

等待出队的逻辑是:

        如果正常出队失败,则进入等待逻辑,等到等待逻辑结束,重复尝试。


2、入队。

/*入队
*/
template<typename T>
bool BoundedQueue<T>::Enqueue(const T& element) {uint64_t new_tail = 0;	//	更新后的队尾uint64_t old_commit = 0;	//	原最大容量// 获取旧值uint64_t old_tail = tail_.load(std::memory_order_acquire);do{// 计算新值new_tail = old_tail + 1;// 若新值等于了原头head_位置,则表示队列已满,插入失败if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {return false;}// 进行tail_值 比较和替换,如果tail_ == old_tail,则使用new_tail作为新值,并且重新作为尾结点// 就是{ if (head_ == old_head) head_ = new_head; } 的原子操作。} while (!tail_.compare_exchange_weak(old_tail, new_tail,std::memory_order_acq_rel, std::memory_order_relaxed));// 将新元素插入到队尾,移动到相应为止pool_[GetIndex(old_tail)] = element;do{// 保证最大容量commit_和tail_的值一样old_commit = old_tail;// 交换成功结束循环   } while (cyber_unlikely(!commit_.compare_exchange_weak(old_commit, new_tail,std::memory_order_acq_rel, std::memory_order_relaxed)));// 通知有可能正在等待的线程获取任务wait_strategy_->Notifyone();return true;}/*等待入队
*/
template <typename T>
bool BoundedQueue<T>::WaitEnqueue(const T& element) {while (!break_all_wait_) {// 尝试插入		插入失败则进行等待if (Enqueue(element)) {return true;}/*队列已满,开始等待策略*/if (wait_strategy_->EmptyWait()) {continue;}break;}return false;
}

尾插法。

基本上就是入队逻辑的翻转。


3、返回下标

template<typename T>
inline uint64_t BoundedQueue<T>::GetIndex(uint64_t num) {return num - (num / pool_size_) * pool_size_;  
}

这里的下标计算公式:任务序号 -  ( 任务序号 - 任务池大小) ×任务池大小

其实后面可以直接用 % 取模来代替,但是 这个返回下标函数,也是高频函数,除法比模运算更快,所有用除法代替取模。

(其实这里在解决线程伪共享时顺带解决的。。。。当时以为是这里的问题。改了稍微好了一点。)


4、初始化

template<typename T>
inline bool BoundedQueue<T>::Init(uint64_t size) {return Init(size, new SleepWaitStratrgy());   // 若不指定等待策略,默认使用睡眠等待策略
}
/*
std::calloc 分配内存来存储元素并且进行初始化
*/
template<typename T>
bool BoundedQueue<T>::Init(uint64_t size, WaitStrategy* strategy){// 保证队列两端各留有一个空闲位置pool_size_ = size + 2;// 在动态内存中分配一片连续的内存空间,将分配的内存空间的起始位置存储在 pool_ 变量中,// 以便后续在线程池中使用。pool_ = reinterpret_cast<T*>(std::calloc(pool_size_, sizeof(T)));if (pool_ == nullptr)return false;// 遍历线程池的每个元素,并通过 new 在对应的内存位置上构造一个类型为 T 的对象for (uint64_t i = 0; i < pool_size_; ++i){new(&(pool_[i])) T();}// 设定等待策略wait_strategy_.reset(strategy);return true;} 

设计了两种初始化方式,如果不指定等待策略,默认使用线程睡眠策略。


5、线程唤醒

template<typename T>
inline void BoundedQueue<T>::BreakAllWait() {break_all_wait_ = true;wait_strategy_->BreakAllwait();
}

就直接调用的等待策略里的线程唤醒。


6、析构

// 析构
template<typename T>
BoundedQueue<T>::~BoundedQueue() {if (wait_strategy_) {BreakAllWait();}if (pool_) {for (uint64_t i = 0; i < pool_size_; i++){pool_[i].~T();}std::free(pool_);}
}

到此为止,任务队列算是全部弄完了,接下来就是线程池的设计了。线程池的设计其实不多,最难的就是任务提交,因为要保证多参数的解析,就得用到c++的自动推导类。代码不多但是理解起来稍微有点绕。下一文再总结吧。

 参考:基于C++11实现线程池 - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/20917.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu20.04 硬盘挂载、显卡驱动安装

前几天ubuntu系统莫名出问题了&#xff0c;修不好只能重装&#xff0c;在此记录安装ubuntu系统后的硬盘挂载和显卡驱动安装。 注意&#xff0c;本文并非教程&#xff0c;只是个人安装过程的记录&#xff0c;仅供参考 ubuntu系统&#xff1a;Ubuntu 20.04.6 LTS 硬件设备&…

一文带你搞清 ChatGPT 与 Azure OpenAI 的区别

这两周是我从2017年开始全职涉入 NLP 领域后最忙的两周&#xff0c;无数的同事和客户都在向我提出一个询问&#xff1a;ChatGPT 可以帮到我们什么&#xff1f; 特别是在2023年3月31日我做了一场微软 Azure OpenAI [布局助力企业]拥抱新智能时代的演讲之后&#xff0c;这几天我…

ChatGPT的真相:强泛化的秘密以及众多关键问题

进NLP群—>加入NLP交流群 本文转载自AI科技评论&#xff0c;作者韩庐山。 本文从ChatGPT带来的即时学习能力&#xff08;in-context learning&#xff09;入手&#xff0c;逐步深入地探讨了ChatGPT目前众多的关键性问题&#xff0c;包括&#xff1a; ChatGPT带来了从未有过的…

chatgpt赋能python:用Python向手机发送信息是如何实现的?

用Python向手机发送信息是如何实现的&#xff1f; 在今天的信息时代&#xff0c;随时随地保持联系已经成为生活不可或缺的一部分。随着技术的发展&#xff0c;我们可以使用各种方式发送和接收信息&#xff0c;而使用Python向手机发送短信是其中一种非常方便的方式。 Python的…

chatgpt赋能python:Python自动认证上网教程

Python自动认证上网教程 随着互联网的普及&#xff0c;越来越多的人需要通过手机、电脑等设备上网&#xff0c;而许多场所都要求进行认证才能使用网络。每次都手动操作认证费时费力&#xff0c;这时Python就可以派上用场了。Python是一种高级的编程语言&#xff0c;具有可读性…

双因素认证(2FA)教程

所谓认证&#xff08;authentication&#xff09;就是确认用户的身份&#xff0c;是网站登录必不可少的步骤。 密码是最常见的认证方法&#xff0c;但是不安全&#xff0c;容易泄露和冒充。 越来越多的地方&#xff0c;要求启用双因素认证&#xff08;Two-factor authenticatio…

如何实现双因素认证?

增强数字安全的愿望引起了世界各国政府的关注&#xff0c;所有政府都希望保护消费者和企业。因此&#xff0c;许多人提出了立法&#xff0c;将两因素身份验证 (2FA) 作为 IT 系统的强制性要求。其实&#xff0c;在我国等级保护制度中等级保护第三级以上都要求完成双因素认证的&…

网络安全合规-Tisax(汽车安全评估讯息交换平台)一

**TISAX&#xff08;汽车安全评估讯息交换平台&#xff08;可信信息安全评估交换平台&#xff09;&#xff09;**是2017年由德国汽车工业联合会(VDA) 联合欧洲网络交换所(ENX) 所推出的资讯交换平台&#xff0c;通过应用欧洲网络交换协会&#xff08;ENX&#xff09;和德国汽车…

从医疗保健攻击到HIPAA 合规性

医疗机构无疑是网络攻击的热门目标。攻击者因在暗网上出售一条健康记录而获取高额 佣金&#xff0c;在各行业网络安全报告中医疗保健行业的攻击事件占比居高不下&#xff0c;这有什么奇怪的吗&#xff1f; 根据2022 年 SonicWall 网络威胁报告&#xff0c;医疗保健行业&#x…

漫话:如何给女朋友解释鸿蒙OS是怎样实现跨平台的?

周末在家休息&#xff0c;女朋友在刷朋友圈&#xff0c;突然她问我&#xff1a; 鸿蒙OS回顾 2019年8月9日华为开发者大会上&#xff0c;华为消费者业务CEO余承东正式宣布发布自有操作系统鸿蒙&#xff0c;内核为Linux内核、鸿蒙微内核和LiteOS。未来将摆脱Linux内核和LiteOS&am…

腾讯研发动画组件,以后动画制作用PAG

你好&#xff0c;我是tiantian。 我们知道&#xff0c;动画特效可以辅助视觉制作焦点&#xff0c;引导注意力的方向&#xff0c;越来越为广大视觉设计师青睐&#xff0c;并广泛应用于各类场景开发。 关于动画设计工具&#xff0c;既有 Framer.js、Origami&#xff0c; 也有交互…

能直接修复代码 BUG,比 ChatGPT 还厉害

【公众号回复 “1024”&#xff0c;免费领取程序员赚钱实操经验】 大家好&#xff0c;又见面了&#xff0c;我是章鱼猫&#xff01; 最近 ChatGPT 非常的火&#xff0c;而且是火出圈的那种&#xff0c;各个领域的人都知道。但是不得不说程序员做的工具&#xff0c;对程序员还是…

chatgpt赋能Python-ipv4地址python

IPv4地址 Python编程介绍 IPv4地址在互联网中扮演着非常重要的角色&#xff0c;英文名称为 Internet Protocol Version 4 Address。每一个连接到互联网上的设备都会被分配一个唯一的IPv4地址&#xff0c;它由32位二进制数以点分十进制的形式呈现出来。在Python编程中&#xff…

chatgpt赋能Python-pythonip地址是否合法

Python中如何判断IP地址是否合法 在网络中&#xff0c;IP地址是非常重要的概念。它用来标识网络中每个设备的唯一地址。IP地址通常分为IPv4和IPv6两种类型。在Python中&#xff0c;有多种方法可以判断IP地址是否合法。在本文中&#xff0c;我们将介绍如何使用Python编程语言来…

可喜可贺,暴雪即将收购第一家工作室Proletariat,魔法吃鸡停运

暴雪娱乐在超过15年的时间里收购了第一家工作室。在VentureBeat的一份报告中&#xff0c;该公司收购了总部位于波士顿的工作室Proletariat。 “经过四年多的元素魔法和咒语组合&#xff0c;我们决定结束Spellbreak的研发&#xff0c;”该公司在其网站上写道。“这些服务器将于2…

修改战网昵称服务器错误,暴雪又改了游戏平台名字 暴雪战网回来了

暴雪一定是个纠结的处女座&#xff0c;距离上一次更改游戏平台名称之后&#xff0c;8月15日早上6点&#xff0c;暴雪中国又一次在微博上发表公告称“暴雪战网品牌名称更新”&#xff0c;名字从上一次的暴雪游戏平台改成了暴雪战网。 按暴雪的意思来看&#xff0c;之所以玩这么一…

暴雪战网服务器维护,炉石无法通过暴雪战网服务进行登录

有很多玩家常常遇到战网无法登陆、炉石传说无法登陆至战网服务等问题。那么下面就告诉大家这种解决办法&#xff0c;希望对你有帮助&#xff01; 1、关闭游戏或安装程序&#xff0c;打开任务管理器&#xff0c;终止以下进程&#xff1a;Agent.exe&#xff0c;Blizzard Launcher…

【吴恩达deeplearning.ai】基于ChatGPT API打造应用系统(上)

以下内容均整理来自deeplearning.ai的同名课程 Location 课程访问地址 DLAI - Learning Platform Beta (deeplearning.ai) 一、大语言模型基础知识 本篇内容将围绕api接口的调用、token的介绍、定义角色场景 调用api接口 import os import openai import tiktoken from dote…

ChatGPT讲故事,DALLE-2负责画出来!两大AI合作出绘本!

点击下方卡片&#xff0c;关注“CVer”公众号 AI/CV重磅干货&#xff0c;第一时间送达 点击进入—>CV微信技术交流群 转载自&#xff1a;机器之心 | 编辑&#xff1a;张倩、袁铭怿 生成式 AI 正在变革内容的生产方式。 在过去的一周&#xff0c;相信大家都被 ChatGPT 刷了屏…

ChatGPT绘本故事,引领孩子探索神奇世界!

现在很多家长忙于工作&#xff0c;无暇陪伴孩子&#xff0c;老人或者身边的带小孩的家人不会给孩子读绘本故事怎么办&#xff1f; 这时ChatGPT的出现就派上大用场了&#xff0c;只要有手机&#xff0c;不会读绘本的大人们及孩子们都可以轻轻松松地进入童话世界&#xff0c;同时…