往期教程
如果觉得写的可以,请给一个点赞+关注支持一下
观看之前请先看,往期的博客教程,否则这篇博客没办法看懂
-
workFlow c++异步网络库编译教程与简介
-
C++异步网络库workflow入门教程(1)HTTP任务
-
C++异步网络库workflow系列教程(2)redis任务
-
workflow系列教程(3)Series串联任务流
简介
上一篇博客中讲了串行任务流,有了串行,那必然也有并行,本篇博客讲解任务流并行执行
创建一个任务流序列
SubTask
是所有任务的基类first
参数是这个任务流序列执行的首个任务
callback
是这个序列执行完毕后执行的回调函数
using series_callback_t = std::function<void (const SeriesWork *)>;inline SeriesWork *
Workflow::create_series_work(SubTask *first, series_callback_t callback)
{return new SeriesWork(first, std::move(callback));
}
创建并行任务流序列
- 参数
callback
是设置并行任务流中所有任务执行完毕之后调用的回调函数 - 函数返回
ParallelWork *
并行任务指针
using parallel_callback_t = std::function<void (const ParallelWork *)>;inline ParallelWork *
Workflow::create_parallel_work(parallel_callback_t callback)
{return new ParallelWork(std::move(callback));
}
向并行任务流中添加一个任物流序列
class ParallelWork{
public:void add_series(SeriesWork *series);
}
代码示例
图示流程
- 首先创建一个并行任务流序列,随后创建n个任务流序列,每个任务流序列添加一个http任务,
- 先并行执行每个序列的http任务基本工作,随后调用设置的
httpCallback
异步回调函数,httpCallback
执行完毕后调用所在序列的序列回调函数,当所有的序列回调函数执行完毕之后在执行并行任务流的parallelCallback
回调函数
#include <vector>
#include <workflow/WFFacilities.h>
#include <workflow/Workflow.h>
#include <workflow/HttpUtil.h>
struct SeriesContext{std::string url;int state;int error;protocol::HttpResponse resp;//响应报文的完整内容
};
void parallelCallback(const ParallelWork *pwork){fprintf(stderr,"pwork callback!\n");SeriesContext *context;for(size_t i = 0; i != pwork->size(); ++i){context = static_cast<SeriesContext *>(pwork->series_at(i)->get_context());fprintf(stderr,"url = %s\n", context->url.c_str());if(context->state == WFT_STATE_SUCCESS){const void *body;size_t size;context->resp.get_parsed_body(&body,&size);fwrite(body,1,size,stderr); fprintf(stderr,"\n");}else{fprintf(stderr,"Error, state = %d, error = %d\n", context->state, context->error);}delete context;}
}
void httpCallback(WFHttpTask *httpTask){SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());fprintf(stderr,"httpTask callback, url = %s\n", context->url.c_str());context->state = httpTask->get_state();context->error = httpTask->get_error();context->resp = std::move(*httpTask->get_resp());
}
int main(){//使用工厂函数,创建一个并行任务ParallelWork *pwork = Workflow::create_parallel_work(parallelCallback);//Workflow::create_parallel_workstd::vector<std::string> urlVec ={"http://192.168.135.129:81", "http://192.168.135.129","http://47.94.147.94"};for(size_t i = 0; i != urlVec.size() ; ++i){//创建若干个任务// WFTaskFactory::create_http_taskstd::string url = urlVec[i];auto httpTask = WFTaskFactory::create_http_task(url,0,5,httpCallback);// 修改任务的属性auto req = httpTask->get_req();req->add_header_pair("Accept","*/*");req->add_header_pair("User-Agent","myHttpTask");req->set_header_pair("Connection", "Close");//为响应的内容申请一片堆空间SeriesContext *context = new SeriesContext;context->url = std::move(url);// 为每个任务创建一个序列auto series = Workflow::create_series_work(httpTask,nullptr);// 把存储响应内容的指针 拷贝到序列的context当中。series->set_context(context);//把序列加入到并行任务中// add_seriespwork->add_series(series);}pwork->start();//启动并行任务
}