Flink调优详解:案例解析(第42天)

系列文章目录

一、Flink-任务参数配置
二、Flink-SQL调优
三、阿里云Flink调优

文章目录

    • 系列文章目录
    • 前言
      • 一、Flink-任务参数配置
        • 1.1 运行时参数
        • 1.2 优化器参数
        • 1.3 表参数
      • 二、Flink-SQL调优
        • 2.1 mini-batch聚合
        • 2.2 两阶段聚合
        • 2.3 分桶
        • 2.4 filter去重(了解)
      • 三、阿里云Flink调优
        • 3.1 智能调优
        • 3.2 定时调优


前言

本文主要详解常见的Flink优化策略。

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/

一、Flink-任务参数配置

1.1 运行时参数
  • 异步维度join
# 默认值:100
# 值类型:Integer
# 流批任务:流、批任务都支持
# 用处:异步 lookup join 中最大的异步 IO 执行数目
table.exec.async-lookup.buffer-capacity: 100
  • 开启微批
# 默认值:false
# 值类型:Boolean
# 流批任务:流任务支持
# 用处:MiniBatch 优化是一种专门针对 unbounded 流任务的优化(即非窗口类应用),其机制是在 `允许的延迟时间间隔内` 以及 `达到最大缓冲记录数` 时触发以减少 `状态访问` 的优化,从而节约处理时间。下面两个参数一个代表 `允许的延迟时间间隔`,另一个代表 `达到最大缓冲记录数`。
table.exec.mini-batch.enabled: false# 默认值:0 ms
# 值类型:Duration
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 MiniBatch 机制最大允许的延迟时间。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0 ms
table.exec.mini-batch.allow-latency: 0 ms# 默认值:-1
# 值类型:Long
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 MiniBatch 机制最大缓冲记录数。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0
table.exec.mini-batch.size: -1注意: 1- 如果想开启微批,那么首先需要将第一个参数值设置为true;接着可以设置后面两个参数。只要程序满足后面2个参数中的任意一个条件就会触发微批处理。2- 微批处理能够提升程序处理数据的吞吐量,但是会导致数据的处理时效性降低
  • 并行度的设置
# 默认值:-1
# 值类型:Integer
# 流批任务:流、批任务都支持
# 用处:可以用此参数设置 Flink SQL 中算子的并行度,这个参数的优先级 `高于` StreamExecutionEnvironment 中设置的并行度优先级,如果这个值设置为 -1,则代表没有设置,会默认使用 StreamExecutionEnvironment 设置的并行度
table.exec.resource.default-parallelism: -1
  • 数据异常时的处理方式
# 默认值:ERROR
# 值类型:Enum【ERROR, DROP】
# 流批任务:流、批任务都支持
# 用处:表上的 NOT NULL 列约束强制不能将 NULL 值插入表中。Flink 支持 `ERROR`(默认)和 `DROP` 配置。默认情况下,当 NULL 值写入 NOT NULL 列时,Flink 会产生运行时异常。用户可以将行为更改为 `DROP`,直接删除此类记录,而不会引发异常。
table.exec.sink.not-null-enforcer: ERROR
  • 上游cdc去重
# 默认值:false
# 值类型:Boolean
# 流批任务:流任务
# 用处:接入了 CDC 的数据源,上游 CDC 如果产生重复的数据,可以使用此参数在 Flink 数据源算子进行去重操作,去重会引入状态开销
table.exec.source.cdc-events-duplicate: false
  • 设置空闲等待
# 默认值:0 ms
# 值类型:Duration
# 流批任务:流任务
# 用处:如果此参数设置为 60 s,当 Source 算子在 60 s 内未收到任何元素时,这个 Source 将被标记为临时空闲,此时下游任务就不依赖此 Source 的 Watermark 来推进整体的 Watermark 了。
# 默认值为 0 时,代表未启用检测源空闲。
table.exec.source.idle-timeout: 0 ms
  • 设置状态有效期
# 默认值:0 ms
# 值类型:Duration
# 流批任务:流任务
# 用处:指定空闲状态(即未更新的状态)将保留多长时间。尤其是在 unbounded 场景中很有用。默认 0 ms 为不清除空闲状态
table.exec.state.ttl: 0 ms推荐: ttl time to live失效时间。保留窗口附近的1-3个state状态数据。举例: 例如滚动窗口大小是5秒,那么这个参数推荐设置为5000 - 15000毫秒之间

上述的参数中,常用的有:开启微批、设置状态有效期

1.2 优化器参数
  • 开启两阶段聚合
