Python Celery快速入门教程

Celery 是一个简单、灵活且可靠的分布式任务队列框架,用于处理大量的异步任务、定时任务等。它允许你将任务发送到消息队列,然后由后台的工作进程(worker)来执行这些任务,并且支持多种消息中间件,如 RabbitMQ、Redis 等。

Celery简介

Celery是一个简单、灵活、可靠的分布式系统,可以处理大量的消息,同时为操作提供维护这样一个系统所需的工具。
在这里插入图片描述

Celery有很多应用场景,典型示例如下:

  • 发送电子邮件:可以将发送电子邮件的任务交给Celery,并向用户显示一个感谢页面,而不是让用户在填写完注册表格后等待。你可能会说,执行电子邮件发送代码不需要花费时间,但是如果电子邮件服务器没有响应,如果将这部分设置为同步,站点访问者将不得不等待,直到超时发生。
  • 图片/其他文件上传任务:现在通过网页上传图片或其他类型的文档是非常常见的。假设希望提供一个工具来上传包含产品图像的产品信息,同时还需要根据要求调整图像大小,并增加与品牌相关的水印,用户在所有这些操作期间等待看起来不太好。他想要的只是看到他的过程已经完成的文本,然后继续前进。你可以创建多个celery任务来实现目标。
  • 计划任务:celery也可以作为一个调度程序,执行周期性任务。

Celery主要概念

Celery基本架构如下图所示:

在这里插入图片描述

生产者:这个应用程序负责推送消息与所有需要的信息。

Broker: 这个模块实际上是作为消息队列服务的,像Redis或RabbitMQ这样的应用程序可以在这里使用。

任务:任务是序列化后在代理中排队的Python函数或任务。然后,任务函数由负责反序列化并执行它的工作程序挑选。默认的序列化格式是JSON,您可以将其更改为msgpack, YAML或pickle。

后端:该组件负责存储函数产生的结果.

环境准备

  • 安装redis

这里为了快速演示,直接适用docker容器:

docker run -d -p 6379:6379 redis
  • 安装依赖

首先安装 Celery 和 Redis(python连接redis):

pip install celery redis

基本示例

  • 创建任务

创建项目(如使用poetry工具),以下是简单Celery任务模块示例:

# tasks.py 文件
from celery import Celery# 创建Celery实例,指定名称和消息中间件(这里是Redis)的URL
app = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y

