Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置

一、概述

1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

注意:

在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

二、项目结构

依赖环境:

celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2

目录结构:

flask-project

        |--apps

                |-- user

                        |-- models

                        |--views.py

                        |--urls.py

                |--__init__.py

        |--ext

                |--__init__.py

                |--config.py

        |--celery_task

                |--__init__.py

                |--async_task.py

                |--celery.py

                |--celeryconfig.py

                |--check_task.py

                |--scheduler_task.py

        app.py

三、flask工厂模式下各模块功能

1、apps/user/models.py : 写了一个user表

2、apps/user/views.py:写了测试调用celery异步任务的接口

3、apps/user/urls.py: 注册路由的

4、ext/__init__.py:cache、db、cors的拓展

5、ext/config.py : cache和cors使用到的配置

6、apps/__init__.py: 一个函数create_app,生成flask应用对象

7、app.py: 启动flask应用对象的模块

本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

在视图中在执行异步任务,并获取异步任务的id:

from celery_task.async_task import send_email_task,cache_user_task
#用户资源:get\put\delete, 对单个进行操作
class UserOneResource(ResourceBase):def put(self,id):#测试异步发邮件email = request.args.get('email')code = request.args.get('code')res = send_email_task.delay(email,code)print(res.id)return NewResponse(msg='put',data={'task_id':res.id})def patch(self,id):#测试异步操作flask的orm和cachep = request.args.get('p')if p=='set':res = cache_user_task.delay()print(res,type(res))return NewResponse(msg='patch',data={'task_id':res.id})else:from ext import cachedata = cache.get('all-user-data')return NewResponse(msg='patch',data=data)

res = 异步函数.delay(函数需要的参数)

task_id = res.id

注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

四、celery项目的配置

1、celery的配置

将celery的配置都放到一个py文件中,方便后期的维护和使用

celeryconfig.py

from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = ['celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]#celery的配置
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}

2、创建celery对象

celery.py

from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)'''
2、创建celery应用对象'task'可以任务是该celery对象名字,用于区分celery对象broker是指定消息中间件backend是指定任务结果存储位置include是手动指定异步任务所在的模块的位置
'''
#创建celery异步对象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#导入一些基本配置
celery.conf.update(**config)'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
class ContextTask(celery.Task):def __call__(self, *args, **kwargs):from apps import create_appapp = create_app()with app.app_context():return self.run(*args, **kwargs)
celery.Task = ContextTask

注意:

1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

2、第二步是创建celery通用的方法了,没什么好说的。

3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

3、异步任务模块

将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

async_task.py

