C++ 列式内存布局数据存储格式 Arrow

Apache Arrow 优点 :
    高性能数据处理: Arrow 使用列式内存布局,这特别适合于数据分析和查询操作,因为它允许对数据进行高效批量处理,减少CPU缓存未命中,从而提升处理速度。
    零拷贝数据共享: Arrow 允许不同系统和进程之间直接共享内存中的数据而无需复制,这对于提高数据密集型应用的效率至关重要,减少了内存使用和CPU开销。
    跨平台兼容性: Arrow 是一个跨语言开发平台,支持C++, Java, Python等多种编程语言,促进了不同软件组件间的互操作性。
    标准化数据格式: 定义了一套统一的数据格式规范,使得数据可以在不同系统间无缝传递,降低了数据转换的成本和复杂性。
    优化大数据处理: 特别是在与大数据框架(如Spark、Pandas)集成时,Arrow 可显著加速数据加载、处理和分析的速度,例如,与PySpark集成后数据处理速度提升高达53倍。
    集成广泛: 被众多数据处理工具和库采用,如Pandas、Parquet、Drill、Spark等,形成了强大的生态系统。
Apache Arrow 缺点 :
    内存消耗: 列式存储相对于行式存储可能需要更多的内存,尤其是在处理稀疏数据或宽表时,因为每一列都需要分配连续的内存空间。
    不适合所有场景: 对于需要频繁随机访问记录或更新操作的场景,Arrow 的列式存储可能不如传统的行式存储高效。
    学习曲线: 对于新用户来说,理解和掌握Arrow的数据结构和API可能需要一定时间,尤其是当他们习惯于使用其他数据处理模型时。
    生态成熟度: 虽然Arrow的生态系统正在快速发展,但在某些特定领域或小众技术栈中,相关支持和工具可能不够丰富或成熟。
    实现复杂性: 对于开发者来说,实现Arrow的高效利用可能涉及到复杂的内存管理和优化策略,这在某些情况下可能会增加开发难度。


