高效改进!防止DataX从HDFS导入关系型数据库丢数据

在这里插入图片描述

高效改进!防止DataX从HDFS导入关系型数据库丢数据

针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题,优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片,确保了每个分片数据都得到全面读取和传输,有效提升了数据导入的可靠性和效率。这些改动不仅解决了丢数据的问题,还显著提高了处理多分片数据的性能。

背景

我们数据中台设计,数据同步功能是datax完成,在orc格式时datax从hdfs导数据到关系型数据库数据丢失,而在textfile格式时丢失数据,当文件超过250M多时会丢数据。因想使用orc格式节省数据空间,提高spark运行效率,需要解决这个问题。

问题

在这里插入图片描述
在这里插入图片描述

只读取了256M 左右的数据,数据条数对不上,导致hdfs,orc格式导入数据到pg,mysql等关系型数据库,数据丢失。

解决

修改hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java

问题代码

 InputSplit[] splits = in.getSplits(conf, 1);RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();// 获取列信息List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);

修改后

 // OrcInputFormat getSplits params numSplits not used, splits size = block numbersInputSplit[] splits = in.getSplits(conf, -1);for (InputSplit split : splits) {{RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();// 获取列信息List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();

点击参考查看

重新打包替换hdfsreader.jar即可

解析

  1. 新增循环处理所有分片的逻辑: 之前的代码只处理了第一个分片(splits[0]),现在改为了处理所有的分片。新增的部分如下:

    java
    InputSplit[] splits = in.getSplits(conf, -1);
    for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();
    

    旧的逻辑是:

    java
    InputSplit[] splits = in.getSplits(conf, 1);
    RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
    Object key = reader.createKey();
    Object value = reader.createValue();
    

    这样改动的目的是,同时处理多个分片,从而提升数据读取的效率。

  2. 移除了重复的分片处理逻辑: 不使用重复的分片处理逻辑:

    java
    // OrcInputFormat getSplits params numSplits not used, splits size = block numbers
    InputSplit[] splits = in.getSplits(conf, -1);
    
  3. 代码块的重构: 将读取分片、解析记录以及处理记录的逻辑放入一个循环中,使代码更简洁、更易读:

    改之前:

    java
    InputSplit[] splits = in.getSplits(conf, 1);
    RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
    Object key = reader.createKey();
    Object value = reader.createValue();
    

    改后使用循环:

    java
    InputSplit[] splits = in.getSplits(conf, -1);
    for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();
    
  4. 处理每个记录字段并传输记录: 保持对每条记录的字段读取并将其传输转移到了新的循环处理逻辑中:

    改之前:

    while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);
    }
    reader.close();
    

    改后:

    for (InputSplit split : splits) {RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);Object key = reader.createKey();Object value = reader.createValue();List<? extends StructField> fields = inspector.getAllStructFieldRefs();List<Object> recordFields;while (reader.next(key, value)) {recordFields = new ArrayList<Object>();for (int i = 0; i <= columnIndexMax; i++) {Object field = inspector.getStructFieldData(value, fields.get(i));recordFields.add(field);}transportOneRecord(column, recordFields, recordSender,taskPluginCollector, isReadAllColumns, nullFormat);}reader.close();
    }
    
  5. 为什么是256M没有更改前他是按每个文件进行分割,而在datax的配置中Java heap size 即默认xmx设置时256M,所以当单个文件超过256M时,超过的部分就被丢掉了,造成数据缺失,而更改后的是按hdfs block size 块的大小进行分割,循环遍历,所以直接修改xmx也能解决问题,但是你要想万一文件超过128G那,你不可能一直调大Java heap size,所以按hdfs block size分割是合理的解决方案

reader单个分片(InputSplit)的大小

在DataX的数据读取过程中,reader单个分片(InputSplit)的大小通常取决于底层存储系统和具体的配置参数。对于HDFS(Hadoop Distributed File System)를的读取,分片大小主要由以下几个因素决定:

  1. HDFS块大小(Block Size): HDFS将文件分为多个块,每个块通常是64MB、128MB或256MB大小,具体大小可以通过HDFS的配置参数dfs.blocksize进行设置。DataX会根据这些块来创建分片,也就是一个分片通常对应一个或多个HDFS块。
  2. 文件本身的大小: 如果文件比HDFS块小,或者没有跨越多个块,则一个文件可能只对应一个分片。
  3. DataX的任务配置: DataX允许在其配置文件中指定一些与分片相关的参数,类似于Hadoop的mapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize,这些参数可以影响分片的逻辑。
  4. InputFormat: DataX使用的Hadoop的InputFormat也能控制分片的逻辑,比如FileInputFormatTextInputFormatOrcInputFormat等。这些格式定义了如何分割输入数据,结合文件大小和块大小来决定分片。

