Flink+Paimon多流拼接性能优化实战

目录

(零)本文简介

(一)背景

(二)探索梳理过程

(三)源码改造

(四)修改效果

1、JOB状态

2、Level5的dataFile总大小

3、数据延迟

(五)未来展望:异步Compact


(零)本文简介

Paimon多流拼接/合并性能优化;

        为解决离线T+1多流拼接数据时效性Flink实时状态太大任务稳定性问题,这里基于数据湖工具Apache Paimon进行近实时的多流拼接。

        使用Flink+Paimon基于ParmaryKey TablePartialUpdate)进行多流拼接的时候,跑一段时间有时会遇到周期性背压、checkpoint时间过长等情况,本文通过剖析源码逻辑、修改源码,在一定程度上解决了这个问题。

Apache Paimon基础 、多流拼接方法 及 与Hudi 的对比 可参考前面文章:

新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客

基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客

(一)背景

       这里使用 Flink 1.14 + Apache Paimon 0.5 snapshot 进行多流拼接(前端埋点流 + 服务端埋点流);

        当前情况是一天一个分区,一个分区100个bucket;就会出现如下情况:分区/bucket中的数据越来越多,到达下午或者傍晚的时候就会出现 paimon 作业周期性背压(因为mergeTree中维护的数据越来越多,tree越来越大),checkpoint时间也会比较长;于是决定将mergeTree中的过期数据删除,即让其不进入tree中,减少计算量;

        这里的“过期”按需自定义,比如调研发现99.9%的数据都可以使用3个小时之内的数据拼接上,那就根据时间戳与当前时间戳(假设没有很严重的消费积压)相比,时间差超过3小时的数据就将其丢弃;

具体细节涉及到(这里先将结论给出):

    1. data文件创建后是否还会修改?(不会)
    2. 根据时间排序的data数据文件是增量还是全量?(几个最新文件加起来就是全量)
    3. 应该根据dataFile的创建/修改时间判断过期 还是 通过具体每个record字段值的时间戳判断过期?(通过record)

(二)探索梳理过程

1、首先观察hdfs文件之后发现,dataFile只保留最近一个小时的文件,超过一小时的文件就会被删除,这里应该对应参数 partition.expiration-check-interval = 1h,由此可知data文件不是增量的【下文compact只有几个文件再次加强验证】(那么就不能通过dataFile的最新修改时间判断文件过期将数据过滤);

2、观察flink log发现,每次compaction都只读几个文件,如下所示:

        每次其实只读取一个level0的file,再加上几个level5的file(level5这里file就是之前的全部数据,包含多个流的),最后将compact之后的文件再命名为新的名字写到level5;

        随着分区数据量的增多,参与compact的file也会越来越多(这也是会导致tree偏大,出现周期性背压的原因);

另外,dataFile命名呈现如下规律:

        level5的第二个文件总是跟第一个中间隔一个(这个跟改源码没有关系,只是适合观察规律);

到晚间的时候参与compact的file更多了:

3、观察每次level5生成的dataFile(理论上level5的dataFile会越来越大/多,当单个文件大小超过128M *(1+rate)时,会生成新文件);

        所有level5的文件大小加起来会越来越大,即永远是呈增长趋势;

        如下每一层的总大小在不断增大,同时当文件到一定程度之后,每层2个文件变成3个文件;

4、【以上3点均为原始实现思路,从这里开始改造】思考:既然已知每个bucket中只要最新的几个dataFile就包含了全部的data数据(dataFile不是增量的),那么就不能通过文件最新修改时间来判断数据是否过期,只能从最新的几个dataFile的每条记录来进行判断了,即原本每次参与合并的record是从这个partition+bucket建立开始的全部数据,那么是否可以通过修改源码判断每条record是否过期,从而不参与mergeTree,在compact完成之后也不会再次写入新的dataFile(如果还是写进来,每次读进tree时都需要判断是否过期,是否进入tree)?【答案当然是可以的!】

(三)源码改造

1、首先说明一下,在源码中有这么一段

// IntervalPartition.partition()
public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;
}

        此处为了将文件排序、再将有overlap的放在一个list里边,一但产生gap(即没有overlap),那么就创建新的list,最终将这些 list 再放到List>中:

示意图如下:

2、后续通过一些处理变成 List> 的格式,这里的KeyValue就包含我们想要去操纵的record!

源码是这样的:

