DAG计算框架:实现业务编排

文章目录

  • DAG
  • 如何实现DAG计算框架
    • Node的实现
    • Engine的实现
    • Graph的实现
    • 具体某个节点
    • 如何使用

在工作几年之后,大部分人如果还在继续做着 CRUD的简单重复工作,即使领导不提出对你更高的期望,自身也会感到焦虑吧。学如逆水行舟不进则退,年龄在增,技术深度也需要不断精进,否则就很可能面临淘汰。因此找个时间静下心来,为自己做一个技术规划是非常有必要的。

在工作中想要做好技术规划,就必须抓住一个软件系统的演进见律:

函数->类->组件->脚本->服务->系统->分栈/层->配置化/标准化->自动化->平台化->产品化->规模化

软件工程的本质就是应对规模化所带来的复杂性。

因此如何将复杂的东西变简单,以便于承接更大的规模化发展,这本身就是技术的本质,因此是极其有技术含量的事。

DAG

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG Directed Acyclic Graph),在工作中,大部分规则引擎都会用到DAG
在这里插入图片描述

设计时,一般只需要在请求到来时,根据变化的配置信息对DAG进行初始化,根据上下文中的信息(一般用自定义的ctx携带信息),每个node决定是否真的执行具体的task(或者跳过),将业务组件最大程度的复用和内聚。

如何实现DAG计算框架

Node的实现

通过继承扩展实现,业务开发需要实现两个函数EnableRun,其中所有参数检查逻辑在Enable中完成, Enable返回false代表不启用此NodeRun函数是真正执行
业务逻辑的函数实现,这样对于一个具体Node的所有业务逻辑都被高度内聚在了一个文件中实现。

type Node struct {Ctx *BizCtx // 自定义的上下文Name string //节点名字,一般表示业务单元的标识(一个业务流程是一个Node)g *Graph  //整个DAG的控制对象ID int64 // 保证唯一的IDDeps []string // 所有的父节点Nexts []*Node // 所有的子节点 Mask int64 // 表示依赖的演码,用于标识当前节点是否可达(可执行)
}//根据相关参数构建节点
func NewNode(...)*Node{....}
// 获取上下文
func (n *Node)GetCtx()*BizCtx{return n.BizCtx}
// 参数校验
func (n *Node)Enable()bool{...}
// 节点实际运行
func (n *Node)Run(){}
// 获取节点名字
func (n *Node)Name()string{return n.Name}
// 节点依赖,可理解成当前节点的父节点,本示例中,父节点都执行成功后,才可执行当前节点
func (n *Node)Deps()[]string{return n.Deps}
// 所有的子节点
func (n *Node)Nexts()[]*Node{return n.Nexts}
// 节点的配置,不同业务,可能有自己的一些配置参数,如分流参数,奖励参数
func (n *Node)Conf()*NodeConf{}

Engine的实现

提供DAG计算框架的运行时资源管理,协程池管理计算资源原,对象池管理内存资源。

type Engine struct { // engine的生命周期是进程级的
ctxPool *sync.Pool
gPool *sync.Pool
runPool *goPool // 某种协程池实现,接受两个函数,一个函数执行和一个回调函数
graph *Grpah // graph代表着一个真正的DAG,是请求级的生命周期。
}// 函数式选项模式获得一个Engine对象
func NewEngine(opt Options)*Engine{...}
// 初始化一个图
func (e *Engine)BuildGraph()*Graph{...}

Graph的实现

真正实现DAG调度的组件,请求范围内的生命周期

