Go 异步任务

Go 异步任务

异步任务在开发中很常见,用来做解耦。本文介绍一下异步队列的实现的几个问题,并且结合三方库实现来分析。

有下面的几个关键点:

  1. 用户代码(任务)如何封装
  2. 数据的存放(数据存放在哪里?就是一个读取队列)
  3. worker的管理(worker的数量,worker执行是否支持超时时间,worker的异常恢复)

带着上面的问题,对比https://github.com/golang-queue/queue的实现,说明一下。

用户代码如何封装

对于任务来说,最重要的是 函数操作,也就是对应的代码逻辑。go中是可以将方法作为参数传递的,方法也是一种类型。所以我们定义下面的方法,方法签名如下:

type TaskFunc func(ctx context.Context) error

还可以配置方法的callback逻辑,比如重试次数,重试间隔,重试error的判断等等

抽象出一个结构体来表示

https://github.com/golang-queue/queue/blob/master/job/job.go#L15
在这里插入图片描述

数据的存放

这是很好拓展的地方,可以支持多种存储媒介和中间件,比如基于内存实现的循环队列,redis,rocketmq。

在实现上就是接口抽象功能,依赖倒转。接口有下面的两个功能

  1. 存数据
  2. 取数据

https://github.com/golang-queue/queue/blob/master/core/worker.go
在这里插入图片描述

解释一下QueuedMessage接口和Worker中的Run方法

  1. QueuedMessage

    用来做数据转换的。

  2. Run

    用来执行函数,表示执行的任务。

worker的管理

worker管理涉及到下面几个方面

  1. worker的数量限制
  2. worker执行时候的超时时间
  3. worker执行时候的异常panic
  4. workder从队列中获取需要处理的处理,并且支持请求超时操作
  5. 服务关闭之后worker也需要操作

我们来看golang-queue/queue中的实现是什么?

通过metric来记录queue在运行期间具体的情况

https://github.com/golang-queue/queue/blob/master/metric.go#L20

在这里插入图片描述

并且通过 channel 来做限制。

每次在goroutine启动和停止的时候通过metric来计数。并且会调用schedule来发信号,给ready发送信号。

goroutine在启动的时候会select ready。

在这里插入图片描述

work的异常情况,在调用task的处理函数的时候,肯定要用到defer来做error恢复,并且通过channel来通信,context来实现超时控制。

具体的原理,我们从下面的代码开始来分析。

https://github.com/golang-queue/queue/blob/master/queue.go#L285

// 对于start来说,是一个死循环,会启动一个goroutine从work中获取数据,当前goroutine等待结果,并且启动goroutine来执行,此Goroutine叫做worker。
func (q *Queue) start() {// QueuedMessage 表示messagetasks := make(chan core.QueuedMessage, 1)// 启动一个goroutine来处理任务// 从work中获取任务,并且启动一个goroutine来处理任务for {// check worker number// 做调度的,就是检查work的数量q.schedule()// 数量不够,需要堵塞select {// wait worker readycase <-q.ready:case <-q.quit:return}// 启动一个goRoutine从 work中获取数据q.routineGroup.Run(func() {for {// 从队列中获取一个请求t, err := q.worker.Request()// 没有消息,或者有错误if t == nil || err != nil {// 有错误if err != nil {select {// 队列退出,关闭掉task,case <-q.quit:if !errors.Is(err, ErrNoTaskInQueue) {close(tasks)return}// 等待一秒再次从work中抓取新数据case <-time.After(time.Second):// sleep 1 second to fetch new task}}}if t != nil { // 说明取到了消息tasks <- treturn}// 说明t为nil但是没有错误select {case <-q.quit:if !errors.Is(err, ErrNoTaskInQueue) {close(tasks)return}default:}}})// 这就是从queue中获取一个task,之后将此task提交给work来实现task, ok := <-tasksif !ok {return}// 所以,这里并没有维护所谓的goroutine池,因为go的编程是不需要这些玩意的。goroutine已经很轻量级的了,直接提交运行就好了// start new taskq.metric.IncBusyWorker()q.routineGroup.Run(func() {q.work(task)})}
}func (q *Queue) work(task core.QueuedMessage) {var err error// 来处理一些内部的错误,在这里会减去worker的数量,并且重新scheduledefer func() {q.metric.DecBusyWorker()e := recover()if e != nil {q.logger.Errorf("panic error: %v", e)}q.schedule()// increase success or failure numberif err == nil && e == nil {q.metric.IncSuccessTask()} else {q.metric.IncFailureTask()}}()// 运行任务,可以看到这里的代码就是为了包装一下if err = q.run(task); err != nil {q.logger.Errorf("runtime error: %s", err.Error())}
}func (q *Queue) run(task core.QueuedMessage) error {data := task.(*job.Message)if data.Task == nil {data = job.Decode(task.Bytes())data.Data = data.Payload}return q.handle(data)
}func (q *Queue) handle(m *job.Message) error {// create channel with buffer size 1 to avoid goroutine leak// 这是go中很创建的做法,一个channel中有数据,但并没有被其他的任何的goroutine操作的话,也是会被gc掉的done := make(chan error, 1) // 完成的信号channelpanicChan := make(chan interface{}, 1) // panic的channelstartTime := time.Now() ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)defer func() {cancel()}()// run the job 启动goroutine来运行一个jobgo func() {// handle panic issuedefer func() {if p := recover(); p != nil {panicChan <- p}}()// run custom process functionvar err error// 做重试逻辑,这里的重试逻辑还可以指定重试的错误,比如as,基于那种类型的错误来做重试操作等。b := &backoff.Backoff{Min:    m.RetryMin,Max:    m.RetryMax,Factor: m.RetryFactor,Jitter: m.Jitter,}delay := m.RetryDelay// backoff都是通过for循环来做的loop:for {// 两种形式,一种是直接function,一直是通过messageif m.Task != nil {err = m.Task(ctx)} else {err = q.worker.Run(ctx, m)}// 不需要重试就直接返回,如果有错误就开始重试,并且利用time来做重试时间的控制if err == nil || m.RetryCount == 0 {break}m.RetryCount--if m.RetryDelay == 0 {delay = b.Duration()}// 这里用select来做操作select {case <-time.After(delay): // retry delayq.logger.Infof("retry remaining times: %d, delay time: %s", m.RetryCount, delay)case <-ctx.Done(): // timeout reached // ctx完成就直接返回err = ctx.Err()break loop}}done <- err}()// 当前的goroutine在等待结果,select {case p := <-panicChan:panic(p)case <-ctx.Done(): // timeout reachedreturn ctx.Err()case <-q.quit: // shutdown service// cancel jobcancel()leftTime := m.Timeout - time.Since(startTime)// wait jobselect {case <-time.After(leftTime):return context.DeadlineExceededcase err := <-done: // job finishreturn errcase p := <-panicChan:panic(p)}case err := <-done: // job finishreturn err}
}

