celery是一个强大的分布式任务队列,在这里我们介绍一下它最基本的处理异步任务的功能,包含以下几个:
-
创建Celery实例
-
创建一个异步任务
-
查询异步任务的信息
-
取消异步任务
使用的环境是flask3.0+Celery5.4
1. 创建Celery实例
celery通过中间件获取任务,再将任务结果写入backend,所以创建实例时需要传递这两个的url,其支持多种协议(比如redis、rabbitmq等),这里我们以redis为例。最简单的一种是创建对象时传递:
from celery import Celerycelery_app = Celery(__name__, broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
另外一种是使用配置文件,按官方文档的例子,在同级目录创建一个celeryconfig.py文件,写入配置信息,然后更新celery对象的配置:
celery_app = Celery(__name__)
apcelery_appp.config_from_object('celeryconfig')
但可能是由于Celery版本问题,配置字段改变了(大小写改变),这种方法连接backend一直报错,暂时还没找到原因
2. 创建一个异步任务
类似于flask,celery也使用装饰器来创建任务处理函数,比如官方例子:
@celery_app.task
def add(x, y):return x + y
但这种方式无法访问任务实例的信息,需要修改一下装饰器的参数,将任务实例作为第一个参数引入:
@celery_app.task(bind=True)
def add(self, x, y):print(self.request.id) # 打印任务return x + y
返回值会写到backend,存储在result字段。结合flask的例子如下:
from flask import Flask# 其它代码
# ...app = Flask(__name__)@app.route('/task', methods=['POST'])
def add_task():id = add.delay(1, 2) # 返回的是异步任务的return {'code': 0, 'data': id}
注意:Celery是要单独启动的,然后再启动flask,启动命令类似于:
Celery -A app worker --loglevel=info
这个地方也有比较多的坑,经常报模块不存在这样的错
3. 查询异步任务的信息
大部分异步任务的耗时都比较长,过程中用户可能想知道任务执行的情况,比如进度等,这时前面引入的第一个参数(self)就能发挥作用了,我们修改一下demo, 每隔1秒计算一次,并将进度更新到backend:
import time@celery_app.task(bind=True)
def add(self, x, y):for i in range(1, 100):print(x + y)self.update_state(state="PROGRESS", meta={'index': i})time.sleep(1)return x + y
而对应的flask查询代码如下:
from celery.result import AsyncResult@app.route('/task/<string:id>', methods=['GET'])
def get_task(id):return {'code': 0, 'data': celery_app.AsyncResult(id)}
4. 取消异步任务
取消任务要借助Celery的Control类:
@app.route('/task/<string:id>', methods=['DELETE'])
def remove_task(id):ctrl = Control(celery_app) ctrl.revoke(id)return {'code': 0}