DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
1. 窗口的划分
1.1 窗口分为:基于时间的窗口 和 基于数量的窗口
基于时间的窗口:基于起始时间戳 和终止时间戳来决定窗口的大小
基于数量的窗口:根据固定的数量定义窗口 的大小
这里我看到的都是只有基于时间的窗口做的划分,没有数量的,发现运用数量窗口划分也比较少,因此很多地方都省略了。
Count Window 也有滚动窗口、滑动窗口等,可以借鉴Flink实战之CountWindowr的滚动窗口、滑动窗口WindowsAPI使用示例
接下来就开始介绍基于时间的窗口了
1.2 在Flink中窗口的设定和数据本身无关,而是系统事先定义好的
窗口是Flink划分数据一个基本单位,窗口的划分方式是默认会根据自然时间划分,并且划分方式是前闭后开。
如图:
左闭右开。
2.时间概念
流数据中,数据具有时间属性。Flink
根据时间的产生时间把时间划分为3中类型:1.事件生成时间(Event time)2.事件接入时间 (Ingestion Time)3.事件处理时间 (Processing Time)
可以借鉴下图理解:
1.基站产生数据,分区传入Flink数据源
2.传入数据的时间 IngstionTime
3.划分窗口进行处理时间 ProcessingTime
2.1 事件生成时间
是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink前就已经进入到事件当中了,也就是说,事件时间是从原始的消息中提取到的。比如 Kafka消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间.
2.2事件接入时间
是数据进入Flink系统的时间,它主要依赖于其数据源算子所在主机的系统时钟。理论上,接入时间处于事件时间和处理时间之间。接入时间不能处理乱序问题或者延迟数据。如果需要处理此类问题,建议使用事件时间
2.3 事件处理时间
是指数据在操作算子计算过程中获取到的所在主机时间,这个时间是由Flink系统自己提供的。这种处理时间方式实时性是最好的,但计算结果未必准确,主要用于时间计算精度要求不是特别高的计算场景,比如延时比较高的日志数据
2.4事件时间和处理时间区别
这里的时间方便了解,比如事件时间,一个在米国产生的时间,一个在中国产生的时间,这两个有时差 不一样,但是数据世界是一样的,应该是从1970计算的时间戳。
在Flink初始化流式运行环境时,会设置流处理的时间特性
//设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//把时间特性设置为“事件时间”
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //或者,把时间特性设置为“处理时间”
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
3.窗口计算
3.1 窗口计算的程序结构
3.1.1.分组
分为:分组窗口和非分组窗口
看一组图:
左侧为分组窗口,右侧不分组。
3.2窗口的计算过程
如图
在窗口计算时,需要将数据流先分组 keyby,再分窗口。那么在写程序的时候也是先进行这两步,当窗口程序计算完成后(ruduce,aggregate,process等),又变为DataStream。
分组数据流程序结构如下:
dataStream.keyBy(...) //是分组数据流.window(...) //指定窗口分配器类型[.trigger(...)] //指定触发器类型(可选)[.evictor(...)] //指定驱逐器或者不指定(可选)[.allowedLateness()] //指定是否延迟处理数据(可选).reduce/fold/apply() //指定窗口计算函数
非分组数据流程序结构如下:
dataStream.windowAll(...) //指定窗口分配器类型[.trigger(...)] //指定触发器类型(可选)[.evictor(...)] //指定驱逐器或者不指定(可选)[.allowedLateness()] //指定是否延迟处理数据(可选).reduce/fold/apply() //指定窗口计算函数
不分组理解为所有数据为一个窗口。 windowAll(…)
3.2 窗口分配器
窗口分配器是负责将每一个到来的元素分配给一个或者多个窗口。
Flink提供预定义窗口分配器
窗口分配器在程序就一行可以了。
这几个窗口可以理解为火车站/汽车站/飞机场的屏幕,假设有25条消息,一页显示10条消息。
滚动:一页显示10条消息,滚动一下,下一页 11-20,滚动 21-25.
滑动:这里涉及一次滑动步长(假设为1),1-10 滑动 ,2-11滑动,3-12 …
3.2.1滚动窗口
滚动窗口是根据固定时间或大小对数据流进行切分,且窗口和窗口之间的元素不会重叠
DataStream API提供了两种滚动窗口类型,
即基于事件时间的滚动窗口(TumblingEventTimeWindows)和
基于处理时间的滚动窗口(TumblingProcessingTimeWindows),
二者对应的窗口分配器分别为TumblingEventTimeWindows
和TumblingProcessingTimeWindows。
窗口的长度
窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time中的
seconds、minutes、hours和days来设置。
3.2.1.1 滚动窗口的实例
1.事件 时间 滚动 ,窗口大小5秒
关键词:TumblingEventTimeWindows
val dataStream: DataStream[T] = ...//基于事件时间的滚动窗口,窗口大小为5秒钟
dataStream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).<window function>(...)
2.基于处理时间的滚动
关键词:TumblingProcessingTimeWindows
//基于处理时间的滚动窗口,窗口大小为5秒钟
dataStream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<window function>(...)
3.事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
偏移量调整窗口开始时间的数字。比如就会从整点的15分,30分,45分,00分开始,允许数据进行移位,用于时效性不强的数据。
//基于事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
dataStream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))).<window function>(...)
还可以使用快捷方法 timeWindow() 来定义TumblingEventTimeWindows
和TumblingProcessingTimeWindows
,举例如下:
dataStream.keyBy(...).timeWindow(Time.seconds(1)).<window function>(...)
如果使用的是timewindow,那么就没说明是-事件时间-还是-处理时间。窗口类型就要根据程序中设置的TimeCharacteristic
的值来决定。
当我们在程序中设置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
时,Flink会创建TumblingEventTimeWindows
,
当设置了env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
时,Flink会创建TumblingProcessingTimeWindows
。
认识英文单词就能很好的理解啦。
3.2.2滑动窗口
滑动窗口有重叠。(就是大屏幕的一个个向下滑动那种)
3.2.2.1 滑动窗口的实例
继续学习英文单词 slide :滑动(v) 它还有ppt幻灯片的意思。
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
里面得有两个参数,一个是窗口大小,一个是滑动步长。
不一样的就是处理时间和事件时间。
val dataStream: DataStream[T] = ...//基于事件时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<window function>(...)
//基于处理时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream.keyBy(<...>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<window function>(...)
这个是参数多了偏移量。
//基于处理时间的滑动窗口,窗口大小为12小时,滑动步长为1小时,偏移量为8小时
dataStream.keyBy(<...>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<window function>(...)
3.2.2.2 滑动步长与窗口大小的关系
3.2.3 会话窗口
会话窗口根据会话间隙(Session Gap)切分不同的窗口,当一个窗口在大于会话间隙的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。
接下来再来两个实例
看代码的withgap,中间的gap时间。
val input: DataStream[T] = ...//基于事件时间的会话窗口,会话间隙为10分钟
input.keyBy(...).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<window function>(...)//基于处理时间的会话窗口,会话间隙为10分钟
input.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<window function>(...)
3.3窗口的计算函数
在Flink的窗口计算程序中,在确定了窗口分配器以后,接下来就要确定窗口计算函数,从而完成对窗口内数据集的计算。
Flink提供了四种类型的窗口计算函数,分别是
1. ReduceFunction、
2. AggregateFunction、
3. FoldFunction
4. ProcessWindowFunction。
根据计算原理,ReduceFunction、AggregateFunction和FlodFunction属于增量聚合函数,而ProcessWindowFunction则属于全量聚合函数(这里处理的是window 那么就是整个窗口了,就是全量了)。
3.3.1 ReduceFunction
ReduceFunction定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合计算,然后输出类型相同的一个结果元素。
从这句话可以理解,先keyBy,再将同组的聚合计算。
接下来看代码
import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ReduceWindowFunctionTest {def main(args: Array[String]) {//设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度
env.setParallelism(1)//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//创建数据源,股票价格数据流val stockPriceStream: DataStream[StockPrice] = env//该数据流由StockPriceSource类随机生成.addSource(new StockPriceSource)
//确定针对数据集的转换操作逻辑val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))//打印输出
sumStream.print()//程序触发执行env.execute("ReduceWindowFunctionTest")}class StockPriceSource extends RichSourceFunction[StockPrice]{var isRunning: Boolean = trueval rand = new Random()//初始化股票价格var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId = 0var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {while (isRunning) {//每次从列表中随机选择一只股票stockId = rand.nextInt(priceList.size)val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis//将数据源收集写入SourceContextsrcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(1000))}}override def cancel(): Unit = {isRunning = false}}
}
分析代码:
具体看这几行
val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
现根据ID分组变成Keyedstream,再分窗口,然后reduce聚合同ID的price。
使用Maven工具对程序进行编译打包,然后提交到Flink中运行,在运行日志中可以看到类似如下的输出结果:
StockPrice(stock_1,1602036130952,39.78897954489408)
StockPrice(stock_4,1602036131741,49.950455275162945)
StockPrice(stock_2,1602036132184,30.073529000410154)
StockPrice(stock_3,1602036133154,79.88817093404676)
StockPrice(stock_0,1602036133919,9.957551599687758)
StockPrice(stock_1,1602036134385,39.68343765292602)
……
3.3.2 AggregateFunction
这个单词的意思就是聚合。
Flink的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。
AggregateFunction比ReduceFunction更加通用,它定义了3个需要复写的方法:
- add
- getResult
- merge
其中,add()定义了数据的添加逻辑,getResult()定义了累加器计算的结果,merge()定义了累加器合并的逻辑。
1.add 函数:
功能:该函数用于将输入元素添加到累加器中。在聚合过程中,每当有新的数据元素流入时,Flink 会调用此函数来更新累加器的状态。
参数:add 函数通常接收两个参数,一个是当前的累加器状态,另一个是待聚合的输入元素。
返回值:该函数返回更新后的累加器状态。
2.getResult 函数:
功能:该函数用于从累加器中提取聚合结果。在窗口触发或查询结束时,Flink 会调用此函数来获取最终的聚合结果。
参数:getResult 函数通常只接收一个参数,即当前的累加器状态。
返回值:该函数返回聚合后的结果,其类型通常由 AggregateFunction 的输出类型参数指定。
3.merge 函数:
功能:该函数用于在并行执行时合并两个累加器的状态。在分布式计算环境中,同一个窗口的数据可能会分配到不同的节点上进行处理。当这些节点上的聚合操作完成后,需要将它们的累加器状态合并起来以得到全局的聚合结果。
参数:merge 函数通常接收两个参数,即两个待合并的累加器状态。
返回值:该函数返回合并后的累加器状态。
这三个函数共同构成了 Flink 中 AggregateFunction 的核心逻辑。通过实现这三个函数,用户可以定义自定义的聚合操作,以满足各种复杂的数据处理需求。
注意 除了这三个函数外,AggregateFunction
接口通常还包含一个 createAccumulator
方法,用于初始化一个新的累加器实例。该方法在聚合操作开始时被调用,并返回一个空的或初始化的累加器状态。
举例代码:
import java.util.Calendar
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double) object AggregateWindowFunctionTest {def main(args: Array[String]) { // 设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//设置为处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //创建数据源,股票价格数据流val stockPriceStream: DataStream[StockPrice] = env .addSource(new StockPriceSource) //该数据流由StockPriceSource类随机生成stockPriceStream.print("input“) //设定针对数据集的转换操作逻辑val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).aggregate(new MyAggregateFunction) //自定义的聚合函数,那么就需要实现三个方法//打印输出sumStream.print("output“) //程序触发执行env.execute("AggregateWindowFunctionTest")}class StockPriceSource extends RichSourceFunction[StockPrice]{var isRunning: Boolean = trueval rand = new Random()// 初始化股票价格var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId = 0var curPrice = 0.0d override def run(srcCtx: SourceContext[StockPrice]): Unit = {while (isRunning) {// 每次从列表中随机选择一只股票stockId = rand.nextInt(priceList.size)val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis// 将数据源收集写入SourceContextsrcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(500))}}override def cancel(): Unit = {isRunning = false}}//自定义函数class MyAggregateFunction extends AggregateFunction[StockPrice,(String,Double,Long),(String,Double)] {//回忆一下:case class StockPrice(stockId:String,timeStamp:Long,price:Double)
//返回值不要时间了,只要id 和price//创建累加器
override def createAccumulator(): (String,Double, Long) = ("",0D,0L)//定义把输入数据累加到累加器的逻辑override def add(input:StockPrice,acc:(String,Double,Long))={(input.stockId,acc._2+input.price,acc._3+1L) //平均价格 所以数量 +1L}//根据累加器得出结果
override def getResult(acc:(String,Double,Long)) = (acc._1,acc._2 / acc._3)//定义累加器合并的逻辑override def merge(acc1:(String,Double,Long),acc2:(String,Double,Long)) = {(acc1._1,acc1._2+acc2._2,acc1._3+acc2._3)}}
}
这里我开始理解为了股票的三个属性(String,Double,Long),这里理解错了。
在 Apache Flink 的 AggregateFunction 接口中,当你定义一个自定义的聚合函数时,你需要指定三个类型参数:
1.输入类型(InputType):这是流中元素的类型,例子中为 StockPrice。
2.累加器类型(AccumulatorType):这是用于在聚合过程中存储中间状态的类型。例子中,这是一个三元组 (String, Double, Long),其中 String 表示股票ID(尽管这里的处理可能不是最理想的,因为通常累加器不应该包含像股票ID这样的非聚合字段),Double 表示价格的总和,Long 表示价格的数量(或说是处理了多少个价格数据点)。
3.输出类型(OutputType):这是聚合函数最终产生的结果类型。例子中,这也是一个二元组 (String, Double),其中 String 同样是股票ID(这里同样需要注意可能的逻辑问题),Double 是计算出的平均价格。
我才开始没有理解acc1._3 + acc2._3,现写如下:
acc1._1,acc1._2 + acc2._2, // 合并价格总和acc1._3 + acc2._3 // 合并价格数量
根据代码的先后顺序及运行顺序,最后执行getresult。
先聚合,最后平均。
在大多数情况下,add 方法会首先被调用,用于处理流入的数据并更新累加器状态。然后,根据并行度和数据分布,merge 方法可能会被调用以合并累加器状态。最后,在窗口触发或查询结束时,getResult 方法会被调用以提取最终的聚合结果。
代码分析完后,输出结果:
input> StockPrice(stock_2,1602040572049,29.99367518574229)
input> StockPrice(stock_2,1602040572205,30.03665296896211)
input> StockPrice(stock_2,1602040572601,30.00867347810531)
input> StockPrice(stock_0,1602040572856,9.974154737531954)
input> StockPrice(stock_1,1602040572934,19.997437804748245)
output> (stock_2,30.013000544269904)
output> (stock_1,19.997437804748245)
output> (stock_0,9.974154737531954)
3.3.3 FoldFunction
FoldFunction决定了窗口中的元素如何和一个输出类型的元素进行结合。对于每个进入窗口的元素而言,FoldFunction会被增量调用。窗口中的第一个元素将会和这个输出类型的初始值进行结合。需要注意的是,FoldFunction不能用于会话窗口和那些可合并的窗口。
//前面的代码和ReduceWindowFunctionTest程序中的代码相同,因此省略
val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).fold("CHINA_"){ (acc, v) => acc + v.stockId }
3.3.4 ProcessWindowFunction
前面提到的ReduceFunction和AggregateFunction都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景的需求
但是,在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到 ProcessWindowFunction
,因为它能够更加灵活地支持基于窗口全部数据元素的结果计算。
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ProcessWindowFunctionTest {def main(args: Array[String]) {//设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//设置为处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//创建数据源,股票价格数据流val source = env.socketTextStream("localhost", 9999)//指定针对数据流的转换操作逻辑val stockPriceStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))val sumStream = stockPriceStream.assignTimestampsAndWatermarks(WatermarkStrategy//为了测试方便,这里把水位线设置为0.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp})).keyBy(s => s.stockId).timeWindow(Time.seconds(3)).process(new MyProcessWindowFunction())//打印输出sumStream.print()//执行程序env.execute("ProcessWindowFunction Test")}class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {//聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据var sumPrice = 0.0;elements.foreach(stock => {sumPrice = sumPrice + stock.price})out.collect(key, sumPrice/elements.size)}}
}
这个代码里需要注意的是 .assignTimestampsAndWatermarks 和MyProcessWindowFunction
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp})
)
WatermarkStrategy.forBoundedOutOfOrderness 用于创建一个处理有界乱序数据的水位线策略。参数 Duration.ofSeconds(0) 表示没有乱序,即所有数据都是按时序到达的(在实际应用中,这通常是一个简化的假设,实际数据往往会有一定程度的乱序)。withTimestampAssigner 方法用于指定如何从数据元素中提取时间戳,这里是从 StockPrice 对象的 timeStamp 字段中提取。
process 方法用于应用一个自定义的 ProcessWindowFunction。在这个例子中,MyProcessWindowFunction 是一个自定义的窗口函数,它接收一个键(股票ID)、一个上下文对象(包含窗口的元数据,如开始和结束时间)、一个包含窗口内所有元素的迭代器,以及一个用于收集输出结果的收集器。
在 MyProcessWindowFunction 的 process 方法中,代码遍历了窗口内的所有 StockPrice 元素,计算了价格的总和,并计算了平均值(总和除以元素数量)。然后,它将结果(股票ID和平均价格)收集到输出流中。
下一小节该总结触发器啦。