高效改进!防止DataX从HDFS导入关系型数据库丢数据
针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题,优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片,确保了每个分片数据都得到全面读取和传输,有效提升了数据导入的可靠性和效率。这些改动不仅解决了丢数据的问题,还显著提高了处理多分片数据的性能。
背景
我们数据中台设计,数据同步功能是datax完成,在orc格式时datax从hdfs导数据到关系型数据库数据丢失,而在textfile格式时丢失数据,当文件超过250M多时会丢数据。因想使用orc格式节省数据空间,提高spark运行效率,需要解决这个问题。
问题
只读取了256M 左右的数据,数据条数对不上,导致hdfs,orc格式导入数据到pg,mysql等关系型数据库,数据丢失。
解决
修改hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java
问题代码
InputSplit[] splits = in.getSplits(conf, 1);RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();// 获取列信息List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);
修改后
// OrcInputFormat getSplits params numSplits not used, splits size = block numbersInputSplit[] splits = in.getSplits(conf, -1);for (InputSplit split : splits) {{RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();// 获取列信息List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();
点击参考查看
重新打包替换hdfsreader.jar即可
解析
-
新增循环处理所有分片的逻辑: 之前的代码只处理了第一个分片(
splits[0]
),现在改为了处理所有的分片。新增的部分如下:java InputSplit[] splits = in.getSplits(conf, -1); for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();
旧的逻辑是:
java InputSplit[] splits = in.getSplits(conf, 1); RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); Object key = reader.createKey(); Object value = reader.createValue();
这样改动的目的是,同时处理多个分片,从而提升数据读取的效率。
-
移除了重复的分片处理逻辑: 不使用重复的分片处理逻辑:
java // OrcInputFormat getSplits params numSplits not used, splits size = block numbers InputSplit[] splits = in.getSplits(conf, -1);
-
代码块的重构: 将读取分片、解析记录以及处理记录的逻辑放入一个循环中,使代码更简洁、更易读:
改之前:
java InputSplit[] splits = in.getSplits(conf, 1); RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); Object key = reader.createKey(); Object value = reader.createValue();
改后使用循环:
java InputSplit[] splits = in.getSplits(conf, -1); for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();
-
处理每个记录字段并传输记录: 保持对每条记录的字段读取并将其传输转移到了新的循环处理逻辑中:
改之前:
while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat); } reader.close();
改后:
for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close(); }
-
为什么是256M没有更改前他是按每个文件进行分割,而在datax的配置中Java heap size 即默认xmx设置时256M,所以当单个文件超过256M时,超过的部分就被丢掉了,造成数据缺失,而更改后的是按hdfs block size 块的大小进行分割,循环遍历,所以直接修改xmx也能解决问题,但是你要想万一文件超过128G那,你不可能一直调大Java heap size,所以按hdfs block size分割是合理的解决方案
reader单个分片(InputSplit)的大小
在DataX的数据读取过程中,reader单个分片(InputSplit)的大小通常取决于底层存储系统和具体的配置参数。对于HDFS(Hadoop Distributed File System)를的读取,分片大小主要由以下几个因素决定:
- HDFS块大小(Block Size): HDFS将文件分为多个块,每个块通常是64MB、128MB或256MB大小,具体大小可以通过HDFS的配置参数
dfs.blocksize
进行设置。DataX会根据这些块来创建分片,也就是一个分片通常对应一个或多个HDFS块。 - 文件本身的大小: 如果文件比HDFS块小,或者没有跨越多个块,则一个文件可能只对应一个分片。
- DataX的任务配置: DataX允许在其配置文件中指定一些与分片相关的参数,类似于Hadoop的
mapreduce.input.fileinputformat.split.maxsize
和mapreduce.input.fileinputformat.split.minsize
,这些参数可以影响分片的逻辑。 - InputFormat: DataX使用的Hadoop的
InputFormat
也能控制分片的逻辑,比如FileInputFormat
、TextInputFormat
和OrcInputFormat
等。这些格式定义了如何分割输入数据,结合文件大小和块大小来决定分片。
总结
- 主要改动是将之前只处理单个分片的逻辑重构为一个循环,处理所有分片。这使代码更具扩展性和效率,也适应不同的输入数据量。
- 移除了无用且重复的注释和代码行,以保持代码清晰。