首先导入Celery类,然后创建一个Celery实例,名称为tasks,并指定broker(消息中间件)为本地 Redis 服务器(redis://localhost:6379/0)。

定义了一个名为add的任务,它是一个被@app.task装饰的函数。这个任务接收两个参数xy,并返回它们的和。

  • 启动Celery工作进程

在终端运行命令启动Celery工作进程:

celery -A tasks worker --loglevel=info

这里-A tasks表示任务模块是tasks.pyworker表示启动工作进程,--loglevel=info设置日志级别为info,这样可以看到任务执行的相关信息。

  • 调用任务

在另一个python脚本(如:main.py)中调用任务:

# main.py
from tasks import add# 异步调用任务
result = add.delay(66, 4)
print("任务已发送,等待结果...")
# 获取任务结果
print("结果:", result.get())

首先从tasks.py模块中导入add任务。

然后使用add.delay(66, 4)异步调用add任务,传递参数664。这会将任务发送到消息队列,由 Celery 工作进程来执行。接着打印出任务已发送的消息,等待任务执行结果。

最后,使用result.get()获取任务的最终结果。当任务还没有执行完成时,get()方法会阻塞,直到任务完成并返回结果。在这里,最终会打印出70,即66 + 4的结果。

定时任务示例

要定时执行任务,我们需要重构task模块,添加定时配置:

# tasks.py 文件
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# 每30秒执行一次add任务,参数为4和6sender.add_periodic_task(30.0, add.s(66, 4))

在这里,新增了一个setup_periodic_tasks函数,它通过@app.on_after_configure.connect装饰器连接到 Celery 的配置完成后的事件。在这个函数中,使用sender.add_periodic_task来添加一个定时任务,每隔30秒执行一次add任务,参数为664。add_periodic_task函数表示增加周期性任务,第一个参数以秒为单位,5分钟可以设置为300。第二个参数表示任务,这里是add.s(66, 4),其中add是之前定义好的 Celery 任务函数(被@app.task装饰过的函数),而.s是 Celery 的一种语法糖,用于对任务进行签名(signature)操作,它可以固定任务执行时的参数。

  • 重新启动Celery工作进程

像之前一样,在终端中运行celery -A tasks worker --loglevel=info来启动工作进程。同时,你还可以启动一个 Celery 调度器(beat)来管理定时任务。这样,Celery 就会按照配置的时间间隔定期执行add任务,并且可以在工作进程的日志中看到任务执行的记录。

运行任务过程

首先在命令行输出任务ID:

e8460939-7bff-4541-8843-c22448ba81a6

在redis中有记录:

{"body": "W1s2NiwgNF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "Add two numbers", "id": "020e9130-e22a-4cea-8463-ba95c2a03103", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "020e9130-e22a-4cea-8463-ba95c2a03103", "parent_id": null, "argsrepr": "(66, 4)", "kwargsrepr": "{}", "origin": "gen17869@LAPTOP-F569632U", "ignore_result": false, "replaced_task_nesting": 0, "stamped_headers": null, "stamps": {}}, "properties": {"correlation_id": "020e9130-e22a-4cea-8463-ba95c2a03103", "reply_to": "d40ebae3-1285-35bf-a96f-bdde29d95121", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "d2cd8663-fba8-458f-aee7-c0342ad3253e"}}

body内容是base64编码,对W1s2NiwgNF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==进行解码,内容如下:

[[66, 4], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]

注意值为66和4的列表。这些是你传递给函数的参数。其他信息,如任务名称、语言等,也可以在上面的JSON中看到。

到目前为止,你所做的就是把数据推入队列,然后序列化。它还没有被消费,因此你需要启动工人。你像下面这样调用工人(如果之前没有启动,现在启动):

celery -A tasks worker --loglevel=INFO

-A 指定应用程序名称,这里是tasks。该名称本身来自任务文件tasks.py的名称。然后告诉你希望调用worker和日志级别。

这时你可以注意到任务Add两个数字以及任务id,它首先被接收并成功执行。注意70这个数字,它是66和4的和。因此,推送到队列中的任务不一定要立即处理。

由于我们也使用后端,所以任务执行的结果存储如下:

[2024-12-30 20:55:56,562: INFO/MainProcess] Task Add two numbers[e8460939-7bff-4541-8843-c22448ba81a6] received
[2024-12-30 20:55:56,566: INFO/ForkPoolWorker-14] Task Add two numbers[e8460939-7bff-4541-8843-c22448ba81a6] succeeded in 0.003371259997948073s: 70

总结

对于那些可以推迟的任务,Celery是很好的选择。其灵活的架构使其可用于多种用途。我只是讨论了它的基本用法。后续我们继续分享,一起学习Celery。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/498184.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity WebGL 部署IIS

Unity WebGL 部署IIS iis添加网站WebGL配置文件WebGL Gzip模式浏览器加载速度优化iis添加网站 第一步在配置好IIS并且添加网站 WebGL配置文件 在web包Build文件夹同级创建web.config文件 web.config文件内容 <?xml version="1.0" encoding="UTF-8"?…

基于西湖大学强化学习课程的笔记

放在前面 课程链接 2024年12月30日 前言&#xff1a;强化学习有原理部分的学习&#xff0c;也有与实践相关的编程部分。我认为实践部分应该是更适合我的&#xff0c;不过原理部分也很重要&#xff0c;我目前是准备先过一过原理。 应该花多少时间学习这部分呢&#xff1f; 但是这…

CannotRetrieveUpdates alert in disconnected OCP 4 cluster解决

环境&#xff1a; Red Hat OpenShift Container Platform (RHOCP) 4 问题&#xff1a; Cluster Version Operator 不断发送警报&#xff0c;表示在受限网络/断开连接的 OCP 4 集群中无法接收更新。 在隔离的 OpenShift 4 集群中看到 CannotRetrieveUpdates 警报&#xff1a; …

Redis--持久化策略(AOF与RDB)

持久化策略&#xff08;AOF与RDB&#xff09; 持久化Redis如何实现数据不丢失&#xff1f;RDB 快照是如何实现的呢&#xff1f;执行时机RDB原理执行快照时&#xff0c;数据能被修改吗&#xff1f; AOF持久化是怎么实现的&#xff1f;AOF原理三种写回策略AOF重写机制 RDB和AOF合…

【数据结构】链表(1):单向链表和单向循环链表

链表 链表是一种经典的数据结构&#xff0c;它通过节点的指针将数据元素有序地链接在一起&#xff0c;在链表中&#xff0c;每个节点存储数据以及指向其他节点的指针&#xff08;或引用&#xff09;。链表具有动态性和灵活性的特点&#xff0c;适用于频繁插入、删除操作的场景…

开源电子书转有声书整合包ebook2audiobookV2.0.0

ebook2audiobook&#xff1a;将电子书转换为有声书的开源项目 项目地址 GitHub - DrewThomasson/ebook2audiobook 整合包下载 更新至v2.0.0 https://pan.quark.cn/s/22956c5559d6 修改:页面已转为中文 项目简介 ebook2audiobook 是一个开源项目&#xff0c;它能够将电子…

NSSCTFpwn刷题

[SWPUCTF 2021 新生赛]nc签到 打开附件里面内容 import osart (( "####!!$$ ))#####!$$ ))(( ####!!$:(( ,####!!$: )).###!!$:##!$:#!!$!# #!$: #$#$ #!$: !!!$:\ "!$: /\ !: /"\ : /"-."-/\\\-."//.-"…

java里classpath都包含哪些范围?

什么是 classpath &#xff1f; classpath 等价于 main/java main/resources 第三方jar包的根目录 「引」SpringBoot中的classpath都包含啥

Docker+Portainer 离线安装

1. Docker安装 步骤一&#xff1a;官网下载 docker 安装包 步骤二&#xff1a;解压安装包; tar -zxvf docker-24.0.6.tgz 步骤三&#xff1a;将解压之后的docker文件移到 /usr/bin目录下; cp docker/* /usr/bin/ 步骤四&#xff1a;将docker注册成系统服务; vim /etc/sy…

#渗透测试#红蓝攻防#红队打点web服务突破口总结01

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

Java:190 基于SSM的药品管理系统

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统的用户分管理员和销售两个角色的权限子模块。 管理员统计药品销售量&#xff0c;可以导出药品出入库记录&#xff0c;管理药品以及报损信息。 销…

Quo Vadis, Anomaly Detection? LLMs and VLMs in the Spotlight 论文阅读

文章信息&#xff1a; 原文链接&#xff1a;https://arxiv.org/abs/2412.18298 Abstract 视频异常检测&#xff08;VAD&#xff09;通过整合大语言模型&#xff08;LLMs&#xff09;和视觉语言模型&#xff08;VLMs&#xff09;取得了显著进展&#xff0c;解决了动态开放世界…

VUE echarts 教程二 折线堆叠图

VUE echarts 教程一 折线图 import * as echarts from echarts;var chartDom document.getElementById(main); var myChart echarts.init(chartDom); var option {title: {text: Stacked Line},tooltip: {trigger: axis},legend: {data: [Email, Union Ads, Video Ads, Dir…

001__VMware软件和ubuntu系统安装(镜像)

[ 基本难度系数 ]:★☆☆☆☆ 一、Vmware软件和Ubuntu系统说明&#xff1a; a、Vmware软件的说明&#xff1a; 官网&#xff1a; 历史版本&#xff1a; 如何下载&#xff1f; b、Ubuntu系统的说明&#xff1a; 4、linux系统的其他版本&#xff1a;红旗(redhat)、dibian、cent…

Flutter中添加全局防护水印的实现

随着版权意识的加强&#xff0c;越来越多的应用开始在应用内部增加各种各样的水印信息&#xff0c;防止核心信息泄露&#xff0c;便于朔源。 效果如下&#xff1a; 在Flutter中增加全局水印的方式&#xff0c;目前有两种实现。 方案一&#xff0c;在native层添加一个遮罩层&a…

FOC控制原理-ADC采样时机

0、文章推荐 SimpleFOC移植STM32&#xff08;五&#xff09;—— 电流采样及其变换_极对数对电流采样的影响-CSDN博客 FOC 电流采样方案对比&#xff08;单电阻/双电阻/三电阻&#xff09; - 知乎 (zhihu.com) FOC中的三种电流采样方式&#xff0c;你真的会选择吗&#xff1f;…

git clone 和 conda 换源

文章目录 git clone 通过 sshconda 创建虚拟环境通过 env.yml 文件conda 换源 git clone 通过 ssh git clone ssh://用户名IP地址:/仓库名字.gitconda 创建虚拟环境通过 env.yml 文件 conda env create -f environment.ymlconda 换源 Step 1 生成 .bashrc 文件在家目录下。…

联邦协作训练大模型的一些研究进展

联邦协作训练大模型的一些研究进展: 架构与框架创新 凝聚联邦学习框架:中科院计算所等团队提出的凝聚联邦学习框架,借助端边云协同,通过桥接样本在线蒸馏协议,组织树状拓扑的算力网,实现不同层级节点间模型无关的协同训练,使各层级可依本地算力训练合适模型,云端最终集…

深度学习blog- 数学基础(全是数学)

矩阵‌&#xff1a;矩阵是一个二维数组&#xff0c;通常由行和列组成&#xff0c;每个元素可以通过行索引和列索引进行访问。 张量‌&#xff1a;张量是一个多维数组的抽象概念&#xff0c;可以具有任意数量的维度。除了标量&#xff08;0D张量&#xff09;、向量&#xff08;…

ARM200~500部署

前提&#xff1a;数据库已经安装好&#xff0c;并且正常运行 1.修改hostname,将里面的AR-A 改为hzx vi /etc/hostname 2.重启网络服务 sudo systemctl restart NetworkManager 3.修改community-admin.service 文件&#xff0c;更改小区名称和IP&#xff0c;并将文件上传到/…