public <T> RecordReader<T> mergeSort(List<ReaderSupplier<KeyValue>> lazyReaders,Comparator<InternalRow> keyComparator,MergeFunctionWrapper<T> mergeFunction)throws IOException {if (ioManager != null && lazyReaders.size() > spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());for (ReaderSupplier<KeyValue> supplier : lazyReaders) {try {readers.add(supplier.get());} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers, keyComparator, mergeFunction, sortEngine);
}

        这里的return就会创建sortMergeReader了,我们可以在将数据传入这里之前,先进行过滤(通过判断每一条record是否超过过期时间),修改如下:

public <T> RecordReader<T> mergeSort(List<ReaderSupplier<KeyValue>> lazyReaders,Comparator<InternalRow> keyComparator,MergeFunctionWrapper<T> mergeFunction)throws IOException {if (ioManager != null && lazyReaders.size() > spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());for (ReaderSupplier<KeyValue> supplier : lazyReaders) {try {// 过滤掉过期数据RecordReader<KeyValue> filterSupplier =supplier.get().filter((KeyValue keyValue) ->isNotExpiredRecord(keyValue.value(), expireTimeMillis));readers.add(filterSupplier);} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers,keyComparator,mergeFunction,sortEngine,keyType.getFieldTypes(),valueType.getFieldTypes());
}// 判断这条数据是否过期
public boolean isNotExpiredRecord(InternalRow row, long expireTimeMillis) {if (expireTimeMillis <= 0) {return true;}// 只要有一个字段不为空,且大于0,且过期时间大于expireTimeMillis,就判断为过期for (Integer pos : expireFieldsPosSet) {if ((!row.isNullAt(pos))&& row.getLong(pos) > 0&& (System.currentTimeMillis() - row.getLong(pos)) > expireTimeMillis) {return false;}}return true;
}

与此同时,将相关参数暴露出来,可以在建表时进行自定义配置:

public static final ConfigOption<Integer> RECORDS_EXPIRED_HOUR =key("record.expired-hour").intType().defaultValue(-1).withDescription("Records in streams WON'T be offered into MergeTree when they are expired."+ " (Inorder to avoid too large MergeTree; -1 means never expired). ");public static final ConfigOption<String> RECORDS_EXPIRED_FIELDS =key("record.expired-fields").stringType().noDefaultValue().withDescription("Records in streams WON'T be offered into MergeTree when they are judged as [expired] according to these fields."+ "If you specify multiple fields, delimiter is ','.");

使用方法:

val createPaimonJoinTable = (s"CREATE TABLE IF NOT EXISTS ${paimonTable}(\n"+ " uuid STRING,\n"+ " metaid STRING,\n"+ " cid STRING,\n"+ " area STRING,\n"+ " ts1 bigint,\n"+ " ts2 bigint,\n"+ " d STRING, \n"+ " PRIMARY KEY (d, uuid) NOT ENFORCED \n"+ ") PARTITIONED BY (d) \n"+ " WITH (\n" +"    'merge-engine' = 'partial-update',\n" +"    'changelog-producer' = 'full-compaction', \n" +"    'file.format' = 'orc', \n" +s"    'sink.managed.writer-buffer-memory' = '${sinkWriterBuffer}', \n" +s"    'full-compaction.delta-commits' = '${fullCompactionCommits}', \n" +s"    'scan.mode' = '${scanMode}', \n" +s"    'bucket' = '${bucketNum}', \n" +s"    'sink.parallelism' = '${sinkTaskNum}', \n" +s"    'record.expired-hour' = '3' , \n" +   // user defined para"     'record.expired-fileds' = '4,5' , \n" +   // user defined para"     'sequence.field' = 'ts1' \n" +")")
tableEnv.executeSql(createPaimonJoinTable)

(四)修改效果

1、JOB状态

运行到晚上20点尚未出现背压:

checkpoint时间也没有过长(如果不剔除过期数据,到这个时间cp时长应该在3分钟左右):

生产到Kafka的消息也没有严重的断流或者锯齿现象:

还是有可能出现exception如下(但对数据量没有任何影响):

2、Level5的dataFile总大小

        上边只是现象,最终还是要数据说话。

        修改源码之后,观察dataFile,理论上每一层的size总大小可能会出现减小的情况 (因为过期数据就不会再写入到 level5 新的data文件中了)

        如下图:levelSize diff(下一次level总size - 上一次level总size),确实出现了“有正有负”的情况,于是验证源码修改生效(即每次进行compact只会读取近 n 个小时的数据进行合并)!

3、数据延迟

有意思的是,当我们修改源码(将过期的数据丢弃)之后,数据延迟也变小了。

数据延迟计算方法:paimon处理完将数据写到kafka队列的时间戳 - 前端埋点被触发被服务器接收到的时间戳;

修改前:

修改后:

(五)未来展望:异步Compact

官方提供的paimon源码,里边的compaction是 sync 模式的,我尝试改成过 async 的,但是时不时会出现很少量的数据丢失(感觉可能是因为同一时刻有多个compact任务在进行),后续有机会可以再继续尝试一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/113570.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java“牵手”京东商品详情数据,京东API接口申请指南

京东平台商品详情接口是开放平台提供的一种API接口&#xff0c;通过调用API接口&#xff0c;开发者可以获取京东商品的标题、价格、库存、月销量、总销量、库存、详情描述、图片等详细信息 。 获取商品详情接口API是一种用于获取电商平台上商品详情数据的接口&#xff0c;通过…

一种借助MYSQL递归CTE生成所有的组合情况的实现方法

需求说明 有如下表和数据&#xff1a; Nname1户口2查询机构数过多3危险驾驶4多头用信 需要输出name里的所有组合情况&#xff0c;即单个值&#xff0c;两两组合&#xff0c;三个组合、四个组合。结果为2的n次方-1中情况&#xff0c;这里是15。 预期结果为&#xff1a; Com…

Spring 如何解决循环依赖问题 - 三级缓存

1. 什么是循环依赖问题 ? 循环依赖问题是指对象与对象之间存在相互依赖关系&#xff0c;而且形成了一个闭环&#xff0c;导致两个或多个对象都无法准确的完成对象的创建和初始化。 两个对象间的循环依赖&#xff1a; 多个对象间的循环依赖 &#xff1a; 解决 Spring 中的循环…

uniapp 微信小程序仿抖音评论区功能,支持展开收起

最近需要写一个评论区功能&#xff0c;所以打算仿照抖音做一个评论功能&#xff0c;支持展开和收起&#xff0c; 首先我们需要对功能做一个拆解&#xff0c;评论区功能&#xff0c;两个模块&#xff0c;一个是发表评论模块&#xff0c;一个是评论展示区。接下来对这两个模块进行…

【数据结构】C语言队列(详解)

前言: &#x1f4a5;&#x1f388;个人主页:​​​​​​Dream_Chaser&#xff5e; &#x1f388;&#x1f4a5; ✨✨专栏:http://t.csdn.cn/oXkBa ⛳⛳本篇内容:c语言数据结构--C语言实现队列 目录 一.队列概念及结构 1.1队列的概念 1.2队列的结构 二.队列的实现 2.1头文…

django-发送邮件

一、业务场景 业务警告 邮箱验证 密码找回 二、邮件相关协议 1.SMYTP&#xff08;简答邮件传输协议 25端口&#xff09; 属于“推送”协议 负责发送 2.IMAP&#xff08;交互式邮件访问协议&#xff0c;应用层协议&#xff0c;143端口&#xff09; 用于从本地邮件客户端…

ssh访问远程宿主机的VMWare中NAT模式下的虚拟机

1.虚拟机端配置 1.1设置虚拟机的网络为NAT模式 1.2设置虚拟网络端口映射(NAT) 点击主菜单的编辑-虚拟网络编辑器&#xff1a; 启动如下对话框&#xff0c;选中NAT模式的菜单项&#xff0c;并点击NAT设置&#xff1a; 点击添加&#xff0c;为我们的虚拟机添加一个端口映射。…

下岗吧,Excel

ChatGPT的诞生使Excel公式变得过时。通过使用 ChatGPT 的代码解释器你可以做到&#xff1a; 分析数据创建图表 这就像用自然语言与电子表格交谈一样。我将向大家展示如何使用 ChatGPT 执行此操作并将结果导出为Excel格式&#xff1a; 作为示例&#xff0c;我将分析并创建美国…

AI人员打架识别算法

AI打架识别算法通过yolov8网络模型算法框架&#xff0c;AI打架识别算法识别校园打架斗殴行为&#xff0c;发现立即打架斗殴行为算法会立即抓拍告警推送打架事件信息。目标检测架构分为两种&#xff0c;一种是two-stage&#xff0c;一种是one-stage&#xff0c;区别就在于 two-s…

【锐捷】OSPF 多区域配置

【实验名称】 配置 OSPF 多区域。 【实验目的】 配置 OSPF 多区域&#xff0c;理解 OSPF 层次型网络的特点。 【背景描述】 本实验拓扑图中有 3 台路由器&#xff0c;路由器在区域 0 和区域 1 中&#xff0c;路由器 B 在区域 0 和区域 30&#xff0c; 路由器 C 在区域 30。 【需…

【考研数学】矩阵、向量与线性方程组解的关系梳理与讨论

文章目录 引言一、回顾二、梳理齐次线性方程组非齐次线性方程组 写在最后 引言 两个原因让我想写这篇文章&#xff0c;一是做矩阵题目的时候就发现这三货经常绑在一起&#xff0c;让人想去探寻其中奥秘&#xff1b;另一就是今天学了向量组的秩&#xff0c;让我想起来了之前遗留…

基于jenkins自动化部署PHP环境

实验环境 操作系统 IP地址 主机名 角色 CentOS7.5 192.168.147.141 git git服务器 CentOS7.5 192.168.147.142 Jenkins git客户端 jenkins服务器 CentOS7.5 192.168.147.143 web web服务器 具体环境配置见上一篇&#xff01; 准备git仓库 [rootgit ~]# su -…

无涯教程-Android - Absolute Layout函数

Absolute Layout 可让您指定其子级的确切位置(x/y坐标)&#xff0c;绝对布局的灵活性较差且难以维护。 Absolute Layout - 属性 以下是AbsoluteLayout特有的重要属性- Sr.NoAttribute & 描述1 android:id 这是唯一标识布局的ID。 2 android:layout_x 这指定视图的x坐标…

【Git】(六)子模块跟随主仓库切换分支

场景 主仓库&#xff1a;TestGit 子模块&#xff1a;SubModule 分支v1.0 .gitmodules文件 [submodule "Library/SubModule"]path Library/SubModuleurl gitgitee.com:sunriver2000/SubModule.gitbranch 1.0.0.0 分支v2.0 .gitmodules文件 [submodule "Li…

工服穿戴检测算法 工装穿戴识别算法

工服穿戴检测算法 工装穿戴识别算法利用yolo网络模型图像识别技术&#xff0c;工服穿戴检测算法 工装穿戴识别算法可以准确地识别现场人员是否穿戴了正确的工装&#xff0c;包括工作服、安全帽等。一旦检测到未穿戴的情况&#xff0c;将立即发出警报并提示相关人员进行整改。Yo…

W5100S-EVB-PICO主动PING主机IP检测连通性(十)

前言 上一章节我们用我们开发板在UDP组播模式下进行数据回环测试&#xff0c;本章我们用开发板去主动ping主机IP地址来检测与该主机之间网络的连通性。 什么是PING&#xff1f; PING是一种命令&#xff0c; 是用来探测主机到主机之间是否可通信&#xff0c;如果不能ping到某台…

自然语言处理(二):近似训练

近似训练 近似训练&#xff08;Approximate Training&#xff09;是指在机器学习中使用近似的方法来训练模型&#xff0c;以降低计算复杂度或提高训练效率。这种方法通常用于处理大规模数据集或复杂模型&#xff0c;其中精确的训练算法可能过于耗时或计算资源不足。 近似训练…

java安全问题处理

一、客户端的计算不可信 1、服务端计算价格&#xff0c;如果不这么做的话&#xff0c;很可能会被黑客利用&#xff0c;商品总价被恶意修改为比较低的价格。 二、客户端提交的参数需要校验 1、误以为客户端的数据来源是服务端&#xff0c;客户端就不可能提交异常数据 2、对参数进…

无涯教程-Android - Frame Layout函数

Frame Layout 旨在遮挡屏幕上的某个区域以显示单个项目&#xff0c;通常&#xff0c;应使用FrameLayout来保存单个子视图&#xff0c;因为在子视图彼此不重叠的情况下&#xff0c;难以以可扩展到不同屏幕尺寸的方式组织子视图。 不过&#xff0c;您可以使用android:layout_grav…

TSMaster小功能分享—Python小程序如何导入外部库

今天给大家介绍TSMaster功能之Python小程序如何导入外部库。通过在 TSMaster 默认的解析器路径下导入外部库来介绍&#xff0c;以便我们去使用 Python 外部库。TSMaster 默认 Python 解析器下安装外部库。 步骤一 在 TSMaster 工具->系统信息->python 环境设置中选择打开…