需求场景:
实现一个flask服务,通过接口控制一个定时任务任务(对酒店订房情况进行检查)的开启和停止。要求定时任务完成后,可以通过飞书机器人推送任务完成的消息。
展现效果:
-
启动定时任务
-
关闭定时任务
-
飞书推送消息
代码实现:
-
项目结构:
-
业务代码:
-
定时任务。先通过
schedule
模块实现基础的定时任务业务代码。# -*- coding:UTF-8 -*-"""@ProjectName : @FileName : schedule_monitor_task@Description : @Time : 2023/9/18 9:21@Author : Qredsun""" import os import time from datetime import datetime from datetime import timedeltaimport schedule import functools from utils.webhook import send_messagedef read_check_list_from_excel(file):try:logger.info("业务代码")except Exception as e:logger.error(f'程序异常:{e}')finally:prompt_message = f'**酒店订房检查已完成**\n' \f'检查时间:{begin_time} - {end_time} \n' \f'检查报告已生成: {save_path}'send_message(Environment.WEBHOOK_URL, Environment.WEBHOOK_SECRET, prompt_message)# todo 定时任务配置 def catch_exceptions(cancel_on_failure = False):def catch_exceptions_decorator(job_func):@functools.wraps(job_func)def wrapper(*args, **kwargs):try:return job_func(*args, **kwargs)except:import tracebacktraceback.format_exc()if cancel_on_failure:return schedule.CancelJobreturn wrapperreturn catch_exceptions_decorator# 异常捕获方法的使用 @catch_exceptions(cancel_on_failure=False) def task_job(file_path):# 定时任务read_check_list_from_excel(file_path)file_path = r'../data/订房检查任务.xlsx' schedule.every().day.at("09:00").do(task_job, file_path) schedule.run_all()while True:schedule.run_pending() # 运行所有可以运行的任务time.sleep(60 * 30)
-
飞书消息推送功能实现
# -*- coding:UTF-8 -*-"""@ProjectName : @FileName : webhook@Description : 飞书消息推送@Time : 2023/9/17 13:36@Author : Qredsun""" import base64 import hashlib import hmac import json from datetime import datetimeimport requestsfrom utils.env_manager import Environment, loggerWEBHOOK_URL = '机器人推送地址' WEBHOOK_SECRET = '机器人密码'def gen_sign(secret, timestamp):# 拼接时间戳以及签名校验string_to_sign = '{}\n{}'.format(timestamp, secret)# 使用 HMAC-SHA256 进行加密hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 对结果进行 base64 编码sign = base64.b64encode(hmac_code).decode('utf-8')return signdef send_message(WEBHOOK_URL, WEBHOOK_SECRET, MSG):timestamp = int(datetime.now().timestamp())sign = gen_sign(WEBHOOK_SECRET, timestamp)params = {"timestamp": timestamp,"sign" : sign,"msg_type" : "interactive","card" : {"config" : {"wide_screen_mode": True},"elements" : [{"tag" : "markdown","content": f"<at id=all></at> \n "f"{MSG}"},{"tag" : "action","actions": [{"tag" : "button","text" : {"tag" : "plain_text","content": "跳转至订房检查"},"type" : "primary","multi_url": {"url" : Environment.CALL_BACK_URL,"android_url": "","ios_url" : "","pc_url" : ""}}]}],"header" : {"template": "blue","title" : {"content": "订房检查异常提示","tag" : "plain_text"}},"card_link": {"url" : "","pc_url" : "","android_url": "","ios_url" : ""}},}resp = requests.post(WEBHOOK_URL, json=params)resp.raise_for_status()result = resp.json()if result.get("code") and result.get("code") != 0:logger.error(f'飞书机器人消息 : {MSG} 发送失败:{resp.text}')else:logger.debug(f'飞书机器人消息 : {MSG} 发送成功')
-
flask_apscheduler
替换schedule
app.schedule_job.add_job(id="check_room", func='s_app:read_check_list_from_excel', **{"args" : (Environment.SRC_FILE,),# 'trigger': 'interval', # 指定 定时任务的类型# 'seconds': 5 # 运行的间隔时间# 每天九点开始执行'trigger': 'cron','day' : '*','hour' : '09','minute' : '00','second' : '00' }) app.schedule_job.start() # 启动任务列表
-
flask服务中启动、停止接口实现
from flask import Flaskfrom utils.env_manager import Environment, logger from utils.schedule_monitor_task import read_check_list_from_excel from flask_apscheduler import APSchedulerapp = Flask(__name__) app.schedule_job = APScheduler()@app.route('/', methods=['get']) def hello_world():return '酒店订房检查服务!'@app.route('/stop', methods=['get']) def stop_job():if not app.schedule_job.get_jobs():return '没有正在执行的订房检查任务'else:app.schedule_job.remove_all_jobs()return '终止订房检查任务'class Config(object):DEBUG = True # flask 调试模式"""flask_apscheduler 配置"""SCHEDULER_API_ENABLED = True # 开放APISCHEDULER_TIMEZONE = 'Asia/Shanghai' # 使用上海时间@app.route('/start', methods=['get']) def start_job():if app.schedule_job.get_jobs():return '订房检查任务已启动'else:app.schedule_job.add_job(id="check_room", func='s_app:read_check_list_from_excel', **{"args" : (Environment.SRC_FILE,)# 每天九点开始执行'trigger': 'cron','day' : '*','hour' : '09','minute' : '00','second' : '00'})return '开始订房检查任务'app.config.from_object(Config) app.schedule_job.init_app(app) # 把任务列表放入 flask app.schedule_job.start() # 启动任务列表 app.run()
-
ps:
-
机器人推送接口的配置:
-
在flask的配置中将SCHEDULER_API_ENABLED设置为True,服务启动后自动加载
flask_apscheduler
提供的API接口:
1./scheduler
[GET] > 获取服务基本信息
2./scheduler/jobs
[POST json job data] > 添加新的任务
3./scheduler/jobs/<job_id>
[GET] > 根据job_id返回任务的详细信息
4./scheduler/jobs
[GET] > 返回所有任务的信息
5./scheduler/jobs/<job_id>
[DELETE] > 删除任务
6./scheduler/jobs/<job_id>
[PATCH json job data] > 更新一个已经存在的任务
7./scheduler/jobs/<job_id>/pause
[POST] > 暂停一个任务并返回任务的信息
8./scheduler/jobs/<job_id>/resume
[POST] > 重新启动一个任务并返回任务信息
9./scheduler/jobs/<job_id>/run
[POST] > 启动一个任务并返回任务的信息
-
在实现定时任务的启动和关闭时,并没有直接
flask_apscheduler
使用自带的接口,而是通过flask_apscheduler提供的定时任务管理方法实现。还有下面一些方法可参考使用:scheduler.start()
开始任务scheduler.shutdown()
停止任务scheduler.pause()
暂停所有任务scheduler.resume()
开启任务scheduler.add_listener(<callback function>,<event>)
添加监听事件scheduler.remove_listener(<callback function>)
去除监听事件scheduler.add_job(<id>,<function>, **kwargs)
添加jobscheduler.remove_job(<id>, **<jobstore>)
删除jobscheduler.remove_all_jobs(**<jobstore>)
删除所有定时任务scheduler.get_job(<id>,**<jobstore>)
获取job信息scheduler.modify_job(<id>,**<jobstore>, **kwargs)
修改jobscheduler.pause_job(<id>, **<jobstore>)
暂停jobscheduler.resume_job(<id>, **<jobstore>)
恢复jobscheduler.run_job(<id>, **<jobstore>)
启动jobscheduler.authenticate(<function>)
验证