基础篇:多线程所需知识:RAII接口模式对生产者和消费者封装以及多batch实现

我们先来弄一个最基础的infer类:


class Infer{
public: bool load_model(const string &file){context_ = file;return true;} void forward(){if(context_.empty()){printf("加载模型异常\n");return;}printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};

正常的模型是很复杂的,但在这里我们就用string代替了。

那这个代码重点就在于异常逻辑,在正常工作代码中,异常逻辑如果没有写好,就会造成封装的不安全性,甚至会导致程序崩溃。

比如之前的load model:如果没有load过还好,但是如果load过了,那不久导致了两次load_model么?

bool load_model(const string &file){if (!context_.empty()){/* code */destroy();}context_ = file;return true;} 

所以在load前要先看一眼,如果load过了就destroy

这个时候就能引出我们的RAII + 接口模式

RAII

RAII -------> 资源获取即初始化

接口i模式 ------> 设计模式,是一种封装模式,实现类与接口分离的模式

那就是将原本的

Infer infer_;
infer.load_model("aa");

合为一步,即获取就可以初始化

shared_ptr <Infer> create_infer(const string& file){shared_ptr <Infer> instance(new Infer);if (!instance->load_model(file)){instance.reset();}return instance;}

所以这里是表示如果模型加载失败,则代表资源获取失败,返回空指针。


int main(){auto infer_ = create_infer("aa");if (infer_ == nullptr){/* code */printf("failed\n");return -1;}infer_->forward();return 0;
}

所以现在的结果非常简单,只要为空指针则表示失败。不为空则成功。

这样就避免了外部执行load_model(RAII制作到一部分没有完全做到)。

避免了load_model不会执行超过一次

获取的模型一定初始化成功,因此forward不用判断模型初始化成功。

load model中可以删除对于重复load的判断,forward可以删掉是否家在成功的判断。

接口模式封装:

那么既然我们已经能用create了,可基于Infer的public属性,还是可以调用load_model的,所以就要进入到接口模式封装了。

接口模式解决问题:

1、解决load_model还能被外部看到的问题。

2、解决成员变量对外可见的问题(比如成员函数是特殊类型的,比如cudaStream_t ,那么头文件必须包含cuda_runtime.h,就会造成命名空间污染,头文件污染的问题 , 不干净的结果就会造成程序错误等异常)

接口类是一个纯虚类

原则是只暴露调用者需要的函数,其他一概隐藏起来

比如说load_model,咱们通过RAII做了定义,因此load_model属于不需要的范畴

内部如果有启动线程等等,start , stop ,也不需要暴露 , 而是初始化的时候自动启动,都是RAII的定义

class InferInterface
{
private:/* data */
public:virtual void forward() = 0 ;};

forward 要做成纯虚函数。

class InferImpl : public InferInterface{
public: bool load_model(const string &file){context_ = file;return true;} virtual void  forward()override{printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};

然后要让原来的类继承于接口类

shared_ptr <InferInterface> create_infer(const string& file){shared_ptr <InferImpl> instance(new InferImpl);if (!instance->load_model(file)){instance.reset();}return instance;}

在返回类型的时候返回的是Interface , 在new的时候选的是InferImpl。

 这样作为一个使用者除了forward真的还没有其他的可选项。

之后可以再进一步设置Infer.hpp作为头文件

#ifndef INFER_HPP
#define INFER_HPP
#include<memory>
#include<string>
class InferInterface
{
private:/* data */
public:virtual void forward() = 0 ;};
std::shared_ptr <InferInterface> create_infer(const std::string& file);
#endif  /. INFER_HPP

实现放到Infer.cpp中:

#include "infer.hpp"
using namespace std;
class InferImpl : public InferInterface{
public: bool load_model(const string &file){context_ = file;return !context_.empty();} virtual void  forward()override{printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};
shared_ptr <InferInterface> create_infer(const string& file){shared_ptr <InferImpl> instance(new InferImpl);if (!instance->load_model(file)){instance.reset();}return instance;}

这样在main函数里就只需要


#include"infer.hpp"
using namespace std;int main(){auto infer_ = create_infer("aa");if (infer_ == nullptr){/* code */printf("failed\n");return -1;}infer_->forward();return 0;
}

这样就做到了

