一、背景
最近项目搞重构,将原有的系统拆分成了多个子系统。但是有数据表需要在不同系统中数据,同时为了解决项目性能最了一个很简单的方案,就是公共数据存在每个系统之中。
二、分析
分析这些表,这些表相比源数据表,表结构完全相同,除了名称加了MD前缀。但数据却要相同。这里我们可以从几个维度去进行比对:
a、表结构
b、表数据量
c、表主键最大id
d、表数据
三、编码
因最开始是以脚本的形式来写,所有没做配置文件。
1、数据库相关
数据库连接信息
# coding=utf8"""
数据库配置信息
"""mysql_data = {"CRM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"PLM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"MES": {"host": "127.0.0.1","user": "user_name","password": ********","db": "db_name","port": 3306},"SCM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"UC": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"GDM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"ESM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"LOCAL": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"78": {'host': '127.0.0.1',"user": 'user_name','password': '********',"db": "db_name","port": 3306},"PLM-PRE": {'host': '127.0.0.1',"user": 'user_name','password': '********',"db": "db_name","port": 3306}
}
数据库连接池
# coding=utf8
import pymysql
from pymysql import err"""
获取crm的数据到plm
1、项目相关数据
2、合同相关数据
"""class MSP_Connect:def __init__(self, mysql_info: dict):try:self.host = mysql_info['host']self.user = mysql_info['user']self.password = mysql_info['password']self.db = mysql_info['db']self.con = pymysql.connect(host=self.host,user=self.user,password=self.password,db=self.db,charset="utf8")self.cur = self.con.cursor()except KeyError:print("mysql_info键名错误,key值分别【host, user, password, db】")except err.OperationalError as e:if e.args[0] == 1045:print("MySql密码错误")else:print("发生了其它连接错误")# 执行sqldef execute(self, sql, values=None):if values == None:self.cur.execute(sql)return self.cur.fetchall()return self.cur.execute(sql, values)# 执行多条sqldef execute_many(self, sql, values):return self.cur.executemany(sql, values)# 提交事务def commit(self):self.con.commit()# 释放资源def close(self):self.cur.close()self.con.close()
2、检查处理
# coding=utf8
import pymysql
import os
from datetime import datetime, timedelta
from msp_script.database.connect_config import MSP_Connect
from msp_script.database.db_info import mysql_data
from openpyxl import Workbook, load_workbookdef check_msp_table_data(target_db_name, file_name=None):""":description: 检查数据同步:param target_db_name: 目标数据:param file_name: 文件名称:return:"""print('******************************************* [' + target_db_name + '] *******************************************')delete_excel_file()target_db = MSP_Connect(mysql_data[target_db_name])write_result = []# 1、获取目标系统中的主数据表print('1、获取目标系统中的主数据表')md_tables = get_table_index_by_md(target_db)print('\t主数据表为' + str(list(table[1] for table in md_tables)))info = []# 2、对比源数据、目标数据库表结构print('2、对比源数据、目标数据库表结构')for table in md_tables:target_table_name = table[1] # 获取当前检查的数据表名词source_table_info = get_source_table_name(target_table_name) # 获取原数据表名称以及在哪个系统source_table_name = source_table_info[0]source_db_name = source_table_info[1]if source_db_name in ('CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM'):source_db = MSP_Connect(mysql_data[source_db_name])# 检查表及表数量check_rusult = check_table_info(source_db, source_table_name, target_db, target_table_name)check_rusult.append(target_db_name)check_rusult.append(source_db_name)write_result.append(check_rusult)if check_rusult[8] != "" and check_rusult[8] != None:info.append(check_rusult[8])source_db.close()target_db.close()# 写入报告print('3、写入比对结果到Excel')write_excel(write_result, file_name)return infodef get_source_table_name(target_table_name):"""通过目标表名称获取源数据表名称:param target_table_name: 目标数据库名称:return: 源数据表名词,系统的简称"""try:source_table_name = target_table_name.split('_', 2)[2]sys_name = target_table_name.split('_', 2)[1].upper()return source_table_name, sys_nameexcept Exception as e:print("[" + target_table_name + "] 可能是非主数据表")def get_table_index_by_md(target_db: MSP_Connect):""":description: 获取目标数据库中的主数据表:param target_db: 目标数据库连接对象:return:"""get_md_table_sql = """SELECTTABLE_SCHEMA AS '数据库名',TABLE_NAME AS '表名',TABLE_ROWS AS '行数',DATA_LENGTH AS '数据大小',INDEX_LENGTH AS '索引大小',CREATE_TIME AS '创建时间',table_comment AS '表描述' FROMinformation_schema.TABLES WHERE TABLE_NAME LIKE 'md_%'"""result = target_db.execute(get_md_table_sql, None)return result# 检验msp数据同步
def check_table_info(source_db: MSP_Connect, source_table_name, target_db: MSP_Connect, target_table_name):""":param source_db: 源数据库连接对象:param source_table_name: 源数据表:param target_db: 目标数据库连接对象:param target_table_name: 目标数据表:return:"""# 获取表字段source_sql = "SHOW COLUMNS FROM `" + source_table_name + "`"target_sql = "SHOW COLUMNS FROM `" + target_table_name + "`"source_result = source_db.execute(source_sql, None)target_result = target_db.execute(target_sql, None)# 获取表数量source_count_sql = "select count(*) from `" + source_table_name + "`"target_count_sql = "select count(*) from `" + target_table_name + "`"source_count = source_db.execute(source_count_sql, None)target_count = target_db.execute(target_count_sql, None)# 获取最大idsource_max_id_sql = "select max(id) from `" + source_table_name + "`"target_max_id_sql = "select max(id) from `" + target_table_name + "`"source_max_id = source_db.execute(source_max_id_sql, None)target_max_id = target_db.execute(target_max_id_sql, None)if source_result == target_result:flag = '相同'else:flag = '不同'msg = "目标系统表【" + target_table_name + "】与源系统表【" + source_table_name + "】表结构【 " + flag + " 】最大值分别为【" + str(target_max_id[0][0]) + ", " + str(source_max_id[0][0]) + "】, 数量分别为【" + str(target_count[0][0]) +", "+ str(source_count[0][0]) +"】"# print(msg)result = Falseremark = ''check_data_result = check_data(source_db, source_table_name, target_db, target_table_name)if result == False:if source_result != target_result:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】表结构不一致'print('\t' + remark)elif source_count != target_count:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】数据量不一致,数据量差额为:' + str(abs(source_count[0][0] - target_count[0][0])) + '条'print('\t' + remark)elif source_max_id != target_max_id:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】表最大值不一致'print('\t' + remark)elif check_data_result[0] == False and len(check_data_result) == 2:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】最近的100条数据不相同,ID为:[' + check_data_result[1] + ']'print('\t' + remark)elif check_data_result[0] == False and len(check_data_result) == 3:remark = check_data_result[1]print('\t' + remark)else:result = Truereturn [flag,target_table_name,source_table_name,target_count[0][0],source_count[0][0],target_max_id[0][0],source_max_id[0][0],result,remark]# 将比对结果写入到Excel
def write_excel(write_result: list, file_name):""":param write_result: 写入的数据:param file_name: 文件名称无后缀:return:"""parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))file_dir = os.path.join(parent_dir, 'file')file_path = os.path.join(file_dir, file_name) + '.xlsx'if os.path.exists(file_path):work_book = load_workbook(file_path)work_sheet = work_book.activemax_row = work_sheet.max_rowelse:work_book = Workbook()work_sheet = work_book.activeheaders = ['序号', '目标系统', '目标系统表', '源系统', '源系统表', '表结构是否相同', '目标系统表数量', '原系统表数量', '目标系统最大值', '原系统最大值', '比对结果', '备注']for col, headers in enumerate(headers, start=1):work_sheet.cell(row=1, column=col, value=headers)max_row = 2for num, result in enumerate(write_result, start=max_row):raw = []raw.append(num - 1) # 序号raw.append(result[9]) # 目标系统raw.append(result[1]) # 目标系统表raw.append(result[10]) # 源系统raw.append(result[2]) # 源系统表raw.append(result[0]) # 表结构是否相同raw.append(result[3]) # 目标系统表数量raw.append(result[4]) # 原系统表数量raw.append(result[5]) # 目标系统最大值raw.append(result[6]) # 原系统最大值raw.append(result[7]) # 比对结果raw.append(result[8]) # 备注work_sheet.append(raw)work_book.save(parent_dir + "/file/" + file_name + '.xlsx')# 删除前一天产生的Excel文件
def delete_excel_file():format_time = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')parent_path = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) + '/file'for file in os.listdir(parent_path):if file.endswith('.xlsx') and "数据比对结果_" + format_time in file:os.remove(os.path.join(parent_path, file))def check_data(source_db: MSP_Connect, source_table_name, target_db: MSP_Connect, target_table_name):""":param source_db: 源数据库连接:param source_table_name: 源表:param target_db: 目标数据库连接:param target_table_name: 目标表:return: 错误信息"""res = []try:source_sql = "select * from `" + source_table_name + "` order by update_at desc, id desc limit 0, 100"target_sql = "select * from `" + target_table_name + "` order by update_at desc, id desc limit 0, 100"source_result = source_db.execute(source_sql)target_result = target_db.execute(target_sql)# return True if source_result == target_result else Falseif target_result == source_result:res.append(True)return reselse:ids = ''for result in target_result:if result not in source_result:ids += (str(result[0]) + ',')res.append(False)res.append(ids[:-1])return resexcept pymysql.err.OperationalError as e:if e.args[0] == 1054:res.append(False)res.append("源系统表【" + source_table_name + "】表缺少update_at字段")res.append('error')print("\t源系统表【" + source_table_name + '】表缺少update_at字段')return resdef check_data_custom(source_db, target_db: list, tables: list):""":param source_db: 原数据库别名:param target_db: 目标数据库别名:param tables: 需要检查的的表名:return:"""result = []for db in target_db:tar_db = MSP_Connect(mysql_data[db.upper()])dict = {}list = []for table in tables:res = check_table_info(MSP_Connect(mysql_data[source_db]), table, tar_db, 'md_' + source_db.lower() + '_' + table)if res[7] == False:list.append({# 'res': res[7],'mark': res[8],'table': table})dict[db.upper()] = listresult.append(dict) if dict else Nonetar_db.close()return resultif __name__ == '__main__':# db_name = ['PLM', 'CRM', 'GDM', 'MES', 'SCM', 'ESM']db_name = ['PLM', 'CRM']# format_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))format_time = datetime.now().strftime('%Y%m%d%H%M%S')file_name = '数据比对结果_' + format_time# print(file_name)for name in db_name:print()print('****************************************************************************************')print(name + "系统数据开始检验...")check_msp_table_data(name, file_name)
3、api
# coding=utf8
import loggingfrom flask import Flask, jsonify, request, send_file
from msp_script.public.check_msp_syn import check_msp_table_data, check_data_custom
from datetime import datetime, timedelta
import osapp = Flask(__name__)
app.debug = True@app.route('/msp/api/check_syn_data', methods=['GET'])
def check_syn_data():""":param target_db_name 目标数据库名称,必要参数:param file_name 报告文件名称,必要参数"""target_db_name = request.args.get('target_db_name')file_name = request.args.get('file_name')info = {}if file_name == None or file_name == '':info['code'] = 500info['message'] = '[file_name]文件名称不能为空!'return jsonify(info)else:file_name = file_name + '_' + datetime.now().strftime('%Y%m%d%H%M%S')if target_db_name == None or target_db_name == '':info['code'] = 500info['message'] = '[target_db_name]数据库别名不能为空!'return jsonify(info)if target_db_name.__contains__(','):db_name = target_db_name.split(',')for db in db_name:if db.upper() not in ('CRM', 'PLM', 'MES', 'SCM', 'ESM', 'GDM'):info['code'] = 500info['message'] = '数据库别名只能是[CRM, PLM, MES, SCM, ESM, GDM]!'return jsonify(info)for name in db_name:data = check_msp_table_data(name.upper(), file_name)info[name] = datainfo['code'] = 200info['file'] = '/msp/api/download/' + file_name + '.xlsx'return jsonify(info)elif target_db_name.upper() not in ('CRM', 'PLM', 'MES', 'SCM', 'ESM', 'GDM'):info['code'] = 500info['message'] = '数据库别名只能是[CRM, PLM, MES, SCM, ESM, GDM]!'return jsonify(info)else:data = check_msp_table_data(target_db_name.upper(), file_name)info['code'] = 200info['file'] = '/msp/api/download/' + file_name + '.xlsx'info[target_db_name] = datareturn jsonify(info)@app.route('/msp/api/download/<string:file_name>', methods=['GET'])
def download_file(file_name):""":param file_name: 文件名称:param suffix: 文件后缀:return:"""info = {}if file_name == "" or file_name == None:info['code'] = 500info['message'] = '文件名称不能为空!'return jsonify(info)else:parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))file_dir = os.path.join(parent_dir, 'file')file_path = os.path.join(file_dir, file_name)if os.path.exists(file_path) and os.path.isfile(file_path):return send_file(file_path, as_attachment=True)else:info['code'] = 500info['message'] = '文件不存在!'return jsonify(info)@app.route('/msp/api/check_table_data', methods=['POST'])
def check_tables():""":param source_db 源数据库名称,必要参数:param target_db 目标数据库名称,必要参数:param tables 需要检查的源数据表名称,必要参数"""source_db = request.json.get('source_db').upper()target_db = [db.upper() for db in request.json.get('target_db')]tables = request.json.get('tables')result = {}if source_db == '' or source_db == None:result['code'] = 500result['message'] = 'source_db原数据库名称不能为空'return jsonify(result)elif source_db not in ['CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM']:result['code'] = 500result['message'] = '原数据库名称[' + source_db + ']不存在'return jsonify(result)if target_db == '' or target_db == None or len(target_db) == 0:result['code'] = 500result['message'] = 'target_db目标数据库名称不能为空'return jsonify(result)else:for t_db in target_db:if t_db not in ['CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM']:result['code'] = 500result['message'] = '目标数据库名称[' + t_db + ']不存在'return jsonify(result)if source_db in target_db:result['code'] = 500result['message'] = 'source_db源数据库' + source_db + '不允许同时存在目标数据库列表' + str(target_db) + '中'return jsonify(result)if tables == '' or tables == None or len(tables) == 0:result['code'] = 500result['message'] = 'tables需要验证的表不能为空'return jsonify(result)else:res = check_data_custom(source_db, target_db, tables)result['code'] = 200result['message'] = '成功'result['source_db'] = source_dbresult['target_db'] = target_dbresult['tables'] = tablesresult['no_match_data'] = resreturn jsonify(result)@app.route("/demo", methods=['GET'])
def demo():dict = {"a": 1,"v": 2}return jsonify(dict)if __name__ == '__main__':app.run()
四、测试
这里一共提供三个接口
1、全量检查数据
/msp/api/check_syn_data GET
参数:target_db_name 需要检查的数据库名称
file_name 执行完输出的报告名称
2、自定义检查系统表数据
/msp/api/check_table_data POST
参数:source_db 源数据库名称
target_db 目标数据库名称
tables 需要检查的表
3、下载检查结果
/msp/api/download/<文件名称> GET