Django框架-使用celery(一):django使用celery的通用配置,不受版本影响

目录

一、依赖包情况

二、项目目录结构

   2.1、怎么将django的应用创建到apps包

三、celery的配置

2.1、celery_task/celery.py

2.2、celery_task/async_task.py

2.3、celery_task/scheduler_task.py

2.4、utils/check_task.py

四、apps/user中配置相关处理视图

4.1、基本配置

4.2、user的models

4.3、user的视图函数

五、调用函数测试

5.1、启动项目

5.2、测试项目:Postman接口工具

六、报错


一、依赖包情况

python==3.9.0

django==3.2.0

celery==5.3.1

django-redis==5.3.0

eventlet==0.33.3  #windows系统需要使用到

注意:还需要在系统中安装好redis数据库,不然无法使用。

二、项目目录结构

                 

   2.1、怎么将django的应用创建到apps包

·1、创建user应用

cd apps
python ../manage.py startapp user

   2、修改user包下的apps.py模块

from django.apps import AppConfigclass UserConfig(AppConfig):default_auto_field = 'django.db.models.BigAutoField'#原来是 name = 'user',改成下面的name = 'apps.user'

3、注册到settings.py文件中

INSTALLED_APPS = ['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','apps.user.apps.UserConfig', #注册user应用,from apps.user.apps import UserConfig 
]

三、celery的配置

概述:celery应用程序和任务都放到celery_task包中,其中celery.py是创建Celery的实例对象的,async_task.py用来写异步任务,scheduler_task.py用来写定时任务的。utils/check_task.py用来检测任务id是否结束并获取任务的返回值的。

2.1、celery_task/celery.py

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0'  # 无密码# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'#包含任务的所有模块的导入路径:
task_module = ['celery_task.async_task', #写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task', #写任务模块导入路径,该模块主要写定时任务的方法
]# 生成celery对象,'task'相当于key,用于区分celery对象
# broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=task_module)# 配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务配置
app.conf.beat_schedule = {# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},#名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',# 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  #定时任务需要的参数},#缓存用户数据到cache中'cache-user-func':{'task':'celery_task.scheduler_task.cache_user_func',#导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule':timedelta(minutes=1),#每1分钟执行一次,将用户消息缓存到cache中}
}'''
配置:也可以使用下面这种方式:
app.conf.update(task_serializer='json',accept_content=['json'],  # Ignore other contentresult_serializer='json',timezone='Asia/Shanghai',enable_utc=False,
)
'''

2.2、celery_task/async_task.py

# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task
'''#没有返回值,禁用掉结果后端
@app.task
def send_email_task(email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 启用线程发送邮件,此处最好加线程池t = threading.Thread(target=send_mail,args=("登录前获取的验证码",  # 邮件标题'点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址[email],  # 接收邮件的邮件地址,可以写多个),# html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中kwargs={'html_message': f"<p></p> <p>验证码:{code}</p>"})t.start()return {'email':email,'code':code}

2.3、celery_task/scheduler_task.py

# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict#有返回值,返回值可以从结果后端中获取
@app.task
def add_func(a,b):print('执行了加法函数')cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})return a+b#不需要返回值,禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():user = user_models.UserModel.objects.all()user_dict = {}for obj in user:user_dict[obj.account] = model_to_dict(obj)cache.set('all-user-data',user_dict,timeout=35*60)

2.4、utils/check_task.py

from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY   :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=app)dic = {'type':result.status,'msg':'','data':'','code':400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg']='任务重新尝试执行'elif result.status =='FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic

四、apps/user中配置相关处理视图

4.1、基本配置

1、study_celery/settings.py

#cache缓存
CACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://127.0.0.1:6379","OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient","CONNECTION_POOL_KWARGS": {"max_connections": 100}# "PASSWORD": "123",},'TIMEOUT':30*60 #缓存过期时间}
}#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '2414155342@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'qq邮箱的授权码'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '<'xxxxx@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True

2、study_celery/urls.py

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('user/',include('apps.user.urls'))
]

3、apps/user/urls.py

from django.urls import path
from . import viewsurlpatterns = []

4.2、user的models

apps/user/models.py

from django.db import models
# Create your models here.class UserModel(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=32)account = models.CharField(max_length=64)password = models.CharField(max_length=256)email = models.EmailField()

执行数据库迁移命令

python manage.py makemigrations

python manage.py migrate

4.3、user的视图函数

1、apps/user/views.py

from django.contrib.auth.hashers import make_password, check_password
from django.views import View
from django.http import JsonResponse
from . import models
from django.core.cache import cache
import time
from celery_task.async_task import send_email_task
# Create your views here.class ResgiterView(View):#注册用户def post(self,request):name = request.POST.get('name')account = request.POST.get('account')password = request.POST.get('password')email = request.POST.get('email')obj = models.UserModel.objects.filter(account=account).first()if obj:return JsonResponse({'code':400,'msg':'账户已经存在了'})password = make_password(password)instance = models.UserModel.objects.create(name=name,account=account,password=password,email=email)return JsonResponse({'code':200,'msg':'注册用户成功'})class LoginView(View):#用户登录def post(self,request):account = request.POST.get('account')password = request.POST.get('password')code = request.POST.get('code') #验证码email_code = cache.get(f'email_{account}')#发给邮箱的验证码.get(f'email_{account})print(code,email_code)if code and email_code:if code != email_code:return JsonResponse({'code':400,'msg':'验证码错误'})else:return JsonResponse({'code':400,'msg':'请先点击发送邮件获取验证码'})obj = models.UserModel.objects.filter(account=account).first()if not obj:return JsonResponse({'code':400,'msg':'当前用户不存在'})pwd_true = check_password(password,obj.password)response = JsonResponse({'code':200,'msg':'登录成功'})if pwd_true:return responseelse:return JsonResponse({'code':400,'msg':'用户名或密码错误'})class LoginSendEmailView(View):#用户登录前,需要验证码,发送验证码给用户的邮箱def post(self,request):account = request.POST.get('account')email = request.POST.get('email')code = str(time.time())[-5:]cache.set(f'email_{account}',code)print(cache.get(f'email_{account}'))res = send_email_task.delay(email,code)task_id = res.idprint('验证码是',code)return JsonResponse({'code':200,'msg':f'请查看{email}邮箱中是否收到邮件','task_id':task_id})class AllUserDataView(View):#查询cache_user_func定时任务执行时存到cache中的用户数据def get(self,request):key = 'all-user-data'data = cache.get(key)if data:return JsonResponse({'code':200,'data':data})else:return JsonResponse({'code':400,'msg':'没有相关数据'})class AddFuncDataView(View):#查询add_func 定时任务执行时存到cache中的数据def get(self, request):data = cache.get('add_ret')print(data, type(data))return JsonResponse({'code': 200, 'data': data})class UserTaskIdGetDataView(View):def get(self,request):from utils.check_task import check_task_status#检查任务是否成功了,获取任务的返回值task_id = request.GET.get('task_id')if not task_id:return JsonResponse({'code':400,'msg':'没有携带任务id'})ret = check_task_status(task_id)return JsonResponse(ret)

2、apps/user/urls.py

from django.urls import path
from . import viewsurlpatterns = [path('login/',views.LoginView.as_view(),name='user-login'),#登录path('register/', views.ResgiterView.as_view(), name='user-register'),#注册path('login/code/',views.LoginSendEmailView.as_view(),name='user-login-send-email'),#登录验证码path('all/user/data/',views.AllUserDataView.as_view(),name='user-all-user-data'),#获取定时任务cache_user_func缓存到cache中的用户数据path('add/result/',views.AddFuncDataView.as_view(),name='user-add-result'),#获取定时任务add_func缓存到cache的计算结果path('task-id/result/',views.UserTaskIdGetDataView.as_view(),name='user-task-id-data'),#通过任务的task-id获取到任务返回值
]

五、调用函数测试

5.1、启动项目

1、启动django项目

python manage.py runserver

2、启动celery异步

#windows系统

celery -A celery_task worker -l info  -P  eventlet

#linux系统

celery -A celery_task worker -l info 

3、启动celery定时(定时任务也是提交给异步的)

celery -A celery_task beat -l info

5.2、测试项目:Postman接口工具

1、注册:url= /user/register/

2、登录前点击获取验证码:返回任务id, url=/user/login/code/

 把task_id=17ee8389-14ab-4da1-b88a-56446afb4493 复制下来,4中可以用到

3、登录:code是发送步骤2中发送给邮箱的验证码 ,url=/user/login/

4、通过任务id,获取到发送邮箱异步任务的返回值,url=/user/task-id/result/

复制2返回值中的task_id,获取该任务的返回值

5、获取定时任务中,add_func缓存在cache中的计算数据

6、获取定时任务中,cache_user_func缓存到cache中的用户数据

 总结:

1、异步任务,一般是保存文件、发送邮件、耗时操作等

2、定时任务,定时处理某些数据。

六、报错

1、先去celery.py中查看定时任务的配置,模块的路径是不是有问题,千万不要多了一个空格啥的

2、如果通过任务id获取任务的返回值不成功,看看是不是添加@app.task(ignore_result=True),如果是就去掉这个参数配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/85163.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue [Day7] 综合案例

核心概念回顾 state&#xff1a;提供数据 getters&#xff1a;提供与state相关的计算属性 mutations&#xff1a;提供方法&#xff0c;用于修改state actions&#xff1a;存放异步操作 modules&#xff1a;存模块 功能分析 https://www.npmjs.com/package/json-server#ge…

3.UE基本操作及数字人工程模块组成(UE数字人系统教程)

1.Fay-UE5数字人工程导入 2.UE数字人语音交互 3.UE基本操作及数字人工程模块组成&#xff08;UE数字人系统教程&#xff09; 一、ue5基本操作 1、项目文件管理 2、关卡素材编辑 在关卡上&#xff1a;w、s、a、d移动&#xff0c;鼠标右键拖动换视角。 二、数字人工程模…

开源力量再现,国产操作系统商业化的全新探索

文章目录 1. 开源运动的兴起2. 开源力量的推动3. 国产操作系统的崭露头角3.1 国产操作系统有哪些 4.国产操作系统的商业化探索5.开源力量对国产操作系统商业化的推动 操作系统作为连接硬件、中间件、数据库、应用软件的纽带&#xff0c;被认为是软件技术体系中最核心的基础软件…

火爆全网,HttpRunner自动化测试框架-parameters参数化(超细整理)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 在使用HttpRunner…

Tomcat的一些配置问题(server.xml/catalina.sh)

在同一机器中运行多个Tomcat时&#xff0c;如果不修改server.xml的端口参数&#xff0c;会出现端口冲突使得Tomcat异常&#xff1b;Tomcat默认配置中&#xff0c;JAVA_OPTS不会设置太大&#xff0c;一般需要在catalina.sh中增加一行配置来加大该参数值。 目录 1.Server.xml配置…

有人真的会去分析代码吗

很早之前使用 webpack 的时候&#xff0c;也有类似的插件&#xff0c;分析打包出来之后的代码&#xff0c;分别是哪些模块比较庞大&#xff0c;针对打包的内容进行优化。说实话&#xff0c;知道归知道&#xff0c;但是没有哪个项目使用分析过。最近刚好看见了两个插件&#xff…

DOM的节点操作+事件高级+DOM事件流+事件对象

一.节点操作 1.父节点: node.parentNode 得到的是离元素最近的父级节点 2.子节点: parentNode.childNodes 所有的子节点 包含元素节点 文本节点等等parentNode.children (非标准) 获取所有的子元素节点,实际开发常用 parentNode.firstChild 获取…

clion run qt 问题汇总

一、Error copying file “D:/soft/QT/5.15.2/mingw81_64/bin/Qt5Cored.dll” to “D:/work/Ccode/qtproject/cmake-build-debug-qtmingw”.报错 查看路径下确实没有Qt5Cored.dll&#xff0c;只有Qt5Core.dll 注释掉cmakelist中的这三行 重新执行后成功 二、使用CLion编辑u…

前端探索之旅

目录 简介:内容大纲:第一章 前端开发简介1.1 前端开发的定义和作用1.2 前端开发的职责1.3 前端开发的技能要求1.4 前端开发的发展前景总结&#xff1a; 第二章 HTML基础2.1 HTML基本结构2.2 常见HTML标签和元素 第三章 CSS基础3.1 CSS基本语法3.2 常见CSS选择器3.3 常见CSS属性…

MySQL之 show profile 相关总结

MySQL之 show profile 相关总结 MySQL官网show profile介绍&#xff1a;https://dev.mysql.com/doc/refman/8.0/en/show-profile.html 1. 简介 show profile 和 show profiles 命令用于展示SQL语句的资源使用情况&#xff0c;包括CPU的使用&#xff0c;CPU上下文切换&#xf…

nvm下载node导致npm报错无法使用

有个依赖库需要更新下node&#xff0c;用nvm下载后项目跑不起来了&#xff0c;npm -v 还报错 其实一开始是npm下载不来&#xff0c;然后换了淘宝镜像后还是报错 然后就只能手动下载下了 进入node.js官网 https://nodejs.org/en/download 下载后注意要安装在你nvm目录中&#x…

windows安装apache-jmeter-5.6.2教程

目录 一、下载安装包&#xff08;推荐第二种&#xff09; 二、安装jmeter 三、启动jmeter 一、下载安装包&#xff08;推荐第二种&#xff09; 1.官网下载&#xff1a;Apache JMeter - Download Apache JMeter 2.百度云下载&#xff1a;链接&#xff1a;https://pan.baidu.…

【算法挨揍日记】day01——双指针算法_移动零、 复写零

283.移动零 283. 移动零https://leetcode.cn/problems/move-zeroes/ 题目&#xff1a; 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 …

Xcode升级导致关联库报错

想办法找到对应的库 然后到 Build Phases -- LinkBinary With Libraries中点击&#xff0c;选择对应的framework即可&#xff0c;就像我工程的报错 Undefined symbol: _OBJC_CLASS_$_ADClient _OBJC_CLASS_$_ASIdentifierManager 缺失的库是AdSupport.framework 添加后再次编…

【服务平台】Rancher运行和管理Docker和Kubernetes,提供管理生产中的容器所需的整个软件堆栈

Rancher是一个开源软件平台&#xff0c;使组织能够在生产中运行和管理Docker和Kubernetes。使用Rancher&#xff0c;组织不再需要使用一套独特的开源技术从头开始构建容器服务平台。Rancher提供了管理生产中的容器所需的整个软件堆栈。  完整软件堆栈 Rancher是供采用容器的团…

Git全栈体系(五)

第八章 IDEA 集成 GitHub 一、设置 GitHub 账号 如果出现 401 等情况连接不上的&#xff0c;是因为网络原因&#xff0c;可以使用以下方式连接&#xff1a; 然后去 GitHub 账户上设置 token。 点击生成 token。 复制红框中的字符串到 idea 中。 点击登录。 二、分享工程…

数据结构顺序表

今天主要讲解顺序表&#xff0c;实现顺序表的尾插&#xff0c;头插&#xff0c;头删&#xff0c;还有尾删等操作&#xff0c;和我们之前写的通讯录的增删查改有类似的功能。接下来让我们开始我们的学习吧。 1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特…

allegro中不可选时,如何对find进行可选操作

allegro出现不可选时&#xff0c;只能尝试其他单一的操作&#xff0c;但这样效率不高&#xff1b;可以通过菜单栏Display下拉菜单点击Element&#xff0c;即可实现FIND下选择需要调整的选项。

022 - STM32学习笔记 - 扩展外部SDRAM(一) - 初识SDRAM和FMC

022 - STM32学习笔记 - 扩展外部SDRAM&#xff08;一&#xff09; - 初识SDRAM和FMC 之前学习了I2C读写EEPROM和SPI读写FLASH&#xff0c;学完之后在学习一种新的存储介质–SDRAM。 一、初识SDRAM 我们知道在stm32内部是有一定大小的SRAM&#xff08;256Kb&#xff09;和FLA…

【Cartopy】库的安装和瓦片加载(天地图、高德等)

原文作者&#xff1a;我辈李想 版权声明&#xff1a;文章原创&#xff0c;转载时请务必加上原文超链接、作者信息和本声明。 Cartopy基础入门 【Cartopy】库的安装和天地图瓦片加载 【Cartopy】【Cartopy】如何更好的确定边界显示 【Cartopy】【Cartopy】如何丝滑的加载Geojso…