python中实现定时任务的几种方案

目录

  • while True: + sleep()
  • Timeloop库
  • threading.Timer
  • sched模块
  • schedule模块
  • APScheduler框架
  • Celery框架
  • 数据流工具Apache Airflow
    • 概述
    • Airflow 核心概念
    • Airflow 的架构

总结以下几种方案实现定时任务,可根据不同需求去使用不同方案。

while True: + sleep()

利用while True的死循环,加上 sleep()函数让其暂停一段时间,达到每隔一段时间执行特定任务的目的。

比较简单,例子如下:

import datetime
import timedef time_printer():now = datetime.datetime.now()ts = now.strftime('%Y-%m-%d %H:%M:%S')print('do func time :', ts)def loop_monitor():while True:time_printer()time.sleep(5)if __name__ == "__main__":loop_monitor()

主要缺点:

  • 只能设定间隔,不能指定具体的时间
  • sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。

Timeloop库

Timeloop是一个库,可用于运行多周期任务。

Timeloop内部维护了一个任务列表jobs,用来管理所有任务。

可以使用装饰器标记任务,这样就会把任务加到任务列表jobs中,使用start方法启动任务列表重的所有任务。

示例如下:

import time
from timeloop import Timeloop
from datetime import timedeltatl = Timeloop()@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():print("2s job current time : {}".format(time.ctime()))if __name__ == "__main__":tl.start(block=True)

运行后打印如下:

[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop..
[2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job <function sample_job_every_2s at 0x7fc3d022d0d0>
[2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set
2s job current time : Mon Oct  2 09:48:43 2023
2s job current time : Mon Oct  2 09:48:45 2023
2s job current time : Mon Oct  2 09:48:47 2023

同时Timeloop还有个stop方法,可以用来暂停所有任务。

threading.Timer

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

主要有如下方法:

方法说明
Timer(interval, function, args=None, kwargs=None)创建定时器
cancel()取消定时器
start()使用线程方式执行
join(self, timeout=None)主线程等待线程执行结束

示例:

import datetimefrom threading import Timerdef time_printer():now = datetime.datetime.now()ts = now.strftime('%Y-%m-%d %H:%M:%S')print('do func time :', ts)# 注意 Timer 只能执行一次,这里需要循环调用,否则只能执行一次loop_monitor()def loop_monitor():t = Timer(5, time_printer)t.start()if __name__ == "__main__":loop_monitor()

sched模块

sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。

class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。

import datetime
import time
import scheddef time_printer():now = datetime.datetime.now()ts = now.strftime('%Y-%m-%d %H:%M:%S')print('do func time :', ts)loop_monitor()def loop_monitor():s = sched.scheduler(time.time, time.sleep)  # 生成调度器s.enter(5, 1, time_printer, ())s.run()if __name__ == "__main__":loop_monitor()

scheduler对象主要方法:

enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
比threading.Timer更好,不需要循环调用。

schedule模块

schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

示例:

import schedule
import timedef job():print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)while True:schedule.run_pending()time.sleep(1)

也可以通过 @repeat() 装饰静态方法:

import time
from schedule import every, repeat, run_pending@repeat(every().second)
def job():print('working...')while True:run_pending()time.sleep(1)

传递参数:

import scheduledef greet(name):print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')while True:schedule.run_pending()

装饰器同样能传递参数:

from schedule import every, repeat, run_pending@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):print('Hello', planet)while True:run_pending()

取消任务:

import schedulei = 0
def some_task():global ii += 1print(i)if i == 10:schedule.cancel_job(job)print('cancel job')exit(0)
job = schedule.every().second.do(some_task)while True:schedule.run_pending()

运行一次任务:

import time
import scheduledef job_that_executes_once():print('Hello')return schedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)
while True:schedule.run_pending()time.sleep(1)

根据标签检索任务:

# 检索所有任务:schedule.get_jobs()
import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')
print(friends)

根据标签取消任务:

# 取消所有任务:schedule.clear()
import scheduledef greet(name):print('Hello {}'.format(name))if name == 'Cancel':schedule.clear('second-tasks')print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True:schedule.run_pending()

运行任务到某时间:

import schedule
from datetime import datetime, timedelta, timedef job():print('working...')schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止while True:schedule.run_pending()

马上运行所有任务(主要用于测试):

import scheduledef job():print('working...')def job1():print('Hello...')schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒

并行运行:使用 Python 内置队列实现:

import threading
import time
import scheduledef job1():print("I'm running on thread %s" % threading.current_thread())def job2():print("I'm running on thread %s" % threading.current_thread())def job3():print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):job_thread = threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)while True:schedule.run_pending()time.sleep(1)

