场景:
需要对mysql进行定时备份,受限于硬盘空间的大小,需要对备份的数据需要定时清理
python代码实现:
# -*- coding:UTF-8 -*-
"""@ProjectName : HotelGo2DelonixPmx@FileName : fix_missing_rates@Description :使用mysqldump命令行工具进行备份:在命令行中执行以下命令:mysqldump -u 用户名 -p 密码 数据库名 > 备份文件.sql。这将生成一个包含所有SQL语句的备份文件,其中包括创建表、插入数据等操作。要还原备份,可以使用以下命令:mysql -u 用户名 -p 密码 新数据库名 < 备份文件.sql。@Time : 2023/7/4 15:48@Author : Qredsun"""import datetime
import functools
import os
import subprocess
import sys
import timeimport schedule
from loguru import logger# 数据库登录账户、密码
USERNAME = ''
PASSWORD = ''
# 指定备份存放路径
BACKUP_DIR = '/path/to/backup'
# 备份周期
RETENTION_PERIOD = 7logger.remove() # 删去import logger之后自动产生的handler,不删除的话会出现重复输出的现象
handler_id = logger.add(sys.stderr, level="INFO") # 添加一个可以修改控制的handlerinfo_log_path = os.path.join('log', datetime.datetime.now().strftime("database_backup_%y_%m_%d_debug.log"))
logger.add(info_log_path,rotation="23:00",retention="7 days",level="DEBUG")
info_log_path = os.path.join('log', datetime.datetime.now().strftime("database_backup_%y_%m_%d_error.log"))
logger.add(info_log_path,rotation="23:00",retention="7 days",level="ERROR")# 备份数据库
def backup_datebase():current_time = datetime.datetime.now()backup_file = current_time.strftime('backup_%y_%m_%d_%H_%M_%S.sql')backup_path = os.path.join(BACKUP_DIR, backup_file)# 使用 subprocess 模块执行数据备份命令 备份数据库所有数据back_command = ['mysqldump','-u',USERNAME,'-p',PASSWORD,'--all-database']with open(backup_path, 'w') as fp:subprocess.run(back_command, stdout=fp)logger.info(f'数据库备份已经完成,备份文件保存路径:{backup_path}')# 清理过期的备份文件
def cleanup_backup():current_time = datetime.datetime.now()expiration_time = current_time - datetime.timedelta(days=RETENTION_PERIOD)for file_name in os.listdir(BACKUP_DIR):file_path = os.path.join(BACKUP_DIR, file_name)if not os.path.isfile(file_path):continuefile_time = datetime.datetime.fromtimestamp(os.path.getatime(file_path))if file_time < expiration_time:os.remove(file_path)logger.info(f'删除过期文件:{file_path}')# 数据备份恢复
def restore_database(backup_file):backup_path = os.path.join(BACKUP_DIR, backup_file)# 使用 subprocess 模块执行数据恢复命令restore_command = ['mysqldump','-u',USERNAME,'-p',PASSWORD,'--all-database']with open(backup_path, 'r') as fp:subprocess.run(restore_command, stdin=fp)logger.info(f'使用 {backup_path} 完成数据库数据恢复')def catch_exceptions(cancel_on_failure = False):def catch_exceptions_decorator(job_func):@functools.wraps(job_func)def wrapper(*args, **kwargs):try:return job_func(*args, **kwargs)except:import tracebacktraceback.format_exc()if cancel_on_failure:return schedule.CancelJobreturn wrapperreturn catch_exceptions_decorator# 异常捕获方法的使用
@catch_exceptions(cancel_on_failure=True)
def task_job():# 定时任务backup_datebase()cleanup_backup()schedule.every().day.at("23:30").do(task_job)while True:schedule.run_pending() # 运行所有可以运行的任务time.sleep(60*30)