python 高效读取多个geojson 写入一个sq3(Sqlite) 、效率提高90%+

1.问题缘由:

        由于工作需求,需要将多个(总量10G+)geojson文件写入到sq3库,众所周知,sqlite 不支持多线程写入,那该怎么办呢,在网上也查了很多策略,都没有达到立竿见影的效果。于是还是回到写文件的本质:多线程写多文件,就绕开加锁的机制。

2.单线程读取的效果

单线程读写原始26个geojson文件,共294M,耗时:547S

写完的sq3文件大小:73.3M

3.多进程并发

多进程并发读写geojson,生成多个sq3文件,再合并到一个sq3文件耗时:16.5S

4.工具代码:

4.1 rw_data_geojson.py: 读写geojson文件

import os
import jsonGEOMETRY = 'geometry'def read_all_layer(src_path):"""读取geojson 文件,传入读取文件路径,返回dictdict 是以layername为key,获取每个layer的dict,子字典的key为要素ID:param src_path: 读取geojson文件路径:return: 封装dict  {layerName:[id:{要素dict}]}"""filenames = os.listdir(src_path)# 过滤出你想要处理的文件,例如只读取.txt文件txt_filenames = [f for f in filenames if f.endswith('.geojson')]geo_properties_map = {}# 循环读取每个文件for filename in txt_filenames:file_path = os.path.join(src_path, filename)with open(file_path, 'r') as file:content = file.read()geojson_data = json.loads(content)features = geojson_data.get('features', [])dict = {}for feature in features:properties = feature.get('properties')if GEOMETRY in feature:properties[GEOMETRY] = feature.get('geometry')dict[properties["id"]] = propertieslayername = filename.replace(".geojson", "")geo_properties_map[layername] = dictreturn geo_properties_mapdef read_single_layer(geojson_path):"""读取指定geojson 文件,返回dict  dict 是以layername为key,获取每个layer的dict,子字典的key为要素ID:param geojson_path: 读取geojson文件:return: 封装dict  {layerName:[id:{要素dict}]}"""geo_properties_map = {}if not geojson_path.endswith('.geojson'):return geo_properties_mapwith open(geojson_path, 'r') as file:content = file.read()geojson_data = json.loads(content)features = geojson_data.get('features', [])dict = {}for feature in features:properties = feature.get('properties')if GEOMETRY in feature:properties[GEOMETRY] = feature.get('geometry')dict[properties["id"]] = propertieskey = os.path.basename(geojson_path).replace(".geojson", "")geo_properties_map[key] = dictreturn geo_properties_mapdef build_geojson(src_feats, layer_name='', epsg_crs=None):"""按照图层,格式化成geojson规格"""attrs = []for attr in [attr for key, attr in src_feats.items()]:geos_obj = attr.get(GEOMETRY)gjson_dict = {"properties": attr, "type": "Feature"}if geos_obj is not None:gjson_dict[GEOMETRY] = geos_objdel attr[GEOMETRY]attrs.append(gjson_dict)layer = {"type": "FeatureCollection", "features": attrs}if layer_name:layer['name'] = layer_nameif epsg_crs and src_feats and any(GEOMETRY in a for a in attrs):if isinstance(epsg_crs, int) or (isinstance(epsg_crs, str) and epsg_crs.isdigit()):crs_str = "urn:ogc:def:crs:EPSG::%s" % epsg_crselse:crs_str = epsg_crslayer['crs'] = {"type": "name", "properties": {"name": crs_str}}return layerdef write_layer(target_path, layer_name, node_data):'''按图层写geojson数据到磁盘:param target_path: 目标文件目录:param layer_name: 目标文件名:param node_data: 写入的dict嵌套类型数据{dict:{[id:value]}}:return:'''if not os.path.exists(target_path):os.makedirs(target_path)with open(target_path + "/" + layer_name + ".geojson", 'w') as f:json.dump(node_data, f)print(target_path + "/" + layer_name + ".geojson 写入完毕")

4.2 db_sq3_tool.py :处理sq3数据库