总结

  • 主要改动是将之前只处理单个分片的逻辑重构为一个循环,处理所有分片。这使代码更具扩展性和效率,也适应不同的输入数据量。
  • 移除了无用且重复的注释和代码行,以保持代码清晰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/455287.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git文件操作指令和文件状态

一、Git 文件操作指令 1、查看指定文件的状态 git status [filename] 我们在新创建且初始化过后的 git 仓库中新建一个 文件&#xff0c;然后在 git 的命令行中输入此指令后&#xff0c;就可以看到 的状态&#xff1a; 在此显示的是 Untracked 的状态&#xff0c;也就是未…

visual studio设置修改文件字符集方法

该方法来自网文&#xff0c;特此记录备忘。 添加两个组件&#xff0c;分别是Force UTF-8,FileEncoding。 截图如下&#xff1a; 方法如下&#xff1a;vs中点击“扩展”->“管理扩展”&#xff0c;输入utf搜索&#xff0c;安装如下两个插件&#xff0c;然后重启vs&#xf…

【pytorch DistributedDataParallel 及amp 使用过程遇到的问题记录 】

目录 环境问题单机多卡时&#xff1a;超时错误部分报错内容:解决方法: 存在没有使用梯度的参数报错内容:解决方法:方法1 找到不参与梯度计算的层**且**没有用处的层&#xff0c;删除方法2 DistributedDataParallel 增加参数:find_unused_parameters True DDP 训练时第一个batc…

2 两数相加

解题思路&#xff1a; \qquad 这道题可以用模拟很直观的解决&#xff0c;模式加法的计算过程&#xff0c;只不过套了一层链表的外衣。题目给出的数字在链表中是按照逆序排列的&#xff0c;即链表头节点的值代表相加数字的个位&#xff0c;这样只需要从链表头开始计算加法即可得…

系统登录接口文档Demo

接口描述 该接口用于用户登录验证。通过用户名和密码进行身份验证&#xff0c;成功后返回一个用于后续请求的认证 token。这个 token 是访问受保护资源的凭证。 时序图&#xff1a; 登录请求&#xff1a; 登录查询接口: POST {url}/api/user/login 请求体: {"username…

简单的 curl HTTP的POSTGET请求以及ip port连通性测试

简单的 curl HTTP的POST&GET请求以及ip port连通性测试 1. 需求 我们公司有一个演示项目&#xff0c;需要到客户那边进行项目部署&#xff0c;项目部署完成后我们需要进行项目后端接口的测试功能&#xff0c;但是由于客户那边么有条件安装类似于postman这种的测试工具&am…

Linux:进程优先级 进程调度切换 调度算法

#1024程序员节&#xff5c;征文# 目录 1.进程优先级 1.1 概念 1.2 为什么有优先级 1.3 Linux进程优先级 2. 概念预备 2.1 并发 2.2 寄存器 主要类型&#xff1a; 2. 进程的调度与切换 3.1 进程调度 3.2 进程切换 4. 调度算法 4.1 runqueue内部结构 4.2 如何调度…

Git使用GUI界面实现任意历史版本对比

首先进入版本历史查看界面 标记某次提交 选择某次提交并和标记的提交对比 可以查看比较结果了&#xff0c;具体到每一个文件每一行代码

一篇文章快速认识 YOLO11 | 目标检测 | 模型训练 | 自定义数据集

本文分享YOLO11的目标检测&#xff0c;主要内容是自定义数据集、数据标注、标签格式转换、模型训练、模型推理等。 目录 1、数据标注 2、Labelme的json转为YOLO的txt 3、配置YOLO11代码工程 4、数据集yaml配置文件 5、YOLO11模型结构配置文件 6、编写训练代码 7、开始训…

Unity 开发学习笔记(0):

文章目录 前言为什么要去学Unity安装国际版Unity总结 前言 我最近打算学习一下Unity。所以打算从零开始做一下相关的学习笔记。 为什么要去学Unity 上位机的上限就这样&#xff0c;没有运动控制和机器视觉&#xff0c;薪资上不去C# 我非常熟练&#xff0c;所以学习Unity成本…