# 导入celery对象app
from celery_task.celery import celery
from ext import cache
import time'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task 默认就是(ignore_result=False)
'''# 没有返回值,禁用掉结果后端
@celery.task
def send_email_task(receiver_email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 模拟邮件发送验证码time.sleep(5)return {'result':'邮件已经发送',receiver_email:'2356'}@celery.task
def cache_user_task():#orm查询数据,放到cache中from apps.user.models import UserModeluser = UserModel.query.all()lis = []for u in user:id = u.idname = u.namedic = {'id':id,'name':name}lis.append(dic)print(dic)cache.set('all-user-data',lis)return {'code':200,'msg':'查询数据成功'}

4、定时任务模块

将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

schedulser_task.py

from celery_task.celery import celery
import time# 有返回值,返回值可以从结果后端中获取
@celery.task
def add_func(a, b):print('执行了加法函数',a+b)return a + b# 不需要返回值,禁用掉结果后端
@celery.task(ignore_result=True)
def cache_user_func():print('all')

5、检测任务id获取任务状态和返回值

check_task.py:

from celery.result import AsyncResult
from celery_task.celery import celery'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY   :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=celery)dic = {'type': result.status,'msg': '','data': None,'code': 400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg'] = '任务重新尝试执行'elif result.status == 'FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic

在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

五、测试

1、运行项目

flask项目(在项目根目录下执行):

        flask run --host 0.0.0.0 --port 5000

celery项目(在项目根目录下执行):

启动celery进程:

windows系统:

        celery -A celery_task.celery worker -l info  -P  eventlet

linux系统:

        celery -A celery_task.celery worker -l info 

启动定时任务(先启动celery进程在启动定时任务):

celery -A celery_task.celery beat -l info

2、运行结果

1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项

1、在系统中要先安装好redis和mysql,并都启动了

2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

3、当前的配置下,celery的目录必须是在flask根目录下

七、拓展-改变celery_task的位置

如果你想将celery_task包移动到apps包下,此时你需要修改什么?

1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变

'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化

'1、加上apps.'
task_module = ['apps.celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'apps.celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]'2、task参数对应的字符串,加上apps.'
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'apps.celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}

3、在视图函数导入异步任务的路径也变了

#异步任务
from apps.celery_task.async_task import send_email_task,cache_user_task

4、启动celery和定时任务的命令变量【在项目根目录下执行命令】

启动celery:

windows启动命令: celery  -A  apps.celery_task.celery worker -l info  -P  eventlet

linux启动命令: celery  -A  apps.celery_task.celery worker -l info 

启动定时任务:

celery -A apps.celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/157370.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【【萌新的SOC学习之基于BRAM的PS和PL数据交互实验】】

萌新的SOC学习之基于BRAM的PS和PL数据交互实验 基于BRAM的PS和PL的数据交互实验 先介绍 AXI BRAM IP核控制器的简介 AXI BRAM ip核 是xilinx提供的一个软核 这个ip核被设计成 AXI的一个从机接口 用于AXI互联的集成 系统的主设备和本地的RAM进行通信 (我们可以通过这…

TensorFlow学习:在web前端如何使用Keras 模型

前言 在上篇文章 TensorFlow学习:使用官方模型进行图像分类、使用自己的数据对模型进行微调中我们学习了如何使用官方模型,以及使用自己的数据微调模型。 但是吧,代码一直是跑在Python里,而我本身是做前端开发的。我是很想让它在…

Idea报错 java: 程序包org.springframework.boot不存在 解决方法

发现我的是因为更改了maven的主路径和本地仓库路径,但是新建了一个工程后,设置就恢复默认了。需要重新设置正确路径。 应用后会重新下载依赖项 之后虽然还会报错,但是已经不影响项目运行,配置成功

用 docker 创建 jmeter 容器, 实现性能测试,该如何下手?

用 docker 创建 jmeter 容器, 实现性能测试 我们都知道,jmeter可以做接口测试,也可以用于性能测试,现在企业中性能测试也大多使用jmeter。docker是最近这些年流行起来的容器部署工具,可以创建一个容器,然后把项目放到…

Mybatis 实现简单增删改查

目录 前言 一、Mybatis是什么 二、配置Mybatis环境 三、创建数据库和表 四、添加业务代码 4.1、添加实体类 4.2、添加mapper接口 4.3、添加实现接口方法的xml文件 五、简单的增删改查操作及单元测试 5.1、单元测试 单元测试具体步骤: 单元测试如何才能不污…

day27--AJAX(bootstrap之modal,toast;接口文档的一些用法;AJAX原理)

目录 Bootstrap之Modal: 显示和隐藏方法 通过自定义属性: 使用JS来控制弹框: Bootstrap之Toast: 接口文档一些用法: 删除图书: 图片上传: 图片上传步骤: 修改头像&#xf…

【单片机】19-TFT彩屏

一、背景知识--显示器 1.什么是TFT (1)LCD显示器的构成:液晶面板驱动器【电压驱动】控制器【逻辑控制】 (2)液晶面板大致分为:TN,TFT,IPS等 (3)驱动器是跟随…

MS5611的ZYNQ驱动试验之一 分析

0,MS5611框图 1,原理图 项目需要用到MS5611气压计模块,原理图很简单明了,如下: 这里PS接GND是SPI接口模式,PS接VDD是I2C接口模式。我在设计原理图时候直接设置成了SPI模式,当然这个SPI不是纯粹意…

203、RabbitMQ 之 使用 direct 类型的 Exchange 实现 消息路由 (RoutingKey)

目录 ★ 使用direct实现消息路由代码演示这个情况二ConstantUtil 常量工具类ConnectionUtil 连接RabbitMQ的工具类Publisher 消息生产者测试消息生产者 Consumer01 消息消费者01测试消费者结果: Consumer02 消息消费者02测试消费者结果: 完整代码&#x…

代理和多级代理

文章目录 代理使用场景代理过程实验演示多级代理 代理使用场景 1、拿下远程 web 服务器 2、webshell 链接不稳定,需要使用稳定的木马程序 3、远程服务器无法直接链接攻击者电脑 4、需要借助公网vps转发来自失陷服务器的木马流量 5、借助frp服务端(vps)和客户端(内网…

实现Element Select选择器滚动加载

<template><el-selectpopper-class"more-tag-data"v-model"tagId"filterableplaceholder"请选择"focus"focusTag"><el-optionv-for"(item, index) in taskTagLists":key"index":label"item.n…

如何在小程序中设置导航栏文字颜色和背景颜色

不同商家有不同的颜色风格&#xff0c;例如有些做设计的公司&#xff0c;主要是黑色风格&#xff1b;有些卖珠宝的商家&#xff0c;主要是金色风格&#xff1b;他们的小程序&#xff0c;也需要进行同样的风格设定。下面具体介绍怎么在小程序中进行整个风格设定。 1. 在小程序管…

破局「二次创业」:合思的新解法

在新的水温下&#xff0c;寻找更为良性的发展正在成为企业的必答题。对此&#xff0c;合思给出的不仅是一份更“省”的答题方法。也更是从认知层到行动层&#xff0c;最后到工具层的一张授人以渔的“渔网”。 作者|思杭 编辑|皮爷 出品|产业家 今年4月初&#xff0c;广州…

lvgl 界面管理器

lv_scr_mgr lvgl 界面管理器 适配 lvgl 8.3 降低界面之间的耦合使用较小的内存&#xff0c;界面切换后会自动释放内存内存泄漏检测 使用方法 在lv_scr_mgr_port.h 中创建一个枚举&#xff0c;用于界面ID为每个界面创建一个页面管理器句柄将界面句柄添加到 lv_scr_mgr_por…

【基础篇】三、Flink集群角色、系统架构以及作业提交流程

文章目录 1、集群角色2、部署模式3、Flink系统架构3.1 作业管理器&#xff08;JobManager&#xff09;3.2 任务管理器&#xff08;TaskManager&#xff09; 4、独立部署会话模式下的作业提交流程5、Yarn部署的应用模式下作业提交流程 1、集群角色 Flink提交作业和执行任务&…

EXCEL——根据单元格值设置不同色阶

方法&#xff1a;开始—>条件格式—>色阶&#xff08;默认色阶或复杂色阶&#xff09;。 一、默认色阶 如图&#xff0c;可选择自定义的色阶模式。 二、复杂色阶 1、如图&#xff0c;点击"其他规则" 2、选择复杂格式 此时可以看到&#xff0c;支持多种格式…

有 AI,无障碍,AIoT 设备为视障人群提供便利

据世界卫生组织统计&#xff0c;全球共 22 亿人视力受损&#xff0c;包含 2.85 亿视障人群和 3,900 万全盲人群。而且&#xff0c;这一数字将随老龄化加剧不断增加。 虽然视障人群面临着诸多不便&#xff0c;但是针对视障人群的辅助设备却存在成本高、维护困难、操作复杂等问题…

给你一个项目,你将如何开展性能测试工作?

一、性能三连问 1、何时进行性能测试&#xff1f; 性能测试的工作是基于系统功能已经完备或者已经趋于完备之上的&#xff0c;在功能还不够完备的情况下没有多大的意义。因为后期功能完善上会对系统的性能有影响&#xff0c;过早进入性能测试会出现测试结果不准确、浪费测试资…

MQTT C库下载

方法一、从Eclipse paho下载 https://eclipse.dev/paho/index.php?pagedownloads.php 方法二&#xff0c;从MQTT官网下载 https://mqtt.org/software/ https://os.mbed.com/teams/mqtt/code/MQTTPacket/ MQTTPacket源码和paho下载的差不多 方法三、从Keil5 包管理工具…

centos7安装db2 version11.1

centos7安装DB2 操作系统 linux centos7 DB2版本 11.1 1、取包 IBM MRS Tool 将安装包放在 /home/software 下面 mkdir -p /home/software cd /home/software wget https://iwm.dhe.ibm.com/sdfdl/v2/regs2/db2pmopn/Express-C/DB2ExpressC11/Xa.2/Xb.aA_60_-i7wWKFMFpbW1xl1…