  • 头文件只依赖最简单的
  • 成员变量看不到
  • 只能访问forward

RAII接口总结原则:

1、头文件应尽量简洁

2、不需要的东西都放到cpp当中,不要让接口类看到

比如:

#include <cudaruntime.h>
class InferInterface
{
private:cudaStream_t stream;
public:virtual void forward() = 0 ;};

这样就不符合尽量简洁的原则,因为这里用不到stream,应该尽可能放到cpp里。

3、尽量不要在头文件写using  namespace,因为写了这个就代表命名空间被打开了,这样就使得所有包含了这个头文件的人的所有命名空间都被打开。但CPP里可以写

多batch实现:

在这里我们主要是进行一个推理的实现。

老规矩:弄一个线程worker_thread_,为了尽量使得资源尽量那里分配那里释放,这样可以保证程序足够简单。在worker内实现模型的家在,使用,与释放。

那样的话

  
bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单worker_thread_ = thread(&InferImpl::worker , this , file);return context_.empty();} 

这肯定是不可以的,毕竟这个线程还没来的及进行worker,就要return了

这时候就要使用future

bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));context_ = file;return pro.get_future().get();} 

对应的worker:

void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}while (true){/* code */}}

这样我们就完成了在局部内加载使用释放

仿照上一节完成一下消费者模式:


struct Job{shared_ptr<promise<string>>pro;string input; 
};class InferImpl : public InferInterface{
public: bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));return pro.get_future().get();} virtual void  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);}//实际的推理阶段void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}while (true){if (!qjobs_.empty()){lock_guard<mutex> l(job_lock);auto job = qjobs_.front();qjobs_.pop();//inference:printf();}this_thread::yield();}  }
private:thread worker_thread_;queue<Job> qjobs_;mutex  job_lock;condition_variable cv_;
};

但这里每次都是一个一个地拿出来pop,我们要的是多batch,最好是一次能拿batchsize个出来。

这是就可以用vector:

  int maxbatchsize  =  5;vector<Job> jobs;while (true){if (!qjobs_.empty()){lock_guard<mutex> l(job_lock);while (jobs.size < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}//inference:printf();}this_thread::yield();}  

通知模式

那我们这个消费者不能一直while(true)啊,于是就要对其进行一个通知,也就是有东西进来了,再进行消费。所以就要使用到condition_variable的wait()和notify_one()了。

    virtual void  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();}

在传入的时候通知一下我。

    void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}int maxbatchsize  =  5;vector<Job> jobs;//i消费者:while (true){unique_lock<mutex> l(job_lock);cv_.wait(l , [&]{return !qjobs_.empty();});while (jobs.size() < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}//inference:printf();}}

在这里要进行一个等待。之后还可以去除qjobs_.empty()的判断。

这种通知的模式也要比我们之前主动检查模式要更好。

之后就要进行推理了:

for (int  i = 0; i < jobs.size(); i++){auto& job = jobs[i];char result [100];sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(1000));

 而这个时候set_value过后我们就要考虑一下返回的操作了。

返回future格式

如果forward是采用了

    virtual string  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();return job.pro->get_future().get();}

这种形式进行返回的话,get() will wait until the whole inference , this not the meaning of batchs

    virtual shared_future<string>  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();return job.pro->get_future();}

这样的话就可以直接返回future,让执行人有更多的选择。

int main(){auto infer_a = create_infer("aa");auto result_a =  infer_a->forward("aa");auto result_b =infer_a->forward("bb");auto result_c =infer_a->forward("cc");printf("%s   \n" , result_a.get().c_str());printf("%s   \n" , result_b.get().c_str());printf("%s   \n" , result_c.get().c_str());return 0;
}

执行过后:

 可见是一个batch去实现的。

但是如果要是先get的话,

 那就是分批次了,因为get就代表必须完成整个步骤之后返回。

退出模型

    atomic<bool> running_{false};

设立私有变量,表示程序是否还在运行。

在loadmodel时设立running_为true

bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单running_ = true;promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));return pro.get_future().get();} 
 void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}int maxbatchsize  =  5;vector<Job> jobs;int batch_id = 0;//i消费者:while (running_){unique_lock<mutex> l(job_lock);cv_.wait(l , [&]{return !qjobs_.empty() ||  !running_;});if(! running_){break;}while (jobs.size() < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}for (int  i = 0; i < jobs.size(); i++){auto& job = jobs[i];char result [100];sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(1000));}printf("释放->%s \n", context_.c_str());context_.clear();printf("Worker Done\n");}