#  默认值:AUTO
#  值类型:String
#  流批任务:流、批任务都支持
#  用处:聚合阶段的策略。和 MapReduce 的 Combiner 功能类似,可以在数据 shuffle 前做一些提前的聚合,可以选择以下三种方式
#  TWO_PHASE:强制使用具有 localAggregate 和 globalAggregate 的两阶段聚合。请注意,如果聚合函数不支持优化为两个阶段,Flink 仍将使用单阶段聚合。
#  两阶段优化在计算 count,sum 时很有用,但是在计算 count distinct 时需要注意,key 的稀疏程度,如果 key 不稀疏,那么很可能两阶段优化的效果会适得其反
#  ONE_PHASE:强制使用只有 CompleteGlobalAggregate 的一个阶段聚合。
#  AUTO:聚合阶段没有特殊的执行器。选择 TWO_PHASE 或者 ONE_PHASE 取决于优化器的成本。
table.optimizer.agg-phase-strategy: AUTO注意: 两阶段聚合必须配合微批处理的参数一起进行设置。
  • 开启分桶
#  默认值:false
#  值类型:Boolean
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct\sum distinct 数据时的 group by 的 key 较少导致的数据倾斜,比如 group by 中一个 key 的 distinct 要去重 500w 数据,而另一个 key 只需要去重 3 个 key,那么就需要先需要按照 distinct 的 key 进行分桶。将此参数设置为 true 之后,下面的 table.optimizer.distinct-agg.split.bucket-num 可以用于决定分桶数是多少
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.enabled: false#  默认值:1024
#  值类型:Integer
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct 数据时的 group by 较少导致的数据倾斜。加了此参数之后,会先根据 group by key 结合 hash_code(distinct_key)进行分桶,然后再自动进行合桶。
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.bucket-num: 1024
  • 重用执行计划
#  默认值:true
#  值类型:Boolean
#  流批任务:流任务
#  用处:如果设置为 true,Flink 优化器将会尝试找出重复的自计划并重用。默认为 true 不需要改动
table.optimizer.reuse-sub-plan-enabled: true
  • source资源重用
#  默认值:true
#  值类型:Boolean
#  流批任务:流任务
#  用处:如果设置为 true,Flink 优化器会找出重复使用的 table source 并且重用。默认为 true 不需要改动
table.optimizer.reuse-source-enabled: true
  • 开启谓词下推
#  默认值:true
#  值类型:Boolean
#  流批任务:流任务
#  用处:如果设置为 true,Flink 优化器将会做谓词下推到 FilterableTableSource 中,将一些过滤条件前置,提升性能。默认为 true 不需要改动
table.optimizer.source.predicate-pushdown-enabled: true
1.3 表参数
  • 开启DML异步
#  默认值:false
#  值类型:Boolean
#  流批任务:流、批任务都支持
#  用处:DML SQL(即执行 insert into 操作)是异步执行还是同步执行。默认为异步(false),即可以同时提交多个 DML SQL 作业,如果设置为 true,则为同步,第二个 DML 将会等待第一个 DML 操作执行结束之后再执行
table.dml-sync: false
  • 设置方法的最大长度不超过64KB
#  默认值:64000
#  值类型:Integer
#  流批任务:流、批任务都支持
#  用处:Flink SQL 会通过生产 java 代码来执行具体的 SQL 逻辑,但是 jvm 限制了一个 java 方法的最大长度不能超过 64KB,但是某些场景下 Flink SQL 生产的 java 代码会超过 64KB,这时 jvm 就会直接报错。因此此参数可以用于限制生产的 java 代码的长度来避免超过 64KB,从而避免 jvm 报错。
table.generated-code.max-length: 64000
  • 本地时区
#  默认值:default
#  值类型:String
#  流批任务:流、批任务都支持
#  用处:在使用天级别的窗口时,通常会遇到时区问题。举个例子,Flink 开一天的窗口,默认是按照 UTC 零时区进行划分,那么在北京时区划分出来的一天的窗口是第一天的早上 8:00 到第二天的早上 8:00,但是实际场景中想要的效果是第一天的早上 0:00 到第二天的早上 0:00 点。因此可以将此参数设置为 GMT+08:00 来解决这个问题。
table.local-time-zone: default
  • 编译器
#  默认值:default
#  值类型:Enum【BLINK、OLD】
#  流批任务:流、批任务都支持
#  用处:Flink SQL planner,默认为 BLINK planner,也可以选择 old planner,但是推荐使用 BLINK planner
table.planner: BLINK
  • SQL方言