APScheduler框架

APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。

具体使用参考文章:APScheduler框架使用

Celery框架

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常使用它来实现异步任务(async task)和定时任务(crontab)。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。

具体使用参考:

Celery使用:优秀的python异步任务框架

Django(21):使用Celery任务框架

数据流工具Apache Airflow

概述

Apache Airflow 是Airbnb开源的一款数据流程工具,目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程,同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式,支持Master-Slave模式,支持Mesos等资源调度,有非常好的扩展性。被大量公司采用。

Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2和T3才能开始执行,T2和T3都执行完成,T4才能开始执行。

在这里插入图片描述
Airflow提供了各种Operator实现,可以完成各种任务实现:

  • BashOperator – 执行 bash 命令或脚本。
  • SSHOperator – 执行远程 bash 命令或脚本(原理同 paramiko 模块)。
  • PythonOperator – 执行 Python 函数。
  • EmailOperator – 发送 Email。
  • HTTPOperator – 发送一个 HTTP 请求。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行 SQL 任务。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。

一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:

在这里插入图片描述

这种需求可以使用BranchPythonOperator来实现。

Airflow 核心概念

  1. DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
  2. Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
  3. Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
  4. Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
  5. Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了。通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)。

Airflow 的架构

在一个可扩展的生产环境中,Airflow 含有以下组件:

  1. 元数据库:这个数据库存储有关任务状态的信息。
  2. 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
  3. 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
  4. Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。

在这里插入图片描述
Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:

  1. SequentialExecutor: 单进程顺序执行,一般只用来测试
  2. LocalExecutor: 本地多进程执行
  3. CeleryExecutor: 使用Celery进行分布式任务调度
  4. DaskExecutor:使用Dask进行分布式任务调度
  5. KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务

生产环境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架构如图:

在这里插入图片描述
使用KubernetesExecutor的架构如图:

在这里插入图片描述

参考:

https://mp.weixin.qq.com/s/dzA9xGoho50WK_-80hzelg

https://zhuanlan.zhihu.com/p/448847300

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/147898.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++进阶(七)】仿函数深度剖析模板进阶讲解

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; 模板进阶 1. 前言2. 仿函数的概念3. 仿函数的实…

【GO 编程语言】面向对象

指针与结构体 文章目录 指针与结构体一、OOP 思想二、继承三、方法 一、OOP 思想 Go语言不是面向对象的语言&#xff0c;这里只是通过一些方法来模拟面向对象&#xff0c;从而更好的来理解面向对象思想 面向过程的思维模式 1.面向过程的思维模式是简单的线性思维&#xff0c;…

C#学生选课及成绩查询系统

一、项目背景 学生选课及成绩查询系统是一个学校不可缺少的部分&#xff0c;传统的人工管理档案的方式存在着很多的缺点&#xff0c;如&#xff1a;效率低、保密性差等&#xff0c;所以开发一套综合教务系统管理软件很有必要&#xff0c;它应该具有传统的手工管理所无法比拟的…

visual studio禁用qt-vsaddin插件更新

visual studio里qt-vsaddin插件默认是自动更新的&#xff0c;由于qt-vsaddin插件新版本的操作方式与老版本相差较大&#xff0c;且新版本不稳定&#xff0c;容易出Bug&#xff0c;所以需要禁用其自动更新&#xff0c;步骤如下&#xff1a;     点击VS2019菜单栏上的【扩展】–…

ChatGPT启蒙之旅:弟弟妹妹的关键概念入门

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

Docker Alist 在线网盘部署

文章目录 拉取镜像创建并运行查看容器自动生成的密码在浏览器中进行访问 挂载本地磁盘 拉取镜像 docker pull xhofe/alist-aria2创建并运行 # -v /data/alist:/opt/alist/data 挂载本地目录 docker run -d --restartalways -v /data/alist:/opt/alist/data -p 5244:5244 -e P…

Linux系统常用指令篇---(一)

Linux系统常用指令篇—(一) 1.cd指令 Linux系统中&#xff0c;磁盘上的文件和目录被组成一棵目录树&#xff0c;每个节点都是目录或文件。 语法:cd 目录名 功能&#xff1a;改变工作目录。将当前工作目录改变到指定的目录下。 (简单理解为进入指定目录下) 举例: cd .. : 返…

vscode 打开后 默认terminal power shell 报错 名为“.C”的驱动器不存在。

