Flink之时间语义
简介
Flink中时间语义可以说是最重要的一个概念了,这里就说一下关于时间语义的机制,我们下看一下下面的表格,简单了解一下
时间 | 定义 |
---|---|
processing time | 处理时间,也就是现实世界的时间,或者说代码执行时,服务器的时间 |
event time | 事件时间,就是事件数据中所带的时间(业务意义上的时间),和现实世界中的时间无关,只以数据中所带的时间为准 |
ingestion time | 注入时间,也就是数据进入到Flink系统中最开始的那个时间,这个没什么用处.开发中使用的基本就是处理时间和事件时间. |
通过表格中的内容我们对这三类时间代表的内容应该都清楚了,后面会先介绍一下时间语义的执行机制和对应的API
使用.
机制
推进时间
何为"时间标记",其实这里说的"时间标记"就是Flink中的WaterMark
,是由于Operator[算子]
中的定时器决定的,我们先看下图
图中我们是以event time
为例子的,这样更便于理解
- 当数据从
Mysql
过来时,Operator
中的定时器会先判断事件数据中所带的时间戳的大小 - 当得知事件数据中的时间戳是截止目前为止的最大时间戳时,会和时间标记进行比较,其实也就是和
WaterMark
进行比较,发现大于WaterMark
时,就会将时间戳进行更换,如果小于怎么办?小于就证明这一条数据时迟到的数据,就会被抛弃(这是发生在1对1的情况下). - 当时间标记更换完成后,就会将这个事件标记发送给下游算子.
推进时间选择
上面我们讲到了当时间标记更新完成后会发送给下游算子,试想一下如果下游的某个subtask
接收的数据是上游的两个subtask
发送来的数据时,且两条数据中的时间戳不同该怎么办?请看下图
Operator(subtask)
这个算子实例,接收到了来自上游的两个推进时间300
和700
- 接收到之后首先判断出两个上游发送来的推进时间中的最小值,在图中也就是
300
- 判断出最小推进时间后,再和算子实例中存在的推进时间进行比较,如果大于当前算子实例中的推进时间则进行替换
- 更新算子实例中的推进时间后,继续发送给下游
迟到数据
前面讲到了关于推进时间的更换和推进时间的选择,这里讲一下Flink中的迟到数据
,什么是迟到数据?就是字面意思,来晚了.
比如说某个算子实例中的推进时间是1000
,但是来了一条数据的时间是500
,怎么办?这条数据会被舍弃掉,在使用Flink的时候我们要牢记一点未来尚可努力,过去不可更改
,Flink中的时间线和现实世界中的时间线是一样的,只会推进永远不会回退,顶多在Flink中的时间可以暂停,但是一定一定是不可以回退的.
请看下图:
- 首先当前算子实例还是会对推进时间进行判断,获取最新的推进时间(
800
) - 当判断出推进时间为
800
时得知500~1000
这个窗口还没有结束(窗口都是前闭后开) - 接收上游发送来的数据,根据数据中的事件时间将其发送到不同的桶中,如
800
和900
的数据都符合当前桶(500~1000
)的时间区间,那么就会将该数据分配到当前桶中,直到推进时间更新到1000
时则开始计算当前桶中的数据 - 如果发现数据中的事件时间超出当前桶的时间区间,则会根据该数据中的时间时间划分未来桶,如
1000
和1200
都属于1000~1500
这个时间区间,假如这时来了一条1500 <= data < 2000
的数据,这时又会划分出一个新的未来中,未来桶中的数据只要还没开始计算,就会一直这样划分下去. 700
的数据来时,发现时间时间700
小于当前算子实例中的推进时间800
,则将700
的数据抛弃, 虽然700
的数据在500~1000
的区间,但是时间遵循不可回退的原则,所以该条数据必然会被抛弃- 将推进时间和计算完成的数据继续发往下游算子实例
推进时间暂停(即停止更新)
在Flink实时计算中还会出现一种情况,就是前面提到的,虽然时间不可回退,但是在Flink中可能会出现推进时间暂停的情况,这里就对这种情况进行说明,请看下图
- 同样下游的算子实例在选择推进时间时,会选择两个上游算子实例发来的推进时间中较小的那一个作为更新当前算子实例中的推进时间依据
- 通过上图可知
WaterMak
为800
的这个算子实例不再有新的WaterMark
发送过来,所以对于下游的算子实例来说,不管另一个持续发送WaterMark
的算子实例时间推进到哪里都没有作用了,也就是对于下游的算子实例时间已经暂停在800
- 当下游的算子实例时间暂停后,上游其中一个算子实例还在源源不断的发送
WaterMark
和数据,这时在下游的算子实例的窗口中就会根据数据中的事件时间以500
为一个区间不断地构建一个一个的未来桶
,将这些数据先放起来 - 发生这种情况时,如果时间过长就可能会导致程序崩溃报错,那么是否有解决方式呢?当然是有的,在Flink中为我们提供了一种机制
watermark-idle-timeout
,这个机制的作用是什么呢?当侦测到某一个支线一直没有数据进来,并且超过了watermark-idle-timeout
设置的时间(比如说是2s
),那么这个机制就会将推进时间
往前推进2s
,也就是说当我们将这个机制设置为2s
时,那等待某个一直没有来数据支线的时间就是2s
,到达2s
这个临界值时就会自动更新推进时间
关于时间语义的机制大概就这些内容了,如有不对欢迎指正,如有问题共同探讨.