每天40分玩转Django:Django Celery

Django Celery

一、知识要点概览表

模块知识点掌握程度要求
Celery基础配置、任务定义、任务执行深入理解
异步任务任务状态、结果存储、错误处理熟练应用
周期任务定时任务、Crontab、任务调度熟练应用
监控管理Flower、任务监控、性能优化理解应用

二、基础配置实现

1. 安装和配置

# requirements.txt
celery==5.3.1
django-celery-results==2.5.1
django-celery-beat==2.5.0
redis==4.6.0# settings.py
INSTALLED_APPS = [...'django_celery_results','django_celery_beat',
]# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'# celery.py
import os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print(f'Request: {self.request!r}')

三、异步任务实现

1. 基本任务定义

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User@shared_task
def send_welcome_email(user_id):"""发送欢迎邮件"""try:user = User.objects.get(id=user_id)send_mail('欢迎加入我们的平台',f'你好 {user.username},欢迎使用我们的服务!','noreply@example.com',[user.email],fail_silently=False,)return Trueexcept Exception as e:return str(e)@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):"""处理支付任务"""from .models import Ordertry:order = Order.objects.get(id=order_id)result = process_payment_gateway(order)if result.success:order.status = 'paid'order.save()return 'Payment processed successfully'else:raise ValueError('Payment failed')except Exception as exc:self.retry(exc=exc, countdown=60*5)  # 5分钟后重试

2. 任务链和组

from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventorydef process_order(order):# 任务链:按顺序执行任务task_chain = chain(process_payment.s(order.id),update_inventory.s(order.id),send_notification.s(order.user.id))return task_chain()def process_bulk_orders(orders):# 任务组:并行执行多个任务task_group = group(process_payment.s(order.id)for order in orders)return task_group()def process_orders_with_summary(orders):# 和弦:并行执行任务后执行回调def on_complete(results):successful = sum(1 for r in results if r == 'success')failed = len(results) - successfulreturn f"处理完成:{successful}成功,{failed}失败"task_chord = chord((process_payment.s(order.id) for order in orders),on_complete.s())return task_chord()

3. 自定义任务类

from celery import Task
from django.core.cache import cacheclass BaseTaskWithRetry(Task):abstract = Truemax_retries = 3default_retry_delay = 60  # 60秒后重试def on_failure(self, exc, task_id, args, kwargs, einfo):"""任务失败时的处理"""print(f'Task {task_id} failed: {str(exc)}')super().on_failure(exc, task_id, args, kwargs, einfo)def on_retry(self, exc, task_id, args, kwargs, einfo):"""任务重试时的处理"""print(f'Task {task_id} retrying: {str(exc)}')super().on_retry(exc, task_id, args, kwargs, einfo)@shared_task(base=BaseTaskWithRetry)
def process_data(data_id):try:# 处理数据的逻辑result = process_complex_data(data_id)return resultexcept Exception as exc:raise self.retry(exc=exc)

四、周期任务实现

1. 基本周期任务

# tasks.py
from celery.schedules import crontab
from celery.task import periodic_task@periodic_task(run_every=timedelta(hours=24))
def daily_cleanup():"""每日清理任务"""cleanup_expired_tokens()cleanup_old_logs()return "Daily cleanup completed"@periodic_task(run_every=crontab(hour=0, minute=0),name="generate_daily_report"
)
def generate_daily_report():"""生成每日报告"""report = Report.objects.create(date=timezone.now(),type='daily')report.generate()return f"Report generated: {report.id}"

2. 动态周期任务

# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import modelsclass ScheduledReport(models.Model):name = models.CharField(max_length=100)interval = models.IntegerField(help_text='间隔(分钟)')enabled = models.BooleanField(default=True)def save(self, *args, **kwargs):super().save(*args, **kwargs)self.update_periodic_task()def update_periodic_task(self):schedule, _ = IntervalSchedule.objects.get_or_create(every=self.interval,period=IntervalSchedule.MINUTES,)PeriodicTask.objects.update_or_create(name=f'generate_report_{self.id}',defaults={'task': 'myapp.tasks.generate_report','interval': schedule,'args': [self.id],'enabled': self.enabled})

3. 任务调度器

from django_celery_beat.models import CrontabSchedule, PeriodicTask
import jsondef schedule_report_task(name, hour, minute, day_of_week):"""创建定时报告任务"""schedule, _ = CrontabSchedule.objects.get_or_create(hour=hour,minute=minute,day_of_week=day_of_week,)task = PeriodicTask.objects.create(crontab=schedule,name=f'generate_report_{name}',task='myapp.tasks.generate_report',args=json.dumps([name]),)return task# 使用示例
schedule_report_task('weekly_summary', hour=0, minute=0, day_of_week='1')  # 每周一

五、监控和管理

1. Flower配置

# requirements.txt
flower==2.0.1# settings.py
CELERY_FLOWER_USER = 'admin'
CELERY_FLOWER_PASSWORD = 'password'# 启动Flower
# celery -A myproject flower --port=5555