同时worker也要改变,while的判断机制也需要改变,cv_.wait也不能是只有qjobs为空,还要加上是否还在运行。如果不运行了要break出while循环。

在结束之后要释放模型。

析构函数

    virtual ~InferImpl(){running_ =  false;cv_.notify_one();if(worker_thread_.joinable()){worker_thread_.join();}} 

重点在于析沟函数,在该类退出后设置线程同步,并且在running_设置为false的时候通知一次wait。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/76533.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LNMP】LNMP

LNMP&#xff1a;是目前成熟的企业网站的应用模式之一&#xff0c;指的是一套协同工作的系统和相关软件&#xff1b;能够提供静态页面服务&#xff0c;也可以提供动态web服务 L Linux系统&#xff0c;操作系统N Nginx网站服务&#xff0c;前端&#xff0c;提供前端的静态…

Redis入门

0目录 1.Redis入门 2.Redis定义&#xff1b;特点及数据类型 3.Value为List类型 4.Value值类型为Set 5.Value值类型为Hash 6.Value值类型为Zset 1.Redis入门 Redis入门 解压包&#xff0c;运行redis-server.exe 安装可视化软件测试链接 命名测试链接 点击确定 2.Redis…

docker的使用

docker安装 https://docs.docker.com/engine/install/debian/ 设置国内镜像 创建或修改 /etc/docker/daemon.json 文件&#xff0c;修改为如下形式 {"registry-mirrors": ["https://registry.hub.docker.com","http://hub-mirror.c.163.com"…

【linux--->数据链路层协议】

文章目录 [TOC](文章目录) 一、数据链路层协议概念二、以太网帧格式1.字段分析 三、局域网通信原理四、ARP协议1.结构2.作用3.ARP通信过程4.ARP协议相关命令 五、局域网内中间人原理六、DNS系统(域名系统)1.域名概念2.DNS系统组成3.DNS协议3.浏览器输入域名后的通信过程4.dig工…

用html+javascript打造公文一键排版系统11:改进单一附件说明排版

一、用htmljavascript打造公文一键排版系统10中的一个bug 在 用htmljavascript打造公文一键排版系统10&#xff1a;单一附件说明排版 中&#xff0c;我们对附件说明的排版函数是&#xff1a; function setAtttDescFmt(p) {var t p;var a ;if (-1 ! t.indexOf(:))//是半角冒…

Leetcode-每日一题【剑指 Offer 10- I. 斐波那契数列】

题目 写一个函数&#xff0c;输入 n &#xff0c;求斐波那契&#xff08;Fibonacci&#xff09;数列的第 n 项&#xff08;即 F(N)&#xff09;。斐波那契数列的定义如下&#xff1a; F(0) 0, F(1) 1 F(N) F(N - 1) F(N - 2), 其中 N > 1. 斐波那契数列由 0 和 1 开…

【前端】对前端小白极为友好的JS DOM入门文章

在现代web开发中&#xff0c;JavaScript (JS) 是不可或缺的一部分&#xff0c;它使我们能够为网页赋予交互性和动态性。其中&#xff0c;DOM&#xff08;文档对象模型&#xff09;技术在前端开发中起着至关重要的作用。本篇博客将带领前端初学者深入理解JavaScript DOM技术&…

Django学习记录:使用ORM操作MySQL数据库并完成数据的增删改查

Django学习记录&#xff1a;使用ORM操作MySQL数据库并完成数据的增删改查 数据库操作 MySQL数据库pymysql Django开发操作数据库更简单&#xff0c;内部提供了ORM框架。 安装第三方模块 pip install mysqlclientORM可以做的事&#xff1a; 1、创建、修改、删除数据库中的…

R语言地理加权回归、主成份分析、判别分析等空间异质性数据分析

在自然和社会科学领域有大量与地理或空间有关的数据&#xff0c;这一类数据一般具有严重的空间异质性&#xff0c;而通常的统计学方法并不能处理空间异质性&#xff0c;因而对此类型的数据无能为力。以地理加权回归为基础的一系列方法&#xff1a;经典地理加权回归&#xff0c;…

SHELL——备份脚本

编写脚本&#xff0c;使用mysqldump实现分库分表备份。 1、获取分库备份的库名列表 [rootweb01 scripts]# mysql -uroot -p123456 -e "show databases;" | egrep -v "Database|information_schema|mysql|performance_schema|sys" mysql: [Warning] Using …

关于综合能源智慧管理系统的架构及模式规划的研究

安科瑞 华楠 摘 要&#xff1a;探讨了国内外能源互联网的研究发展&#xff0c;分析了有关综合智慧能源管理系统的定位&#xff0c;以及系统的主要特点&#xff0c;研究了综合智慧能源管理系统的构架以及模式规划。 关键词&#xff1a;综合能源&#xff1b;智慧管理系统&#…

8月3日上课内容 LNMP精讲

LNMP&#xff1a;目前成熟的企业网站的应用模式之一&#xff0c;指的是一套协作工作的系统和相关文件 能够提供静态页面服务&#xff0c;也可以提供动态web服务。 这是一个缩写 L linux系统&#xff0c;操作系统。 N nginx网站服务&#xff0c;前端&#xff0c;提供前端的静…

升级到mybatis-plus,系统启动的一些问题

在分表后mybatis-plus删除操作失效等问题处理 mybatis-plus 旧系统重构遇到的种种问题 在这三篇文章中&#xff0c;我花了近1个月时间重构了28个微服务&#xff0c;当中遇到的一些问题&#xff0c;但是发布到pretest环境&#xff0c;却还有启动问题&#xff0c;看来系统重构不是…

【微信小程序创作之路】- 小程序远程数据请求、获取个人信息

【微信小程序创作之路】- 小程序远程数据请求、获取个人信息 第七章 小程序远程数据请求、获取个人信息 文章目录 【微信小程序创作之路】- 小程序远程数据请求、获取个人信息前言一、远程数据请求1.本地环境2.正式域名 二、获取用户个人信息1.展示当前用户的身份信息2.获取用…

价值 1k 嵌入式面试题-计算机网络 OSI

开门见山 请讲下 OSI 各层协议的主要功能&#xff1f; 常见问题 回答不系统回答不确切无法和实际网络协议做关联对应 答题思路 OSI 代表了开放互联系统中信息从一台计算机的一个软件应用流到另一个计算机的另一个软件应用的参考模型 OSI 包含 7 层&#xff0c;每一层负责特…

Java-day05(面向对象-1)

面向对象 面向对象与面向过程的区别&#xff1a; 面向过程&#xff0c;强调功能行为&#xff1b;面向对象&#xff0c;强调功能的对象。 Java类及类成员 类&#xff1a;对一类事物描述&#xff0c;是抽象的&#xff0c;概念上的定义对象&#xff1a;实际存在的该类事物的每…

踩坑(5)整合kafka 报错 java.net.UnknownHostException: 不知道这样的主机

java.net.UnknownHostException: 不知道这样的主机。 (5c0c3c629db9)at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:na]at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933) ~[na:na]at java.ba…

【Spring Cloud一】微服务基本知识

系列文章目录 微服务基本知识 系列文章目录前言一、系统架构的演变1.1单体架构1.2分层架构1.3分布式架构1.4微服务架构1.5分布式、SOA、微服务的异同点 二、CAP原则三、RESTfulRESTful的核心概念&#xff1a; 四、共识算法 前言 在实际项目开发过程中&#xff0c;目前负责开发…

正点原子HAL库入门1~GPIO

探索者F407ZGT6(V3) 理论基础 IO端口基本结构 F4/F7/H7系列的IO端口 F1在输出模式&#xff0c;禁止使用内部上下拉 F4/F7/H7在输出模式&#xff0c;可以使用内部上下拉不同系列IO翻转速度不同 F1系列的IO端口 施密特触发器&#xff1a;将非标准方波&#xff0c;整形为方波 当…

哈工大计算机网络课程网络安全基本原理详解之:密钥分发中心与公钥认证中心

哈工大计算机网络课程网络安全基本原理详解之&#xff1a;密钥分发中心与公钥认证中心 在介绍密钥分发中心的概念前&#xff0c;先来回顾一下之前介绍的身份认证协议AP4.0&#xff1a;利用随机数R来避免“回放攻击”&#xff0c;并借助于对称加密算法来保证R的加密传输和解密&…