APScheduler定时器使用:django中使用apscheduler,使用mysql做存储后端

一、基本环境

python版本:3.8.5

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

二、django基本设置

2.1、新增一个app

该app用来写apscheduler相关的代码

python manage.py startapp gs_scheduler

2.2、修改配置文件settings.py

#使用pymysql做客户端
import pymysql
pymysql.install_as_MySQLdb()INSTALLED_APPS = ['rest_framework', #注册restful 应用'gs_scheduler', #注册新增的app
]#配置mysql
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'admin-root'
MYSQL_NAME = 'study_websocket'DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','HOST': MYSQL_HOST,'PORT': MYSQL_PORT,'USER': MYSQL_USER,'PASSWORD': MYSQL_PASSWORD,'NAME': MYSQL_NAME,}
}

2.3、gs_scheduler创建urls.py

1、gs_scheduler/urls.py

from django.urls import path
from . import views
urlpatterns = []

2、根路由urls.py

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置gs_scheduler应用

3.1、配置api接口

这些接口,用来展示定时任务的运行情况

1、urls.py

from django.urls import path
from . import views
urlpatterns = [path('run_next/', views.JobNextRunTimeAPIView.as_view()),#定时任务下次运行path('run_history/',views.JobRunTimeHistory.as_view()),#定时任务运行历史path('run_error/',views.JobRunErrorHistory.as_view()), #定时任务最近运行错误
]

2、models.py

