Celery的任务流

在这里插入图片描述

Celery的任务流

在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西

celery的简单使用

signature的引入

signature官方文档

可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程

先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c# 减
@app.task
def subtract_num(a, b):print(f'{a}-{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a - breturn c# 乘
@app.task
def multiply_num(a, b):print(f'{a}*{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a * breturn c# 除
@app.task
def divide_num(a, b):print(f'{a}/{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a / breturn c@app.task
def test(args):print(args)return args

运行celery消费者

# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

在这里插入图片描述

用signature对上面的add_num包装

from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

chain的使用

chain官方链接

chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数

from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

在这里插入图片描述

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条

group的使用

group官方链接

group可以将signature包装的任务函数并行执行,返回一组返回值

from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')# 对某个数分别做加减乘除处理group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

在这里插入图片描述
可以看到相比于chain,group里的任务是同时执行

chord的使用

chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成

上代码

from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, testadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)#包装test异步任务函数
test_sign = signature(test, queue='test')c1 = chord(group1, test_sign)
c1.delay()

在这里插入图片描述
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数

PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到

最后附上celery关于任务工作流的官方链接
celery工作流

PS

有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值ret = add_num.delay(1,2)ret = ret.get()return ret
#启动消费者
celery -A tasks worker -l info

调用test异步函数

from tasks import test
ret = test.delay()

在这里插入图片描述

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():ret = add_num.delay(1, 2)ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=Falsereturn ret

再调用一次test方法

在这里插入图片描述
成功调用

详见celery官方链接
链接传送门

结语

写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/296516.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python文件处理:解析docx/word文件文字、图片、复选框

前言 因为一些项目原因,我需要提供解析docx内容功能。本来以为这是一件比较简单的工作,没想到在解析复选框选项上吃了亏,并且较长一段时间内通过各种渠道都没有真正解决这一问题,反而绕了远路。 终于,我在github pytho…

9.图像中值腐蚀膨胀滤波的实现

1 简介 在第七章介绍了基于三种卷积前的图像填充方式,并生成了3X3的图像卷积模板,第八章运用这种卷积模板进行了均值滤波的FPGA实现与MATLAB实现,验证了卷积模板生成的正确性和均值滤波算法的MATLAB算法实现。   由于均值滤波、中值滤波、腐…

v-text 和v-html

接下来&#xff0c;我讲介绍一下v-text和v-html的使用方式以及它们之间的区别。 使用方法 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-widt…

Redis的值有5种数据结构,不同数据结构的使用场景是什么?

文章目录 字符串缓存计数共享Session限速 哈希缓存 列表消息队列文章列表栈队列有限集合 集合标签抽奖社交需求 有序集合排行榜系统 字符串 缓存 &#xff08;1&#xff09;使用原生字符类型缓存 优点&#xff1a;简单直观&#xff0c;每个属性都支持更新操作 缺点&#xff1…

Ansible批量操作(上传文件、删除文件指定文件内容、执行sh文件等)

官方网站 https://www.ansible.com/ 一、Ansible 简介 1、Ansible是新出现的自动化运维工具&#xff0c;完全基于Python开发&#xff0c;集合了众多运维工具&#xff08;puppet、chef、func、fabric&#xff09;的优点&#xff0c;实现了批量系统配置、批量程序部署、批量运行…

4.3学习总结

[HNCTF 2022 WEEK2]Canyource&#xff08;无参数&#xff09; 通过这题又接触了一种无参数RCE的方法&#xff0c;前面学习的getallheaders只有在apache环境下才能使用&#xff0c;具有一定的局限性 这里是利用php函数来构造读取flag的方法 localeconv() – 函数返回一个包含本…

Acrel-1000DP光伏监控系统在尚雷仕(湖北)健康科技有限公司5.98MW分布式光伏10KV并网系统的应用

摘 要&#xff1a;分布式光伏发电特指在用户场地附近建设&#xff0c;运行方式多为自发自用&#xff0c;余电上网&#xff0c;部分项目采用全额上网模式。分布式光伏全额上网的优点是可以充分利用分布式光伏发电系统的发电量&#xff0c;提高分布式光伏发电系统的利用率。发展分…

第19次修改了可删除可持久保存的前端html备忘录:换了一个特别的倒计时时钟

第19次修改了可删除可持久保存的前端html备忘录:换了一个特别的倒计时时钟 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><met…

计算机视觉——图像金字塔理解与代码示例

图像金字塔 有时为了在图像中检测一个物体&#xff08;例如人脸、汽车或其他类似的物体&#xff09;&#xff0c;需要调整图像的大小或对图像进行子采样&#xff0c;并进行进一步的分析。在这种情况下&#xff0c;会保持一组具有不同分辨率的同一图像。称这种集合为图像金字塔…

基于OrangePi Zero2的智能家居项目(开发阶段)

智能家居项目的软件实现 紧接上文 基于OrangePi Zero2的智能家居项目&#xff08;准备阶段&#xff09;-CSDN博客 目录 一、项目整体设计 1.1项目整体设计 1.2具体划分 二、开发工作的前期准备 1、进行分类&#xff0c;并用Makefile文件进行管理 参考&#xff1a;自己创…

硬件了解 笔记 2

CPU 内存控制器&#xff1a;负责读写数据 代理系统和平台IO&#xff1a;与主板上的芯片组通信&#xff0c;并管理PC中其他组件之间的数据流 主板&#xff1a;巨大的印刷电路板 Chipset&#xff1a;芯片组&#xff0c;位于散热器下方&#xff0c;直接连接到CPU的系统代理部分 …

详解网络攻击的发生原因、类型及如何防范

网络攻击是访问计算机系统或者大小&#xff0c;修改或窃取数据的未经授权的企图。网络破坏分子可以使用多种攻击媒介&#xff0c;推出包括网络攻击的恶意软件&#xff0c;网络钓鱼&#xff0c;勒索&#xff0c;以及人在这方面的中间人攻击。固有风险和残余风险使这些攻击中的每…

【大数据存储】实验五:Mapreduce

实验Mapreduce实例——排序&#xff08;补充程序&#xff09; 实验环境 Linux Ubuntu 16.04 jdk-8u191-linux-x64 hadoop-3.0.0 hadoop-eclipse-plugin-2.7.3.jar eclipse-java-juno-SR2-linux-gtk-x86_64 实验内容 在电商网站上&#xff0c;当我们进入某电商页面里浏览…

鸿蒙实战开发:【实现应用悬浮窗】

如果你要做的是系统级别的悬浮窗&#xff0c;就需要判断是否具备悬浮窗权限。然而这又不是一个标准的动态权限&#xff0c;你需要兼容各种奇葩机型的悬浮窗权限判断。 fun checkPermission(context: Context): Boolean if (Build.VERSION.SDK_INT < Build.VERSION_CODES.M)…

[Arduino学习] ESP8266读取DHT11数字温湿度传感器数据

目录 1、传感器介绍 2、接线 3、DHT.h库 1、传感器介绍 DHT11数字温湿度传感器是一款含有已校准数字信号输出的温湿度复合传感器&#xff0c;是简单环境监测项目的理想选择。 温度分辨率为1C&#xff0c;相对湿度为1&#xff05;。温度范围在0C到50C之间&#xff0c;湿度的测…

java Web 健身管理系统idea开发mysql数据库LayUI框架java编程计算机网页源码maven项目

一、源码特点 java Web健身管理系统是一套完善的信息管理系统&#xff0c;结合java 开发技术和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 前段主要技术 layUI bootst…

PVE设置显卡直通(二:Linux显卡直通,以及Linux系统下安装cuda库)

PVE设置显卡直通(一:硬件设置) 本文仅记录PVE关于Linux下的显卡直通步骤 例程不过多阐述 ps: 无直通经验的同学,先参阅 PVE设置显卡直通(一:硬件设置),再参阅本博文 参阅完成 PVE设置显卡直通(一:硬件设置)后,直接在PVE面板中添加显卡硬件到自己的主机即可,此文中…

CTK插件框架学习-插件注册调用(03)

CTK插件框架学习-新建插件(02)https://mp.csdn.net/mp_blog/creation/editor/136923735 一、CTK插件组成 接口类&#xff1a;对外暴露的接口&#xff0c;供其他插件调用实现类&#xff1a;实现接口内的方法激活类&#xff1a;负责将插件注册到CTK框架中 二、接口、插件、服务…

ABC318 F - Octopus

解题思路 对于每个宝藏维护个区间&#xff0c;答案一定在这些区间中对于每个区间的端点由小到大排序对于每个点进行判断&#xff0c;若当前位置合法&#xff0c;则该点一定为一个右端点则该点到前一个端点之间均为合法点若前一个点不合法&#xff0c;则一定是某一个区间限制的…

C++万物起源:类与对象(三)拷贝构造、赋值重载

目录 一、拷贝构造函数 1.1拷贝构造函数的概念与特征 1.2拷贝构造的实现 1.3默认构造函数 1.4拷贝构造函数典型调用场景 二、赋值运算符重载 2.1赋值运算符重载的格式 一、拷贝构造函数 1.1拷贝构造函数的概念与特征 在c语言语法中&#xff0c;我们可以将一个变量赋值给…