django-celery-beat自动调度异步任务

        Celery是一个简单、灵活且可靠的分布式系统,专门用于处理大量消息的实时任务调度。它支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。Celery不仅支持异步任务(如发送邮件、文件上传、图像处理等耗时操作),还支持定时任务,即需要在特定时间执行的任务。Celery本身不提供消息服务,需要借助RabbitMQ、Redis等消息中间件,本案例使用的是Redis。

        Celery Beat则是Celery的一个组件,专门用于处理定时任务调度。它包含一个调度器,负责根据配置的时间表计划任务的执行。这些任务通常是Celery任务,即异步执行的函数或方法。Celery Beat将计划的任务发送到Celery任务队列,由Celery Worker处理并执行队列中的任务。此外,Celery Beat还支持任务的持久性,即使在系统重启后也能够保持已计划的周期性任务。

开发环境:Python3 + MySQL + Redis  + PyCharm专业版

一、创建Django项目

参考 Python框架Django入门教程-CSDN博客 前三步,

二、安装celery、mysql、redis等依赖包

eventlet 是一个python协程模板,celery 4版本以上在windows环境进行测试需要安装此依赖

django_celery_results 是任务执行结果的依赖

mysqlclient
redis
celery
eventlet
django-celery-beat
django_celery_results

三、初始化Celery数据库

打开PyCharm的终端,执行以下命令

python manage.py makemigrations
python manage.py migrate

打开数据库查看,执行命令后自动创建了一些表

django_celery_beat_clockedschedule  # 以指定时间执行任务,例如:2024-05-22 09:22:10
django_celery_beat_crontabschedule  # 以crontab格式时间执行任务,某月某天星期几某时某分
django_celery_beat_intervalschedule  # 以间隔时间执行任务,例如:每5秒、每2小时
django_celery_beat_periodictask  # 存储要执行的任务。
django_celery_beat_periodictasks  # 索引和跟踪任务更改状态
django_celery_beat_solarschedule  # 以天文时间执行任务,例如:日出、日落
django_celery_results_chordcounter  # 存储Celery的chord任务的状态
django_celery_results_groupresult  # 存储Celery的group任务的结果
django_celery_results_taskresult  # 存储Celery任务的执行结果

四、配置Celery

修改(注意是修改,不是添加!!!)settings.py,找到 INSTALLED_APPS变量(约31行),将celery注册到django的应用管理中

在settings.py末尾添加celery配置:

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # 结果存储也使用Redis
# 配置 celery 定时任务使用的调度器,使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 配置celery自动存储任务执行结果
CELERY_RESULT_BACKEND = 'django_celery_results.backends:DatabaseBackend'
CELERY_TIMEZONE = 'Asia/Shanghai'  # 设置时区
# 是否启用UTC
CELERY_ENABLE_UTC = False
# 是否开启时间感知
DJANGO_CELERY_BEAT_TZ_AWARE = False

在settings.py同级目录下创建celery.py,然后添加以下内容:

import osfrom celery import Celery
from django.conf import settings# djangoDemo是项目名,大家根据自己的情况进行替换!!!
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoDemo.settings')
# djangoDemo是项目名,大家根据自己的情况进行替换!!!
app = Celery('djangoDemo')
# 从django的设置中读取配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现app下的任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)@app.task(bind=True)
def debug_task(self):print(f"Request: {self.request!r}")

五、创建应用模块,进行测试

打开PyCharm终端执行命令,创建应用模块,这里命名为celeryapp,将新的应用模块注册到django的应用管理中

python manage.py startapp celeryapp

 

在新的应用模块中创建tasks.py,添加异步函数,文件名必须是tasks.py,否则后面启动Celery的时候监听不到

@shared_task
def task_one():print("------------------------- 000 <<<")# 业务逻辑...print("------------------------- 111 <<<")return "222"    

