MySQL实现全量同步和增量同步到SQL Server或其他关系型库

在将MySQL中的两张表同步到SQL Server的过程中,全量同步和增量同步各有其优缺点。全量同步简单直接但可能耗时较长且资源消耗大,而增量同步则更加高效但需要额外的逻辑来处理数据的变更。以下是对这两种同步方式的详细解释及代码示例的完善。

完整代码示例

以下是一个完整的示例,包括全量同步和增量同步,以及使用schedule库来设置定时任务。

import pymysql
import pyodbc
from datetime import datetime, timedelta
import schedule
import time# MySQL 数据库连接函数
def get_mysql_connection():return pymysql.connect(host='localhost',         # 替换为你的 MySQL 服务器地址user='root',              # 替换为你的 MySQL 用户名password='password123',   # 替换为你的 MySQL 密码database='test_db'        # 替换为你的 MySQL 数据库名)# SQL Server 数据库连接函数
def get_sqlserver_connection():return pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};''SERVER=your_sqlserver_host;'  # 替换为你的 SQL Server 服务器地址或 IP'DATABASE=test_sqlserver_db;'  # 替换为你的 SQL Server 数据库名'UID=sqlserver_user;'          # 替换为你的 SQL Server 用户名'PWD=sqlserver_password123'    # 替换为你的 SQL Server 密码)# 全量同步函数
def full_sync_tables():mysql_conn = get_mysql_connection()sqlserver_conn = get_sqlserver_connection()try:mysql_cursor = mysql_conn.cursor()sqlserver_cursor = sqlserver_conn.cursor()# 清空SQL Server中的表数据sqlserver_cursor.execute("TRUNCATE TABLE table1")sqlserver_cursor.execute("TRUNCATE TABLE table2")# 从MySQL表中查询所有数据并插入到SQL Server表中for table in ['table1', 'table2']:mysql_cursor.execute(f"SELECT * FROM {table}")mysql_data = mysql_cursor.fetchall()columns = len(mysql_cursor.description)placeholders = ', '.join(['?'] * columns)insert_query = f"INSERT INTO {table} VALUES ({placeholders})"for row in mysql_data:sqlserver_cursor.execute(insert_query, row)sqlserver_conn.commit()finally:mysql_cursor.close()mysql_conn.close()sqlserver_cursor.close()sqlserver_conn.close()# 增量同步函数
def incremental_sync_tables(last_sync_time):mysql_conn = get_mysql_connection()sqlserver_conn = get_sqlserver_connection()try:mysql_cursor = mysql_conn.cursor()sqlserver_cursor = sqlserver_conn.cursor()# 获取MySQL中自上次同步以来的增量数据for table in ['table1', 'table2']:mysql_cursor.execute(f"SELECT * FROM {table} WHERE update_time > %s", (last_sync_time,))mysql_data = mysql_cursor.fetchall()# 插入或更新SQL Server中的数据update_query = f"UPDATE {table} SET {} WHERE id = ?".format(', '.join([f"{col} = ?" for col in mysql_cursor.description[1:]]))insert_query = f"INSERT INTO {table} ({}) VALUES ({})".format(', '.join([col[0] for col in mysql_cursor.description]),', '.join(['?'] * len(mysql_cursor.description)))for row in mysql_data:sqlserver_cursor.execute(f"SELECT id FROM {table} WHERE id = ?", (row[0],))result = sqlserver_cursor.fetchone()if result:sqlserver_cursor.execute(update_query, row[1:] + (row[0],))else:sqlserver_cursor.execute(insert_query, row)# 处理删除操作(假设MySQL有逻辑删除标记字段is_deleted)for table in ['table1', 'table2']:sqlserver_cursor.execute(f"SELECT id FROM {table}")sqlserver_ids = [row[0] for row in sqlserver_cursor.fetchall()]mysql_cursor.execute(f"SELECT id FROM {table} WHERE is_deleted = 1 AND update_time > %s", (last_sync_time,))deleted_ids = [row[0] for row in mysql_cursor.fetchall()]for id_ in set(sqlserver_ids) - set(deleted_ids):sqlserver_cursor.execute(f"DELETE FROM {table} WHERE id = ?", (id_,))sqlserver_conn.commit()finally:mysql_cursor.close()mysql_conn.close()sqlserver_cursor.close()sqlserver_conn.close()# 定时任务函数
def schedule_sync_tasks():# 每天凌晨1点进行全量同步schedule.every().day.at("01:00").do(full_sync_tables)# 每5分钟进行增量同步last_sync_time = datetime.now() - timedelta(minutes=5)  # 初始化为5分钟前,之后每次调用都会更新def run_incremental_sync():nonlocal last_sync_timeincremental_sync_tables(last_sync_time)last_sync_time = datetime.now()  # 更新上次同步时间schedule.every(5).minutes.do(run_incremental_sync)# 运行调度器while True:schedule.run_pending()time.sleep(1)# 运行定时任务
if __name__ == "__main__":schedule_sync_tasks()

注意事项

  1. 性能:对于大数据量的表,增量同步可能会更高效,但也要确保增量同步的逻辑不会成为瓶颈。
  2. 事务:在同步过程中,确保使用事务来保持数据的一致性。
  3. 错误处理:在实际应用中,需要更完善的错误处理和日志记录机制。
  4. 时间戳:确保MySQL中的update_time字段在每次数据更新时都被正确更新。
  5. 健壮性:增量同步依赖于时间戳或逻辑删除标记,因此需要确保这些字段在业务逻辑中被正确维护。
  6. 安全性:不要在代码中硬编码数据库密码,考虑使用环境变量或配置文件来管理敏感信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/37052.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在VMware上部署【Ubuntu】

镜像下载 国内各镜像站点均可下载Ubuntu镜像,下面例举清华网站 清华镜像站点:清华大学开源软件镜像站 | Tsinghua Open Source Mirror 具体下载步骤如下: 创建虚拟机 准备:在其他空间大的盘中创建存储虚拟机的目录&#xff0c…

2025年Postman的五大替代工具

虽然Postman是一个广泛使用的API测试工具,但许多用户在使用过程中会遇到各种限制和不便。因此,可能需要探索替代解决方案。本文介绍了10款强大的替代工具,它们能够有效替代Postman,成为你API测试工具箱的一部分。 什么是Postman&…

wow-rag—task5:流式部署

我们希望做一个流式输出的后端,然后让前端去捕获这个流式输出,并且在聊天界面中流式输出。 首先构造流式输出引擎。 # 构造流式输出引擎 query_engine index.as_query_engine(streamingTrue, similarity_top_k3,llmllm)然后生成response_stream&#x…

投资日记_道氏理论技术分析

主要用于我自己参考,我感觉我做事情的时候容易上头,忘掉很多事情。 技术分析有很多方法,但是我个人相信并实践的还是以道氏理论为根本的方法。方法千千万万只有适合自己价值观,习惯,情绪,性格的方法才是好的…

LangChain4j入门指南:Java开发者的AI应用新起点

什么是LangChain和LangChain4j? LangChain是⼀个⼤模型的开发框架,使⽤ LangChain 框架,程序员可以更好的利⽤⼤模型的能⼒,⼤⼤提⾼编 程效率。如果你是⼀个 Java 程序员,那么对 LangChain 最简单直观的理解就是&…

【实测闭坑】LazyGraphRAG利用本地ollama提供Embedding model服务和火山引擎的deepseek API构建本地知识库

LazyGraphRAG 2024年4月,为解决传统RAG在全局性的查询总结任务上表现不佳,微软多部门联合提出Project GraphRAG(大模型驱动的KG);2024年7月,微软正式开源GraphRAG项目,引起极大关注&#xff0c…

压力测试实战指南:JMeter 5.x深度解析与QPS/TPS性能优化

一、压力测试基础概念 1.1 什么是压力测试? 定义:模拟极端负载场景验证系统性能极限 目的:发现性能瓶颈、评估系统可靠性、验证容错能力 常见类型:负载测试、压力测试、稳定性测试、峰值测试 1.2 核心性能指标解析 1.2.1 QP…

嵌入式4-Modbus

1.Modbus Modbus 是一种广泛应用于工业自动化领域的通信协议,用于在不同设备(如传感器、PLC、变频器、仪表等)之间交换数据。它支持串行通信(如 RS232、RS485)和以太网通信(Modbus TCP)&#x…

机器学习-手搓KNN算法

一、简介 K最近邻(K-Nearest Neighbors, KNN)​ 是一种简单且直观的监督学习算法,适用于分类和回归任务。其核心思想是:​相似的数据点在特征空间中彼此接近。KNN通过计算新样本与训练数据中各个样本的距离,找到最近的…

Linux|fork命令及其使用的写时拷贝技术

fork复制进程 fork通过以下步骤来复制进程: 分配新的进程控制块:内核为新进程分配一个新的进程控制块(PCB),用于存储进程的相关信息,如进程 ID、状态、寄存器值、内存指针等。复制进程地址空间&#xff1…

Hoppscotch 开源API 开发工具

Hoppscotch 是一个开源的 API 开发工具,旨在为开发者提供一个轻量级、快速且功能丰富的 API 开发和调试平台。以下是对其主要特性和功能的详细介绍: 1. 轻量级与高效 Hoppscotch 采用简约的 UI 设计,注重易用性和高效性。它支持实时发送请求…

Datawhale大语言模型-Transformer以及模型详细配置

Datawhale大语言模型-Transformer以及模型详细配置 Transformer模型位置编码前馈层网络注意力机制多头自注意力编码器解码器 大语言模型的参数配置归一化激活函数位置编码旋转位置编码代码内容实现 注意力机制 参考资料 Transformer模型 当前主流的大语言模型都基于 Transform…

iPhone 16怎么编辑图片?图片编辑技巧、软件分享

在当今这个视觉信息爆炸的时代,一张经过精心编辑的图片往往能够瞬间抓住观众的眼球,而 iPhone 16凭借其卓越的硬件性能和丰富的软件生态,在图片编辑领域展现出了非凡的实力,成为众多摄影爱好者和创意工作者的得力助手。 一、编辑效…

代码随想录_动态规划

代码随想录 动态规划 509.斐波那契数 509. 斐波那契数 斐波那契数 (通常用 F(n) 表示)形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始,后面的每一项数字都是前面两项数字的和。也就是: F(0) 0,F(1) 1 F(n…

【虚幻引擎UE5】SpawnActor生成Character实例不执行AI Move To,未初始化AIController的原因和解决方法

虚幻引擎版本:5.5.4 问题描述 刚创建的Third Person项目里,定义一个BP_Enemy蓝图,拖拽到场景中产生的实例会追随玩家,但SpawnActor产生的实例会固定不动。BP_Enemy蓝图具体设计如下: BP_Enemy的Event Graph ​​ 又定义…

论文笔记(七十三)Gemini Robotics: Bringing AI into the Physical World

Gemini Robotics: Bringing AI into the Physical World 文章概括1. 引言2. Gemini 2.0的具身推理2.1. 具身推理问答(ERQA)基准测试2.2. Gemini 2.0的具身推理能力2.3. Gemini 2.0支持零样本和少样本机器人控制 3. 使用 Gemini Robotics 执行机器人动作3…

汇能感知高品质的多光谱相机VSC02UA

VSC02UA概要 VSC02UA是一款高品质的200万像素的光谱相机,适用于工业检测、农业、医疗等领域。VSC02UA 包含 1600 行1200 列有源像素阵列、片上 10 位 ADC 和图像信号处理器。它带有 USB2.0 接口,配合专门的电脑上位机软件使用,可进行图像采集…

VSCode创建VUE项目(三)使用axios调用后台服务

1. 安装axios,执行命令 npm install axios 2. 在 main.ts 中引入并全局挂载 Axios 实例 修改后的 代码(也可以单独建一个页面处理Axios相关信息等,然后全局进行挂载) import { createApp } from vue import App from ./App.vue import rou…

信号处理抽取多项滤波的数学推导与仿真

昨天的《信号处理之插值、抽取与多项滤波》,已经介绍了插值抽取的多项滤率,今天详细介绍多项滤波的数学推导,并附上实战仿真代码。 一、数学变换推导 1. 多相分解的核心思想 将FIR滤波器的系数 h ( n ) h(n) h(n)按相位分组,每…

基于Rockylinux9.5(LTS-SP4)安装MySQL Community Server 9.2.0

目录 一、安装环境及准备 1、linux操作系统环境 2、MYSQL安装包准备 二、执行安装 1、解压软件包 2、按顺序执行软件包的安装 3、启动MYSQL服务 4.配置MYSQL 一、安装环境及准备 1、linux操作系统环境 Rocky linux9.5安装在VMware虚拟机上完成Rocky linux9.5安装&am…