import sqlite3
import os
from shapely.geometry import shape
from read_file import rw_data_geojson
import random
import time
import multiprocessing
import datetimedef create_connection(db_file):""" 创建与SQLite数据库的连接 """conn = Nonetry:conn = sqlite3.connect(db_file)return connexcept sqlite3.Error as e:print(e)return conndef create_table(conn, create_table_sql):""" 使用给定的SQL语句创建表 """try:cursor = conn.cursor()cursor.execute(create_table_sql)conn.commit()except sqlite3.Error as e:print(e)def insert_data(conn, insert_sql, data):""" 向数据库插入数据 """try:cursor = conn.cursor()cursor.execute(insert_sql, data)conn.commit()except sqlite3.Error as e:print(e)def batch_insert_data(conn, data_list, table_name, columns):'''批量插入数据:param conn: 数据库连接:param data_list: 插入数据list:param table_name: 表名:param columns: 表的列名list:return:'''cursor = conn.cursor()# 构建插入语句的占位符placeholders = ', '.join(['?'] * len(columns))insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"try:cursor.executemany(insert_sql, data_list)conn.commit()except sqlite3.Error as e:print(f"An error occurred: {e}")conn.rollback()def select_data(conn, select_sql):""" 从数据库查询数据 """try:cursor = conn.cursor()cursor.execute(select_sql)rows = cursor.fetchall()return rowsexcept sqlite3.Error as e:print(e)def update_data(conn, update_sql, data):""" 更新数据库中的数据 """try:cursor = conn.cursor()cursor.execute(update_sql, data)conn.commit()except sqlite3.Error as e:print(e)def delete_data(conn, delete_sql, data):""" 从数据库中删除数据 """try:cursor = conn.cursor()cursor.execute(delete_sql, data)conn.commit()except sqlite3.Error as e:print(e)def dict_data_write_sqlite(node_data, table_name, conn, batch=1):'''将读完的dict 结构图层内容写入到sq3:param node_data: 要写入的数据dict:param table_name: 表名称:param conn: 数据库连接:param batch: 是否批量插入:return:'''try:if len(node_data) == 0:return# 获取第一行的key转保存表的列名random_key, random_value = random.choice(list(node_data.items()))row0 = random_valueflag2type = {'str': 'TEXT', 'int': 'BIGINT', 'float': 'REAL', 'dict': 'TEXT'}fld_types = []columns = []for key, value in row0.items():value_type = type(value).__name__# print(f'{table_name} key:{key} 类型: {value_type}')fld_types.append((key, flag2type[value_type]))columns.append(key)fld_sql = ','.join(f'{fld} {typ}' for fld, typ in fld_types if fld != 'id')pk_sql = 'id BIGINT PRIMARY KEY'create_tab_sql = f'CREATE TABLE IF NOT EXISTS {table_name} ({pk_sql}, {fld_sql});'if conn is not None:# 1.创建表结构create_table(conn, create_tab_sql)if batch:# 方式1:批量插入,一次提交,效率高data_list = []for id, data in node_data.items():feature_list = []for key, value in data.items():if 'geometry' == key:geometry = shape(value)feature_list.append(str(geometry.wkt))else:feature_list.append(str(value))data_list.append(feature_list)batch_insert_data(conn, data_list, table_name, columns)else:# 方式2:一条一条插入,适合小数据,效率低下for id, data in node_data.items():# 插入数据的SQL语句和数据cur_values = []for key, value in data.items():if 'geometry' == key:geometry = shape(value)cur_values.append("'" + str(geometry.wkt) + "'")else:cur_values.append(str(value))flds_str = ','.join(columns)vals_str = ','.join(cur_values)insert_sql = f"insert into {table_name} ({flds_str}) values ({vals_str})"insert_data(conn, insert_sql, data)print(f"{table_name} sq3写入成功")except Exception as e:print(" 写入sq3异常: " + e)def read_single_geojson_write_sq3(args):'''单文件读写sq3:param args::return:'''file_name, target_path = argslayer_name = file_name.replace(".geojson", "")db_file = target_path + '/' + layer_name + '.sq3'# 创建数据库连接conn = create_connection(db_file)# 读取数据,写数据库geojson_file = os.path.join(folder_path, file_name)node_data = rw_data_geojson.read_single_layer(geojson_file)if len(node_data[layer_name]) > 0:for layer_name, layer_value in node_data.items():dict_data_write_sqlite(layer_value, layer_name, conn, batch=1)else:# 删除空sq3os.remove(db_file)conn.close()return file_namedef merge_sq3(target_path):# 连接到目标数据库(要拷贝到的数据库)localtime = time.localtime()merge_folder = target_path + "/" + "merge_sq3_finish"if not os.path.exists(merge_folder):os.makedirs(merge_folder)target_db = merge_folder + "/" + str(time.strftime('%Y%m%d', localtime)) + ".sq3"if os.path.exists(target_db):os.remove(target_db)print(f"{target_db} 已被删除。")target_conn = create_connection(target_db)target_cursor = target_conn.cursor()# 连接到源数据库(要拷贝的数据库)for item in os.listdir(target_path):table_name = item.replace(".sq3", "")source_db_file = os.path.join(target_path, item)if os.path.isfile(source_db_file) and item != '.DS_Store':# 附加源数据库到目标数据库连接target_cursor.execute(f"ATTACH DATABASE '{source_db_file}' AS source_db;")# 将源sq3中的 table_name 表 复制到 目标.sq3target_cursor.execute(f"CREATE TABLE {table_name} AS SELECT * FROM source_db.{table_name}")# 分离附加的数据库target_cursor.execute("DETACH DATABASE source_db;")target_conn.commit()# 提交更改并关闭连接target_conn.close()

5.单线程读写代码

    folder_path = '/Users/admin/Desktop/123/sq3效率/geojson'target_path = "/Users/admin/Desktop/123/sq3效率/merge_sq3"# 1.单线程全量读写start_time = time.time()node_data = rw_data_geojson.read_all_layer(folder_path)# 创建数据库连接db_file = target_path + '/' + '20240928.sq3'if os.path.exists(db_file):os.remove(db_file)print(f"{db_file} 已被删除。")conn = create_connection(db_file)for layer_name, layer_value in node_data.items():if len(node_data[layer_name]) > 0:dict_data_write_sqlite(layer_value, layer_name, conn, batch=0)end_time = time.time()execution_time = end_time - start_timeprint(f"写入sq3 函数执行时间:{execution_time} 秒")exit()

6.多线程读写,合并到一个sq3数据库

# 2.多文件多线程读写start_time = time.time()for root, dirs, files in os.walk(target_path):for file in files:db_file = os.path.join(root, file)os.remove(db_file)print(f"{db_file} 已被删除。")with multiprocessing.Pool(processes=5) as pool:for file_name in os.listdir(folder_path):if file_name == '.DS_Store':continueparams = [(file_name, target_path)]pool.map(read_single_geojson_write_sq3, params)# 合并多个sq3文件merge_sq3(target_path)end_time = time.time()execution_time = end_time - start_timeprint(f"写入sq3 函数执行时间:{execution_time} 秒")exit()

6.在上述基础上,再继续提效

        若单个geojson文件太大时,可多线程分批读取,将读取的块内容,写到一个分块的.sq3,再并发合并到单个图层的sq3,最后将多个图层合并到一个sq3中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/434481.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工控主板在工业控制中扮演什么角色

工控主板在工业控制中扮演着至关重要的角色,它是工业控制系统的核心组件,负责连接、控制和管理各种工业设备,实现自动化生产和智能化管理。具体来说,工控主板在工业控制中的作用可以归纳为以下几个方面: 一、核心控制…

年龄性别与手势识别系统源码分享

年龄性别与手势识别检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Comput…

计算机视觉学习路线:从基础到进阶

计算机视觉学习路线:从基础到进阶 计算机视觉(Computer Vision)是人工智能和机器学习领域中重要的分支,致力于让计算机能够理解和分析图像、视频等视觉信息。随着深度学习的发展,计算机视觉的应用变得越来越广泛&…

Python 解析 html

一、场景分析 假设有如下 html 文档&#xff1a; 写一段 python 脚本&#xff0c;解析出里面的数据&#xff0c;包括经度维度。 <div classstorelist><ul><li lng"100.111111" lat"10.111111"><h4>联盟店1</h4><p>…

基于Qt/C++UDP 调试软件功能及用途介绍

概述 UDP 调试软件是一个基于 Qt 框架的图形化应用程序&#xff0c;旨在提供一个简单易用的界面用于测试和调试 UDP&#xff08;用户数据报协议&#xff09;通信。该软件支持客户端和服务器模式&#xff0c;能够实现数据的发送和接收&#xff0c;方便开发者和网络工程师进行网…

牛顿迭代法求解x 的平方根

牛顿迭代法是一种可以用来快速求解函数零点的方法。 为了叙述方便&#xff0c;我们用 C C C表示待求出平方根的那个整数。显然&#xff0c; C C C的平方根就是函数 f ( x ) x c − C f(x)x^c-C f(x)xc−C 的零点。 牛顿迭代法的本质是借助泰勒级数&#xff0c;从初始值开始快…

