记录使用python解析sql文件异步批量插入数据

起因

最近对接了某机构数据,他们数据使用的是oracle,而我方数据库则是mysql,他们那边给我们使用的测试数据是以oracle的形式,不能直接执行sql插入到mysql中,或者是多个表数据提取一些需要的数据字段合并到一个表中,所以需要对该sql进行一些处理

当然也可以在我们内部服务器上搭建oracle数据库,然后将这个sql在oracle数据中执行,然后再使用数据同步工具,比如 Navicat中的 工具-数据传输功能,将oracle里面的表结构以及数据同步复制传输到mysql中,或者使用 Kettle 进行数据同步

使用Navicat 问题

想要使用Navicat就sql同步到mysql数据库中有两个步骤,

1 是先将sql文件执行导入到oracle里面,
2 将oracle数据表通过工具-数据传输功能,将数据传输到mysql中

但是在Navicat中,是按sql文件行读取导入的,这就出现一种情况,sql文件很大,有百万条数据,每条sql导入都要执行一次,这就很慢了,而且使用Navicat工具将数据同步到mysql中的时候,也很慢, 而且使用Navicat 只能将表同步,而不能将多个表的字段提取出来合并到一个新表中

如果使用Kettle ,那就需要编写各种流处理操作,也是需要先将sql导入到数据库中才能进行处理,这样会很慢

使用python实现

最后使用了python实现读取sql文件进行批量导入,将导入过程分成3步

1 异步读取文件,解析sql语句,替换sql语句中的一些特殊符号以及内容,将sql数据通过异步队列发送给异步任务

2 异步执行批量导入任务读取异步队列中的结果数据进行批量导入操作,如果出现异常,则将异常数据发送给异常处理

3 异常处理可以将异常数据写入到另一个记录文件中进行保存,也可以尝试就将批量数据结构挨条进行导入执行

import asyncio
import timeimport aiofiles
import aiomysql# 连接数据库
async def connect_db():conn = await aiomysql.connect(host='10.10.6.131', port=3306,user='root', password='root',db='test')return conn# 解析sql文件
async def parse_file(queue: asyncio.Queue):async with aiofiles.open('test.sql', 'r', encoding="utf-8") as f:while True:data = await f.readlines(10000)if not data:breakdata_list = []for i in data:if 'INSERT INTO "XXX"."PERFORMANCE_HISTORY" VALUES (' in i:d = i.replace('INSERT INTO "FDC"."PERFORMANCE_HISTORY" VALUES (', '')d = d.replace(");", "")d = d.replace("\n", "")d = d.replace("\t", "")d = d.replace(", 'SYYYY-MM-DD HH24:MI:SS')", "")d = d.replace("TO_DATE(", "")d = d.split(", ")d = [None if "NULL" in i else str(i.replace("'", "")) for i in d]data_list.append(d)if len(data_list) > 0:await queue.put(data_list)print(queue.qsize(), time.time())await queue.put(None)async def batch_insert_data(conn: aiomysql.Connection, queue: asyncio.Queue, qu2: asyncio.Queue):sql = "INSERT INTO performance(id, name, date) values (%s, %s, %s)"async with conn.cursor() as cursor:while 1:data = await queue.get()if data is None:breaktry:await cursor.executemany(sql, data)await conn.commit()except Exception as e:print("批量插入失败", data)await qu2.put(data)await qu2.put(None)async def insert_error_data(conn: aiomysql.Connection, queue: asyncio.Queue):sql = "INSERT INTO t_dm_f_performance(id, name, date) values (%s, %s, %s)"async with conn.cursor() as cursor:while 1:data = await queue.get()if data is None:breakfor i in data:try:await cursor.execute(sql, i)await conn.commit()except Exception as e:passasync def main():queue = asyncio.Queue(maxsize=1000)error_queue = asyncio.Queue(maxsize=1000)conn1 = await connect_db()conn2 = await connect_db()await asyncio.gather(parse_file(queue), batch_insert_data(conn1, queue, error_queue), insert_error_data(conn2, error_queue))await conn1.close()await conn2.close()if __name__ == '__main__':asyncio.run(main())

最终实现了批量插入功能,比起先将sql导入到oracle,再同步到mysql快了不少

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387517.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自动化测试与手动测试的区别!

自动化测试与手动测试之间存在显著的区别,这些区别主要体现在以下几个方面: 测试目的: 自动化测试的目的在于“验证”系统没有bug,特别是在系统处于稳定状态时,用于执行重复性的测试任务。 手工测试的目的则在于通过…

git配置环境变量

一.找到git安装目录 打开此git安装目录下的bin文件,复制此文件路径 二.配置环境变量 2.1 右键点击此电脑的属性栏 2.2 点击高级系统配置 2.3 点击环境变量 2.4 按图中步骤进行配置 三.配置完成 win r 输入cmd打开终端 终端页面中输入 git --version 如图所示…

如何将WordPress文章中的外链图片批量导入到本地

在使用采集软件进行内容创作时,很多文章中的图片都是远程链接,这不仅会导致前端加载速度慢,还会在微信小程序和抖音小程序中添加各种域名,造成管理上的麻烦。特别是遇到没有备案的外链,更是让人头疼。因此,…

kafka高性能的底层原理分析

目录 1.磁盘顺序写 2.零拷贝 3.数据压缩 4.消息批量处理 5.pageCache 6.稀疏索引 总结 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。那么他是如何做到高性能的呢,本篇文章从宏观上分析一下&#xff…

alibabacloud学习笔记12