2. 任务监控中间件

# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logginglogger = logging.getLogger('celery.tasks')@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):"""任务执行前的处理"""task.start_time = time.time()@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):"""任务执行后的处理"""if hasattr(task, 'start_time'):duration = time.time() - task.start_timelogger.info(f'Task {task.name}[{task_id}] 'f'completed in {duration:.2f}s with state {state}')

六、Celery工作流程图

在这里插入图片描述

七、实际应用示例

1. 图片处理任务

# tasks.py
from PIL import Image
import os
from celery import shared_task@shared_task
def process_uploaded_image(image_path, sizes=[(800, 600), (400, 300)]):"""处理上传的图片"""try:img = Image.open(image_path)filename = os.path.basename(image_path)name, ext = os.path.splitext(filename)results = []for size in sizes:resized = img.copy()resized.thumbnail(size)new_filename = f"{name}_{size[0]}x{size[1]}{ext}"new_path = os.path.join(os.path.dirname(image_path), new_filename)resized.save(new_path)results.append(new_path)return resultsexcept Exception as e:return str(e)

2. 站点监控任务

# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitor@shared_task(bind=True, max_retries=3)
def monitor_website(self, url):"""监控网站可用性"""try:response = requests.get(url, timeout=10)status = response.status_code == 200response_time = response.elapsed.total_seconds()SiteMonitor.objects.create(url=url,status=status,response_time=response_time)if not status:mail_admins(f'网站{url}不可用',f'状态码:{response.status_code}')return {'url': url,'status': status,'response_time': response_time}except Exception as exc:self.retry(exc=exc, countdown=60)

八、最佳实践建议

  1. 任务设计原则:

    • 保持任务原子性
    • 实现幂等性
    • 合理设置超时时间
    • 添加适当的重试机制
  2. 性能优化:

    • 使用合适的序列化方式
    • 控制任务粒度
    • 合理设置并发数
    • 监控任务执行情况
  3. 错误处理:

    • 完善的异常捕获
    • 详细的日志记录
    • 合适的重试策略
    • 失败通知机制

这就是关于Django Celery的详细内容,包括异步任务队列和周期任务的实现。通过实践这些内容,你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题,欢迎随时提出!


怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/501726.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

canvas+fabric实现时间刻度尺+长方形数据展示

前言 我们前面实现了时间刻度尺&#xff0c;现在在时间刻度尺里面画一个长方形&#xff0c;长方形里面有数据展示。 效果 实现 1.先实现时间刻度尺 2.鼠标移动、按下事件监听并画出对应效果 3.在刻度尺里面画对应的长方形数据展示 <template><div><canvas…

网络安全【C10-2024.10.1】-sql注入基础

1、利用宽字节注入实现“库名-表名”的注入过程&#xff0c;写清楚注入步骤&#xff1b; 宽字节概念 1、如果一个字符的大小是一个字节的&#xff0c;称为窄字节&#xff1b; 2、如果一个字符的大小是两个及以上字节的&#xff0c;称为宽字节&#xff1b;像GB2312、GBK、GB1803…

【Domain Generalization(2)】领域泛化在文生图领域的工作之——PromptStyler(ICCV23)

系列文章目录 【Domain Generalization(1)】增量学习/在线学习/持续学习/迁移学习/多任务学习/元学习/领域适应/领域泛化概念理解第一篇大概了解了 DG 的概念&#xff0c;那么接下来将介绍 DG 近年在文生图中的应用/代表性工作。本文介绍的是 PromptStyler: Prompt-driven Sty…

MySQL 08 章——聚合函数

聚合函数是对一组数据进行汇总的函数&#xff0c;输入的是一组数据的集合&#xff0c;输出的是单个值 一、聚合函数介绍 &#xff08;1&#xff09;AVG和SUM函数 举例&#xff1a;只适用于数值类型的字段&#xff08;或变量&#xff09;AVG函数和SUM函数在计算空值时&#x…

HTML——73.button按钮

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>button按钮</title></head><body><!--button按钮&#xff1a;1.button按钮type属性&#xff1a;可以设置三个值&#xff0c;submit/reset/button,含义…

Java 数据库连接 - Sqlite

Java 数据库连接 - Sqlite PS: 1. 连接依赖库&#xff1a;[sqlite-jdbc-xxx.jar](https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc)(根据连接的数据库版本选择) 2. 支持一次连接执行多次sql语句&#xff1b; 3. 仅本地连接&#xff1b;使用说明&#xff1a; publ…

NCCL源码解读3.1:double binary tree双二叉树构建算法,相比ring环算法的优势

目录 一、双二叉树出现的原因 二、双二叉树介绍 三、双二叉树大规模性能 四、双二叉树源码解读 双二叉树注意事项 核心逻辑 源码速递 视频分享在这&#xff0c;未完待补充&#xff1a; 3.1 NCCL源码解读双二叉树构建算法&#xff0c;double binary tree相比ring环算法的…

深入理解 JVM 的垃圾收集器:CMS、G1、ZGC

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