在新的应用模块中修改views.py,添加一个测试接口。这里说一个坑:在settings.py中配置了TIME_ZONE = 'Asia/Shanghai' 和  USE_TZ = True 之后,通过datetime.now()获取的是亚洲上海时间,但是把这个时间存到数据库,就会自动减少8小时,变成了UTC时间,就很无语,于是我把USE_TZ的值改为False,发现存到数据库中的时间变成正常的亚洲上海时间。然而Celery定时执行任务的时区是UTC,经过多次测试,配置了CELERY_TIMEZONE等时区相关的配置,发现好像并没什么用,Celery定时执行任务的时区依然是UTC,无奈只能把任务执行的时间减少8小时

import json
from datetime import datetime, timedeltafrom django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django_celery_beat.models import ClockedSchedule, PeriodicTaskdef celery_test(request):# 任务名task_name = 'celery_test'# 任务执行的时间,设置为下一分钟的10秒,celery执行时间的时区是UTC,要保证数据库中的时间是UTC时间,这里获取的是亚洲上海的时间,所以减了8小时time = (datetime.now() - timedelta(hours=8) + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:10")# 创建任务的执行时间clock = ClockedSchedule.objects.get_or_create(clocked_time=time)# 创建指定时间执行的celery任务PeriodicTask.objects.update_or_create(name=task_name,  # 任务名,尽量保证唯一性,若该任务已存在,则更新该任务task='celeryapp.tasks.task_one',  # 要执行的异步函数的全路径defaults={'clocked': clock[0],  # 使用clocked在指定时间执行该任务'one_off': True,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),  # 参数列表,必须是json格式的数组})return JsonResponse({'message': '200'})

修改urls.py,在urlpatterns中添加路由

from celeryapp import viewsurlpatterns = [# ......path('celery/test/', views.celery_test),
]

六、启动项目进行测试

先启动Django项目,然后在分别两个终端中执行命令启动Celery和Celery Beat,命令中的djangoDemo是项目名,大家根据自己的情况进行替换

celery -A djangoDemo worker -P eventlet -l info  # 启动worker监听异步任务
celery -A djangoDemo beat -l info  # 启动beat任务调度器

这里的woker已经监听到我们创建的celeryapp.tasks.task_one异步函数了

这里说明beat启动成功了:

浏览器输入请求地址:http://127.0.0.1:8000/celery/test/ ,创建Celery任务

接口请求成功后查看数据库,django_celery_beat_periodictask表新增了一条异步任务,其中的clocked_id字段指向django_celery_beat_clockedschedule表,该表新增了一条任务的执行时间。celery.backend_cleanup是Celery自动创建的,不用管

在到达任务执行时间后观察woker和beat的终端日志

查看beat终端日志,红框第一行是我们发送请求成功创建异步任务之后,CeleryBeat已经检测到数据库中有任务发生变化(CeleryBeat每5秒检测一次,使用debug级日志可查看到);第二行CeleryBeat将一个名为celery_test的任务发送给worker,让woker执行celeryapp.tasks.task_one异步函数,消费该任务

在woker终端的日志中可以看到任务执行的结果:

七、 Celery Beat常用的三种时间控制器

clockedSchedule:指定某个时间执行任务,例如:2024-05-22 09:22:10,对应的表是django_celery_beat_clockedschedule,该表仅有id和clocked_time两个字段

crontabSchedule:指定crontab格式的时间执行任务,某月某天星期几某时某分,与linux的定时任务规则一致,可参考Linux定时任务-CSDN博客,对应的表是django_celery_beat_crontabschedule,该表有7个字段

  `id` `minute` 分钟`hour` 小时`day_of_week`  星期几`day_of_month`  每月的哪些天`month_of_year`  每年的哪些月份`timezone`  时区

intervalSchedule:间隔指定时间执行任务,对应的表是django_celery_beat_intervalschedule,该表有3个字段id、every(间隔时长)、period(时间单位,可选时、分、秒、微秒、天)

代码示例,clockedSchedule上面已经演示过了,这里只演示另外两种:

# crontabSchedule
def celery_test2(request):task_name = 'celery_test2'crontab = CrontabSchedule.objects.get_or_create(minute='*/1',  # 每1分钟hour='*',  # 每小时day_of_week='*',  # 一周中的哪几天,*表示每天day_of_month='*',  # 月份中的哪一天,*表示每一天month_of_year='*',  # 年中的哪一月,*表示每个月timezone='Asia/Shanghai')PeriodicTask.objects.update_or_create(name=task_name,task='celeryapp.tasks.task_one',  # Celery任务的全路径defaults={'crontab': crontab[0],  # 使用crontab格式时间执行该任务'one_off': False,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),})return JsonResponse({'message': '200'})# intervalSchedule
def celery_test3(request):task_name = 'celery_test3'interval = IntervalSchedule.objects.get_or_create(every=1,  # 间隔时间period=IntervalSchedule.MINUTES,  # 周期单位,这里是分钟)PeriodicTask.objects.update_or_create(name=task_name,task='celeryapp.tasks.task_one',  # Celery任务的全路径defaults={'interval': interval[0],  # 使用interval间隔指定时间执行该任务'one_off': False,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),})return JsonResponse({'message': '200'})

其实只要创建不同的时间控制器,然后在创建任务的时候作为参数放进去即可,注意一个任务只能使用一种时间控制器

参考文献:

https://blog.csdn.net/wuwei_201/article/details/129650089

https://blog.51cto.com/u_15703497/6252757

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/330139.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习之基于Tensorflow卷积神经网络(CNN)实现猫狗识别

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景与意义 在人工智能和深度学习的热潮中&#xff0c;图像识别是一个备受关注的领域。猫狗识别作为图像识…

记录Python低代码开发框架zdppy_amcrud的开发过程

实现新增接口 基础代码 import env import mcrud import api import snowflakeenv.load(".env") db mcrud.new_env()table "user" columns ["name", "age"]async def add_user(req):data await api.req.get_json(req)values [d…

跟TED演讲学英文:Do schools kill creativity by Sir Ken Robinson

Do schools kill creativity? Link: https://www.ted.com/talks/sir_ken_robinson_do_schools_kill_creativity Speaker: Sir Ken Robinson Date: February 2006 文章目录 Do schools kill creativity?IntroductionVocabularySummaryTranscriptAfterword Introduction Sir…

[AI Google] 10个即将到来的Android生态系统更新

新的体验带来了更强的防盗保护、手表电池寿命优化&#xff0c;以及对电视、汽车等的娱乐功能改进。 昨天&#xff0c;我们分享了Android如何以人工智能为核心重新构想智能手机。今天&#xff0c;我们推出了Android 15的第二个测试版&#xff0c;并分享了更多我们改进操作系统的…

webSocket+Node+Js实现在线聊天(包含所有代码)

这篇文章主要介绍了如何使用 webSocket、Node 和 Js 实现在线聊天功能。 重要亮点 &#x1f4bb; 技术选型&#xff1a;使用 Node.js 搭建服务器&#xff0c;利用 Express 框架和 Socket.io 库实现 WebSocket 通信。 &#x1f4c4; 实现思路&#xff1a;通过建立数组存储聊天…

Linux 软件包管理器 yum的下载、功能介绍及使用

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;青果大战linux 总有光环在陨落&#xff0c;总有新星在闪烁 Linux下的三种软件安装方…

项目如何有效做资源管理?易趋项目管理软件让资源管理可视化

在项目管理的过程中&#xff0c;有效的资源管理能够确保资源得到合理的分配和使用&#xff0c;避免资源的浪费和冗余&#xff0c;进而提高整体工作效率、确保项目的成功&#xff1b;同时降低组织的运营成本。 但在项目推进过程中&#xff0c;项目经理总会面临各种资源管理的难…

MyBatis从入门到“入土“

&#x1f495;喜欢的朋友可以关注一下&#xff0c;下次更新不迷路&#xff01;&#x1f495;(●◡●) 目录 一、Mybatis为何物&#xff1f;&#x1f44c; 二、快速入门&#x1f923; 1、新建项目&#x1f60a; 2、数据库建表&#x1f60a; 3、导入依赖的jar包&#x1f60a;…

vue3 路由跳转 携带参数

实现功能&#xff1a;页面A 跳转到 页面B&#xff0c;携带参数 路由router.ts import { createRouter, createWebHistory } from "vue-router";const routes: RouteRecordRaw[] [{path: "/demo/a",name: "aa",component: () > import(&quo…

【软件设计师】2018年的上午题总结

2018 2018上半年2018下半年 2018上半年 1.小阶向大阶对齐 2.吞吐率是最长流水段操作时间的倒数 3.ssh的端口号是22 4.s所发送的信息使用s的私钥进行数字签名&#xff0c;t收到后使用s的公钥验证消息的真实性 5.数据流分析是被动攻击方式 6.《计算机软件保护条例》是国务院颁布…

226.翻转二叉树

翻转一棵二叉树。 思路&#xff1a; 指针做交换 用递归&#xff08;前序or后序&#xff0c;中序不行&#xff09; 前序&#xff1a;中左右 遍历到“中”的时候&#xff0c;交换它的左右孩子 然后分别对它的左孩子和右孩子使用“交换函数”&#xff08;定义的&#xff09;&a…

【论文阅读】使用深度学习及格子玻尔兹曼模拟对SEM图像表征粘土结构及其对储层的影响

文章目录 0、论文基本信息1、深度学习2、可运行程序—Matlab3、深度切片3、LBM模拟4、局限性 0、论文基本信息 论文标题&#xff1a;Characterizing clay textures and their impact on the reservoir using deep learning and Lattice-Boltzmann simulation applied to SEM i…

Python-温故知新

1快速打开.ipynb文件 安装好anaconda后&#xff0c;在需要打开notebook的文件夹中&#xff0c; shift键右键——打开powershell窗口——输入jupyter notebook 即可在该文件夹中打开notebook的页面&#xff1a; 2 快速查看函数用法 光标放在函数上——shift键tab 3...

CGAN|生成手势图像|可控制生成

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f366; 参考文章&#xff1a;TensorFlow入门实战&#xff5c;第3周&#xff1a;天气识别&#x1f356; 原作者&#xff1a;K同学啊|接辅导、项目定制 CGAN&#xff08;条件生成对抗网络&#xf…

【Crypto】Rabbit

文章目录 一、Rabbit解题感悟 一、Rabbit 题目提示很明显是Rabbit加密&#xff0c;直接解 小小flag&#xff0c;拿下&#xff01; 解题感悟 提示的太明显了

二分查找

题目链接 题目: 分析: 如果按照从头到尾的顺序一次比较, 每次只能舍弃一个元素, 效率是非常低的, 而且没有用到题目的要求, 数组是有序的因为数组是有序的, 所以如果我们随便找到一个位置, 和目标元素进行比较, 如果大于目标元素, 说明该位置的右侧元素都比目标元素大, 都可…

一键恢复安卓手机数据:3个快速简便的解决方案!

安卓手机作为我们不可或缺的数字伙伴&#xff0c;承载着大量珍贵的个人和工作数据。然而&#xff0c;随着我们在手机上进行各种操作&#xff0c;不可避免地会遇到一些令人头痛的问题&#xff0c;比如意外删除文件、系统故障或其他不可预见的情况&#xff0c;导致重要数据的丢失…

汽车生产线中的工业机器人应用HT3S-PNS-ECS(EtherCAT/Profinet)协议转换通讯方案案例分析

汽车生产线中的工业机器人应用HT3S-PNS-ECS(EtherCAT/Profinet)协议转换通讯方案案例分析 ——北京中科易联科技有限公司供稿—— 一、摘要 随着工业自动化的快速发展&#xff0c;汽车生产线对工业机器人的依赖日益增加。HT3S-PNS-ECS作为工业机器人中的关键组件&#xff0c;其…

OpenFeign快速入门 替代RestTemplate

1.引入依赖 <!--openFeign--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><!--负载均衡器--><dependency><groupId>org.spr…

重学java 38.创建线程的方式⭐

It is during our darkest moments that we must focus to see the light —— 24.5.24 一、第一种方式_继承extends Thread方法 1.定义一个类,继承Thread 2.重写run方法,在run方法中设置线程任务(所谓的线程任务指的是此线程要干的具体的事儿,具体执行的代码) 3.创建自定义线程…