Docker介绍和使用场景 讲解阿里云ECS服务安装Docker实战 遇到这个报错可以执行: 执行这个docker info出这个就证明docker关闭成功。 快速掌握Dokcer基础知识 掌握Docker容器常见命令 查看本地已有镜像: 拉取镜像: 可以查到刚才拉取的镜像。 …

028-GeoGebra中级篇-脚本的初步的探索

GeoGebra 的脚本功能允许用户通过不同的触发机制(如点击、更新、输入框变化、拖动结束)和全局 JavaScript 自定义图形和交互行为,实现动态数学模型和用户交互,同时 ggbOnInit() 函数可在应用初始化时设置默认状态,提供…

构建基于数据驱动的应用程序与Llamaindex——理解大型语言模型

如果你在阅读这本书,你可能已经探索过大型语言模型(LLMs)的领域,并且已经认识到它们的潜在应用以及它们的缺陷。本书旨在解决LLMs所面临的挑战,并提供一本实用指南,教你如何使用LlamaIndex构建数据驱动的LL…

【通俗理解】自由能与自由意志的桥梁——从物理到哲学的跨越

【通俗理解】自由能与自由意志的桥梁——从物理到哲学的跨越 自由能与自由意志的类比 你可以把自由能比作一个“能量货币”,它代表着系统能够用来做功的能量。而自由意志则是一个“选择的能力”,它代表着个体在做出决策时的自主性和可能性。 自由能与自由…

HCIA总结

一、情景再现:ISP网络为学校提供了DNS服务,所以,DNS服务器驻留在ISP网络内,而不再学校网络内。DHCP服务器运行在学校网络的路由器上 小明拿了一台电脑,通过网线,接入到校园网内部。其目的是为了访问谷歌网站…

基于 SASL/SCRAM 让 Kafka 实现动态授权认证

一、说明 在大数据处理和分析中 Apache Kafka 已经成为了一个核心组件。然而在生产环境中部署 Kafka 时,安全性是一个必须要考虑的重要因素。SASL(简单认证与安全层)和 SCRAM(基于密码的认证机制的盐化挑战响应认证机制&#xff…

搭建自己的金融数据源和量化分析平台(四):自动化更新上市公司所属一级、二级行业以及股票上市状态

前面做了更新沪深交易所的上市股票列表的读取和更新,但一旦股票退市则需要在数据库里将该股票状态更新为退市,同时附上退市日期,将股票名更改为XX退。 此外深交所下载的xls解析出来是没有上市公司所属的二级行业的,因此还需要建立…

魔众文库-PHP文库管理系统

魔众文库是一套基于PHPMYSQL开发的适用于多平台的文档管理系统,提供doc、ppt、excel、pdf、压缩包、图片、CAD 等资源的在线预览和下载,文件被转换为H5或图片格式,文字放大无失真,响应速度更快速对SEO更友好,收录更快、…

【第二节】python编程基础语法

目录 一、运算符介绍 1.1 算术运算符 1.2 比较运算符 1.3 赋值运算符 1.4 位运算符 1.5 逻辑运算符 1.6 成员运算符 1.7 身份运算符 二、python运算符优先级 三、三大流程结构 四、列表 五、元组 六、字典 一、运算符介绍 1.1 算术运算符 1.2 比较运算符 1.3 赋值…

【传输层协议】UDP和TCP协议

UDP协议 UDP协议全称为User Datagram Protocol,用户数据报协议。UDP协议报文格式如下: 16UDP长度。表示整个数据报的最大长度,即UDP首部UDP数据。这个字段帮助我们确保在网络字节流中获取完整的UDP报文信息。校验和:用于检测数…

巴斯勒相机(Basler) ACE2 dart 系列说明和软件

巴斯勒相机(Basler) ACE2 dart 系列说明和软件

C语言指针·入门用法超详解

目录 1. 什么是指针 2. 指针变量的定义格式 3. 指针的作用 3.1 查询数据 3.2 存储数据(修改数据) 3.3 操作其他函数中的变量 3.4 函数返回多个值 3.5 函数的结果和计算状态分开 1. 什么是指针 通过内存地址,指向的空间&#…

vue3后台管理系统 vue3+vite+pinia+element-plus+axios上

前言 项目安装与启动 使用vite作为项目脚手架 # pnpm pnpm create vite my-vue-app --template vue安装相应依赖 # sass pnpm i sass # vue-router pnpm i vue-router # element-plus pnpm i element-plus # element-plus/icon pnpm i element-plus/icons-vue安装element-…

C++第一篇 入门基础

目录 1.C的第一个程序 2.c历代版本 3.命名空间 3.1 namespace关键字 namespace的用法: namespace中定义函数 namespace中定义结构体 C中的域: 3.2就近原则 4.命名空间的使用 5.C输入输出 6.缺省参数 全缺省: 半缺省:必须从右往左连续缺省(也…

爆“卷”的AI视频,大厂向左,创企向右

文|白 鸽 编|王一粟 “生成的人物一转身就变成老外,怎么解决呢?” “没有办法,10s中动作大的,人物一致性有问题,只能抽卡,多刷几个,选择一个变化不大的。” 在一个以…

RocketMQ Server Windows安装

RocketMQ阿里开发 开源给apache 官网:RocketMQ 官方网站 | RocketMQ 下载后解压 配置环境变量 注意启动顺序 双击 注意 4.9.0这个版本必须 jdk 8 高了用不了 namesrv是注册中心的作用 broke是核心用于接收生产者消息 存储消息 发送给消费者消息 类似DubboZookeeper…