#  默认值:default
#  值类型:String
#  流批任务:流、批任务都支持
#  用处:Flink 解析一个 SQL 的解析器,目前有 Flink SQL 默认的解析器和 Hive SQL 解析器,其区别在于两种解析器支持的语法会有不同,比如 Hive SQL 解析器支持 between and、rlike 语法,Flink SQL 不支持
table.sql-dialect: default

二、Flink-SQL调优

2.1 mini-batch聚合

作用:微批处理能够提升程序处理数据的吞吐量,但是会导致数据的处理时效性降低

在这里插入图片描述

SQL中参数配置如下:

# 默认值:false
# 值类型:Boolean
# 流批任务:流任务支持
# 用处:MiniBatch 优化是一种专门针对 unbounded 流任务的优化(即非窗口类应用),其机制是在 `允许的延迟时间间隔内` 以及 `达到最大缓冲记录数` 时触发以减少 `状态访问` 的优化,从而节约处理时间。下面两个参数一个代表 `允许的延迟时间间隔`,另一个代表 `达到最大缓冲记录数`。
table.exec.mini-batch.enabled: false# 默认值:0 ms
# 值类型:Duration
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 MiniBatch 机制最大允许的延迟时间。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0 ms
table.exec.mini-batch.allow-latency: 0 ms# 默认值:-1
# 值类型:Long
# 流批任务:流任务支持
# 用处:此参数设置为多少就代表 MiniBatch 机制最大缓冲记录数。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0
table.exec.mini-batch.size: -1
2.2 两阶段聚合

作用:1- 提前对数据进行局部聚合操作,能够减少后续数据处理量;2-能够一定程度避免数据倾斜的问题

在这里插入图片描述

FlinkSQL中的配置:

#  默认值:AUTO
#  值类型:String
#  流批任务:流、批任务都支持
#  用处:聚合阶段的策略。和 MapReduce 的 Combiner 功能类似,可以在数据 shuffle 前做一些提前的聚合,可以选择以下三种方式
#  TWO_PHASE:强制使用具有 localAggregate 和 globalAggregate 的两阶段聚合。请注意,如果聚合函数不支持优化为两个阶段,Flink 仍将使用单阶段聚合。
#  两阶段优化在计算 count,sum 时很有用,但是在计算 count distinct 时需要注意,key 的稀疏程度,如果 key 不稀疏,那么很可能两阶段优化的效果会适得其反
#  ONE_PHASE:强制使用只有 CompleteGlobalAggregate 的一个阶段聚合。
#  AUTO:聚合阶段没有特殊的执行器。选择 TWO_PHASE 或者 ONE_PHASE 取决于优化器的成本。
table.optimizer.agg-phase-strategy: AUTO注意: 如果想要使用两阶段聚合,那么必须同时开始mini-batch微批处理
2.3 分桶

作用:能够一定程度避免数据倾斜的问题

在这里插入图片描述

FlinkSQL的配置如下:

#  默认值:false
#  值类型:Boolean
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct\sum distinct 数据时的 group by 的 key 较少导致的数据倾斜,比如 group by 中一个 key 的 distinct 要去重 500w 数据,而另一个 key 只需要去重 3 个 key,那么就需要先需要按照 distinct 的 key 进行分桶。将此参数设置为 true 之后,下面的 table.optimizer.distinct-agg.split.bucket-num 可以用于决定分桶数是多少
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.enabled: false#  默认值:1024
#  值类型:Integer
#  流批任务:流任务
#  用处:避免 group by 计算 count distinct 数据时的 group by 较少导致的数据倾斜。加了此参数之后,会先根据 group by key 结合 hash_code(distinct_key)进行分桶,然后再自动进行合桶。
#  后文会介绍具体的案例
table.optimizer.distinct-agg.split.bucket-num: 1024
2.4 filter去重(了解)
--普通的写法
SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day--filter优化写法
SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('web', 'other')) AS web_uv
FROM T
GROUP BY day

filter子句能够将三个状态合并成一个共享的状态。方便程序的读取等操作。能够提升效率。

在这里插入图片描述

三、阿里云Flink调优

Flink支持智能调优和定时调优两种调优模式。

在这里插入图片描述

3.1 智能调优

