监控Checkpoints
监控 checkpoint 行为最简单的方法是通过 UI 的 checkpoint 部分。 监控这两个指标:
- 算子收到第一个 checkpoint barrier 的时间。当触发 checkpoint 的耗费时间一直很高时,这意味着 checkpoint barrier 需要很长时间才能从 source 到达 operators。 这通常表明系统处于反压下运行。
- 对齐时间Alignment Duration:处理第一个和最后一个 checkpoint barrier 之间的时间。在 unaligned(未对齐)checkpoints 下,exactly-once 和 at-least-once checkpoints 的 subtasks 处理来自上游 subtasks 的所有数据,且没有任何中断。 然而,对于 aligned exactly-once checkpoints,已经收到 checkpoint barrier 的通道被阻止继续发送数据,直到所有剩余的通道都赶上并接收它们的 checkpoint barrier(对齐时间)。
理想情况下,这两个值都应该很低 。 较高的数值意味着 由于存在反压(没有足够的资源来处理传入的记录),导致checkpoint barriers 在作业中的移动速度较慢,这也可以通过处理记录的端到端延迟在增加来观察到。
在出现瞬态反压、数据倾斜或网络问题时,这些数值偶尔会很高。
Unaligned checkpoints 可用于加快checkpoint barriers的传播。 但是并不能解决导致反压的根本问题(端到端记录延迟仍然很高)。
Checkpoint 调优
应用程序可以配置定期触发 checkpoints。 当 checkpoint 完成时间超过 checkpoint 间隔时,在正在进行的 checkpoint 完成之前,不会触发下一个 checkpoint。默认情况下,一旦正在进行的 checkpoint 完成,将立即触发下一个 checkpoint。
当 checkpoints 完成的时间经常超过 checkpoints 基本间隔时(例如,因为状态比计划的更大,或者访问 checkpoints 所在的存储系统暂时变慢), 系统不断地进行 checkpoints(一旦完成,新的 checkpoints 就会立即启动)。这可能意味着过多的资源被不断地束缚在 checkpointing 中,并且 checkpoint 算子进行得缓慢。 此行为对使用 checkpointed 状态的流式应用程序的影响较小,但仍可能对整体应用程序性能产生影响。
为了防止这种情况,应用程序可以定义 checkpoints 之间的最小等待时间:
StreamExecutionEnvironment.getCheckpointConfig().
setMinPauseBetweenCheckpoints(milliseconds)
此持续时间是指从最近一个 checkpoint 结束到下一个 checkpoint 开始之间必须经过的最小时间间隔。