#define ARROW_COMPUTE#include <arrow/compute/api.h>
#include "arrow/pretty_print.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/json/api.h>
#include <arrow/io/api.h>
#include <arrow/table.h>
#include <arrow/pretty_print.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/ipc/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <memory>
#include <iostream>template <typename T>
using numbuildT = arrow::NumericBuilder<T>;struct ArrowUtil {static arrow::Status read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status write_ipc(arrow::Table const& tb, char const* file_name);static arrow::Status write_parquet(arrow::Table const& tb, char const* file_name);template <typename T, typename buildT, typename arrayT>inline static std::shared_ptr<arrow::Array> chunked_array_to_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) {buildT int64_builder;int64_builder.Resize(array_a->length());std::vector<T> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrayT>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j));}}int64_builder.AppendValues(int64_values);std::shared_ptr<arrow::Array> array_a_res;int64_builder.Finish(&array_a_res);return array_a_res;}template <typename T, typename arrayT>inline static std::vector<T> chunked_array_to_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) {std::vector<T> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrayT>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j));}}return int64_values;}inline static std::vector<std::string> chunked_array_to_str_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) {std::vector<std::string> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j).data());}}return int64_values;}inline static std::shared_ptr<arrow::Array> chunked_array_to_str_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) {arrow::StringBuilder int64_builder;int64_builder.Resize(array_a->length());std::vector<std::string> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j).data());}}int64_builder.AppendValues(int64_values);std::shared_ptr<arrow::Array> array_a_res;int64_builder.Finish(&array_a_res);return array_a_res;}
};arrow::Status ArrowUtil::read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb) {ARROW_ASSIGN_OR_RAISE(auto input_file,arrow::io::ReadableFile::Open(file_name));ARROW_ASSIGN_OR_RAISE(auto csv_reader,arrow::csv::TableReader::Make(arrow::io::default_io_context(), input_file,arrow::csv::ReadOptions::Defaults(),arrow::csv::ParseOptions::Defaults(),arrow::csv::ConvertOptions::Defaults()));ARROW_ASSIGN_OR_RAISE(auto table, csv_reader->Read());tb = table;return arrow::Status::OK();
}arrow::Status ArrowUtil::read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb) {ARROW_ASSIGN_OR_RAISE(auto input_file,arrow::io::ReadableFile::Open(file_name));ARROW_ASSIGN_OR_RAISE(auto ipc_reader, arrow::ipc::RecordBatchFileReader::Open(input_file));std::vector<std::shared_ptr<arrow::RecordBatch>> batches;batches.reserve(ipc_reader->num_record_batches());for (int i = 0; i < ipc_reader->num_record_batches(); ++i) {ARROW_ASSIGN_OR_RAISE(auto a_record, ipc_reader->ReadRecordBatch(i));batches.emplace_back(std::move(a_record));}arrow::Table::FromRecordBatches(ipc_reader->schema(), std::move(batches)).Value(&tb);return arrow::Status::OK();
}arrow::Status ArrowUtil::read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb) {std::shared_ptr<arrow::io::ReadableFile> infile;PARQUET_ASSIGN_OR_THROW(infile,arrow::io::ReadableFile::Open(file_name,arrow::default_memory_pool()));std::unique_ptr<parquet::arrow::FileReader> reader;PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));std::shared_ptr<arrow::Table> table;PARQUET_THROW_NOT_OK(reader->ReadTable(&table));tb = table;return arrow::Status::OK();
}arrow::Status ArrowUtil::read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb) {std::shared_ptr<arrow::io::ReadableFile> infile;PARQUET_ASSIGN_OR_THROW(infile,arrow::io::ReadableFile::Open(file_name,arrow::default_memory_pool()));ARROW_ASSIGN_OR_RAISE(auto reader, arrow::json::TableReader::Make(arrow::default_memory_pool(), infile, arrow::json::ReadOptions::Defaults(), arrow::json::ParseOptions::Defaults()));ARROW_ASSIGN_OR_RAISE(auto res_tb, reader->Read());tb = res_tb;return arrow::Status::OK();
}arrow::Status ArrowUtil::write_ipc(arrow::Table const& tb, char const* file_name) {ARROW_ASSIGN_OR_RAISE(auto output_file,arrow::io::FileOutputStream::Open(file_name));ARROW_ASSIGN_OR_RAISE(auto batch_writer,arrow::ipc::MakeFileWriter(output_file, tb.schema()));ARROW_RETURN_NOT_OK(batch_writer->WriteTable(tb));ARROW_RETURN_NOT_OK(batch_writer->Close());return arrow::Status::OK();
}arrow::Status ArrowUtil::write_parquet(arrow::Table const& tb, char const* file_name) {std::shared_ptr<arrow::io::FileOutputStream> outfile;PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(file_name));// The last argument to the function call is the size of the RowGroup in// the parquet file. Normally you would choose this to be rather large but// for the example, we use a small value to have multiple RowGroups.PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(tb, arrow::default_memory_pool(), outfile, 3));return arrow::Status::OK();
}void testReadCSV() {// 读取CSV文件char const* csv_path = "./test.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(csv_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testWriteIpc() {// 读取CSV文件并写入IPC文件char const* csv_path = "./test.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(csv_path, tb);auto const& tb_ = *tb;char const* write_csv_path = "./test_dst.arrow";arrow::PrettyPrint(tb_, {}, &std::cerr);auto write_res = ArrowUtil::write_ipc(tb_, write_csv_path);assert(write_res == arrow::Status::OK());
}void testReadIPC() {// 读取Arrow IPC 文件char const* ipc_path = "./test_dst.arrow";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_ipc(ipc_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testWriteParquet() {// 写入Parquet文件char const* csv_path = "./test.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(csv_path, tb);auto const& tb_ = *tb;char const* write_parquet_path = "./test_dst.parquet";arrow::PrettyPrint(tb_, {}, &std::cerr);auto write_res = ArrowUtil::write_parquet(tb_, write_parquet_path);assert(write_res == arrow::Status::OK());
}void testReadParquet() {// 读取 Parquetchar const* parquet_path = "./test_dst.parquet";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_parquet(parquet_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testReadJson() {// 读取Json文件char const* json_path = "./test.json";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_json(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testComputeGreater() {// 比较两列 int 值中 int1 > int2的值, greater函数char const* json_path = "./comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_b = tb_.GetColumnByName("int2");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b);auto compared_datum = arrow::compute::CallFunction("greater", { array_a_res, array_b_res });auto array_a_gt_b_compute = compared_datum->make_array();arrow::PrettyPrint(*array_a_gt_b_compute, {}, &std::cerr);auto schema =arrow::schema({ arrow::field("int1", arrow::int64()), arrow::field("int2", arrow::int64()),arrow::field("a>b? (arrow)", arrow::boolean()) });std::shared_ptr<arrow::Table> my_table = arrow::Table::Make(schema, { array_a_res, array_b_res, array_a_gt_b_compute }, tb_.num_rows());arrow::PrettyPrint(*my_table, {}, &std::cerr);
}void testComputeMinMax() {// 计算int1列的最大值和最小值char const* json_path = "./comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto min_max = arrow::compute::CallFunction("min_max", { array_a_res }, &scalar_aggregate_options);// Unpack struct scalar result (a two-field {"min", "max"} scalar)auto min_value = min_max->scalar_as<arrow::StructScalar>().value[0];auto max_value = min_max->scalar_as<arrow::StructScalar>().value[1];assert(min_value->ToString() == "1");assert(max_value->ToString() == "8");
}#define GTEST_TEST(a, b) void a##_##b()
#define ASSERT_EQ(a, b) assert(a == b)GTEST_TEST(RWTests, ComputeMean) {// 计算int1列的平均值char const* json_path = "../data/comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto mean = arrow::compute::CallFunction("mean", { array_a_res }, &scalar_aggregate_options);auto const& mean_value = mean->scalar_as<arrow::Scalar>();ASSERT_EQ(mean_value.ToString(), "4.5");
}GTEST_TEST(RWTests, ComputeAdd) {// 将第一列的值加3char const* json_path = "../data/comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;std::shared_ptr<arrow::Scalar> increment = std::make_shared<arrow::Int64Scalar>(3);auto add = arrow::compute::CallFunction("add", { array_a_res, increment }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>();arrow::PrettyPrint(*incremented_array, {}, &std::cerr);
}GTEST_TEST(RWTests, ComputeAddArray) {// int1和int2两列相加char const* json_path = "../data/comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);auto array_b = tb_.GetColumnByName("int2");auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto add = arrow::compute::CallFunction("add", { array_a_res, array_b_res }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>();arrow::PrettyPrint(*incremented_array, {}, &std::cerr);
}GTEST_TEST(RWTests, ComputeStringEqual) {// 比较s1和s2两列是否相等char const* json_path = "../data/comp_s_eq.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("s1");auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);auto array_b = tb_.GetColumnByName("s2");auto array_b_res = ArrowUtil::chunked_array_to_str_array(array_b);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto eq_ = arrow::compute::CallFunction("equal", { array_a_res, array_b_res }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}GTEST_TEST(RWTests, ComputeCustom) {// 自己写算法逐个比较相等 char const* json_path = "../data/comp_s_eq.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto arr1 = tb_.GetColumnByName("s1");auto arr2 = tb_.GetColumnByName("s2");auto v1 = ArrowUtil::chunked_array_to_str_vector(arr1);auto v2 = ArrowUtil::chunked_array_to_str_vector(arr2);for (std::size_t i = 0; i < v1.size(); ++i) {if (v1[i] != v2[i]) {std::cerr << v1[i] << "!=" << v2[i] << "\n";}}
}GTEST_TEST(RWTests, ComputeCustomDbl) {// 自己写算法比较double值char const* json_path = "../data/custom_dbl.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto arr1 = tb_.GetColumnByName("dbl1");auto arr2 = tb_.GetColumnByName("dbl2");auto v1 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr1);auto v2 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr2);for (std::size_t i = 0; i < v1.size(); ++i) {if (v1[i] != v2[i]) {std::cerr << v1[i] << "!=" << v2[i] << "\n";}}
}GTEST_TEST(RWTests, ComputeEqualDbl) {// 使用equal函数比较double值char const* json_path = "../data/custom_dbl.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto arr1 = tb_.GetColumnByName("dbl1");auto arr2 = tb_.GetColumnByName("dbl2");auto dbl_arr1 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr1);auto dbl_arr2 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr2);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto eq_ = arrow::compute::CallFunction("equal", { dbl_arr1, dbl_arr2 }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}GTEST_TEST(RWTests, StrStartsWith) {// 计算s1列以是否以 Zha开头的值char const* json_path = "../data/comp_s_eq.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("s1");auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);arrow::compute::MatchSubstringOptions options("Zha");auto eq_ = arrow::compute::CallFunction("starts_with", { array_a_res }, &options);std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}using arrow::Int32Builder;
using arrow::Int64Builder;
using arrow::DoubleBuilder;
using arrow::StringBuilder;struct row_data {int32_t col1;int64_t col2;double col3;std::string col4;
};//行结构#define EXIT_ON_FAILURE(expr)                      \do {                                             \arrow::Status status_ = (expr);                \if (!status_.ok()) {                           \std::cerr << status_.message() << std::endl; \return EXIT_FAILURE;                         \}                                              \} while (0);arrow::Status CreateTable(const std::vector<struct row_data>& rows, std::shared_ptr<arrow::Table>* table) {//使用arrow::jemalloc::MemoryPool::default_pool()构建器更有效,因为这可以适当增加底层内存区域的大小.arrow::MemoryPool* pool = arrow::default_memory_pool();Int32Builder col1_builder(pool);Int64Builder col2_builder(pool);DoubleBuilder col3_builder(pool);StringBuilder col4_builder(pool);//现在我们可以循环我们现有的数据,并将其插入到构建器中。这里的' Append '调用可能会失败(例如,我们无法分配足够的额外内存)。因此我们需要检查它们的返回值。for (const row_data& row : rows) {ARROW_RETURN_NOT_OK(col1_builder.Append(row.col1));ARROW_RETURN_NOT_OK(col2_builder.Append(row.col2));ARROW_RETURN_NOT_OK(col3_builder.Append(row.col3));ARROW_RETURN_NOT_OK(col4_builder.Append(row.col4));}//添加空值,末尾值的元素为空ARROW_RETURN_NOT_OK(col1_builder.AppendNull());ARROW_RETURN_NOT_OK(col2_builder.AppendNull());ARROW_RETURN_NOT_OK(col3_builder.AppendNull());ARROW_RETURN_NOT_OK(col4_builder.AppendNull());std::shared_ptr<arrow::Array> col1_array;ARROW_RETURN_NOT_OK(col1_builder.Finish(&col1_array));std::shared_ptr<arrow::Array> col2_array;ARROW_RETURN_NOT_OK(col2_builder.Finish(&col2_array));std::shared_ptr<arrow::Array> col3_array;ARROW_RETURN_NOT_OK(col3_builder.Finish(&col3_array));std::shared_ptr<arrow::Array> col4_array;ARROW_RETURN_NOT_OK(col4_builder.Finish(&col4_array));std::vector<std::shared_ptr<arrow::Field>> schema_vector = {arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),arrow::field("col4", arrow::utf8()) };auto schema = std::make_shared<arrow::Schema>(schema_vector);//最终的' table '变量是我们可以传递给其他可以使用Apache Arrow内存结构的函数的变量。这个对象拥有所有引用数据的所有权,//因此一旦我们离开构建表及其底层数组的函数的作用域,就不必关心未定义的引用。*table = arrow::Table::Make(schema, { col1_array, col2_array, col3_array,col4_array });return arrow::Status::OK();
}arrow::Status TableToVector(const std::shared_ptr<arrow::Table>& table,std::vector<struct row_data>* rows) {//检查表结构是否一致std::vector<std::shared_ptr<arrow::Field>> schema_vector = {arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),arrow::field("col4", arrow::utf8()) };auto expected_schema = std::make_shared<arrow::Schema>(schema_vector);if (!expected_schema->Equals(*table->schema())) {// The table doesn't have the expected schema thus we cannot directly// convert it to our target representation.return arrow::Status::Invalid("Schemas are not matching!");}//获取对应列数据指针auto col1s =std::static_pointer_cast<arrow::Int32Array>(table->column(0)->chunk(0));auto col2s =std::static_pointer_cast<arrow::Int64Array>(table->column(1)->chunk(0));auto col3s =std::static_pointer_cast<arrow::DoubleArray>(table->column(2)->chunk(0));auto col4s =std::static_pointer_cast<arrow::StringArray>(table->column(3)->chunk(0));for (int64_t i = 0; i < table->num_rows(); i++) {if (col1s->IsNull(i)) {assert(i == 3);//第四行为null}else {int32_t col1 = col1s->Value(i);int64_t col2 = col2s->Value(i);double col3 = col3s->Value(i);std::string col4 = col4s->GetString(i);rows->push_back({ col1, col2, col3,col4 });}}return arrow::Status::OK();
}// 行数组和列数组相互转换
int testTableConvertSTL() {//行数组std::vector<row_data> rows = {{1, 11,1.0, "John"}, {2, 22,2.0, "Tom"}, {3,33, 3.0,"Susan"} };std::shared_ptr<arrow::Table> table;EXIT_ON_FAILURE(CreateTable(rows, &table));std::vector<row_data> expected_rows;EXIT_ON_FAILURE(TableToVector(table, &expected_rows));std::cout << expected_rows.size() << std::endl;assert(rows.size() == expected_rows.size());return 0;
}void test() {// 构建一个int8数组arrow::Int8Builder builder;arrow::Int16Builder int16builder;int8_t days_raw[5] = { 1, 12, 17, 23, 28 };int8_t months_raw[5] = { 1, 3, 5, 7, 1 };int16_t years_raw[5] = { 1990, 2000, 1995, 2000, 1995 };builder.AppendValues(days_raw, 5);std::shared_ptr<arrow::Array> days = builder.Finish().MoveValueUnsafe();    builder.AppendValues(months_raw, 5);std::shared_ptr<arrow::Array> months = builder.Finish().MoveValueUnsafe();    int16builder.AppendValues(years_raw, 5);std::shared_ptr<arrow::Array> years = int16builder.Finish().MoveValueUnsafe();// Schema 自定义table// Now, we want a RecordBatch, which has columns and labels for said columns.// This gets us to the 2d data structures we want in Arrow.// These are defined by schema, which have fields -- here we get both those object types// ready.std::shared_ptr<arrow::Field> field_day, field_month, field_year;std::shared_ptr<arrow::Schema> schema;// Every field needs its name and data type.field_day = arrow::field("Day", arrow::int8());field_month = arrow::field("Month", arrow::int8());field_year = arrow::field("Year", arrow::int16());// The schema can be built from a vector of fields, and we do so here.schema = arrow::schema({ field_day, field_month, field_year });// 打印// With the schema and Arrays full of data, we can make our RecordBatch! Here,// each column is internally contiguous. This is in opposition to Tables, which we'll// see next.std::shared_ptr<arrow::RecordBatch> rbatch;// The RecordBatch needs the schema, length for columns, which all must match,// and the actual data itself.rbatch = arrow::RecordBatch::Make(schema, days->length(), { days, months, years });std::cout << rbatch->ToString();/*Day:   [1,12,17,23,28]Month:   [1,3,5,7,1]Year:   [1990,2000,1995,2000,1995]*/// stl vector容器arrow::ArrayVector day_vecs{days};std::shared_ptr<arrow::ChunkedArray> day_chunks =std::make_shared<arrow::ChunkedArray>(day_vecs);testTableConvertSTL();testReadCSV();/*col1: stringcol2: stringcol3: string----col1:[["val1","val1"]]col2:[["val2","val2"]]col3:[["val3","val3"]]*/testWriteIpc();testReadIPC();//testComputeGreater();//testComputeMinMax();
}

