读取mysql数据写入parquet文件
创建表
create table tbs1(
id int,
name varchar(50),
address varchar(50),
countryid int);
写入数据
package mainimport ("database/sql""fmt"_ "github.com/go-sql-driver/mysql""log""strconv"
)func main() {dsn := "root:123456@tcp(192.168.230.71:3306)/fxdb"db, err := sql.Open("mysql", dsn)if err != nil {log.Fatal(err)}defer db.Close()// 准备INSERT语句stmt, err := db.Prepare("INSERT INTO tbs1 (id,name,address,countryid) VALUES (?,?,?,?)")if err != nil {log.Fatal(err)}defer stmt.Close()// 执行INSERT语句for i := 1; i <= 1000; i++ {stmt.Exec(i, "tom"+strconv.Itoa(i), "wuhan"+strconv.Itoa(i), 1000+i)}fmt.Println("Data inserted successfully")
}
读取数据写入parquet文件
go.mod文件内容如下:
module parquetdemogo 1.20require (github.com/go-sql-driver/mysql v1.5.0github.com/xitongsys/parquet-go v1.6.2github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
)require (github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirectgithub.com/apache/thrift v0.16.0 // indirectgithub.com/golang/snappy v0.0.4 // indirectgithub.com/google/flatbuffers v2.0.8+incompatible // indirectgithub.com/klauspost/compress v1.15.9 // indirectgithub.com/pierrec/lz4/v4 v4.1.15 // indirectgithub.com/stretchr/testify v1.8.0 // indirectgolang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
)
main.go内容:
package mainimport ("database/sql""fmt"_ "github.com/go-sql-driver/mysql""github.com/xitongsys/parquet-go-source/local""github.com/xitongsys/parquet-go/writer""log"
)func main() {// DSN(数据源名称)dsn := "root:123456@tcp(192.168.230.71:3306)/fxdb"// 打开数据库连接db, err := sql.Open("mysql", dsn)if err != nil {log.Fatal(err)}defer db.Close()// 验证连接err = db.Ping()if err != nil {log.Fatal(err)}sSql := "SELECT * FROM tbs1"stmt, err := db.Prepare(sSql)if err != nil {log.Fatal(err)}defer stmt.Close()// 查询rows, err := stmt.Query()if err != nil {log.Fatal(err)}defer rows.Close()// 数据列columns, err := rows.Columns()if err != nil {log.Fatal(err)}// 列的个数count := len(columns)types, _ := rows.ColumnTypes()/* 构建如下字符串数组,写入需要按照这个数组顺序md := []string{"name=id, type=INT64","name=Name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN",}*/// 构建mdvar md []string// 需要按顺序for i := 0; i < len(types); i++ {sqltype := *types[i]// 列名vname := sqltype.Name()// 列的数据类型vtype := sqltype.DatabaseTypeName()item := fmt.Sprintf("name=%s,type=%s", vname, getType(vtype))md = append(md, item)}//write,创建一个ParquetFilefw, err := local.NewLocalFileWriter("csv.parquet")if err != nil {log.Println("Can't open file", err)return}// 创建一个CSV writer,np为parallel numberpw, err := writer.NewCSVWriter(md, fw, 4)if err != nil {log.Println("Can't create csv writer", err)return}// 用于存储表数据mData := make([]map[string]interface{}, 0)// 一条数据的各列的值(需要指定长度为列的个数,以便获取地址)values := make([]interface{}, count)// 存储values的地址valPointers := make([]interface{}, count)// 取values地址放入valPointersfor i := 0; i < count; i++ {valPointers[i] = &values[i]}// 遍历数据for rows.Next() {// 获取各列的值,放到对应的地址中rows.Scan(valPointers...)// 一条数据的Map (列名和值的键值对)entry := make(map[string]interface{})// Map 赋值for i, col := range columns {var v interface{}val := values[i]b, ok := val.([]byte)if ok {v = string(b)} else {v = val}entry[col] = v}mData = append(mData, entry)}for i, _ := range mData {data := make([]interface{}, 0)// 按列顺序for j := 0; j < len(types); j++ {sqltype := *types[j]data = append(data, mData[i][sqltype.Name()])}if err = pw.Write(data); err != nil {log.Println("WriteString error:", err)}}if err = pw.WriteStop(); err != nil {log.Println("WriteStop error", err)}log.Println("Write Finished")fw.Close()
}func getType(colType string) string {if colType == "INT" {return "INT64"}if colType == "VARCHAR" {return "BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"}return ""
}