简单理解,就是阿里云Flink智能来进行调优。默认会从并发度和内存来进行调优。

  • 智能调优会调整作业的并发度来满足作业流量变化所需要的吞吐。

    智能调优会监控消费源头数据的延迟变化情况、TaskManager(TM) CPU实际使用率和各个算子处理数据能力来调整作业的并发度。详情如下:

    • 作业延迟Delay指标正常(不超过60s),不修改当前作业并发。
    • 作业延迟Delay指标超过默认阈值60s,分以下两种情况来调整并发度:
      • 延迟正在下降,不进行并发度调整。
      • 延迟增加并且连续上升3分钟(默认值), 默认调整作业并发度到当前实际TPS的两倍,但不超过设置最大的资源(默认值为64 CU)。
    • 作业不存在延迟指标。
      • 作业某VERTEX节点连续6分钟实际处理数据时间占比超过80%,调大作业并发度使得SLOT使用率降低到50%,但不超过设置最大的资源(默认为64 CU)。
      • 所有TM的平均利用率连续6分钟超过80%,调高并发度使TM的CPU使用率降低到50%。
    • 所有TM的最大CPU使用率连续24小时低于20%,且VERTEX的实际处理数据时间低于20%时,调低作业的并发度使CPU和VERTEX实际处理的时间占比提高到50%。
  • 智能调优也会监控作业的内存使用和Failover情况,来调整作业的内存配置。详情如下:

    • 在JobManager GC频繁或者发生OOM异常时,会调高JM的内存,默认最大调整到16 GiB。
    • 在TM GC频繁或者发生OOM异常、HeartBeatTimeout异常时,会调高TM的内存,默认最大调整到16 GiB。
    • 在TM内存使用率超过95%时,会调大TM的内存。
    • 在TM的实际内存使用率连续24小时低于30%时,降低TM内存的配置,默认最小调整到1.6 GiB。
3.2 定时调优

定时调优计划描述了资源和时间点的对应关系,一个定时调优计划中可以包含多组资源和时间点的关系。在使用定时调优计划时,您需要明确知道各个时间段的资源使用情况,根据业务时间区间特征,设置对应的资源。

例如,某业务全天早09:00~19:00是业务高峰,19:00到第二天09:00是业务低峰。此时您可以使用定时调优功能,在高峰时间段使用30 CU,在业务低峰时使用10 CU。

说明:1 CU=1核CPU+4 GiB内存+20 GB本地存储(放置日志、系统检查点等信息)

小结:在使用阿里云调优时,可以使用一些开源的参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/380634.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码解读:Diffusion Models中的长宽桶技术(Aspect Ratio Bucketing)