excel判断某一列(A列)中的数据是否在另一列(B列)中

如B列如果有7个元素&#xff0c;在A列右边的空白列中&#xff0c;输入如下公式&#xff1a; COUNTIF($B$1:$B$7,A1), 其中&#xff0c;$B$1:$B$7代表A列中的所有数据即绝对范围&#xff0c;A1代表B列中的一个单元格.

JVM 加载 class 文件的原理机制

JVM 加载 class 文件的原理机制 JVM&#xff08;Java虚拟机&#xff09;是一个可以执行Java字节码的虚拟机。它负责执行Java应用程序和应用程序的扩展&#xff0c;如Java库和框架。 文章目录 JVM 加载 class 文件的原理机制1. JVM1.1 类加载器1.2 魔数1.3 元空间 2. 类加载2.1 …

openpnp - 底部相机视觉识别CvPipeLine的参数bug修正

文章目录 openpnp - 底部相机视觉识别的CvPipeLine的参数bug概述笔记openpnp的视觉识别参数的错误原因备注补充 - 如果要直接改默认的底部视觉要注意END openpnp - 底部相机视觉识别的CvPipeLine的参数bug 概述 底部相机抓起一个SOD323的元件&#xff0c;进行视觉识别。 识别…

点餐系统需求分析说明书(软件工程分析报告JAVA)

目录 1 引言 4 1.1 编写目的 4 1.2 项目背景 4 1.3 定义 4 1.4 预期的读者 5 1.5 参考资料 5 2 任务概述 5 2.1 目标 5 2.2 运行环境 5 2.3 条件与限制 6 3 数据描述 6 3.1 静态数据 6 3.2 动态数据 6 3.3 数据库介绍 6 3.4 对象模型 6 3.5 数据采集 7 4 动态模型 7 4.1 脚本 …

《深度学习》 了解YOLO基本知识

目录 一、关于YOLO 1、什么是YOLO 2、经典的检测方法 1&#xff09;one-stage单阶段检测 模型指标介绍&#xff1a; 2&#xff09;two-stage多阶段检测 二、关于mAP指标 1、概念 2、IOU 3、关于召回率和准确率 4、示例 5、计算mAP 一、关于YOLO 1、什么是YOLO YOL…

基于泊松洞过程建模的异构蜂窝网络信干比增益与近似覆盖率分析

大家好&#xff0c;我是带我去滑雪&#xff01; 移动通信业务的高速增长使得传统同构蜂窝网络结构不能满足用户对通信质量的要求&#xff0c;而异 构网络架构可以有效解决这种问题。文中对泊松洞过程下异构蜂窝网络的覆盖率进行研究。首 先&#xff0c;利用泊松洞过程( Poisson…

求助帖:ubuntu22.10 auto install user-data配置了为何还需要选择语言键盘(如何全自动)

0-现象&#xff1a;配置好autoinstll的PXE与user-data文件安装过程仍然要人工选语言、键盘等&#xff0c;非全自动&#xff1b;—— 1.硬件环境&#xff1a;x86_64机器 U盘&#xff08;/dev/sda&#xff09;&#xff1b; 2.软件环境&#xff1a;DHCPPXE启动做好grub与pxe的aut…

【AI绘画】Midjourney进阶:留白构图详解

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AI绘画 | Midjourney 文章目录 &#x1f4af;前言&#x1f4af;什么是构图为什么Midjourney要使用构图 &#x1f4af;留白构图特点使用场景提示词书写技巧测试 &#x1f4af;小结 &#x1f4af;前言 【AI绘画】Midjourney进阶&…

Springboot 的手动配置操作讲解

1.创建新项目: 手动创建使用maven 项目 并选择骨架: quickstart 骨架用来搭建spingboot 2.手动输入pom.xml依赖: 要想创建springboot 首先继承springboot 的父类 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo…

算法的学习笔记—两个链表的第一个公共结点(牛客JZ52)

&#x1f600;前言 在链表问题中&#xff0c;寻找两个链表的第一个公共结点是一个经典问题。这个问题的本质是在两个单链表中找到它们的相交点&#xff0c;或者说它们开始共享相同节点的地方。本文将详细讲解这个问题的解题思路&#xff0c;并提供一种高效的解决方法。 &#x…