我们先来弄一个最基础的infer类:
class Infer{ public: bool load_model(const string &file){context_ = file;return true;} void forward(){if(context_.empty()){printf("加载模型异常\n");return;}printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();} private:string context_;};
正常的模型是很复杂的,但在这里我们就用string代替了。
那这个代码重点就在于异常逻辑,在正常工作代码中,异常逻辑如果没有写好,就会造成封装的不安全性,甚至会导致程序崩溃。
比如之前的load model:如果没有load过还好,但是如果load过了,那不久导致了两次load_model么?
bool load_model(const string &file){if (!context_.empty()){/* code */destroy();}context_ = file;return true;}
所以在load前要先看一眼,如果load过了就destroy
这个时候就能引出我们的RAII + 接口模式了
RAII
RAII -------> 资源获取即初始化
接口i模式 ------> 设计模式,是一种封装模式,实现类与接口分离的模式
那就是将原本的
Infer infer_;
infer.load_model("aa");
合为一步,即获取就可以初始化
shared_ptr <Infer> create_infer(const string& file){shared_ptr <Infer> instance(new Infer);if (!instance->load_model(file)){instance.reset();}return instance;}
所以这里是表示如果模型加载失败,则代表资源获取失败,返回空指针。
int main(){auto infer_ = create_infer("aa");if (infer_ == nullptr){/* code */printf("failed\n");return -1;}infer_->forward();return 0;
}
所以现在的结果非常简单,只要为空指针则表示失败。不为空则成功。
这样就避免了外部执行load_model(RAII制作到一部分没有完全做到)。
避免了load_model不会执行超过一次
获取的模型一定初始化成功,因此forward不用判断模型初始化成功。
load model中可以删除对于重复load的判断,forward可以删掉是否家在成功的判断。
接口模式封装:
那么既然我们已经能用create了,可基于Infer的public属性,还是可以调用load_model的,所以就要进入到接口模式封装了。
接口模式解决问题:
1、解决load_model还能被外部看到的问题。
2、解决成员变量对外可见的问题(比如成员函数是特殊类型的,比如cudaStream_t ,那么头文件必须包含cuda_runtime.h,就会造成命名空间污染,头文件污染的问题 , 不干净的结果就会造成程序错误等异常)
接口类是一个纯虚类
原则是只暴露调用者需要的函数,其他一概隐藏起来
比如说load_model,咱们通过RAII做了定义,因此load_model属于不需要的范畴
内部如果有启动线程等等,start , stop ,也不需要暴露 , 而是初始化的时候自动启动,都是RAII的定义
class InferInterface
{
private:/* data */
public:virtual void forward() = 0 ;};
forward 要做成纯虚函数。
class InferImpl : public InferInterface{
public: bool load_model(const string &file){context_ = file;return true;} virtual void forward()override{printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};
然后要让原来的类继承于接口类
shared_ptr <InferInterface> create_infer(const string& file){shared_ptr <InferImpl> instance(new InferImpl);if (!instance->load_model(file)){instance.reset();}return instance;}
在返回类型的时候返回的是Interface , 在new的时候选的是InferImpl。
这样作为一个使用者除了forward真的还没有其他的可选项。
之后可以再进一步设置Infer.hpp作为头文件
#ifndef INFER_HPP
#define INFER_HPP
#include<memory>
#include<string>
class InferInterface
{
private:/* data */
public:virtual void forward() = 0 ;};
std::shared_ptr <InferInterface> create_infer(const std::string& file);
#endif /. INFER_HPP
实现放到Infer.cpp中:
#include "infer.hpp"
using namespace std;
class InferImpl : public InferInterface{
public: bool load_model(const string &file){context_ = file;return !context_.empty();} virtual void forward()override{printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};
shared_ptr <InferInterface> create_infer(const string& file){shared_ptr <InferImpl> instance(new InferImpl);if (!instance->load_model(file)){instance.reset();}return instance;}
这样在main函数里就只需要
#include"infer.hpp"
using namespace std;int main(){auto infer_ = create_infer("aa");if (infer_ == nullptr){/* code */printf("failed\n");return -1;}infer_->forward();return 0;
}
这样就做到了
- 头文件只依赖最简单的
- 成员变量看不到
- 只能访问forward
RAII接口总结原则:
1、头文件应尽量简洁
2、不需要的东西都放到cpp当中,不要让接口类看到
比如:
#include <cudaruntime.h> class InferInterface { private:cudaStream_t stream; public:virtual void forward() = 0 ;};
这样就不符合尽量简洁的原则,因为这里用不到stream,应该尽可能放到cpp里。
3、尽量不要在头文件写using namespace,因为写了这个就代表命名空间被打开了,这样就使得所有包含了这个头文件的人的所有命名空间都被打开。但CPP里可以写
多batch实现:
在这里我们主要是进行一个推理的实现。
老规矩:弄一个线程worker_thread_,为了尽量使得资源尽量那里分配那里释放,这样可以保证程序足够简单。在worker内实现模型的家在,使用,与释放。
那样的话
bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单worker_thread_ = thread(&InferImpl::worker , this , file);return context_.empty();}
这肯定是不可以的,毕竟这个线程还没来的及进行worker,就要return了
这时候就要使用future
bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));context_ = file;return pro.get_future().get();}
对应的worker:
void worker(string file , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}while (true){/* code */}}
这样我们就完成了在局部内加载使用释放
仿照上一节完成一下消费者模式:
struct Job{shared_ptr<promise<string>>pro;string input;
};class InferImpl : public InferInterface{
public: bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));return pro.get_future().get();} virtual void forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);}//实际的推理阶段void worker(string file , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}while (true){if (!qjobs_.empty()){lock_guard<mutex> l(job_lock);auto job = qjobs_.front();qjobs_.pop();//inference:printf();}this_thread::yield();} }
private:thread worker_thread_;queue<Job> qjobs_;mutex job_lock;condition_variable cv_;
};
但这里每次都是一个一个地拿出来pop,我们要的是多batch,最好是一次能拿batchsize个出来。
这是就可以用vector:
int maxbatchsize = 5;vector<Job> jobs;while (true){if (!qjobs_.empty()){lock_guard<mutex> l(job_lock);while (jobs.size < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}//inference:printf();}this_thread::yield();}
通知模式
那我们这个消费者不能一直while(true)啊,于是就要对其进行一个通知,也就是有东西进来了,再进行消费。所以就要使用到condition_variable的wait()和notify_one()了。
virtual void forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();}
在传入的时候通知一下我。
void worker(string file , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}int maxbatchsize = 5;vector<Job> jobs;//i消费者:while (true){unique_lock<mutex> l(job_lock);cv_.wait(l , [&]{return !qjobs_.empty();});while (jobs.size() < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}//inference:printf();}}
在这里要进行一个等待。之后还可以去除qjobs_.empty()的判断。
这种通知的模式也要比我们之前主动检查模式要更好。
之后就要进行推理了:
for (int i = 0; i < jobs.size(); i++){auto& job = jobs[i];char result [100];sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(1000));
而这个时候set_value过后我们就要考虑一下返回的操作了。
返回future格式
如果forward是采用了
virtual string forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();return job.pro->get_future().get();}
这种形式进行返回的话,get() will wait until the whole inference , this not the meaning of batchs
virtual shared_future<string> forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();return job.pro->get_future();}
这样的话就可以直接返回future,让执行人有更多的选择。
int main(){auto infer_a = create_infer("aa");auto result_a = infer_a->forward("aa");auto result_b =infer_a->forward("bb");auto result_c =infer_a->forward("cc");printf("%s \n" , result_a.get().c_str());printf("%s \n" , result_b.get().c_str());printf("%s \n" , result_c.get().c_str());return 0;
}
执行过后:
可见是一个batch去实现的。
但是如果要是先get的话,
那就是分批次了,因为get就代表必须完成整个步骤之后返回。
退出模型
atomic<bool> running_{false};
设立私有变量,表示程序是否还在运行。
在loadmodel时设立running_为true
bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单running_ = true;promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));return pro.get_future().get();}
void worker(string file , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}int maxbatchsize = 5;vector<Job> jobs;int batch_id = 0;//i消费者:while (running_){unique_lock<mutex> l(job_lock);cv_.wait(l , [&]{return !qjobs_.empty() || !running_;});if(! running_){break;}while (jobs.size() < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}for (int i = 0; i < jobs.size(); i++){auto& job = jobs[i];char result [100];sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(1000));}printf("释放->%s \n", context_.c_str());context_.clear();printf("Worker Done\n");}
同时worker也要改变,while的判断机制也需要改变,cv_.wait也不能是只有qjobs为空,还要加上是否还在运行。如果不运行了要break出while循环。
在结束之后要释放模型。
析构函数
virtual ~InferImpl(){running_ = false;cv_.notify_one();if(worker_thread_.joinable()){worker_thread_.join();}}
重点在于析沟函数,在该类退出后设置线程同步,并且在running_设置为false的时候通知一次wait。