有个问题,如何保证程序退出的时候这些work可以执行结束呢?利用waitGroup实现。

https://github.com/golang-queue/queue/blob/master/thread.go

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/88844.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[数据集][目标检测]骑电动车摩托车不戴头盔数据集VOC格式1385张

数据集格式&#xff1a;Pascal VOC格式(不包含分割路径的txt文件和yolo格式的txt文件&#xff0c;仅仅包含jpg图片和对应的xml) 图片数量(jpg文件个数)&#xff1a;1385 标注数量(xml文件个数)&#xff1a;1385 标注类别数&#xff1a;2 标注类别名称:["y","n&q…

“Can‘t open perl script configure : No such file or directory”的解决办法

编译OpenSSL的时候执行到 perl configure 时提示找不到configure&#xff0c; 然后在网上搜了搜&#xff0c;大家给的解决办法一般都是说设置环境变量或者指定configure路径再执行&#xff1b;我试了都不行&#xff0c; 最后我把perl卸了重装就正常了&#xff1b; 然后我换了…

服务器数据恢复-RAID5上层Hyper-V虚拟机数据恢复案例

服务器数据恢复环境&#xff1a; 一台Windows Server服务器&#xff0c;部署Hyper-V虚拟化环境&#xff0c;虚拟机的硬盘文件和配置文件存放在一台DELL存储中。该存储中有一组由4块硬盘组建的RAID5阵列&#xff0c;用来存放虚拟机的数据文件&#xff0c;另外还有一块大容量硬盘…

Pyqt5使QTextEdit或QLabel等框框背景透明

设置&#xff1a;textEdit->setStyleSheet(“background-color: rgb(255, 255, 255, 60);”);

如何使用Mac终端给树莓派pico构建C/C++程序进行开发,以及遇到各种问题该怎么处理,不使用任何IDE或编辑器(例如VS Code)

写本文的原因是官方的教程已经过时了&#xff0c;如果你现在按照官方教程来在 Mac 上进行配置&#xff0c;那么会遇到一堆问题&#xff0c;比如我几乎把能踩的“雷”都踩了。所以这里记录了完整过程&#xff0c;以及各种错误的原因和处理方法&#xff0c;不然以后换 Mac 了或者…

2023上半年京东奶粉行业品牌销售排行榜(京东数据分析平台)

近年来&#xff0c;受新生儿人口数量下降的影响&#xff0c;婴幼儿奶粉市场的需求量萎缩&#xff0c;市场由增量竞争转为存量竞争。根据鲸参谋电商数据分析平台的数据显示&#xff0c;今年上半年&#xff0c;京东婴幼儿奶粉市场的销量将近4400万&#xff0c;环比下降约19%&…

电气测试相关

项目&#xff1a; 长期过电压 瞬态过电压 瞬态欠压 跳跃启动 卸载 纹波电压 电源电压缓慢下降和上升 电源电压缓慢下降、快速上升 复位行为 短暂中断 启动脉冲 带电气系统控制的电压曲线 引脚中断 连接器中断 反极性 信号线和负载电路短路 启动行为 对分流不…