这是 默认terminal power shell 打开报的错 Test-Path : 找不到驱动器。名为“.C”的驱动器不存在。 所在位置 C:\Users\HUAWEI\Documents\WindowsPowerShell\profile.ps1:4 字符: 5 If (Test-Path "C:\Users\HUAWEI\AppData\Local\Temp\_MEI319962\Scripts\ ... …

python二次开发CATIA:文字轮廓草图

CATIA V5 版本的草图中&#xff0c;并没有文字轮廓的创建命令。通常的做法是&#xff0c;再Drawing 文件中创建所需文本-->将 Drawing 文件另存为 dwg / dxf 格式-->打开另存的文件&#xff0c;文字已转为轮廓线条-->复制线条并粘贴到草图中。 本例中&#xff0c;基于…

Hono——一个小型,简单且超快的Edges Web框架

Hono - [炎]在日语中的意思是火焰&#x1f525; - 是一个小型&#xff0c;简单且超快的Edges Web框架。它适用于任何JavaScript运行时&#xff1a;Cloudflare Workers&#xff0c;Fastly ComputeEdge&#xff0c;Deno&#xff0c;Bun&#xff0c;Vercel&#xff0c;Netlify&…

【单片机】14-I2C通信之EEPROM

1.EEPROM概念 1.EEPROM 1.1 一些概念 &#xff08;1&#xff09;一些概念&#xff1a;ROM【只读存储器---硬盘】&#xff0c;RAM【随机访问存储器--内存】&#xff0c;PROM【可编程的ROM】&#xff0c;EPROM【可擦除ROM】&#xff0c;EEPROM【电可擦除ROM】 1.2 为什么需要EE…

接口测试入门实践

简单接口搭建(表单/REST) 五步教会你写接口 首先要安装flask包: pip install flask 从flask中导入Flask类和request对象: from flask import Flask, request从当前模块实例化出一个Flask实例:appFlask(__name__)编写一个函数来处理请求 从请求对象中获取数据:arequest.values.…

LeetCode【121. 买卖股票的最佳时机】

你才不是什么小人物&#xff0c;你在我这里&#xff0c;是所有的天气和心情。 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日子 卖出该股票。设计一…

【Leetcode】滑动窗口合集

这里写目录标题 209.长度最小的子数组题目思路代码 3. 无重复字符的最长子串&#xff08;medium&#xff09;题目思路 11. 最大连续 1 的个数 III题目思路 1658. 将 x 减到 0 的最⼩操作数题目思路代码 904. 水果成篮题目思路代码 438.找到字符串中所有字母的异位词题目思路代码…

二十八、高级IO与多路转接之select

文章目录 一、五种IO模型&#xff08;一&#xff09;阻塞IO:&#xff08;二&#xff09;非阻塞IO:&#xff08;三&#xff09;信号驱动IO:&#xff08;四&#xff09;IO多路转接:&#xff08;五&#xff09;异步IO: 二、高级IO重要概念&#xff08;一&#xff09;同步通信 vs 异…

【STL】用哈希表(桶)封装出unordered_set和unordered_map

⭐博客主页&#xff1a;️CS semi主页 ⭐欢迎关注&#xff1a;点赞收藏留言 ⭐系列专栏&#xff1a;C进阶 ⭐代码仓库&#xff1a;C进阶 家人们更新不易&#xff0c;你们的点赞和关注对我而言十分重要&#xff0c;友友们麻烦多多点赞&#xff0b;关注&#xff0c;你们的支持是我…

pytorch_神经网络构建1

文章目录 pytorch简介神经网络基础分类问题分析:逻辑回归模型逻辑回归实现多层神经网络多层网络搭建保存模型 pytorch简介 为什么神经网络要自定义数据类型torch.tensor? tensor可以放在gpu上训练,支持自动求导,方便快速训练,同时支持numpy的运算,是加强版,numpy不支持这些 为…

python二次开发CATIA:导出Excel格式BOM

BOM是英文Bill of Material的缩写&#xff0c;中文翻译为“物料清单”&#xff0c;也称为产品结构表或产品结构树。它是计算机可以识别的产品结构数据文件&#xff0c;也是ERP的主导文件。BOM使系统识别产品结构&#xff0c;也是联系与沟通企业各项业务的纽带。ERP系统中的BOM的…

多线程经典代码案例及手动实现

目录 一.线程和多线程 二. 多线程的经典的代码案例 1.单例模式 2.阻塞队列 (1)概念介绍 (2)生产者消费者模型 (3)手动实现阻塞队列 (4)代码解释及问题分析 3.定时器 (1)概念介绍 (2)思路分析 (3)手动实现定时器 (4)代码解释及问题分析 问题一:优先级 问题二 :忙等…

基于SSM的电动车上牌管理系统(有报告)。Javaee项目。

演示视频&#xff1a; 基于SSM的电动车上牌管理系统&#xff08;有报告&#xff09;。Javaee项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringM…