C#实现SqlServer数据库同步

实现效果:

设计思路:
1. 开启数据库及表的cdc,定时查询cdc表数据,封装sql语句(通过执行类型,主键;修改类型的cdc数据只取最后更新的记录),添加到离线数据表;
2. 线程定时查询离线数据表,更新远程库数据;
3. 远程库数据被更改又会产生cdc数据,对此数据进行拦截;

配置文件说明:

{
"AsyncInterval": 30000,
"Drivers": [
{
"RefreshTime": 5000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "dbsync2|student,table1,table2,table3", "dbsync3|*" ],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
}
]
}

{
"AsyncInterval": 25000,
"Drivers": [
{
"RefreshTime": 10000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "testsync1|*"],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
},
{
"RefreshTime": 10000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "testsync1|*" ],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
}
]
}

1. 设置同步间隔时间
2. 根据不同的配置文件,加载不同的模式,多驱动(Drivers 1主1备-单向同步,1主1主-双向同步,2主1备-多库汇总),多机同步(DstConnect),多库同多表同步(SrcMap,dbsync2|*表示监听该数据库下的所有表),设置刷新时间(RefreshTime),是否启用(Enable),是否重置cdc数据(SrcUpdateCDC)

数据表说明:

async_data 离线数据表
id 主键自增 INTEGER
connect_str 连接字符串 NVARCHAR(255)
excute_sql 需要同步的sql语句 NVARCHAR(255)
cdc_time cdc时间 DATETIME
event_time event时间 DATETIME
db_name 数据库名 NVARCHAR(255)
table_name 表名 NVARCHAR(255)
table_pk 表主键 NVARCHAR(255)
excute_type 执行类型(I/U/D) NVARCHAR(255)

sqlserver cdc表(日志表)中如果一条id多次更新,取最新一条数据
sqlite asy_data表(离线数据表),入库时,查dbname + table + pk,无记录则添加,有记录比较cdc记录时间,如果时间更新则更新sql语句

特殊数据处理:
uniqueidentifier类型的数据转为NULL,数据中含有'的替换''

核心代码:

复制代码

using SqlServerAsync.Util.config;
using SqlServerAsync.Util.sqlite;
using SqlServerAsync.Util.sqlite.model;
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;namespace SqlServerAsync.Util
{public class SqlServerCDC{public void Listen(Driver driver){var update_cdc = driver.SrcUpdateCDC == 1 ? true : false;var enable = driver.Enable == 1 ? true : false;foreach (var map in driver.SrcMap){StartCDC(driver.SrcConnect, driver.DstConnect, driver.RefreshTime, enable, update_cdc, map);}}void StartCDC(string srcconnect, List<string> dstconnect, int refreshTime, bool enable, bool update_cdc, string map){try{var freeSql = new FreeSql.FreeSqlBuilder().UseConnectionString(FreeSql.DataType.SqlServer, srcconnect).UseNoneCommandParameter(true)// 不使用参数化.UseAutoSyncStructure(false)// 不同步表结构.Build();var arrayMap = map.Split('|');var db = arrayMap[0];var tbs = arrayMap[1];string[] arrayTB = null;var dstStr = string.Join("#", dstconnect);if (!enable){Program.AddLog($"禁用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}");return;}string sql = string.Empty;Dictionary<string, Table> dicTable = new Dictionary<string, Table>();Program.AddLog($"启用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}");if ("*" == tbs){// 查询db下所有表名sql = $"use {db};select TABLE_NAME from {db}.information_schema.tables where TABLE_SCHEMA='dbo' and TABLE_NAME not in('systranschemas','sysdiagrams')";DataTable dtAll = freeSql.Ado.ExecuteDataTable(sql); var rowCount = dtAll.Rows.Count; if (rowCount > 0) arrayTB = new string[rowCount]; for (int i = 0; i < rowCount; i++){arrayTB[i] = dtAll.Rows[i]["TABLE_NAME"].ToString();}}else{arrayTB = tbs.Split(',');}if (null == arrayTB || 0 == arrayTB.Length){Program.AddLog($"数据库{db},查无数据表 ×");return;}// 开启SQL Server数据库CDCsql = $"use {db};if exists(select 1 from {db}.sys.databases where name='{db}' and is_cdc_enabled=0)\n" +"begin\n" +$"exec {db}.sys.sp_cdc_enable_db\n" +"end";freeSql.Ado.ExecuteNonQuery(sql);// 查询库cdc是否开启成功sql = $"use {db};select is_cdc_enabled from {db}.sys.databases where name='{db}'";DataTable dtCDC_DB = freeSql.Ado.ExecuteDataTable(sql);if (dtCDC_DB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_DB.Rows[0]["is_cdc_enabled"])){Program.AddLog($"数据库CDC开启失败({db}) ×");return;}Program.AddLog($"数据库CDC开启成功({db}) √");foreach (var table in arrayTB){if (string.IsNullOrEmpty(table)) continue;if (update_cdc){// 关闭单张表的CDC功能sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=1)\n" +"begin\n" +$"exec {db}.sys.sp_cdc_disable_table @source_schema='dbo',@source_name='{table}',@capture_instance='dbo_{table}'" +"end";freeSql.Ado.ExecuteNonQuery(sql);}// 开启单张表的CDC功能sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=0)\n" +"begin\n" +$"exec {db}.sys.sp_cdc_enable_table\n" +"@source_schema='dbo',\n" +$"@source_name='{table}',\n" +"@capture_instance=NULL,\n" +"@supports_net_changes=1,\n" +"@role_name=NULL\n" +"end";freeSql.Ado.ExecuteNonQuery(sql);// 查询表cdc是否开启成功sql = $"use {db};select is_tracked_by_cdc from {db}.sys.tables WHERE name='{table}'";DataTable dtCDC_TB = freeSql.Ado.ExecuteDataTable(sql);if (dtCDC_TB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_TB.Rows[0]["is_tracked_by_cdc"])){Program.AddLog($"数据表CDC开启失败({table}) ×");continue;}Program.AddLog($"数据表CDC开启成功({table}) √");Table tb = new Table() { Name = table };// 获取字段名,是否主键,字段类型sql = $"use {db};SELECT distinct col.name AS 'Name', idx.is_primary_key as 'IsPK',TYPE_NAME(system_type_id) as 'Type'\n" +$"FROM sys.columns col\n" +$"LEFT JOIN sys.index_columns idxcol ON col.object_id=idxcol.object_id AND col.column_id=idxcol.column_id\n" +$"LEFT JOIN sys.indexes idx ON idxcol.object_id=idx.object_id AND idxcol.index_id=idx.index_id\n" +$"WHERE col.object_id=OBJECT_ID('{table}')";List<Field> lstField = freeSql.Ado.Query<Field>(sql);foreach (var field in lstField){var ispk = Convert.ToBoolean(field.IsPK);if (ispk){tb.LstPKField.Add(field);// 主键,用于更新删除}else{tb.LstDataField.Add(field);}}dicTable.Add(table, tb);}Program.AddLog($"监听成功,{db}");// 定时轮询ThreadPool.QueueUserWorkItem(delegate{Dictionary<string, string> dicTBUpdatePK = new Dictionary<string, string>();while (true){try{ foreach (var item in dicTable){dicTBUpdatePK.Clear();var table_name = item.Key;var tableEntity = item.Value;// cdc表查询//__$start_lsn :与相应更改的提交事务关联的日志序列号(LSN)//__$end_lsn : (在 SQL Server 2008中,此列始终为 NULL)//__$seqval :对事务内的行更改顺序//__$operation :源表DML操作var cdc_table_name = $"{db}.cdc.dbo_{table_name}_CT";sql = $"use {db};select sys.fn_cdc_map_lsn_to_time(__$start_lsn) as cdctime,* from {cdc_table_name}";// 查询cdc时间var dt = freeSql.Ado.ExecuteDataTable(sql);table_name = $"{db}.dbo." + table_name;for (int i = 0; i < dt.Rows.Count; i++){var row = dt.Rows[i];var lstPKField = tableEntity.LstPKField;var lstDataField = tableEntity.LstDataField;var cdctime = Convert.ToDateTime(row["cdctime"]);var operation = Convert.ToInt32(row["__$operation"]);var seqval = (byte[])(row["__$seqval"]);// __$start_lsn代表事件时间,并发时,会有相同的情况,改用__$seqvalvar str_seqval = BitConverter.ToString(seqval, 0).Replace("-", string.Empty);if (3 == operation){continue;}var sql_cdc_execute = string.Empty;string table_pk = string.Empty;foreach (var field1 in lstPKField){table_pk += field1.Name + "='" + row[field1.Name] + "' and ";}table_pk = table_pk.Substring(0, table_pk.Length - 5);string cdc_dic_pk = table_name + ";" + table_pk;// cdc表中过滤多条表中一条记录多次更新,取最新一条数据(查询过的数据利用字典存储)string str_seqval1 = string.Empty;if (4 == operation){if (dicTBUpdatePK.ContainsKey(cdc_dic_pk)){str_seqval1 = dicTBUpdatePK[cdc_dic_pk];}else{// 查询多次更新后的最新值sql = $"use {db};select top 1 __$seqval from {cdc_table_name} where {table_pk} and __$operation=4 order by __$seqval desc";var dtlsn = freeSql.Ado.ExecuteDataTable(sql);var seqval1 = (byte[])(dtlsn.Rows[0]["__$seqval"]);str_seqval1 = BitConverter.ToString(seqval1, 0).Replace("-", string.Empty);dicTBUpdatePK.Add(cdc_dic_pk, str_seqval1);}}// 删除cdc表数据sql = $"use {db};delete from {cdc_table_name} where __$seqval=CONVERT(BINARY(10), '{str_seqval}', 2)";freeSql.Ado.ExecuteNonQuery(sql);string excute_type = string.Empty;switch (operation){case 1:// 删除excute_type = BaseEnum.Delete;sql_cdc_execute = $"delete from {table_name} where {table_pk}";break;case 2:// 插入excute_type = BaseEnum.Insert;string insertField = string.Empty;string insertValue = string.Empty;foreach (var field1 in lstPKField){ insertField += field1.Name + ",";insertValue += HandleSpecialData(field1.Type, row[field1.Name]) + ",";}foreach (var field2 in lstDataField){insertField += field2.Name + ",";insertValue += HandleSpecialData(field2.Type, row[field2.Name]) + ",";} insertField = insertField.Substring(0, insertField.Length - 1);insertValue = insertValue.Substring(0, insertValue.Length - 1);sql_cdc_execute = $"insert into {table_name} ({insertField}) values({insertValue})";break;case 3:break;case 4:// 修改 if (str_seqval == str_seqval1)// 最新的数据{excute_type = BaseEnum.Update;string updateData = string.Empty; foreach (var field2 in lstDataField){updateData += field2.Name + "=" + HandleSpecialData(field2.Type, row[field2.Name]) + ",";}updateData = updateData.Substring(0, updateData.Length - 1);sql_cdc_execute = $"update {table_name} set {updateData} where {table_pk}";}break;}if (!string.IsNullOrEmpty(sql_cdc_execute)){foreach (var dst in dstconnect){bool add = true; string key1 = srcconnect + "_" + table_name + "_" + table_pk; // A同步B,B更新后,CDC日志返回A,这边做截取if (Program.DicExecuted.ContainsKey(key1)){add = false;string removedValue;Program.DicExecuted.TryRemove(key1, out removedValue);}else{// 修改以最后时间的数据为准var entity = SqliteHelper.GetUpdateAsyncData(db, table_name, table_pk);if (null == entity){var asyncdata = new AsyncData() { ConnectStr = dst, ExcuteSQL = sql_cdc_execute, CDCTime = cdctime, EventTime = DateTime.Now, DBName = db, TableName = table_name, TablePK = table_pk, ExcuteType = excute_type };SqliteHelper.InsertAsyncData(asyncdata);}else{// 比较时间if (DateTime.Compare(entity.CDCTime, cdctime) < 0){SqliteHelper.UpdateAsyncData(dst, sql_cdc_execute, entity.Id);}else{add = false;}}if (add){if (dst.Contains("192.168.8.81")){Console.WriteLine("111");}Program.AddLog($"添加,dst:{dst},sql:{sql_cdc_execute}");}}}} }}}catch (Exception ex){Program.AddLog($"Listen Error,ex:{ex.Message}");}Thread.Sleep(refreshTime);}});}catch (Exception ex){Program.AddLog($"[Error] 初始化CDC异常,errmsg:{ex.Message}");}}/// <summary>/// 特殊数据类型处理 1. uniqueidentifier为空时,设置为NULL;2. 单引号,转成双号/// </summary>/// <param name="val"></param>/// <returns></returns>public string HandleSpecialData(string type, object val){if (null == val) return string.Empty;string ret = val.ToString(); bool special = false;if ("uniqueidentifier" == type.ToLower())// 特殊数据类型处理{if (string.IsNullOrEmpty(ret)){special = true;ret = "NULL";}}if (!special){if (ret.Contains("'")){ret = ret.Replace("'", "''");// 把单引号转成双引号}ret = $"'{ret}'";}return ret;} }public class Table{public string Name { get; set; }public List<Field> LstPKField { get; set; } = new List<Field>();public List<Field> LstDataField { get; set; } = new List<Field>();}public class Field{public string Name { get; set; }public string IsPK { get; set; } public string Type { get; set; }// GUID,uniqueidentifier为空时,改为NULL}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/79358.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

httpd+Tomcat(jk)的Web动静分离搭建

动静分离是指将动态请求和静态请求分别交给不同的服务器来处理&#xff0c;可以提高服务器的效率和性能。在Java Web开发中&#xff0c;常见的动态请求处理方式是通过Tomcat来处理&#xff0c;而静态请求则可以通过Apache服务器来处理。本文将详细讲解如何结合Apache和Tomcat来…

c++之STL详解

c之STL详解 泛型编程什么是STLSTL发展STL组件容器类型成员适配器STL迭代器STL算法顺序容器向量vector双端队列双端队列实现列表listc关联容器cmapmultimapsetmultiset迭代器函数对象集成函数对象自定义函数对象标准c库中算法STL算法头文件标准函数泛型算法例子自定函数作为算法…

SNAT与DNAT原理

SNAT和DNAT &#xff08;源地址转换和目标地址转换&#xff09; SNAT&#xff1a;源地址转换。内网到外网转换的是源地址。 DNAT&#xff1a;目标地址转换&#xff1a;外网到内网转换的是目的地址 &#xff08;把内部服务器的ip地址转换成一个所有人都可以访问的地址&#xff0…

【Spring】创建一个Spring项目与Bean对象的存储

目录 一、创建Spring项目 1、创建Maven项目 2、配置maven国内源 3、引入spring依赖 4、添加启动类 二、将Bean对象存储到Spring&#xff08;IoC容器&#xff09; 1、创建Bean对象 2、将Bean存储到spring&#xff08;容器&#xff09;中 3、获取Bean对象 3.1、Applicatio…

详解PHP反射API

PHP中的反射API就像Java中的java.lang.reflect包一样。它由一系列可以分析属性、方法和类的内置类组成。它在某些方面和对象函数相似&#xff0c;比如get_class_vars()&#xff0c;但是更加灵活&#xff0c;而且可以提供更多信息。反射API也可与PHP最新的面向对象特性一起工作&…

信息安全技术工业控制系统安全控制应用指南学习笔记

工业控制系统安全控制基线 根据工业控制系统在国家安全、经济建设、社会生活中的重要程度&#xff0c;遭到破坏后对国家安全、社会秩序、公共利益以及公民、法人和其他组织的合法权益的危害程度等&#xff0c;结合信息安全等级保护标准划分及实施效果分析&#xff0c;结合工业…

ElasticSearch详细操作

ElasticSearch搜索引擎详细操作以及概念 文章目录 ElasticSearch搜索引擎详细操作以及概念 1、_cat节点操作1.1、GET/_cat/nodes&#xff1a;查看所有节点1.2、GET/_cat/health&#xff1a;查看es健康状况1.3_、_GET/_cat/master&#xff1a;查看主节点1.4、GET/_cat/indices&a…

Linux初识网络基础

目录 网络发展 认识“协议 ” 网络协议 OSI七层模型&#xff1a; TCP/IP五层&#xff08;或四层&#xff09;模型 网络传输基本流程 网络传输流程图&#xff1a; 数据包封装和封用 网络中的地址 认识IP地址&#xff1a; 认识MAC地址&#xff1a; 网络发展 1.独立…

2023华数杯数学建模C题思路 - 母亲身心健康对婴儿成长的影响

# 1 赛题 C 题 母亲身心健康对婴儿成长的影响 母亲是婴儿生命中最重要的人之一&#xff0c;她不仅为婴儿提供营养物质和身体保护&#xff0c; 还为婴儿提供情感支持和安全感。母亲心理健康状态的不良状况&#xff0c;如抑郁、焦虑、 压力等&#xff0c;可能会对婴儿的认知、情…

解决树莓派“由于没有公钥,无法验证下列签名“

目录 简介&#xff1a;在换完国内源后&#xff0c;树莓派尝试更新同步/etc/apt/sources.list和/etc/apt/sources.list.d中列出的软件源的软件包版本也就是&#xff08;apt-get update&#xff09;和更新已安装的所有或者指定软件包&#xff08;也即是apt-get upgrade&#xff0…

输入框长度在XSS测试中如何绕过字符长度限制

大家好&#xff0c;这是我编写的第一篇文章&#xff0c;之所以会分享这个故事&#xff0c;是因为我花了几个晚上的时间&#xff0c;终于找到了解决某个问题的方法。故事如下&#xff1a; 几个月前&#xff0c;我被邀请参加一个非公共的漏洞悬赏项目&#xff0c;在初期发现了一些…

科技云报道:财税数字化时代,财务人实现RPA自由了吗?

企业数字化转型&#xff0c;财务是一个重要的切入点。随着数字化业务不断展开&#xff0c;新的系统、流程和数据源被不断引入&#xff0c;财务部门面临的是不断暴增的对账、处理报表、审计等日常工作。 如此大的工作量&#xff0c;即使是经验丰富的资深财务&#xff0c;也难免…

一篇文章,教你彻底掌握接口测试!

什么是接口测试 所谓接口&#xff0c;是指同一个系统中模块与模块间的数据传递接口、前后端交互、跨系统跨平台跨数据库的对接。而接口测试&#xff0c;则是通过接口的不同情况下的输入&#xff0c;去对比输出&#xff0c;看看是否满足接口规范所规定的功能、安全以及性能方面…

解决问题:ModuleNotFoundError: No module named ‘mmcv._ext‘,及安装mmcv-full的详细教程

解决问题**ModuleNotFoundError: No module named ‘mmcv._ext’**之前得先搞懂mmcv和mmcv-full的关系。 mmcv 和 mmcv-full 都是针对 PyTorch 的计算机视觉基础库,两者的主要区别是: mmcv 包含了 mmcv 的核心组件,例如运行器、回调函数、可视化工具等,打包体积较小。mmcv-fu…

C#,OpenCV开发指南(01)

C#&#xff0c;OpenCV开发指南&#xff08;01&#xff09; 一、OpenCV的安装1、需要安装两个拓展包&#xff1a;OpenCvSharp4和OpenCvSharp4.runtime.win 二、C#使用OpenCV的一些代码1、需要加头文件2、读取图片3、在图片上画矩形框4、 在图片上画直线 一、OpenCV的安装 1、需…

浅谈新电改背景下电网企业综合能源服务商业模式研究及发展方向

安科瑞 华楠 摘要: 新电改方案实施后&#xff0c;由于输配电价的改革和售电侧的放开&#xff0c;电网企业的盈利模式也随之发生了变化。这就要求电网企业转变服务理念与经营方式&#xff0c;来寻求竞争优势。基于“魏朱六要素商业模式”模型&#xff0c;对电网企业综合能源服务…

在Linux中安装MySQL

在Linux中安装MySQL 检测当前系统中是否安装MySQL数据库 命令作用rpm -qa查询当前系统中安装的所有软件rpm -qa|grep mysql查询当前系统中安装的名称带mysql的软件rpm -qa | grep mariadb查询当前系统中安装的名称带mariadb的软件 RPM ( Red-Hat Package Manager )RPM软件包管理…

【雕爷学编程】 MicroPython动手做(35)——体验小游戏3

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

Docker与DevOps的无敌组合,引爆你的创新潜能

&#x1f3c6;荣誉认证&#xff1a;51CTO博客专家博主、TOP红人、明日之星&#xff1b;阿里云开发者社区专家博主、技术博主、星级博主。 &#x1f4bb;微信公众号&#xff1a;iOS开发上架 &#x1f4cc;本文由iOS开发上架原创&#xff01; &#x1f389;欢迎关注&#x1f50e;…

基于Java+SpringBoot+Vue的篮球论坛系统设计与实现(源码+LW+部署文档等)

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…