from django.db import models# Create your models here.
import sqlite3
from gs_scheduler.management.commands.config import MYSQL_HOST,MYSQL_NAME,MYSQL_PORT,MYSQL_PASSWORD,MYSQL_USER,MYSQL_CHARSET
from datetime import datetime,timedelta
from django.conf import settingsimport pymysql
from concurrent.futures import ThreadPoolExecutor#时间戳转时间字符串
def timestamp_to_time_str(timestamp):# 使用 datetime 模块将时间戳转换为 datetime 对象dt = datetime.fromtimestamp(timestamp)# 将 datetime 对象格式化为时间字符串time_str = dt.strftime('%Y-%m-%d %H:%M:%S')return time_strclass MysqlDB:DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'# 创建数据库连接池def __init__(self):self.conn = pymysql.connect(host=MYSQL_HOST,port=MYSQL_PORT,user=MYSQL_USER,password=MYSQL_PASSWORD,db=MYSQL_NAME,charset=MYSQL_CHARSET,cursorclass=pymysql.cursors.DictCursor)# 执行数据库增删改查def _execute_sql(self,query):sql,args = querytry:with self.conn.cursor() as cursor:# 执行sql语句if args:if isinstance(args,(tuple,list)):# args = (value1,value2)cursor.execute(sql,args)else:# args = value1cursor.execute(sql,(args,))else:cursor.execute(sql)# 不同类型sql,设置不同返回值if sql.strip().lower()[:6] == 'select':#查询语句rows = cursor.fetchall() or []return rowselif sql.strip().lower()[:4] == 'show':#查看表是否存在result = cursor.fetchall()return resultelse:#增删改语句self.conn.commit()return Trueexcept Exception as e:print('sql执行失败',e)pass# 线程池执行sql语句def start_sql_with_pool(self, sql:str,args=None):with ThreadPoolExecutor() as executor:result = executor.submit(self._execute_sql, (sql,args)).result()return result#创建记录定时任务历史表(调度器使用)def create_apscheduler_history(self):#创建表语句:表名不能是占位符sql = """CREATE TABLE apscheduler_history (id INTEGER AUTO_INCREMENT PRIMARY KEY,job_id VARCHAR(128),run_time DATETIME,is_error TINYINT,error_msg VARCHAR(256) NULL)"""#判断表存在不,不存在再创建results = self.start_sql_with_pool('SHOW TABLES')is_exist = Falsefor dic in results:if dic['Tables_in_{}'.format(MYSQL_NAME,)] == 'apscheduler_history':is_exist = True#表不存在才创建if not is_exist:# print('表不存在,执行创建表')create = self.start_sql_with_pool(sql)# 创建索引self.start_sql_with_pool(db.conn.cursor().execute("CREATE INDEX run_time_index ON apscheduler_history (run_time)"))return True# 将任务运行的结果记录到数据库中(调度器使用)def insert_into_apscheduler_history(self, data_list: list):if len(data_list) == 4:  # 异常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error,error_msg) VALUES (%s,%s,%s,%s)"""else:  # 正常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error) VALUES (%s,%s,%s)"""#使用线程池执行插入语句self.start_sql_with_pool(sql,data_list)# 删除8小时前的历史记录(调度器使用)def delete_8hour_before_history(self):before_8hour = (datetime.now() - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')sql = '''DELETE FROM apscheduler_history WHERE run_time < %s'''self.start_sql_with_pool(sql,(before_8hour,))# 查询每个任务的下次运行时间(api展示)def fetch_all_next_run_time(self):sql = 'SELECT id,next_run_time FROM apscheduler_jobs'# 获取查询结果rows = self.start_sql_with_pool(sql)for row in rows:row['next_run_time'] = timestamp_to_time_str(row['next_run_time'])return rows# 查询所有任务最近10次运行记录(api展示)def fetch_all_run_history(self):''':param query:'SELECT id,next_run_time,job_state FROM apscheduler_jobs':return:'''#获取所有定时任务iddata_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)# 创建一个游标对象sql = """SELECT id,job_id,run_time FROM apscheduler_history WHERE is_error=0 AND job_id = %s ORDER BY run_time DESC LIMIT 10"""ret_list = []for id in id_list:# 执行查询rows = self.start_sql_with_pool(sql,(id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近10次运行时间'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timedic['last_run_time'].append(run_time)ret_list.append(dic)return ret_list# 获取任务最近运行失败情况(api展示)def fetch_all_error_history(self):data_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)sql = """SELECT run_time,error_msg FROM apscheduler_history WHERE is_error=1 AND job_id = %s ORDER BY id DESC LIMIT 5"""ret_list = []for id in id_list:# 获取查询结果rows = self.start_sql_with_pool(sql, (id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近5次执行失败'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timeerror = row.get('error_msg')dic['last_run_time'].append({'run_time': run_time, 'error': error})ret_list.append(dic)return ret_list# 关闭连接def close(self):self.conn.close()if __name__ == '__main__':db = MysqlDB()db.create_apscheduler_history()# for i in range(1,10):#     db.insert_into_apscheduler_history(['send_to_big_data','2024-05-01 13:{}:12'.format(str(i).zfill(2)),1,'执行失败了'])# db.delete_8hour_before_history()# ret = db.fetch_all_run_history()# print(ret,'history')# ret = db.fetch_all_next_run_time()# print(ret,'next')# ret = db.fetch_all_error_history()# print(ret,'error')db.close()

3、views.py

from django.shortcuts import render# Create your views here.
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import MysqlDB#任务下次运行时间
class JobNextRunTimeAPIView(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_next_run_time()db.close()ret = {'code':200,'status':'success','data':data,}return Response(ret)#任务最近运行历史
class JobRunTimeHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_run_history()db.close()ret = {'code':200,'status':'success','data':data}return Response(ret)#任务最近运行错误
class JobRunErrorHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_error_history()db.close()ret = {'code': 200,'status': 'success','data': data}return Response(ret)

3.2、在gs_scheduler创建

1、config.py 代码

该文件存放的是启动APScheduler调度器的一些配置数据

import os
from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore #mysql等做后端存储
# from django.conf import settings
from study_apscheduler import settings
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。}
#job的存储后端
JOB_STORE = {'default': SQLAlchemyJobStore(url=URL)
}#监听事件对应的情况
LISTENER={1:'调度程序启动',2:'调度程序关闭',4:'调度程序中任务处理暂停',64:'将任务存储添加到调度程序中',8192:'任务在执行期间引发异常',4096:'任务执行成功',
}