Diffusion Models专栏文章汇总:入门与实战 前言:自从SDXL提出了长宽桶技术之后,彻底解决了不同长宽比的图像输入问题,现在已经成为训练扩散模型必选的方案。这篇博客从代码详细解读如何在模型训练的时候运用长宽桶技术(Aspect Rat…

UNiapp 微信小程序渐变不生效

开始用的一直是这个,调试一直没问题,但是重新启动就没生效,经查询这个不适合小程序使用:不适合没生效 background-image:linear-gradient(to right, #33f38d8a,#6dd5ed00); 正确使用下面这个: 生效,适合…

Python list comprehension (列表推导式 - 列表解析式 - 列表生成式)

Python list comprehension {列表推导式 - 列表解析式 - 列表生成式} 1. Python list comprehension (列表推导式 - 列表解析式 - 列表生成式)2. Example3. ExampleReferences Python 中的列表解析式并不是用来解决全新的问题,只是为解决已有问题提供新的语法。 列…

(10)深入理解pandas的核心数据结构:DataFrame高效数据清洗技巧

目录 前言1. DataFrame数据清洗1.1 处理缺失值(NaNs)1.1.1 数据准备1.1.2 读取数据1.1.3 查找具有 null 值或缺失值的行和列1.1.4 计算每列缺失值的总数1.1.5 删除包含 null 值或缺失值的行1.1.6 利用 .fillna() 方法用Portfolio …

Windows搭建RTMP视频流服务器

参考了一篇文章,见文末。 博客中nginx下载地址失效,附上一个有效的地址: Index of /download/ 另外,在搭建过程中,遇到的问题总结如下: 1 两个压缩包下载解压并重命名后,需要 将nginx-rtmp…

如何使用简鹿水印助手或 Photoshop 给照片添加文字

在社交媒体中,为照片添加个性化的文字已经成为了一种流行趋势。无论是添加注释、引用名言还是表达情感,文字都能够为图片增添额外的意义和风格。本篇文章将使用“简鹿水印助手”和“Adobe Photoshop”这两种工具给照片添加文字的详细步骤。 使用简鹿水印…

【python基础】组合数据类型:元组、列表、集合、映射

文章目录 一. 序列类型1. 元组类型2. 列表类型(list)2.1. 列表创建2.2 列表操作2.3. 列表元素遍历 ing元素列表求平均值删除散的倍数 二. 集合类型(set)三. 映射类型(map)1. 字典创建2. 字典操作3. 字典遍历…

【EI检索】第二届机器视觉、图像处理与影像技术国际会议(MVIPIT 2024)

一、会议信息 大会官网:www.mvipit.org 官方邮箱:mvipit163.com 会议出版:IEEE CPS 出版 会议检索:EI & Scopus 检索 会议地点:河北张家口 会议时间:2024 年 9 月 13 日-9 月 15 日 二、征稿主题…

【香橙派开发板测试】:在黑科技Orange Pi AIpro部署YOLOv8深度学习纤维分割检测模型

文章目录 🚀🚀🚀前言一、1️⃣ Orange Pi AIpro开发板相关介绍1.1 🎓 核心配置1.2 ✨开发板接口详情图1.3 ⭐️开箱展示 二、2️⃣配置开发板详细教程2.1 🎓 烧录镜像系统2.2 ✨配置网络2.3 ⭐️使用SSH连接主板 三、…

Web开发:图片九宫格与非九宫格动态切换效果(HTML、CSS、JavaScript)

目录 一、业务需求 二、实现思路 三、实现过程 1、基础页面 2、图片大小调整 3、图片位置调整 4、鼠标控制切换 5、添加过渡 四、完整代码 一、业务需求 默认显示基础图片; 当鼠标移入,使用九宫格效果展示图片; 当鼠标离开&#…

CTF-Web习题:[BJDCTF2020]ZJCTF,不过如此

题目链接:[BJDCTF2020]ZJCTF,不过如此 解题思路 访问靶场链接,出现的是一段php源码,接下来做一下代码审阅,发现这是一道涉及文件包含的题 主要PHP代码语义: file_get_contents($text,r); 把$text变量所…

基于NeRF的路面重建算法——RoME / EMIE-MAP / RoGS

基于NeRF的路面重建算法——RoME / EMIE-MAP / RoGS 1. RoMe1.1 Mesh Initialization / Waypoint Sampling1.2 Optimization1.3 Experiments 2. EMIE-MAP2.1 Road Surface Representation based on Explicit mesh and Implicit Encoding2.2 Optimizing Strategies2.3 Experimen…

Uniapp鸿蒙项目实战

Uniapp鸿蒙项目实战 24.7.6 Dcloud发布了uniapp兼容鸿蒙的文档:Uniapp开发鸿蒙应用 在实际使用中发现一些问题,开贴记录一下 设备准备 windows电脑准备(家庭版不行,教育版、企业版、专业版也可以,不像uniapp说的只有…

Promise 详解(原理篇)

目录 什么是 Promise 实现一个 Promise Promise 的声明 解决基本状态 添加 then 方法 解决异步实现 解决链式调用 完成 resolvePromise 函数 解决其他问题 添加 catch 方法 添加 finally 方法 添加 resolve、reject、race、all 等方法 如何验证我们的 Promise 是否…

分布式搜索之Elasticsearch入门

Elasticsearch 是什么 Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。 Elastic Stack 又是什么呢&a…

企业须善用数字化杠杆经营获取数字化时代红利

​在当今数字化时代,企业面临着新机遇和新挑战。数字化技术的迅速发展正在重塑商业格局,企业若能善用数字化杠杆经营,将能够在激烈的市场竞争中脱颖而出,获取丰厚的时代红利。 数字化杠杆的内涵 数字化杠杆是指企业借助数字化技术…

SAPUI5基础知识16 - 深入理解MVC架构

1. 背景 经过一系列的练习,相信大家对于SAPUI5的应用程序已经有了直观的认识,我们在练习中介绍了视图、控制器、模型的概念和用法。在本篇博客中,让我们回顾总结下这些知识点,更深入地理解SAPUI5的MVC架构。 首先,让…

Android 性能优化之卡顿优化

文章目录 Android 性能优化之卡顿优化卡顿检测TraceView配置缺点 StricktMode配置违规代码 BlockCanary配置问题代码缺点 ANRANR原因ANRWatchDog监测解决方案 Android 性能优化之卡顿优化 卡顿检测 TraceViewStricktModelBlockCanary TraceView 配置 Debug.startMethodTra…

XMl基本操作

引言 使⽤Mybatis的注解⽅式,主要是来完成⼀些简单的增删改查功能. 如果需要实现复杂的SQL功能,建议使⽤XML来配置映射语句,也就是将SQL语句写在XML配置⽂件中. 之前,我们学习了,用注解的方式来实现MyBatis 接下来我们…

【STM32】按键控制LED光敏传感器控制蜂鸣器(江科大)

一、按键控制LED LED.c #include "stm32f10x.h" // Device header/*** 函 数:LED初始化* 参 数:无* 返 回 值:无*/ void LED_Init(void) {/*开启时钟*/RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENAB…