背景
iceberg上生产之后,flink流作业实时写iceberg,由于checkpoint的间隔时间是1min。导致生产的小文件很多,该表对于下游的查询性能很差。需要小文件合并策略。
小文件合并策略
借助spark-sql来触发小文件合并。以下合并涉及的sql都是在spark-sql执行
步骤
1。将小文件重写合并为大文件
CALL spark.system.rewrite_data_files(table => 'test_table1',options => map('max-concurrent-file-group-rewrites', 100,'target-file-size-bytes','536870912','max-file-group-size-bytes','10737418240','rewrite-all', 'true')
);
call语句中spark表示是spark引擎中iceberg的catalog。具体的配置在spark-default.conf文件中配置
spark.sql.catalog.hive_prod.type hive
spark.sql.catalog.local org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type hadoop
spark.sql.catalog.local.warehouse $PWD/warehouse
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension
spark.sql.files.maxPartitionBytes 128m
spark.sql.orc.columnarReaderBatchSize 8192
spark.sql.parquet.columnarReaderBatchSize 8192
spark.sql.shuffle.partitions 200
2.删除过期快照
=CALL spark_catalog.system.expire_snapshots('test_table1', TIMESTAMP '2024-11-19 12:00:00.000');-- 注意其中的timestamp可以设置大一些,这样可以尽可能删除更多的过期快照
3.删除孤儿文件
CALL spark_catalog.system.remove_orphan_files(table =>'test_table1', location => 'hdfs://hdfsservice/apps/hive/warehouse/ods_test.db/test_table1/data/');
4.删除位置文件
CALL spark.system.rewrite_position_delete_files(table => 'ods_test.test_table1',options => map('max-concurrent-file-group-rewrites', 100,'target-file-size-bytes','67108864','max-file-group-size-bytes','10737418240','rewrite-all', 'true')
);