C++和Python实现SQL Server数据库导出数据到S3并导入Redshift数据仓库

用C++实现高性能数据处理,Python实现操作Redshift导入数据文件。
在Visual Studio 2022中用C++和ODBC API导出SQL Server数据库中张表中的所有表的数据为CSV文件格式的数据流,用逗号作为分隔符,用双引号包裹每个数据,字符串类型的数据去掉前后的空格,数据中如果包含双引号,则将一个双引号替换为两个双引号,创建gzip压缩文件,输出数据流写入到gzip压缩文件包中的唯一的csv文件中,一张表对应一个gzip压缩文件。用异步编程的方法优化数据库表的读取和写入文件的过程,每个gzip压缩文件包中的csv文件的第一行都是用逗号分割的表头,对应数据库表的字段,程序需要异常处理和日志文件输出,当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导出数据行数、成功或者失败的状态,日志文件不存在则创建。

以下是一个使用C++和ODBC API在Visual Studio 2022中实现将SQL Server数据库表数据导出为CSV文件并进行gzip压缩的示例代码。代码使用异步编程优化数据库读取和文件写入过程,并包含异常处理和日志记录功能。

#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <future>
#include <sql.h>
#include <sqlext.h>
#include <zlib.h>
#include <ctime>
#include <iomanip>// 日志记录函数
void logMessage(const std::string& message) {std::ofstream logFile("export_log.txt", std::ios::app);if (logFile.is_open()) {auto now = std::chrono::system_clock::now();std::time_t now_c = std::chrono::system_clock::to_time_t(now);std::tm* now_tm = std::localtime(&now_c);std::ostringstream oss;oss << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S") << " " << message << std::endl;logFile << oss.str();logFile.close();}
}// 处理字符串中的双引号
std::string escapeDoubleQuotes(const std::string& str) {std::string result = str;size_t pos = 0;while ((pos = result.find('"', pos))!= std::string::npos) {result.replace(pos, 1, 2, '"');pos += 2;}return result;
}// 从数据库读取表数据
std::vector<std::vector<std::string>> readTableData(SQLHSTMT hstmt) {std::vector<std::vector<std::string>> data;SQLSMALLINT columnCount = 0;SQLNumResultCols(hstmt, &columnCount);std::vector<SQLCHAR*> columns(columnCount);std::vector<SQLINTEGER> lengths(columnCount);for (SQLSMALLINT i = 0; i < columnCount; ++i) {columns[i] = new SQLCHAR[SQL_MAX_MESSAGE_LENGTH];SQLBindCol(hstmt, i + 1, SQL_C_CHAR, columns[i], SQL_MAX_MESSAGE_LENGTH, &lengths[i]);}while (SQLFetch(hstmt) == SQL_SUCCESS) {std::vector<std::string> row;for (SQLSMALLINT i = 0; i < columnCount; ++i) {std::string value(reinterpret_cast<const char*>(columns[i]));value = escapeDoubleQuotes(value);row.push_back(value);}data.push_back(row);}for (SQLSMALLINT i = 0; i < columnCount; ++i) {delete[] columns[i];}return data;
}// 将数据写入CSV文件
void writeToCSV(const std::vector<std::vector<std::string>>& data, const std::vector<std::string>& headers, const std::string& filename) {std::ofstream csvFile(filename);if (csvFile.is_open()) {// 写入表头for (size_t i = 0; i < headers.size(); ++i) {csvFile << '"' << headers[i] << '"';if (i < headers.size() - 1) csvFile << ',';}csvFile << std::endl;// 写入数据for (const auto& row : data) {for (size_t i = 0; i < row.size(); ++i) {csvFile << '"' << row[i] << '"';if (i < row.size() - 1) csvFile << ',';}csvFile << std::endl;}csvFile.close();} else {throw std::runtime_error("Failed to open CSV file for writing");}
}// 压缩CSV文件为gzip
void compressCSV(const std::string& csvFilename, const std::string& gzipFilename) {std::ifstream csvFile(csvFilename, std::ios::binary);std::ofstream gzipFile(gzipFilename, std::ios::binary);if (csvFile.is_open() && gzipFile.is_open()) {gzFile gzOut = gzopen(gzipFilename.c_str(), "wb");if (gzOut) {char buffer[1024];while (csvFile.read(buffer, sizeof(buffer))) {gzwrite(gzOut, buffer, sizeof(buffer));}gzwrite(gzOut, buffer, csvFile.gcount());gzclose(gzOut);} else {throw std::runtime_error("Failed to open gzip file for writing");}csvFile.close();gzipFile.close();std::remove(csvFilename.c_str());} else {throw std::runtime_error("Failed to open files for compression");}
}// 导出单个表
void exportTable(const std::string& server, const std::string& database, const std::string& schema, const std::string& table) {SQLHENV henv = nullptr;SQLHDBC hdbc = nullptr;SQLHSTMT hstmt = nullptr;try {SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);std::string connectionString = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=" + server + ";DATABASE=" + database + ";UID=your_username;PWD=your_password";SQLRETURN ret = SQLDriverConnect(hdbc, nullptr, (SQLCHAR*)connectionString.c_str(), SQL_NTS, nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT);if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {throw std::runtime_error("Failed to connect to database");}std::string query = "SELECT * FROM " + schema + "." + table;SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);ret = SQLExecDirect(hstmt, (SQLCHAR*)query.c_str(), SQL_NTS);if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {throw std::runtime_error("Failed to execute query");}std::vector<std::vector<std::string>> data = readTableData(hstmt);std::vector<std::string> headers;SQLSMALLINT columnCount = 0;SQLNumResultCols(hstmt, &columnCount);for (SQLSMALLINT i = 0; i < columnCount; ++i) {SQLCHAR columnName[SQL_MAX_COLUMN_NAME_LEN];SQLSMALLINT nameLen;SQLDescribeCol(hstmt, i + 1, columnName, SQL_MAX_COLUMN_NAME_LEN, &nameLen, nullptr, nullptr, nullptr, nullptr);headers.push_back(reinterpret_cast<const char*>(columnName));}std::string csvFilename = table + ".csv";writeToCSV(data, headers, csvFilename);std::string gzipFilename = table + ".gz";compressCSV(csvFilename, gzipFilename);std::ostringstream oss;oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Rows Exported: " << data.size() << ", Status: Success";logMessage(oss.str());} catch (const std::exception& e) {std::ostringstream oss;oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Status: Failed, Error: " << e.what();logMessage(oss.str());} finally {if (hstmt) SQLFreeHandle(SQL_HANDLE_STMT, hstmt);if (hdbc) SQLDisconnect(hdbc); SQLFreeHandle(SQL_HANDLE_DBC, hdbc);if (henv) SQLFreeHandle(SQL_HANDLE_ENV, henv);}
}int main() {std::string server = "<实例名>";std::string database = "<数据库名>";std::string schema = "<Schema名>";std::vector<std::string> tables = {"<表名1>", "<表名2>", "<表名3>"};std::vector<std::future<void>> futures;for (const auto& table : tables) {futures.push_back(std::async(std::launch::async, exportTable, server, database, schema, table));}for (auto& fut : futures) {fut.get();}return 0;
}

代码说明:

日志记录: logMessage 函数用于记录操作日志,包括时间戳、数据库名、schema名、表名、导出数据行数和操作状态。
字符串处理: escapeDoubleQuotes 函数用于处理字符串中的双引号,将其替换为两个双引号。
数据库读取: readTableData 函数使用ODBC API从数据库中读取表数据,并将其存储在二维向量中。
CSV写入: writeToCSV 函数将数据写入CSV文件,包括表头和数据行,并用双引号包裹每个数据,使用逗号作为分隔符。
文件压缩: compressCSV 函数将生成的CSV文件压缩为gzip格式,并删除原始CSV文件。
表导出: exportTable 函数负责连接数据库、执行查询、读取数据、写入CSV文件并压缩。
主函数: main 函数定义了数据库服务器、数据库名、schema名和表名,并使用异步任务并行导出每个表的数据。

用Python删除当前目录下所有功能扩展名为gz文件,接着运行export_sqlserver.exe程序,输出该程序的输出内容并等待它运行完成,然后连接SQL Server数据库和Amazon Redshift数据仓库,从数据库中获取所有表和它们的字段名,然后在Redshift中创建字段名全部相同的同名表,字段长度全部为最长的varchar类型,如果表已经存在则不创建表,自动上传当前目录下所有功能扩展名为gz文件到S3,默认覆盖同名的文件,然后使用COPY INTO将S3上包含csv文件的gz压缩包导入对应创建的Redshift表中,文件数据的第一行是表头,导入所有上传的文件到Redshift表,程序需要异常处理和日志文件输出,当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导入数据行数、成功或者失败的状态,日志文件不存在则创建。

import os
import subprocess
import pyodbc
import redshift_connector
import boto3
import logging
from datetime import datetime# 配置日志记录
logging.basicConfig(filename='operation_log.log', level=logging.INFO,format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')def delete_gz_files():try:for file in os.listdir('.'):if file.endswith('.gz'):os.remove(file)logging.info('所有.gz文件已删除')except Exception as e:logging.error(f'删除.gz文件时出错: {e}')def run_export_sqlserver():try:result = subprocess.run(['export_sqlserver.exe'], capture_output=True, text=True)print(result.stdout)logging.info('export_sqlserver.exe运行成功')except Exception as e:logging.error(f'运行export_sqlserver.exe时出错: {e}')def create_redshift_tables():# SQL Server 连接配置sqlserver_conn_str = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_sqlserver_server;DATABASE=your_database;UID=your_username;PWD=your_password'try:sqlserver_conn = pyodbc.connect(sqlserver_conn_str)sqlserver_cursor = sqlserver_conn.cursor()# Redshift 连接配置redshift_conn = redshift_connector.connect(host='your_redshift_host',database='your_redshift_database',user='your_redshift_user',password='your_redshift_password',port=5439)redshift_cursor = redshift_conn.cursor()sqlserver_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")tables = sqlserver_cursor.fetchall()for table in tables:table_name = table[0]sqlserver_cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")columns = sqlserver_cursor.fetchall()column_definitions = ', '.join([f"{column[0]} VARCHAR(MAX)" for column in columns])try:redshift_cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({column_definitions})")redshift_conn.commit()logging.info(f'在Redshift中成功创建表 {table_name}')except Exception as e:logging.error(f'在Redshift中创建表 {table_name} 时出错: {e}')sqlserver_conn.close()redshift_conn.close()except Exception as e:logging.error(f'连接数据库或创建表时出错: {e}')def upload_gz_files_to_s3():s3 = boto3.client('s3')bucket_name = 'your_bucket_name'try:for file in os.listdir('.'):if file.endswith('.gz'):s3.upload_file(file, bucket_name, file)logging.info(f'成功上传文件 {file} 到S3')except Exception as e:logging.error(f'上传文件到S3时出错: {e}')def copy_data_to_redshift():redshift_conn = redshift_connector.connect(host='your_redshift_host',database='your_redshift_database',user='your_redshift_user',password='your_redshift_password',port=5439)redshift_cursor = redshift_conn.cursor()bucket_name = 'your_bucket_name'try:for file in os.listdir('.'):if file.endswith('.gz') and file.endswith('.csv.gz'):table_name = file.split('.')[0]s3_path = f's3://{bucket_name}/{file}'sql = f"COPY {table_name} FROM '{s3_path}' IAM_ROLE 'your_iam_role' CSV HEADER"try:redshift_cursor.execute(sql)redshift_conn.commit()row_count = redshift_cursor.rowcountlogging.info(f'成功将数据导入表 {table_name},导入行数: {row_count}')except Exception as e:logging.error(f'将数据导入表 {table_name} 时出错: {e}')except Exception as e:logging.error(f'连接Redshift或导入数据时出错: {e}')finally:redshift_conn.close()if __name__ == "__main__":delete_gz_files()run_export_sqlserver()create_redshift_tables()upload_gz_files_to_s3()copy_data_to_redshift()

代码说明:

日志记录:使用 logging 模块配置日志记录,记录操作的时间戳和操作信息到 operation_log.log 文件。
删除.gz文件: delete_gz_files 函数删除当前目录下所有扩展名为 .gz 的文件。
运行export_sqlserver.exe: run_export_sqlserver 函数运行 export_sqlserver.exe 程序并输出其内容。
创建Redshift表: create_redshift_tables 函数连接SQL Server和Redshift数据库,获取SQL Server中所有表和字段名,在Redshift中创建同名表,字段类型为 VARCHAR(MAX) 。
上传.gz文件到S3: upload_gz_files_to_s3 函数上传当前目录下所有扩展名为 .gz 的文件到S3。
将数据从S3导入Redshift: copy_data_to_redshift 函数使用 COPY INTO 语句将S3上的CSV压缩包数据导入对应的Redshift表中。

请根据实际的数据库配置、S3桶名和IAM角色等信息修改代码中的相关参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/10213.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

消息队列篇--通信协议篇--TCP和UDP(3次握手和4次挥手,与Socket和webSocket的概念区别等)

1、TCP和UDP概述 TCP&#xff08;传输控制协议&#xff0c;Transmission Control Protocol&#xff09;和UDP&#xff08;用户数据报协议&#xff0c;User Datagram Protocol&#xff09;都算是最底层的通信协议&#xff0c;它们位于OSI模型的传输层。*传输层的主要职责是确保…

unity学习22:Application类其他功能

目录 1 是否允许后台运行 1.1 Application.runInBackground&#xff0c;显示是否允许后台运行 1.2 设置的地方 2 打开URL 2.1 Application.OpenURL("") 打开超链接 3 退出游戏 3.1 Application.Quit() 退出游戏 4 场景相关 5 返回游戏状态 6 控制游戏的行…

deepseek R1 14b显存占用

RTX2080ti 11G显卡&#xff0c;模型7b速度挺快&#xff0c;试试14B也不错。 7B显存使用5.6G&#xff0c;14B显存刚好够&#xff0c;出文字速度差不多。 打算自己写个移动宽带的IPTV播放器&#xff0c;不知道怎么下手&#xff0c;就先问他了。

.NET Core缓存

目录 缓存的概念 客户端响应缓存 cache-control 服务器端响应缓存 内存缓存&#xff08;In-memory cache&#xff09; 用法 GetOrCreateAsync 缓存过期时间策略 缓存的过期时间 解决方法&#xff1a; 两种过期时间策略&#xff1a; 绝对过期时间 滑动过期时间 两…

C++,STL 六大组件:容器、迭代器、算法、函数对象、适配器、分配器

文章目录 引言一、容器&#xff08;Containers&#xff09;主要分类 二、迭代器&#xff08;Iterators&#xff09;三、算法&#xff08;Algorithms&#xff09;四、函数对象&#xff08;Functors&#xff09;五、适配器&#xff08;Adapters&#xff09;六、分配器&#xff08…

996引擎 - NPC-添加NPC引擎自带形象

996引擎 - NPC-添加NPC引擎自带形象 截图参考添加NPC参考资料截图参考 添加NPC 编辑NPC表:Envir\DATA\cfg_npclist.xls 1.1. 需要临时隐藏NPC时可以在id前加 // 1.2. 如果NPC朝向不对,可以调整dir 列。(按8方向,上是0顺时针数。我这里给的4) 1.3. 形象代码:NPC代码、怪物…

【懒删除堆】力扣2349. 设计数字容器系统

设计一个数字容器系统&#xff0c;可以实现以下功能&#xff1a; 在系统中给定下标处 插入 或者 替换 一个数字。 返回 系统中给定数字的最小下标。 请你实现一个 NumberContainers 类&#xff1a; NumberContainers() 初始化数字容器系统。 void change(int index, int numb…

利用飞书机器人进行 - ArXiv自动化检索推荐

相关作者的Github仓库 ArXivToday-Lark 使用教程 Step1 新建机器人 根据飞书官方机器人使用手册&#xff0c;新建自定义机器人&#xff0c;并记录好webhook地址&#xff0c;后续将在配置文件中更新该地址。 可以先完成到后续步骤之前&#xff0c;后续的步骤与安全相关&…

SpringBoot 日志

目录 一. 日志概述 二. 日志的使用 1. 打印日志 (1) 获取日志对象 (2) 输出要打印的内容 2. 日志框架简介 (1) 门面模式简介 (2) SLF4J 框架简介 3. 日志的格式 4. 日志的级别 5. 日志配置 (1) 配置日志级别 (2) 日志持久化存储 ① 配置日志文件名 ② 配置日志的…

RK3568中使用QT opencv(显示基础图像)

文章目录 一、查看对应的开发环境是否有opencv的库二、QT使用opencv 一、查看对应的开发环境是否有opencv的库 在开发板中的/usr/lib目录下查看是否有opencv的库&#xff1a; 这里使用的是正点原子的ubuntu虚拟机&#xff0c;在他的虚拟机里面已经安装好了opencv的库。 二、…

LMI Gocator GO_SDK VS2019引用配置

LMI SDK在VS2019中的引用是真的坑爹,总结一下经验,希望后来的人能少走弯路.大致内容如下: &#xff08;1&#xff09; 环境变量 &#xff08;2&#xff09;C/C 附加包含目录 E:\GWQ\Gocator\GO_SDK\Gocator\GoSdk E:\GWQ\Gocator\GO_SDK\Platform\kApi &#xff08;3&#…

模型I/O

文章目录 什么是模型I/O模型I/O功能之输出解析器输出解析器的功能输出解析器的使用Pydantic JSON输出解析器结构化输出解析器 什么是模型I/O 模型I/O在所有LLM应用中&#xff0c;核心元素无疑都是模型本身。与模型进行有效的交互是实现高效、灵活和可扩展应用的关键。LangChain…

C语言练习(31)

有5个学生&#xff0c;每个学生有3门课程的成绩&#xff0c;从键盘输入以上数据&#xff08;包括学号、姓名、3门课程成绩&#xff09;&#xff0c;计算出平均成绩&#xff0c;将原有数据和计算出的平均分数存放在磁盘文件stud中。 设5名学生的学号、姓名和3门课程成绩如下&am…

【Block总结】DynamicFilter,动态滤波器降低计算复杂度,替换传统的MHSA|即插即用

论文信息 标题: FFT-based Dynamic Token Mixer for Vision 论文链接: https://arxiv.org/pdf/2303.03932 关键词: 深度学习、计算机视觉、对象检测、分割 GitHub链接: https://github.com/okojoalg/dfformer 创新点 本论文提出了一种新的标记混合器&#xff08;token mix…

「AI学习笔记」深度学习的起源与发展:从神经网络到大数据(二)

深度学习&#xff08;DL&#xff09;是现代人工智能&#xff08;AI&#xff09;的核心之一&#xff0c;但它并不是一夜之间出现的技术。从最初的理论提出到如今的广泛应用&#xff0c;深度学习经历了几乎一个世纪的不断探索与发展。今天&#xff0c;我们一起回顾深度学习的历史…

Axure PR 9 旋转效果 设计交互

大家好&#xff0c;我是大明同学。 这期内容&#xff0c;我们将学习Axure中的旋转效果设计与交互技巧。 旋转 创建旋转效果所需的元件 1.打开一个新的 RP 文件并在画布上打开 Page 1。 2.在元件库中拖出一个按钮元件。 创建交互 创建按钮交互状态 1.选中按钮元件&#xf…

【外文原版书阅读】《机器学习前置知识》2.用看电影推荐的例子带你深入了解向量点积在机器学习的作用

目录 3.3 Where Are You Looking, Vector? The Dot Product 个人主页&#xff1a;Icomi 大家好&#xff0c;我是Icomi&#xff0c;本专栏是我阅读外文原版书《Before Machine Learning》对于文章中我认为能够增进线性代数与机器学习之间的理解的内容的一个输出&#xff0c;希望…

论文阅读(八):结构方程模型用于研究数量遗传学中的因果表型网络

1.论文链接&#xff1a;Structural Equation Models for Studying Causal Phenotype Networks in Quantitative Genetics 摘要&#xff1a; 表型性状可能在它们之间发挥因果作用。例如&#xff0c;农业物种的高产可能会增加某些疾病的易感性&#xff0c;相反&#xff0c;疾病的…

每日一题——序列化二叉树

序列化二叉树 BM39 序列化二叉树题目描述序列化反序列化 示例示例1示例2 解题思路序列化过程反序列化过程 代码实现代码说明复杂度分析总结 BM39 序列化二叉树 题目描述 请实现两个函数&#xff0c;分别用来序列化和反序列化二叉树。二叉树的序列化是将二叉树按照某种遍历方式…

JVM_程序计数器的作用、特点、线程私有、本地方法的概述

①. 程序计数器 ①. 作用 (是用来存储指向下一条指令的地址,也即将要执行的指令代码。由执行引擎读取下一条指令) ②. 特点(是线程私有的 、不会存在内存溢出) ③. 注意:在物理上实现程序计数器是在寄存器实现的,整个cpu中最快的一个执行单元 ④. 它是唯一一个在java虚拟机规…