form中表单切换,导致 relus 中的事件无法触发,原因:页面切换不要一直切换DOM,会导致问题,需要都显示出来

修改前&#xff0c;因为重复渲染DOM导致绑定rules失效 修改前代码使用 computed 计算出渲染的DOM&#xff0c;影响rules事件<el-formref"form"inline:model"billDetailCopy":rules"rules"size"small"label-position"right&quo…

YOLOv5可视化界面

Pyside6可视化界面 安装Pyside6 激活之前的虚拟环境yolov5 在该环境的终端输入以下命令 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyside6输入where python找到当前使用的Python的路径 找到该路径下的designer.exe文件&#xff08;/Lib/site-packages/PySi…

Java IO流——【从零构建信息管理系统】

Java I/O流——【从零构建信息管理系统】 文章目录 什么是Java I/O流介绍理解字节流和字符流的区别 Java I/O流的作用Java I/O流方法InputStream方法Reader方法OutputStream方法Writer方法Java I/O体系的全体类 使用示例Java I/O流在实际应用中使用效果 什么是Java I/O流 介绍…

idea集成chatGPT,免费使用的bito神器

什么是Bito&#xff1f; Bito是一款在IntelliJ IDEA编辑器中的插件&#xff0c;Bito插件是由ChatGPT团队开发的&#xff0c;它是ChatGPT团队为了提高开发效率而开发的一款工具。ChatGPT团队是一支专注于自然语言处理技术的团队&#xff0c;他们开发了一款基于GPT的自然语言处理…

Azure创建第一个虚拟机

首先&#xff0c;登录到 Azure 门户 (https://portal.azure.com/)。在 Azure 门户右上角&#xff0c;点击“虚拟机”按钮&#xff0c;并点击创建&#xff0c;创建Azure虚拟机。 在虚拟机创建页面中&#xff0c;选择所需的基本配置&#xff0c;包括虚拟机名称、操作系统类型和版…

Pytorch个人学习记录总结 10

目录 优化器 优化器 官方文档地址&#xff1a;torch.optimhttps://pytorch.org/docs/stable/optim.html Debug过程中查看的grad所在的位置&#xff1a; model --> Protected Atributes --> _modules --> ‘model’ --> Protected Atributes --> _modules -…

期刊和会议缩写查询网站

1.https://pubmed.ncbi.nlm.nih.gov/?termMedicalImageComputingandComputer-AssistedIntervention 2. http://www.letpub.com.cn/index.php?pagejournalapp&viewsearch 3. https://blog.csdn.net/weixin_44557349/article/details/120825927 https://blog.csdn.net/ret…

Vue中如何更好地封装组件?

子组件接受父组件传递的事件 1.子组件使用事件名"$emit(父组件中传递的事件名,想给父组件传递的参数(可选))" click"$emit(click)" 2.子组件使用 v-on"$listeners" 父组件&#xff1a; <template><div id"app"><myCo…

第 358 场LeetCode周赛题解

A 数组中的最大数对和 数据范围小&#xff0c;直接暴力枚举数对 class Solution { public:int mx(int x) {//返回10进制表示的数的最大数字int res 0;for (; x; x / 10)res max(res, x % 10);return res;}int maxSum(vector<int> &nums) {int n nums.size();int r…

3.2 Tomcat基础

1. Tomcat概述 Tomcat 服务器是一个免费的开放源代码的Web 应用服务器&#xff0c;属于轻量级应用服务器。 Tomcat版本&#xff1a;apache-tomcat-8.5.76。 2.IDEA集成Tomcat 第一步 第二步 第三步 ​ 编辑切换为居中 添加图片注释&#xff0c;不超过 140 字&#xff0…

PyQt5组件之QLabel显示图像和视频

目录 一、显示图像和视频 1、显示图像 2、显示视频 二、QtDesigner 窗口简单介绍 三、相关函数 1、打开本地图片 2、保存图片到本地 3、打开文件夹 4、打开本地文本文件并显示 5、保存文本到本地 6、关联函数 7、图片 “.png” | “.jpn” Label 自适应显示 一、显…

ModaHub魔搭社区:从OpenAI实践看分工必要性,核心关注工作流相关的基础软件工具栈

从OpenAI实践看分工必要性,核心关注工作流相关的基础软件工具栈 参考海外OpenAI的率先尝试,工作流分工、点工具加持助力成功。一方面,OpenAI在《GPT-4 Technical Report》论文中[1]中披露了参与GPT 4开发的人员分工,共249人,角色分工明确,预训练、强化学习和对齐、部署等…

ORCA优化器浅析——CXform base class for all transformations

CXform CXforml类作为所有transformation的基础类&#xff0c;其包含了pattern成员m_pexpr。主要是在exploration和implementation expression流程中使用&#xff0c;主要调用Transform函数。其还包含返回相关xforms的集合函数&#xff0c;比如PbsIndexJoinXforms等。 class …