C++ | Leetcode C++题解之第438题找到字符串中所有字母异位词

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> findAnagrams(string s, string p) {int sLen s.size(), pLen p.size();if (sLen < pLen) {return vector<int>();}vector<int> ans;vector<int> count(26);for (int i …

828华为云征文|基于华为云Flexus X实例部署Uptime-Kuma服务器监控面板

目录 前言 一、Flexus云服务器X介绍 1.1 Flexus云服务器X实例简介 1.2 Flexus云服务器X实例特点 1.3 Flexus云服务器X实例场景需求 二、Flexus云服务器X购买 2.1 Flexus X实例购买 2.2 重置密码 2.3 登录服务器 三、Flexus X安装uptime-kuma面板 3.1 uptime-kuma介绍 3.2 uptim…

【频分复用】5G中OFDM和GFDM的比较(频谱效率、误码率、星座图、复杂度)【附MATLAB代码及报告】

微信公众号&#xff1a;EW Frontier QQ交流群&#xff1a;554073254 背景 5G需要满足低延迟、高数据速率、连接密度和其他应用需求&#xff0c;这些应用需要增强的移动的宽带、超可靠和低延迟连接以及海量机器类型连接[1]。这种通信所需的信道容量受到噪声、衰减、失真和符号间…

R包:ggheatmap热图

加载R包 # devtools::install_github("XiaoLuo-boy/ggheatmap")library(ggheatmap) library(tidyr)数据 set.seed(123) df <- matrix(runif(225,0,10),ncol 15) colnames(df) <- paste("sample",1:15,sep "") rownames(df) <- sapp…

TypeScript 设计模式之【策略模式】

文章目录 策略模式&#xff1a;灵活切换算法的导航系统策略模式的奥秘策略模式有什么利与弊?如何使用策略模式来优化你的系统代码实现案例策略模式的主要优点策略模式的主要缺点策略模式的适用场景总结 策略模式&#xff1a;灵活切换算法的导航系统 当你使用导航软件规划路线…

如何使用ssm实现北关村基本办公管理系统的设计与实现

TOC ssm721北关村基本办公管理系统的设计与实现jsp 第一章 绪论 1.1 选题背景 目前整个社会发展的速度&#xff0c;严重依赖于互联网&#xff0c;如果没有了互联网的存在&#xff0c;市场可能会一蹶不振&#xff0c;严重影响经济的发展水平&#xff0c;影响人们的生活质量。…

【教学类-18-04】20240508《蒙德里安“黑白格子画” 七款图案挑选》

背景需求 最近有2位客户买了蒙德里安黑白格子画的素材&#xff0c;其中一位问是否是1000张。 【教学类-18-03】20240508《蒙德里安“红黄蓝黑格子画”-A4横版》&#xff08;大小格子&#xff09;_processing简单图形画蒙德里安-CSDN博客文章浏览阅读1.1k次&#xff0c;点赞35次…

基于小波变换与稀疏表示优化的RIE数据深度学习预测模型

加入深度实战社区:www.zzgcz.com&#xff0c;免费学习所有深度学习实战项目。 1. 项目简介 本项目旨在通过深度学习模型进行RSOP&#xff08;Remote Sensing Observation Prediction&#xff09;的数据预测。RSOP数据是基于远程传感技术采集的多维信息&#xff0c;广泛应用于…

apache paimon简介(官翻)

介绍 如下架构所示: 读/写操作: Paimon 支持多样化的数据读写方式,并支持 OLAP 查询。 读取: 支持从历史快照(批处理模式)中消费数据,从最新偏移量(流处理模式)中读取数据,或以混合方式读取增量快照。写入: 支持从数据库变更日志(CDC)进行流式同步,从离线数据中…

Spring5入门

Spring5 课程&#xff1a;3、IOC理论推导_哔哩哔哩_bilibili 文档&#xff1a;狂神SSM教程- 专栏 -KuangStudy 一.Spring概述 1.介绍 Spring : 春天 —->给软件行业带来了春天2002年&#xff0c;Rod Jahnson首次推出了Spring框架雏形interface21框架。2004年3月24日&…

OpenHarmony(鸿蒙南向)——平台驱动开发【PWM】

往期知识点记录&#xff1a; 鸿蒙&#xff08;HarmonyOS&#xff09;应用层开发&#xff08;北向&#xff09;知识点汇总 鸿蒙&#xff08;OpenHarmony&#xff09;南向开发保姆级知识点汇总~ 持续更新中…… 概述 功能简介 PWM&#xff08;Pulse Width Modulation&#xff…

Goland的使用

一、安装Goland 一、Goland简介 Goland是由JetBrains公司旨在为go开发者提供的一个符合人体工程学的新的商业IDE。这个IDE整合了IntelliJ平台的有关go语言的编码辅助功能和工具集成特点 二、下载相应的安装包 1、官网下载地址 GoLand by JetBrains: More than just a Go IDE 三…

工程师 - Windows下使用WSL来访问本地的Linux文件系统

Access Linux filesystems in Windows and WSL 2 从 Windows Insiders 预览版构建 20211 开始&#xff0c;WSL 2 将提供一项新功能&#xff1a;wsl --mount。这一新参数允许在 WSL 2 中连接并挂载物理磁盘&#xff0c;从而使您能够访问 Windows 本身不支持的文件系统&#xff0…

在 Docker 中进入 Redis 容器后,可以通过以下方法查看 Redis 版本:

文章目录 1、info server2、redis-cli -v 1、info server [rootlocalhost ~]# docker exec -it spzx-redis redis-cli 127.0.0.1:6379> auth 123456 OK 127.0.0.1:6379> info server # Server redis_version:6.2.6 redis_git_sha1:00000000 redis_git_dirty:0 redis_bui…