var RootNode = &Node{ID:-1}
var EndNode = &Node{ID:-2}
type Graph struct{e *Engine // Graph 和Engine互相包含id int64taskChannel chan int64 // 需要执行的节点的IDackChanel   chan int64   // 异步回调的确认chandoneChannel chan struct{}  // 执行完成或者异常时,终止DAG的通道NameTable   map[string]*Node // 节点名与节点的映射IDTable     map[int64]*Node // 节点id与节点的映射
}// 添加节点,拼接实际的图
// 这里默认添加顺序遵循了添加当前节点时,已经添加完了当前节点依赖的所有父节点
func (g *Graph)ADD(node *Node)*Graph{g.NameTable[node.Name()] = node// Mask在构建Node时会设置和Node的ID字段相同,然后与所有父节点的ID异或,得到新的Mask值// 后面执行时候,当前节点执行成功,就与其所有子节点异或,更新了子节点的Mask值// 当子节点的Mask值和其ID值又相等时,说明当前节点的所有父节点都执行成功了,可以执行当前节点了for nodeName <- node.Deps(){preNode := g.NameTable[nodeName]node.Mask ^= preNode.IDpreNode.Nexts = append(preNode.Nexts, node)}
}func (g *Graph)Run() err {
//在遍历过程中对第一层没有依赖的node添加一个 rootNode,其 ID== -1
//在遍历的过程中对最后没有出度的节点添加上特殊的 终止Nodle,其ID=== -2
// 即默认让-1和-2分别作为根节点和终止节点
g.taskChannel <- -1
for{select{case taskID <-g.taskChannel:// 遇到了终止节点,当前图可以终止执行了if taskID == -2 {close(g.doneChannel)}node := g.IDTable[taskID]if node.Enable(){// 使用Engine管理协程执行g.e.runPool(func(){node.Run()}, func(id int64){g.ackChannel<-id})}//这里也可以基于协程池做异步控制,ackcase taskID <- g.ackChanel:node := g.IDTable[taskID]// 当前节点致辞哪个成功后,通知所有子节点for nextNode <- node.Nexts(){nextNode.Mask ^= node.ID//利用相同数字异或结果为0的特性维护任务依赖状态// 该子节点可以放入可执行的channel中了if nextNode.Mask == nextNode.ID { gg.taskChannel <- nextNode.ID}case <-g.doneChannel:g.Close()return

具体某个节点

这里以一个RecoveryNode为例,其可能放于文件:/nodes/recovery.go一个单独文件中,其他具体的节点也都放于单独的文件中,但都共同放在nodes文件夹中。

type RecoveryNode struct{*Node // 继承节点的能力.... // 其他与当前节点相关的业务自定义字段
}
//注册名字
func (n *Node)Name()string{return "recovery"}//注册依赖,假如当前节点依赖了分流和奖励节点
func (n *Node)Deps()[]string{return []string{"shunt", "reward"}//可以把Node的name定义为常量进行传递会更好,避免出错
}func NewRecoveryNode(/**这里也可以传参数**/)*Node{return &RecoveryNode{Node:NewNode()}
}func (n *RecoveryNode)Enable()bool{// 利用了ctx的WithValue能力,如下shunt.path就是一个key,其中shunt可以理解成是命名空间,表示是shunt节点中设置的path key,取其值// 这里表示分流节点中的通过路径不是1时,可以执行当前节点return n.Node.GetCtx().GetString("shunt.path","") != "1"
}func (n *Node)Run(){count := n.Node.GetCtx().GetInt64("reward.count", 20)list:=Recovery(count)//示意而已,不用在乎业务具体逻辑n.Node.GetCtx().SetInt64List("recovery.success"", true)
}// ... 其他一些方法的实现

如何使用

var e *Engine
func init(){e = NewEngine{}
}func handler(){g := e.BuildGraph().ADD(NewShuntNode()).ADD(NewRewardNode()).ADD(NewRecoveryNode()).ADD(NewPackDataNode())err := g.Run()print(err)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/407471.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎么整合spring security和JWT

什么是spring security spring security是一个安全框架,它里面有过滤器链,可以多次过滤,其实他可以给前端的cookie传入一个jsessionid,都可以不使用jwt也能完成校验 第一步:导入依赖 <!-- springboot security --> <dependency><groupId>org.springframew…

git错误fatal: Unpack error, check server log

git错误fatal: Unpack error, check server log fatal: Unpack error, check server log error: remote unpack failed: error Missing tree xxxxxxxxxxxxxxxxxx 先执行 git fetch 命令&#xff0c;再push。 git拉取远程所有分支/添加远程仓库_git pull所有分支代码-CSDN博客…

OpenGL-ES 学习(8) ---- FBO

目录 FBO OverViewFBO 优点使用FBO的步骤 FBO OverView FBO(FrameBuffer Object) 指的是帧缓冲对象&#xff0c;实际上是一个可以添加缓冲区容器&#xff0c;可以为其添加纹理或者渲染缓冲区对象(RBO) FBO(FrameBuffer Object) 本身不能用于渲染&#xff0c;只有添加了纹理或者…

【C++ Primer Plus习题】3.6

问题: 解答: #include <iostream> using namespace std;int main() {float miles 0;float gallons 0;float gallon 0;cout << "请输入驱车里程(单位为英里):";cin >> miles;cout << "请输入使用的汽油量(单位为加仑):";cin &g…

【Three.js基础学习】19.Custom models with Blender

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 前言 blender模型资源:【blender】一个汉堡包-CSDN博客 一、代码 import ./style.css import * as THREE from three import { OrbitControls } from three/examples/jsm/co…

CentOS服务器三级等保加固

1.密码周期: vim /etc/login.defs max_days:90 mindays:2 minlen:8 warnage:72.密码复杂度: vim /etc/pam.d/system-auth &#xff1a; password requisite pam_cracklib.so retry3 difok3 minlen8 lcredit-1 dcredit-1 ucredit-1 ocredit-1 【Ubuntu系统->vim /etc/pam.d/c…

【STM32】C语言基础补充

学习过程中发现自己好些需要用到的C语言语法、特征都不太熟练了&#xff0c;特意记录一下&#xff0c;免得忘记了&#xff0c;以后遇到了新的也会继续更新 目录 1 全局变量 2 结构体 3 静态变量 4 memset()函数 5 使用8位的存储器存16位的数 1 全局变量…

C++学习笔记----4、用C++进行程序设计(四)---- 复合关系与继承关系之间的细线

在现实世界只是很容易区分对象之间是复合关系还是继承关系。没有人会说桔子有一个水果--而只能是桔子是一种水果。但是&#xff0c;在代码中&#xff0c;有时候就不是那么清晰了。 设想有一个代表关联数组的假想类&#xff0c;将一个键影射到一个值的数据结构。例如&#xff0c…

【Canvas与艺术】环形橄榄枝纹饰

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>环形橄榄枝(draft2)</title><style type"text/css&quo…

MySql 高阶二(SQL 性能分析)

SQL 性能分析&#xff1a; 查看当前数据库的 增删改查的使用情况 show global status like Com_______;慢查询日志&#xff1a; -- 查看状态 show variables like slow_query_log目前是开启状态。如何开启&#xff0c;编辑my.cnf 文件 添加下面的语句&#xff0c;编辑完成后…

Ansible远程自动化运维

目录 概念 安装ansible modules模块和语法 命令行语法 模块 1. command 基础模块 常用的参数 2. shell模块 3. cron定时任务模块 4. user用户管理模块 参数 5. copy复制模块 参数 6. file模块 设置文件属性 参数 实验&#xff1a;批量创建目录 7…

设备实时数据采集:开启制造业智能化、自动化的新篇章

传统制造业在进行生产过程中&#xff0c;会涉及到设备实时数据采集需求&#xff0c;这些数据对于监控生产流程、优化生产效率、保证产品质量以及降低成本等方面至关重要。以下是一些常见的数据采集需求&#xff1a; 1.生产数据&#xff1a;包括生产数量、生产批次、生产速度等&…

如何禁止编辑PDF文件?推荐两种方法!

在日常工作中&#xff0c;我们经常会遇到需要分享重要的PDF文件的情况&#xff0c;但又希望文件内容不被随意更改。为此&#xff0c;设置PDF文件的修改限制是一个非常有效的措施。今天分享两种常见的禁止修改PDF的方法&#xff0c;一起来看看如何设置。 方法一&#xff1a;使用…

FPGA开发——DS18B20读取温度并且在数码管上显示

一、简介 在上一篇文章中我们对于DS18B20的相关理论进行了详细的解释&#xff0c;同时也对怎样使用DS18B20进行了一个简单的叙述。在这篇文章我们通过工程来实现DS18B20的温度读取并且实现在数码管伤显示。 1、基本实现思路 根据不同时刻的操作&#xff0c;我们可以使用一个状…

TCP与UDP传输的学习

void *memset(void *s, int c, size_t n); 功能&#xff1a;将一块内存空间的每个字节都设置为指定的值&#xff1b;这个函数通常用于初始化一个内存空间&#xff0c;或者清空一个空间&#xff1b; 参数&#xff1a;viod * s 空类型指针&#xff0c;指向要填充内存块&#xf…

代码随想录训练营 Day37打卡 动态规划 part05 完全背包理论基础 518. 零钱兑换II 377. 组合总和 Ⅳ 卡码70. 爬楼梯(进阶版)

代码随想录训练营 Day37打卡 动态规划 part05 一、完全背包理论基础 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09;&#xff0c;求解将哪些物品装…

Arco Design,字节跳动出品的UI库

Arco Design是字节跳动出品的UI库&#xff0c;支持Vue和React。还是比较美观的。并且Arco Design还提供了中后台模版。但是通过提供的arco-cli连接了github&#xff0c;正常情况下无法构建。但效果还是挺好的&#xff0c;下面是效果图&#xff1a; 更新&#xff1a; 传送门可…

【C++ Primer Plus习题】2.3

问题: 解答: #include <iostream> using namespace std;void print01() {cout << "三只眼瞎的老鼠" << endl; }void print02() {cout << "看他们怎么跑" << endl; }int main() {print01();print01();print02();print02();r…

Linux:Linux多线程

目录 线程概念 什么是线程 二级页表 线程的优点 线程的缺点 线程异常 线程用途 Linux进程VS线程 进程和线程 进程的多个线程共享 进程和线程的关系 Linux线程控制 POSIX线程库 线程创建 线程等待 线程终止 分离线程 线程ID及进程地址空间布局 线程概念 什么…

VS Code 远程连接SSH服务

随着技术的不断迭代更新&#xff0c;在 Linux 系统中使用 Vim、nano 等基于 Shell 终端的编辑器&#xff08;我曾经也是个 vimer&#xff0c;但是 VS Code 实在太香了&#xff09;&#xff0c;已经很难适应当下的开发效率。因此大多数开发者开始使用 VS Code 远程连接 Linux 系…