本文介绍DuckDB查询Parquet文件的典型应用场景,掌握DuckDB会让你的产品分析能力更强,相反系统运营成本相对较低。为了示例完整,我也提供了如何使用Python导出MongoDB数据。
Apache Parquet文件格式在存储和传输大型数据集方面变得非常流行。最近遇到一个典型应用场景,在MongoDB中存储大量结构化数据的成本过高。相反,我们将这些数据以Parquet格式存储在S3中。为了提供偶尔的查询,我们下载S3文件并使用DuckDB加载/查询。
导出MongoDB数据
将 MongoDB 中的数据导入为 Parquet 格式的步骤及相应的代码示例(使用 Python),主要包括三个步骤:
- 从 MongoDB 中读取数据。
- 将读取的数据存储为 DataFrame 格式(使用
pandas
)。 - 使用
pyarrow
将 DataFrame 转换为 Parquet 格式。
import pymongo
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq# 连接 MongoDB
def connect_to_mongodb(host='localhost', port=27017, db_name='your_db_name', collection_name='your_collection_name'):client = pymongo.MongoClient(host, port)db = client[db_name]collection = db[collection_name]return collection# 从 MongoDB 读取数据
def read_from_mongodb(collection):data = list(collection.find())df = pd.DataFrame(data)return df# 将 DataFrame 转换为 Parquet 并保存
def convert_to_parquet(df, output_path='output.parquet'):table = pa.Table.from_pandas(df)pq.write_table(table, output_path)if __name__ == "__main__":# 连接 MongoDBcollection = connect_to_mongodb(db_name='test_db', collection_name='test_collection')# 从 MongoDB 读取数据df = read_from_mongodb(collection)# 将数据存储为 Parquet 格式convert_to_parquet(df, output_path='mongodb_data.parquet')
注意:
-
首先,确保已经安装所需的 Python 库:
pip install pymongo pandas pyarrow
-
替换代码中的
db_name
和collection_name
为你实际的 MongoDB 数据库和集合名称。 -
运行代码,将从 MongoDB 中读取数据,并将其存储为 Parquet 格式的文件,文件名为
mongodb_data.parquet
。
Go 查询parquet文件
Parquet 是一种列式存储格式,专为高效存储和处理大规模数据而设计。它支持多种数据类型,能对数据进行压缩和编码,以减少存储空间并提高读写性能。Parquet 具有良好的可扩展性和兼容性,可与 Hadoop、Spark 等大数据处理框架无缝集成,广泛应用于数据仓库、数据分析等领域,能有效提升数据处理的效率和灵活性。
下面是用Golang编写的代码。
package mainimport ("database/sql""fmt""log"_ "github.com/marcboeker/go-duckdb"
)func main() {db := OpenDuckDB()rows, err := db.Query("SELECT id, first_name, family_name from read_parquet('employee.parquet');")if err != nil {panic(fmt.Sprintf("failed to run select query against parquet file %v", err))}type Employee struct {Id string `db:"id"`FirstName string `db:"first_name"`FamilyName string `db:"family_name"`}var row Employeedefer rows.Close()for rows.Next() {err := rows.Scan(&row.Id, &row.FirstName, &row.FamilyName)if err != nil {log.Fatal(err)}log.Printf("%v", row)}err = rows.Err()if err != nil {log.Fatal(err)}
}func OpenDuckDB() (db *sql.DB) {db, err := sql.Open("duckdb", "")if err != nil {panic(fmt.Sprintf("failed to open parquet file using duckdb %v", err))}_, err = db.Exec("INSTALL parquet;")if err != nil {panic(fmt.Sprintf("failed to INSTALL parquet extension. %v", err))}_, err = db.Exec("LOAD parquet;")if err != nil {panic(fmt.Sprintf("failed to LOAD parquet extension. %v", err))}return
}
虽然Parquet文件主要用于将数据从事务性数据库传输到数据仓库,但它也可以用于直接在Parquet文件之上构建查询和分析。