workflow源码解析:ThreadTask

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/242416.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CISSP认证计算机化自适应考试(CAT)及常见问题解答

每位参加CISSP CAT考试的考生均会由一个远低于及格标准的考题开始。在考生对某道题作答之后&#xff0c;评分算法会根据所有题目的难度和作答情况重新估算考生的能力。每多作答一道题&#xff0c;计算机对考生能力就会估算更加精确——与传统的线性考试相比&#xff0c;它能更有…

23号资源——电力系统程序集合已提供下载资源

23号资源&#xff1a;程序集合包含9个程序&#xff08;经典电力系统经济调度程序&#xff1b;2解决带储&#xff1b;3智能微电网PSO优化算法&#xff1b;微电网调度等等&#xff0c;见资源描述&#xff09;资源-CSDN文库https://download.csdn.net/download/LIANG674027206/887…

【PICO】【Unity】【VR】如何对打包后的PICO项目有效Debug

【背景】 PICO项目打包后再运行就看不到Console了。当然,会有各类专业的Debug工具。 有一类Debug的工具是Preview形式下展示Debug信息,但是发现Preview成功不见得打包也成功。 打包后也会有一些Debug工具,不过这里我给出自己的简单解决办法。 【解决方案】 Unity Console…

第一篇【传奇开心果系列】WeUI开发原生微信小程序:汽车租赁小程序示例

传奇开心果博文系列目录 WeUI开发原生微信小程序示例系列博文目录博文目录一、项目目标二、编程思路三、初步实现汽车租赁微信小程序示例代码四、实现汽车租赁微信小程序的登录注册示例代码五、实现汽车租赁微信小程序的订单管理示例代码六、整合实现比较完整的汽车租赁微信小程…

算法笔记(动态规划入门题)

1.找零钱 int coinChange(int* coins, int coinsSize, int amount) {int dp[amount 1];memset(dp,-1,sizeof(dp));dp[0] 0;for (int i 1; i < amount; i)for (int j 0; j < coinsSize; j)if (coins[j] < i && dp[i - coins[j]] ! -1)if (dp[i] -1 || dp[…

Spark读取kafka(流式和批数据)

spark读取kafka&#xff08;批数据处理&#xff09; # 按照偏移量读取kafka数据 from pyspark.sql import SparkSessionss SparkSession.builder.getOrCreate()# spark读取kafka options {# 写kafka配置信息# 指定kafka的连接的broker服务节点信息kafka.bootstrap.servers: n…

C语言练习day8

变种水仙花 变种水仙花_牛客题霸_牛客网 题目&#xff1a; 思路&#xff1a;我们拿到题目的第一步可以先看一看题目给的例子&#xff0c;1461这个数被从中间拆成了两部分&#xff1a;1和461&#xff0c;14和61&#xff0c;146和1&#xff0c;不知道看到这大家有没有觉得很熟…

适合多种语言的BPE(Byte-Pair Encoding)编码

文章目录 前言BPE参考 前言 因为最近在看T5&#xff0c;里面讲到一些分词的方法如BEP&#xff0c;因为现在都是在玩大模型&#xff0c;那么语料也就都很大&#xff0c;而且还需要适配不同的语言&#xff0c;而不同的语言又不一定像英文那样按空格切分就行&#xff0c;例如咱们…

小程序学习-19

Vant Weapp - 轻量、可靠的小程序 UI 组件库 ​​​​​ Vant Weapp - 轻量、可靠的小程序 UI 组件库 安装出现问题&#xff1a;rollbackFailedOptional: verb npm-session 53699a8e64f465b9 解决办法&#xff1a;http://t.csdnimg.cn/rGUbe Vant Weapp - 轻量、可靠的小程序…

【C++干货铺】C++11新特性——lambda表达式 | 包装器

个人主页点击直达&#xff1a;小白不是程序媛 C系列专栏&#xff1a;C干货铺 代码仓库&#xff1a;Gitee 目录 C98中的排序 lambda表达式 lambda表达式语法 表达式中的各部分说明 lambda表达式的使用 基本的使用 [var]值传递捕捉变量var ​编辑 [&var]引用传递捕…

VC++中使用OpenCV进行颜色检测

VC中使用OpenCV进行颜色检测 在VC中使用OpenCV进行颜色检测非常简单&#xff0c;首选读取一张彩色图像&#xff0c;并调用函数cvtColor(img, imgHSV, COLOR_BGR2HSV);函数将原图img转换成HSV图像imgHSV&#xff0c;再设置好HSV三个分量的上限和下限值&#xff0c;调用inRange函…

在WIN从零开始在QMUE上添加一块自己的开发板(二)

文章目录 一、前言往期回顾 二、CPU虚拟化&#xff08;一&#xff09;相关源码&#xff08;二&#xff09;举个例子&#xff08;三&#xff09;测试 三、内存虚拟化&#xff08;一&#xff09;相关源码&#xff08;二&#xff09;举个例子测试 参考资料 一、前言 笔者这篇博客…

雷盛红酒LEESON分享葡萄酒也有“社会责任感”?

葡萄酒文化从来都不仅仅是感官体验&#xff0c;一瓶佳酿的背后不但蕴含着风土人情、历史传承和文化交流&#xff0c;更反映了时代社会的变迁以及体现的社会责任意识。 目前葡萄酒生产商追求酒瓶越来越轻就是葡萄酒市场上的一个趋势&#xff0c;因为任何一个行业都在追求与世界共…

c语言算法——大数相加

C数据类型 类型与描述1基本数据类型 它们是算术类型&#xff0c;包括整型&#xff08;int&#xff09;、字符型&#xff08;char&#xff09;、浮点型&#xff08;float&#xff09;和双精度浮点型&#xff08;double&#xff09;。2枚举类型&#xff1a; 它们也是算术类型&am…

Vue2的双向数据绑定

Vue2的双向数据绑定 Observer&#xff1a;观察者&#xff0c;这里的主要工作是递归地监听对象上的所有属性&#xff0c;在属性值改变的时候&#xff0c;触发相应的watcher。 Watcher&#xff1a;订阅者&#xff0c;当监听的数据值修改时&#xff0c;执行响应的回调函数&#x…

Spring:StopWatch

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 一、输出总耗时 二、输出所有任务的耗时和占比 总结 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、输出总耗时 public void stopWatc…

ERP进出库+办公用品管理系统

系统架构 简介系统架构部分页面结构图UML逻辑图办公用品入出库 简介 本系统适用于ERP企业公司职员关于系统化的申请相关办公用品&#xff0c;提高整体系统整合行&#xff0c;加大上下级之间的联系&#xff0c;规避因人员过多&#xff0c;而浪费人力在简单重复的工作中&#xf…

conda国内加速

1、配置国内源 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ 2、显示源地址 conda config --set show_channel_urls yes

【MongoDB】下载安装、指令操作

目录 1.下载安装 2.指令 2.1.基础操作指令 2.2.增加 2.3.查询 2.4.修改 2.5.删除 前言&#xff1a; 关于MongoDB的核心概念请移步&#xff1a; 【文档数据库】ES和MongoDB的对比-CSDN博客 1.下载安装 本文以安装Windows版本的mongodb为例&#xff0c;Linux版本的其实…

30岁的路口,这些90后选择离开大城市

#第一批90后今年34岁了#【30岁的路口&#xff0c;这些90后选择离开大城市】#第一批90后现状如何# 据惊蛰研究所&#xff1a;第一批90后今年34岁了。假如从2012年踏入职场&#xff0c;第一批90后如今已在职场摸爬滚打十年。十年之前&#xff0c;他们意气风发来到大城市&#xff…