Celery 是一个简单、灵活且可靠的分布式任务队列框架,用于处理大量的异步任务、定时任务等。它允许你将任务发送到消息队列,然后由后台的工作进程(worker)来执行这些任务,并且支持多种消息中间件,如 RabbitMQ、Redis 等。
Celery简介
Celery是一个简单、灵活、可靠的分布式系统,可以处理大量的消息,同时为操作提供维护这样一个系统所需的工具。
Celery有很多应用场景,典型示例如下:
- 发送电子邮件:可以将发送电子邮件的任务交给Celery,并向用户显示一个感谢页面,而不是让用户在填写完注册表格后等待。你可能会说,执行电子邮件发送代码不需要花费时间,但是如果电子邮件服务器没有响应,如果将这部分设置为同步,站点访问者将不得不等待,直到超时发生。
- 图片/其他文件上传任务:现在通过网页上传图片或其他类型的文档是非常常见的。假设希望提供一个工具来上传包含产品图像的产品信息,同时还需要根据要求调整图像大小,并增加与品牌相关的水印,用户在所有这些操作期间等待看起来不太好。他想要的只是看到他的过程已经完成的文本,然后继续前进。你可以创建多个celery任务来实现目标。
- 计划任务:celery也可以作为一个调度程序,执行周期性任务。
Celery主要概念
Celery基本架构如下图所示:
生产者:这个应用程序负责推送消息与所有需要的信息。
Broker: 这个模块实际上是作为消息队列服务的,像Redis或RabbitMQ这样的应用程序可以在这里使用。
任务:任务是序列化后在代理中排队的Python函数或任务。然后,任务函数由负责反序列化并执行它的工作程序挑选。默认的序列化格式是JSON,您可以将其更改为msgpack, YAML或pickle。
后端:该组件负责存储函数产生的结果.
环境准备
- 安装redis
这里为了快速演示,直接适用docker容器:
docker run -d -p 6379:6379 redis
- 安装依赖
首先安装 Celery 和 Redis(python连接redis):
pip install celery redis
基本示例
- 创建任务
创建项目(如使用poetry工具),以下是简单Celery任务模块示例:
# tasks.py 文件
from celery import Celery# 创建Celery实例,指定名称和消息中间件(这里是Redis)的URL
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
首先导入Celery
类,然后创建一个Celery
实例,名称为tasks
,并指定broker
(消息中间件)为本地 Redis 服务器(redis://localhost:6379/0
)。
定义了一个名为add
的任务,它是一个被@app.task
装饰的函数。这个任务接收两个参数x
和y
,并返回它们的和。
- 启动Celery工作进程
在终端运行命令启动Celery工作进程:
celery -A tasks worker --loglevel=info
这里-A tasks
表示任务模块是tasks.py
,worker
表示启动工作进程,--loglevel=info
设置日志级别为info
,这样可以看到任务执行的相关信息。
- 调用任务
在另一个python脚本(如:main.py)中调用任务:
# main.py
from tasks import add# 异步调用任务
result = add.delay(66, 4)
print("任务已发送,等待结果...")
# 获取任务结果
print("结果:", result.get())
首先从tasks.py
模块中导入add
任务。
然后使用add.delay(66, 4)
异步调用add
任务,传递参数66
和4
。这会将任务发送到消息队列,由 Celery 工作进程来执行。接着打印出任务已发送的消息,等待任务执行结果。
最后,使用result.get()
获取任务的最终结果。当任务还没有执行完成时,get()
方法会阻塞,直到任务完成并返回结果。在这里,最终会打印出70
,即66 + 4
的结果。
定时任务示例
要定时执行任务,我们需要重构task模块,添加定时配置:
# tasks.py 文件
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# 每30秒执行一次add任务,参数为4和6sender.add_periodic_task(30.0, add.s(66, 4))
在这里,新增了一个setup_periodic_tasks
函数,它通过@app.on_after_configure.connect
装饰器连接到 Celery 的配置完成后的事件。在这个函数中,使用sender.add_periodic_task
来添加一个定时任务,每隔30
秒执行一次add
任务,参数为66
和4
。add_periodic_task函数表示增加周期性任务,第一个参数以秒为单位,5分钟可以设置为300。第二个参数表示任务,这里是add.s(66, 4)
,其中add
是之前定义好的 Celery 任务函数(被@app.task
装饰过的函数),而.s
是 Celery 的一种语法糖,用于对任务进行签名(signature)操作,它可以固定任务执行时的参数。
- 重新启动Celery工作进程
像之前一样,在终端中运行celery -A tasks worker --loglevel=info
来启动工作进程。同时,你还可以启动一个 Celery 调度器(beat)来管理定时任务。这样,Celery 就会按照配置的时间间隔定期执行add
任务,并且可以在工作进程的日志中看到任务执行的记录。
运行任务过程
首先在命令行输出任务ID:
e8460939-7bff-4541-8843-c22448ba81a6
在redis中有记录:
{"body": "W1s2NiwgNF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "Add two numbers", "id": "020e9130-e22a-4cea-8463-ba95c2a03103", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "020e9130-e22a-4cea-8463-ba95c2a03103", "parent_id": null, "argsrepr": "(66, 4)", "kwargsrepr": "{}", "origin": "gen17869@LAPTOP-F569632U", "ignore_result": false, "replaced_task_nesting": 0, "stamped_headers": null, "stamps": {}}, "properties": {"correlation_id": "020e9130-e22a-4cea-8463-ba95c2a03103", "reply_to": "d40ebae3-1285-35bf-a96f-bdde29d95121", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "d2cd8663-fba8-458f-aee7-c0342ad3253e"}}
body内容是base64编码,对W1s2NiwgNF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==
进行解码,内容如下:
[[66, 4], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]
注意值为66和4的列表。这些是你传递给函数的参数。其他信息,如任务名称、语言等,也可以在上面的JSON中看到。
到目前为止,你所做的就是把数据推入队列,然后序列化。它还没有被消费,因此你需要启动工人。你像下面这样调用工人(如果之前没有启动,现在启动):
celery -A tasks worker --loglevel=INFO
-A 指定应用程序名称,这里是tasks。该名称本身来自任务文件tasks.py的名称。然后告诉你希望调用worker和日志级别。
这时你可以注意到任务Add两个数字以及任务id,它首先被接收并成功执行。注意70这个数字,它是66和4的和。因此,推送到队列中的任务不一定要立即处理。
由于我们也使用后端,所以任务执行的结果存储如下:
[2024-12-30 20:55:56,562: INFO/MainProcess] Task Add two numbers[e8460939-7bff-4541-8843-c22448ba81a6] received
[2024-12-30 20:55:56,566: INFO/ForkPoolWorker-14] Task Add two numbers[e8460939-7bff-4541-8843-c22448ba81a6] succeeded in 0.003371259997948073s: 70
总结
对于那些可以推迟的任务,Celery是很好的选择。其灵活的架构使其可用于多种用途。我只是讨论了它的基本用法。后续我们继续分享,一起学习Celery。