2、task.py

所有的定时任务都存放在这里

import os
from datetime import datetime, timedelta, date
from gs_scheduler.models import MysqlDB
from utils.log_util import info_log
from utils.send_monitor_data import SendData#推送到数据仓的告警信息:5分钟执行一次
send_to_big_data = SendData().send#清除定时任务历史运行记录
def delete_apscheduler_history():db = MysqlDB()db.delete_8hour_before_history()if __name__ == '__main__':print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

3、crontab.py

实例化调度器

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from django.core.management.base import BaseCommand
from apscheduler import events
from pytz import timezone
from threading import RLock
from datetime import datetime, timedelta
from gs_scheduler.models import MysqlDB
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
#日志
from utils.log_util import info_log
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE#脚本运行:python manage.py crontab
class Command(BaseCommand):TIME_FORMAT = '%Y-%m-%d %H:%M:%S'#初始化调度器def _scheduler_obj(self):scheduler = BlockingScheduler()scheduler.configure(timezone = TIME_ZONE, #时区job_defaults=JOB_DEFAULTS, #job的默认配置jobstores=JOB_STORE, #job的存储后端)return scheduler#添加任务def _add_job(self,scheduler:BlockingScheduler):#每5分钟执行一次推送告警到大数据仓scheduler.add_job(send_to_big_data,trigger=IntervalTrigger(minutes=1),id='send_to_big_data',replace_existing=True,coalesce=True,)#每隔8个小时,清除历史的记录scheduler.add_job(delete_apscheduler_history,trigger=IntervalTrigger(hours=8),id='delete_apscheduler_history',replace_existing=True,coalesce=True,)#添加监听器def _listener(self,event:events):code = event.coderun_time = datetime.now().strftime(self.TIME_FORMAT)msg = LISTENER.get(code)db = MysqlDB()if msg:if code == 4096:#成功运行job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,0])elif code == 8192:#运行异常了job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,1,msg])else:info_log(msg)db.close()def start(self):scheduler = self._scheduler_obj()# 创建记录运行记录db = MysqlDB()db.create_apscheduler_history()db.close()# 设置监听器scheduler.add_listener(self._listener)# 设置定时任务self._add_job(scheduler)try:# print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))scheduler.start()except KeyboardInterrupt:scheduler.shutdown()# python manage.py crontab运行 就是调用该方法def handle(self, *args, **options):self.start()#伴随django,在后台运行的。 在wsgi.py 文件中,调用BackRunScheduler().start()
class BackRunScheduler(Command):# 初始化调度器def _scheduler_obj(self):scheduler = BackgroundScheduler()scheduler.configure(timezone=TIME_ZONE,  # 时区job_defaults=JOB_DEFAULTS,  # job的默认配置jobstores=JOB_STORE,  # job的存储后端)return scheduler

3.3、启动方式:

方式一: python manage.py  crontab    (生产环境,推荐使用此方式,单独运行)

方式二:在settings.py中,调用BackRunScheduler().start()  , 后台运行

四、测试

4.1、启动

启动django项目:python manage.py runserver 8080

启动定时器:python manage.py crontab

4.2、等待一段时间

1、查询任务下次执行时间

请求:http://127.0.0.1:8080/api/scheduler/run_next/

2、查询任务最近运行情况

请求:http://127.0.0.1:8080/api/scheduler/run_next/

3、查询任务最近异常情况

请求:http://127.0.0.1:8080/api/scheduler/run_error/

五、源代码下载

码云地址:

django应用定时器: django下使用定时器的方法icon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-application-timer/

目前有两套代码:一个以mysql存储任务,一个用sqlite存储任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/317553.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

8. Django 表单与模型

8. 表单与模型 表单是搜集用户数据信息的各种表单元素的集合, 其作用是实现网页上的数据交互, 比如用户在网站输入数据信息, 然后提交到网站服务器端进行处理(如数据录入和用户登录注册等).网页表单是Web开发的一项基本功能, Django的表单功能由Form类实现, 主要分为两种: dj…

翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习三

合集 ChatGPT 通过图形化的方式来理解 Transformer 架构 翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习一翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习二翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深…

GoLang Gin实际使用

所有代码同步到Admin/gitDemo - Gitee.comhttps://gitee.com/mec-deployment-team_0/git-demo/tree/dev/ 1.创建Gin框架 一般设计一个常规的web项目&#xff0c;都需要以下几个模块 runApp 主函数&#xff0c;运行整个项目routes 路由控制&#xff0c;管理跳转以及路由分组co…

微软如何打造数字零售力航母系列科普06 - 如何使用微软的Copilot人工智能

如何使用微软的Copilot人工智能&#xff1f; Copilot和ChatGPT有很多相似之处&#xff0c;但微软的聊天机器人本身就有一定的优势。以下是如何对其进行旋转&#xff0c;并查看其最引人注目的功能。 ​​​​​​​ &#xff08;资料来源&#xff1a;Lance Whitney/微软&…

ZooKeeper 搭建详细步骤之一(单机模式)

ZooKeeper 搭建详细步骤之三&#xff08;真集群&#xff09; ZooKeeper 搭建详细步骤之二&#xff08;伪集群模式&#xff09; ZooKeeper 搭建详细步骤之一&#xff08;单机模式&#xff09; ZooKeeper 及相关概念简介 搭建模式简述 ZooKeeper 的搭建模式包括单机模式、集群模…

微软如何打造数字零售力航母系列科普05 - Azure中计算机视觉的视觉指南

Azure中计算机视觉的视觉指南 什么是计算机视觉&#xff1f;如何使用Microsoft Azure将计算机视觉功能集成到应用程序和工作流中&#xff1f; 作者&#xff1a;Nitya Narasimhan 编辑&#xff1a;数字化营销工兵 •11分钟阅读 什么是计算机视觉&#xff1f;如何使用Microso…

开发一个语音聊天社交app小程序H5需要多少钱?

社交&#xff0c;即时通讯APP系统。如何开发一个社交App||开发一个即时通信应用是一项复杂而充满挑战的任务&#xff0c;需要考虑多个技术、开发时间和功能方面的因素。以下是一个概要&#xff0c;描述了从技术、开发时间和功能角度如何开发这样的应用&#xff1a; 1. 技术要点…

C++11 设计模式5. 原型模式

什么是原型模式&#xff1f; 原型模式⼀种创建型设计模式&#xff0c;该模式的核⼼思想是基于现有的对象创建新的对象&#xff0c;⽽不是从头开始创建。在原型模式中&#xff0c;通常有⼀个原型对象&#xff0c;它被⽤作创建新对象的模板。新对象通过复制原型对象的属性和状态来…

uniapp 之 开发微信小程序入门详细指南

目录 配置运行设置&#xff08;编辑器的设置&#xff09;项目目录文件配置基础配置中的uniapp应用标识&#xff08;AppID&#xff09;配置微信小程序的AppID 总结 配置运行设置&#xff08;编辑器的设置&#xff09; 点击编辑器上方菜单栏 - 运行 - 运行到小程序模拟器 - 运行…

面试经典150题——求根节点到叶节点数字之和

​ 1. 题目描述 2. 题目分析与解析 2.1 思路一——DFS 理解问题&#xff1a; 首先要理解题目的要求&#xff0c;即对于给定的二叉树&#xff0c;我们需要找出从根节点到所有叶子节点的所有路径&#xff0c;然后将每一条路径上的数字组成一个整数&#xff0c;最后求出这些整数…

ios微信小程序禁用下拉上拉

第一步&#xff1a; page.json配置页面的"navigationStyle":"custom"属性&#xff0c;禁止页面滑动 "navigationStyle":"custom" 第二步&#xff1a; 页面里面使用scroll-view包裹内容&#xff0c;内容可以内部滑动 <view class&…

AI赋能不应贵气:深度解读AI助力企业渡过经济寒冬以及如何落地AI的路径

