【最后203篇系列】007 使用APS搭建本地定时任务

说明

最大的好处是方便。

其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。

总结一下:源头是定时轮询,中间过程是事件传递。

本次使用APS搭建本地定时任务的目的是为了简化实验性质的定时任务,通过在git项目下进行编辑任务脚本和执行任务清单,而运行容器本身会周期性的自动拉取代码,然后按照任务清单执行。

执行过程采用多线程方式,任务的负载通常都不高。整体设计上,复杂和繁重的任务会包在微服务中,定时任务主要是向这些微服务发起触发动作。通常,微服务收到触发元信息后进行自动的任务/数据拉取处理,处理完毕后通过webhook将结果持久化,或进一步发起其他的触发动作。

另外,具有共性的任务将会被提取出来,之后会交给celery以分布&协程方式执行,这些任务包括:

  • 1 数据库IO。例如从队列里取数,存到数据库中。
  • 2 网络数据获取IO。爬取网页、或者通过接口,获取数据。
  • 3 接口化标准操作。按url, json input这样的标准web请求,这种灵活性很强。表面上是一个IO动作,但背后可能触发密集计算,但是又不需要celery集群承担。(可能是ray集群、dask集群、基于显卡计算的集群)

内容

1 读取任务列表

主要为了简单的读入任务(脚本),同时可以方便的进行注释

# 用于将代表任务列表的数据读入
# 去掉换行和空格
# 如果以# 号开头表示注释
def read_all_lines_clean(fpath):with open(fpath, 'r') as f:lines = f.readlines()lines1 = [x.replace('\n','').strip() for x in lines]lines2 = [x for x in lines1 if len(x) and not x.startswith('#')]return lines2

任务文件如下task_list.txt

task_01_probably_git_pull.py
task_02_del_event_null_recs.py
# task_03_sync_xs_backup.py
#task_04_rotate_data.py
# task_05_sync_milvus.py
#task_06_rotate_mysql_time.py

读入后

In [4]: a = read_all_lines_clean('task_list.txt')In [5]: a
Out[5]: ['task_01_probably_git_pull.py', 'task_02_del_event_null_recs.py']

这些就是之后要定时调度的任务

2 并行执行

为了使得每一次定时任务都可以执行,且保证效率,需要用一些简单的调度(容错问题均在脚本内解决)。调度器可以保证每30秒起来一次。

线程的并行执行:

def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)

每一次执行os_system_python

import subprocessdef os_system_python(some_path=None, timeout=30):try:result = subprocess.run(['python3', some_path], timeout=timeout)return resultexcept subprocess.TimeoutExpired:print(f"Task {some_path} timed out after {timeout} seconds.")return None'''
代码说明
subprocess.run:这是 subprocess 模块的高级 API,用于运行命令并等待其完成。它支持 timeout 参数,如果命令在指定时间内未完成,会抛出 TimeoutExpired 异常。timeout 参数:你设置了默认超时时间为 30 秒,这是一个合理的默认值。如果任务在 30 秒内未完成,subprocess.run 会抛出 TimeoutExpired 异常。异常处理:捕获 TimeoutExpired 异常后,打印超时信息并返回 None。这样可以避免程序因超时而崩溃,同时提供清晰的日志信息。
'''

3 自动更新

更新git项目,作为一个任务脚本被周期执行。由于代码更新并不是高频事件,所以一般概率上保证5分钟会更新一次代码。

(base) root@76a14afa199b:/workspace/local_aps_v2/base# python3 task_01_probably_git_pull.py
2000-01-01 08:00:00
2000-01-01 08:00:00
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
task_01_probably_git_pull running
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
Git pull executed successfully for branch 'master':
Already up to date.2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
(base) root@76a14afa199b:/workspace/local_aps_v2/base#

4 定时调度

调度器在每分钟的0/30秒执行,我把30秒定为一拍(pace),一分钟定位一时隙(slot)。绝大部分任务都应该在30秒内完成。

# 执行本地脚本
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerfrom base_config import base_config
from Basefuncs import * 
def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &if __name__ == '__main__':# 创建调度器sche1 = BlockingScheduler()# 添加任务,使用 cron 表达式每分钟的第 0 秒和第 30 秒执行sche1.add_job(exe_tasks_threads,'cron',second='0,30',  # 每分钟的第 0 秒和第 30 秒kwargs={},coalesce=True,max_instances=1)print('[S] Starting scheduler with cron (0s and 30s of every minute)...')try:sche1.start()  # 启动调度器except (KeyboardInterrupt, SystemExit):print('[S] Scheduler stopped.')

5 Docker运行

为了保证执行的稳定性,使用docker执行

docker run -d --name=local_aps_v2 \--restart=always \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \-w /workspace/local_aps_v2/base \YOURIMAGE  \sh -c "git pull && python3 aps.py"

只有环境改变时才需要修改镜像重发布,大部分时候只要调试和修改代码,然后推送就可以了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/9718.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Springboot的智能学习平台系统【附源码】

基于Springboot的智能学习平台系统 效果如下: 系统登陆页面 系统主页面 课程详情页面 论坛页面 考试页面 试卷管理页面 考试记录页面 错题本页面 研究背景 随着互联网技术的普及,人们探索知识的方式逐渐转向数字化平台。传统的教学方法通常局限于固定…

shiro学习五:使用springboot整合shiro。在前面学习四的基础上,增加shiro的缓存机制,源码讲解:认证缓存、授权缓存。

文章目录 前言1. 直接上代码最后在讲解1.1 新增的pom依赖1.2 RedisCache.java1.3 RedisCacheManager.java1.4 jwt的三个类1.5 ShiroConfig.java新增Bean 2. 源码讲解。2.1 shiro 缓存的代码流程。2.2 缓存流程2.2.1 认证和授权简述2.2.2 AuthenticatingRealm.getAuthentication…

网关登录校验

网关登录校验 单体架构时我们只需要完成一次用户登录、身份校验,就可以在所有业务中获取到用户信息。而微服务拆分后,每个微服务都独立部署,不再共享数据。也就意味着每个微服务都需要做登录校验,这显然不可取。 鉴权思路分析 …

【单细胞第二节:单细胞示例数据分析-GSE218208】

GSE218208 1.创建Seurat对象 #untar(“GSE218208_RAW.tar”) rm(list ls()) a data.table::fread("GSM6736629_10x-PBMC-1_ds0.1974_CountMatrix.tsv.gz",data.table F) a[1:4,1:4] library(tidyverse) a$alias:gene str_split(a$alias:gene,":",si…

【已解决】黑马点评项目Redis版本替换过程的数据迁移

黑马点评项目Redis版本替换过程的数据迁移 【哭哭哭】附近商户中需要用到的GEO功能只在Redis 6.2以上版本生效 如果用的是老版本,美食/KTV的主页能正常返回,但无法显示内容 上次好不容易升到了5.0以上版本,现在又用不了了 Redis 6.2的windo…

本地部署deepseek模型步骤

文章目录 0.deepseek简介1.安装ollama软件2.配置合适的deepseek模型3.安装chatbox可视化 0.deepseek简介 DeepSeek 是一家专注于人工智能技术研发的公司,致力于打造高性能、低成本的 AI 模型,其目标是让 AI 技术更加普惠,让更多人能够用上强…

[论文总结] 深度学习在农业领域应用论文笔记14

当下,深度学习在农业领域的研究热度持续攀升,相关论文发表量呈现出迅猛增长的态势。但繁荣背后,质量却不尽人意。相当一部分论文内容空洞无物,缺乏能够落地转化的实际价值,“凑数” 的痕迹十分明显。在农业信息化领域的…

快速分析LabVIEW主要特征进行判断

在LabVIEW中,快速分析程序特征进行判断是提升开发效率和减少调试时间的重要技巧。本文将介绍如何高效地识别和分析程序的关键特征,从而帮助开发者在编写和优化程序时做出及时的判断,避免不必要的错误。 ​ 数据流和并行性分析 LabVIEW的图形…

展示统计信息收集情况

看看最近是否收集失败 SET LINES 200 PAGES 0 SET LONG 100000 longc 100000 COLUMN REPORT FORMAT A200VARIABLE stat_report CLOB; BEGIN:stat_report : DBMS_STATS.REPORT_STATS_OPERATIONS (since > SYSDATE-3 , until > SYSDATE , detail_lev…

STM32 TIM输入捕获 测量频率

输入捕获简介: IC(Input Capture)输入捕获 输入捕获模式下,当通道输入引脚出现指定电平跳变时,当前CNT的值将被锁存到CCR中,可用于测量PWM波形的频率、占空比、脉冲间隔、电平持续时间等参数 每个高级定时器…

如何将 Windows 上的文件传递到 Mac 上

文章目录 效果需求Windows 上设置共享磁盘【可选】新建一个带有密码的账户查看 Windows 的 IP 地址Mac 上链接 Windows 共享的磁盘 效果 需求 Windows 上有一个有密码的账户 Windows 上设置共享磁盘 windows 这边需要用 Administrator 权限的账号,把要共享的磁盘设…

NLP模型大对比:Transformer > RNN > n-gram

结论 Transformer 大于 RNN 大于 传统的n-gram n-gram VS Transformer 我们可以用一个 图书馆查询 的类比来解释它们的差异: 一、核心差异对比 维度n-gram 模型Transformer工作方式固定窗口的"近视观察员"全局关联的"侦探"依赖距离只能看前…

ODP(OBProxy)路由初探

OBProxy路由策略 Primary Zone 路由 官方声明默认情况,会将租户请求发送到租户的 primary zone 所在的机器上,通过 Primary Zone 路由可以尽量发往主副本,方便快速寻找 Leader 副本。另外,设置primary zone 也会在一定成都上减少…

Python NumPy(7):连接数组、分割数组、数组元素的添加与删除

1 连接数组 函数描述concatenate连接沿现有轴的数组序列stack沿着新的轴加入一系列数组。hstack水平堆叠序列中的数组(列方向)vstack竖直堆叠序列中的数组(行方向) 1.1 numpy.concatenate numpy.concatenate 函数用于沿指定轴连…

在线课堂小程序设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…

生成模型:扩散模型(DDPM, DDIM, 条件生成)

扩散模型的理论较为复杂,论文公式与开源代码都难以理解。现有的教程大多侧重推导公式。为此,本文通过精简代码(约300行),从代码运行角度讲解扩散模型。 本文包括扩散模型的3项技术复现: 1.DDPM (Denoising…

DeepSeek大模型技术解析:从架构到应用的全面探索

一、引言 在人工智能领域,大模型的发展日新月异,其中DeepSeek大模型凭借其卓越的性能和广泛的应用场景,迅速成为业界的焦点。本文旨在深入剖析DeepSeek大模型的技术细节,从架构到应用进行全面探索,以期为读者提供一个…

[权限提升] 常见提权的环境介绍

关注这个框架的其他相关笔记:[内网安全] 内网渗透 - 学习手册-CSDN博客 通过前期的渗透测试,我们大概率会拿到目标的一个 Shell,比如 WebShell 或者 MSF Shell 等等,不同的 Shell 对应提权的姿势也不同,比如有的 Shell…

SQL注入漏洞之高阶手法 宽字节注入以及编码解释 以及堆叠注入原理说明

目录 宽字节注入 编码区分 原理 函数 转译符号解释 注意 绕过方式详解 堆叠【Stack】注入攻击 注入语句 宽字节注入 在说宽字节注入之前 我们需要知道编码相关的知识点,这个有助于搞定什么是宽字节注入 分清楚是ascii码是什么宽字节注入代码里面加入了adds…

Spring Boot - 数据库集成05 - 集成MongoDB

Spring Boot集成MongoDB 文章目录 Spring Boot集成MongoDB一:使用前的准备1:依赖导入 & 配置2:实体类创建 二:核心 - MongoRepository三:核心 - MongoTemplate1:集合操作2:文档操作(重点)3&…