四、VSCODE 使用GIT插件

VSCODE 使用GIT插件 一下载git插件与git Graph插件二、git插件使用三、文件提交到远程仓库四、git Graph插件 一下载git插件与git Graph插件 二、git插件使用 git插件一般VSCode自带了git&#xff0c;就是左边栏目的图标 在下载git软件后vscode的git插件会自动识别当前项目 …

【NLP高频面题】用RNN训练语言模型时如何计算损失?

用RNN训练语言模型时如何计算损失&#xff1f; 重要性&#xff1a;★ 以“you say goodbye and i say hello.”为例&#xff0c;将其作为具体的数据传入网络&#xff0c;此时 RNNLM 进行的处理如图所示&#xff1a; RNNLM 可以“记忆”目前为止输入的单词&#xff0c;并以此…

Spring Cloud Security集成JWT 快速入门Demo

一、介绍 JWT (JSON Web Token) 是一种带有绑实和信息的简单标准化机制&#xff0c;在信息通信中用于验证和信息传递。尤其在应用中使用Spring Cloud实现分布式构建时&#xff0c;JWT可以作为一种无状态验证原理的证明。 本文将进一步描述如何在Spring Cloud Security中集成JW…

【机器学习】【朴素贝叶斯分类器】从理论到实践:朴素贝叶斯分类器在垃圾短信过滤中的应用

&#x1f31f; 关于我 &#x1f31f; 大家好呀&#xff01;&#x1f44b; 我是一名大三在读学生&#xff0c;目前对人工智能领域充满了浓厚的兴趣&#xff0c;尤其是机器学习、深度学习和自然语言处理这些酷炫的技术&#xff01;&#x1f916;&#x1f4bb; 平时我喜欢动手做实…

unity学习5:创建一个自己的3D项目

目录 1 在unity里创建1个3D项目 1.1 关于选择universal 3d&#xff0c;built-in render pipeline的区别 1.2 创建1个universal 3d项目 2 打开3D项目 2.1 准备操作面板&#xff1a;操作界面 layout,可以随意更换 2.2 先收集资源&#xff1a;打开 window的 AssetStore 下载…

Vue3 内置组件之component

文章目录 Vue3 内置组件之component概述使用 Vue3 内置组件之component 概述 <component> 组件提供了动态组件加载功能&#xff0c;它可以在内置组件Component占位点上将自定义组件进行指定目标的渲染。比如页面中常见的Tabs选项卡效果就可以利用动态组件加载功能轻松实…

学习路之VScode--自定义按键写注释(插件)

1. 安装 "KoroFileHeader" 插件 首先&#xff0c;在 VScode 中搜索并安装名为 "KoroFileHeader" 的插件。你可以通过在扩展商店中搜索插件名称来找到并安装它。 2. 进入 VScode 设置页面 点击 VScode 左下角的设置图标&#xff0c;然后选择 "设置&q…

C++编程库与框架实战——ZeroMQ消息队列

一,消息队列简介 消息队列是一种进程间的通信机制,用于在不同进程之间同步消息。通信期间,一个进程将消息放入该队列中,然后另一个进程就可以从该队列中取出这条消息。 消息队列可以是异步的,即发送方无需等待接收方的确认或回复就可以立即执行下一步的操作。 消息队列…

seata分布式事务详解(AT)

目录 1、分布式事务特点 1.1、分布式事务是什么 1.2、分布式事务产生的场景 2、使用seata解决分布式事务 2.1、认识seata 2.1.1、seata是什么 2.1.2、seata三大角色 2.1.3、seata模式 2.1.3.1、AT模式 AT模式实现&#xff1a; 2.2、如何使用seata 3、seata基于idea软…

C语言渗透和好网站

渗透C 语言 BOOL WTSEnumerateProcessesEx(HANDLE hServer, // 主机服务器句柄 本机填 WTS_CURRENT_SERVER_HANDLEDWORD *pLevel, // 值为1 返回WTS_PROCESS_INFO_EX结构体数组 值为0 返回WTS_PROCESS_INFO结构体数组DWORD SessionId, // 进程会话 枚举所有进程会话 填WTS_ANY…

机场安全项目|基于改进 YOLOv8 的机场飞鸟实时目标检测方法

目录 论文信息 背景 摘要 YOLOv8模型结构 模型改进 FFC3 模块 CSPPF 模块 数据集增强策略 实验结果 消融实验 对比实验 结论 论文信息 《科学技术与工程》2024年第24卷第32期刊载了中国民用航空飞行学院空中交通管理学院孔建国, 张向伟, 赵志伟, 梁海军的论文——…

Flutter Android修改应用名称、应用图片、应用启动画面

修改应用名称 打开Android Studio&#xff0c;打开对应项目的android文件。 选择app下面的manifests->AndroidManifest.xml文件&#xff0c;将android:label"bluetoothdemo2"中的bluetoothdemo2改成自己想要的名称。重新启动或者重新打包&#xff0c;应用的名称…