AI很棒可是给人感觉“很贵”因此我不敢用 继GPT4后Dalle3、Sora、GPT4.5、GPT5的消息以及前天突然出现的GPT 2.0&#xff08;GPT二代&#xff0c;有人说这就是OPEN AI的新产品&#xff1a;Q*&#xff09;但凡涉及到AI的一系列新闻给人予很震撼的感觉。放眼望去AI正在欣欣向荣。…

【bug已解决】发生错误,导致虚拟 CPU 进入关闭状态。如果虚拟机外部发生此错误,则可能已导致物理计算机重新启动......

本bug报错已找到原因,并成功解决。 项目场景: vmware安装ubuntu报错。 如下: 发生错误,导致虚拟 CPU 进入关闭状态。如果虚拟机外部发生此错误,则可能已导致物理计算机重新启动。错误配置虚拟机、客户机操作系统中的错误或 VMware Workstation 中的问题都可以导致关闭状…

uniapp微信小程序开发踩坑日记:由于图表数据渲染不出来,我第一次在项目中用watch函数监听数据变化

一、发现问题 在我们团队自己开发的微信小程序中&#xff0c;引入了Echarts图表库 然后突然有一天&#xff0c;后端队友反应图表渲染有问题。后面我去试了一下&#xff0c;确实20次里面必有一次数据渲染不出来 断定代码没问题&#xff0c;于是我们将其鉴定为玄学 二、问题原因…

GPU 架构与 CUDA 关系 并行计算平台和编程模型 CUDA 线程层次结构 GPU 的算力是如何计算的 算力峰值

GPU 架构与 CUDA 关系 本文主要包含 NVIDIA GPU 硬件的基础概念、CUDA(Compute Unified Device Architecture)并行计算平台和编程模型,详细讲解 CUDA 线程层次结构,最后将讲解 GPU 的算力是如何计算的,这将有助于计算大模型的算力峰值和算力利用率。 GPU 硬件基础概念GP…

ClickHouse安装(成功安装)

1.下载安装包 下面通过阿里镜像&#xff08;https://mirrors.aliyun.com/clickhouse/rpm/lts/&#xff09;进行下载&#xff0c;下载哪里&#xff0c;自行指定。 # deb包下载使用如下4行 wget https://mirrors.aliyun.com/clickhouse/deb/pool/stable/clickhouse-client_22.8…

AnomalyGPT——使用大型视觉语言模型进行工业异常检测的算法解析与应用

1.概述 工业缺陷检测是工业自动化和质量控制中的一个重要环节&#xff0c;其目的是在生产过程中识别和分类产品或组件中的缺陷&#xff0c;以确保最终产品的质量满足既定标准。这项技术的应用可以显著提高生产效率&#xff0c;降低成本&#xff0c;并减少由于缺陷产品导致的潜…

网络原理(qq消息发送原理)

1.网络初识 IP地址 概念&#xff1a; IP地址主要⽤于标识⽹络主机、其他⽹络设备&#xff08;如路由器&#xff09;的⽹络地址。简单说&#xff0c;IP地址⽤于定位主机的⽹络地址。 就像我们发送快递⼀样&#xff0c;需要知道对⽅的收货地址&#xff0c;快递员才能将包裹送到…

智能家居—ESP32开发环境搭建

相关文章 毕业设计——基于ESP32的智能家居系统(语音识别、APP控制) 智能家居—ESP32开发环境搭建 一、下载安装二、验证三、资料获取 一、下载安装 下载安装 vscode 安装插件 创建工程 二、验证 写一个简单的函数来验证一下功能 void setup() {// put your setup c…

进一步了解android studio 里 AGP,gradle等关系

目录 &#xff08;1&#xff09; gradle是什么 &#xff08;2&#xff09; 工程的jdk版本&#xff0c;及引用包的编译版本的关系 实践 问题与解决 编译成功与运行成功 编译成功 运行成功 &#xff08;1&#xff09; gradle是什么 Gradle是一个构建工具&#xff0c;它是…