Compute Functions — Apache Arrow v17.0.0

GitHub - apache/arrow: Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing


创作不易,小小的支持一下吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/383799.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SPSS个人版是什么软件

SPSS是一款数据统计、分析软件&#xff0c;它由IBM公司出品&#xff0c;这款软件平台提供了文本分析、大量的机器学习算法、数据分析模型、高级统计分析功能等&#xff0c;软件易学且功能非常强大&#xff0c;可以使用SPSS制作图表&#xff0c;例如柱状、饼状、折线等图表&…

Pytorch使用教学7-张量的广播

PyTorch中的张量具有和NumPy相同的广播特性&#xff0c;允许不同形状的张量之间进行计算。 广播的实质特性&#xff0c;其实是低维向量映射到高维之后&#xff0c;相同位置再进行相加。我们重点要学会的就是低维向量如何向高维向量进行映射。 相同形状的张量计算 虽然我们觉…

SpringBoot 实现图形验证码

一、最终结果展示 二、前端代码 2.1 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"utf-8"><title>验证码</title><style>#inputCaptcha {height: 30px;vertical-align: middle;}#verifica…

C95之重要特性及用法实例(五十二)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列…

JavaWeb学习——请求响应、分层解耦

目录 一、请求响应学习 1、请求 简单参数 实体参数 数组集合参数 日期参数 Json参数 路径参数 总结 2、响应 ResponseBody&统一响应结果 二、分层解耦 1、三层架构 三层架构含义 架构划分 2、分层解耦 引入概念 容器认识 3、IOC&DI入门 4、IOC详解 …

实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录 1. 准备工作 2. 将需要缓存的数据存储 Redis 3. 监听 canal 存储在 Kafka Topic 中数据 1. 准备工作 1. 开启并配置MySQL的 BinLog&#xff08;MySQL 8.0 默认开启&#xff09; 修改配置&#xff1a;C:\ProgramData\MySQL\MySQL Server 8.0\my.ini log-bin"HELO…

本田Honda EDI项目案例:非EDI标准的数据格式转换与传输

近期知行帮助东风本田Honda的供应商E公司成功实施EDI项目&#xff0c;与以往采用X12、EDIFACT等EDI标准的项目不同&#xff0c;Honda向其供应商提供API接口&#xff0c;以JSON的格式传输库存信息以及生产计划。 EDI需求概览 Honda提供公开的WSRM系统供应商API接口&#xff0c…

数据库中的事务

一、理解事务 1、本质 事务由一组DML语句组成&#xff0c;这一组语句要么全部成功&#xff0c;要么全部失败。在逻辑上&#xff0c;事务就是一组sql语句&#xff0c;但在实际中&#xff0c;公共的数据库一定会高并发地接受各种事务的请求&#xff0c;所以一个事务要有4个属性…

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案 一&#xff0c; 文章简介二&#xff0c;硬件平台构建2.1 音频源板2.2 音频收发板2.3 双板硬件连接 三&#xff0c;软件方案与软件实现3.1 方案实现3.2 软件代码实现3.2.1 4路I2S接收3.2.2 I2S DMA pingpong配置3.2.3 音频数…

经典文献阅读之--World Models for Autonomous Driving(自动驾驶的世界模型:综述)

Tip: 如果你在进行深度学习、自动驾驶、模型推理、微调或AI绘画出图等任务&#xff0c;并且需要GPU资源&#xff0c;可以考虑使用UCloud云计算旗下的Compshare的GPU算力云平台。他们提供高性价比的4090 GPU&#xff0c;按时收费每卡2.6元&#xff0c;月卡只需要1.7元每小时&…

ctfshow-web入门-php特性(web132-web136)

目录 1、web132 2、web133 3、web134 4、web135 5、web136 1、web132 存在 robots.txt 访问 /admin 需要传三个参数&#xff0c;并且需要满足&#xff1a; if($code mt_rand(1,0x36D) && $password $flag || $username "admin"){if($code admin){ech…

设计模式-Git-其他

目录 设计模式&#xff1f; 创建型模式 单例模式&#xff1f; 啥情况需要单例模式 实现单例模式的关键点&#xff1f; 常见的单例模式实现&#xff1f; 01、饿汉式如何实现单例&#xff1f; 02、懒汉式如何实现单例&#xff1f; 03、双重检查锁定如何实现单例&#xff…

dsp c6657 SYS/BIOS学习笔记

1 SYS/BIOS简介 SYS/BIOS是一种用于TI的DSP平台的嵌入式操作系统&#xff08;RTOS&#xff09;。 2 任务 2.1 任务调度 SYS/BIOS任务线程有0-31个优先级&#xff08;默认0-15&#xff0c;优先级0被空闲线程使用&#xff0c;任务最低优先级为1&#xff0c;最高优先级为15&am…

Superset二次开发之筛选器native Filters 水平布局

引言 Apache Superset作为一个功能强大的开源数据探索和可视化平台&#xff0c;提供了丰富的配置选项来定制化用户体验。其中&#xff0c;HORIZONTAL_FILTER_BAR 是一个重要的配置项&#xff0c;专注于优化和改进Superset中的筛选器条布局与交互。 什么是HORIZONTAL_FILTER_B…

Linux嵌入书学习—数据结构——栈(seqstak)

一、栈&#xff1b; 定义&#xff1a; 是限定仅在表尾&#xff08;栈顶&#xff09;进行插入和删除操作的线性表 栈又称为 后进先出&#xff08;Last In First Out&#xff09; 的线性表&#xff0c;简称 LIFO 结构 栈顶&#xff08;Top&#xff09; 栈顶是栈中允许进行添加&…

开源邮箱套件介绍系列1:SOGo

项目网站&#xff1a;SOGo | Free Open Source Webmail 提示&#xff1a;如下内容大部分来自官方网站&#xff0c;通过AI智能翻译而来。 1. SOGo功能概述 SOGo提供了多种访问日历和消息数据的方式。您的用户可以使用网页浏览器、Microsoft Outlook、Mozilla Thunderbird、Ap…

jackson序列化(jackson codec)

Jackson 是一个用于 Java 平台的开源 JSON 库&#xff0c;它提供了灵活且高效的方式来处理 JSON 数据的序列化(Java对象 → JSON字符串)和反序列化(JSON 字符串→ Java对象)。 以下是 Jackson 的一些主要特点和功能&#xff1a; 高性能&#xff1a;Jackson 通过使用基于流的处理…

32单片机开发bootloader程序

一&#xff0c;单片机为什么要使用bootloader 1、使用bootloader的好处 1) 程序隔离&#xff1a;可以同时存在多个程序&#xff0c;只要flash空间够大&#xff0c;或者通过外挂flash&#xff0c;可以实现多个程序共存&#xff0c;在多个程序之间切换使用。 2&#xff09;方便程…

【树状数组】2659. 将数组清空

本文涉及知识点 树状数组 LeetCode2659. 将数组清空 给你一个包含若干 互不相同 整数的数组 nums &#xff0c;你需要执行以下操作 直到数组为空 &#xff1a; 如果数组中第一个元素是当前数组中的 最小值 &#xff0c;则删除它。 否则&#xff0c;将第一个元素移动到数组的…

监测Nginx访问日志状态码,并做相应动作

文章目录 引言I 监测 Nginx 访问日志情况,并做相应动作1.1 前提准备1.2 访问日志 502 情况,重启 bttomcat9服务1.3 其他案例:访问日志 502 情况,重启 php-fpm 服务II 将Shell 脚本check499.sh包装成systemd服务2.1 创建systemd服务2.2 配置service2.3 开机启动2.4 其他常用…