0基础学习PyFlink——水位线(watermark)触发计算

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。
在这里插入图片描述

为了解决长期不计算的问题,我们引入了在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》和《0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)》的方案。但是这个方案引入另外一个问题,就是每次处理数据可能不尽相同。这是因为它们使用了“处理时间”(Processing Time)来作为窗口划分的参考系,而每次程序处理时间会根据当前负载情况有很大的不同。这样我们对同一批数据做处理时,可能会得出不同的Window切分方案。
在这里插入图片描述
于是我们引入《0基础学习PyFlink——事件时间和运行时间的窗口》方案。它可以使用源自数据本身的“事件时间”(Event Time)作为Time Window的参考系,这样在不同负载、不同时间,相同数据的时间参考系是一样的,进而可以得出一致的结果。
在这里插入图片描述
但是现实中,我们没法保证上述数据是按照上面的顺序到达Flink的。
比如下面这个例子,红色部分都是乱序的,那么Flink如何处理这些数据呢?
在这里插入图片描述
只有两种可能性:

  1. 直接抛弃;
  2. 等待一段时间统一处理,超过等待的时间直接抛弃。因为不可能一直等下去,否则什么时候处理呢?

这些即有别于Count Window,也有别于Time Window。这个时候就要引入水位线(watermark)技术来解决这个问题。
在详细讲解之前,我们需要明确一些基本知识:

  1. EventTime就是Timestamp,即我们可以通过制定Timestamp函数设定元素的EventTime。
  2. EventTime从属于元素。
  3. Watermark源于EventTime和max_out_of_orderness(等待无序数据的时间),即Watermark=EventTime-max_out_of_orderness。
  4. Watermark从属于流。
  5. Window的Start源于EventTime;End源于Start和窗口时间,即End=Start+WindowTme;这是一个左闭右开的区间,即[Start, End)。
  6. Window从属于流,只有Watermark>=Window End时才会触发计算(且窗口中要有元素)。
  7. Window在单向递增前进,比如从[0,10)变成[10,20)、[20,30)……[90,100)。
  8. Wartermark单向递增前进,它不会因为新进入的元素EventTime较小,而导致Wartermark向变小的趋势发展。
    在这里插入图片描述
    上图中,第一个元素(A,1)的EventTime通过自定义公式可以得到101,于是初始的Window的Start值是该值向下取可以被Window Size整除的最大值,即100;这个进一步确认了第一个窗口是[100,105)。
    watermark是通过eventtime计算出来的,上例中我们希望如果事件在窗口时间之外到来则抛弃,即不等待任何时间,即Window End+0,即Eventtime-0。
    (A,0)数据来到的时候,watermark不会因为其Eventtime为100,比流中的watermark值(101)小而改变,依然维持watermark单调递增。这个在(A,2)和(A,5)到来时也是如此。
    (A,8)元素的到来,会让流的watermark变成108。这个值会越过当前窗口[100,105),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,1)、(A,0)、(A,3)和(A,4);
    (A,10)元素的到来,会让流的watermark变成110。这个值会越过当前窗口[100,110),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,8)、(A,6)、(A,7)和(A,9);而(A,2)因为不在这个区间内,就被抛弃了。我们也可以认为(A,2)迟到而被抛弃。
    为了更好讲述原理,上述例子存在一个假设:watertime更新是随着元素一个个进入而改变的。而实际元素进入个数不太确定,比如可能会两个两个进入,那么就会变成如下。主要区别就是(A,5)参与了第二次窗口计算,虽然它迟到了,而且watermark计算方法也不打算等待任何一个迟到的数据,但是它和(A,10)一起进入时间戳计算逻辑,导致触发的时机被滞后,从而“幸运”的赶上了最后一轮窗口计算。如果它稍微再晚一点到来,它也会被抛弃。
    在这里插入图片描述

测试代码

import time
from pyflink.common import Duration, WatermarkStrategy, Time, Types
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
from pyflink.datastream.functions import AllWindowFunction, ProcessFunction, ProcessAllWindowFunction, KeyedProcessFunction
from pyflink.table.expressions import lit, col
from pyflink.table.window import Tumble
from pyflink.common.time import Instant
from pyflink.table.udf import udf
from pyflink.common import Rowclass WindowFunc(AllWindowFunction[tuple, tuple, TimeWindow]):def apply(self, window, inputs):out = "**************************WindowFunc**************************" \"\nwindow: start:{} end:{} \ninputs: {}" \"\n**************************WindowFunc**************************" \.format(Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end), inputs)print(out)for value in inputs:yield (value, Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end))class TimestampAssignerAdapter(TimestampAssigner):def extract_timestamp(self, value, record_timestamp: int):return value[1] * 1000class TimestampAssignerProcessFunctionAdapter(ProcessFunction):def process_element(self, value, ctx: 'ProcessFunction.Context'):out_put = "-----------------------TimestampAssignerProcessFunctionAdapter {}-----------------------" \"\nvalue: {} \ttimestamp: {} \tcurrent_processing_time: {} \tcurrent_watermark: {}" \"\n-----------------------TimestampAssignerProcessFunctionAdapter-----------------------" \.format(int(time.time()), value, Instant.of_epoch_milli(ctx.timestamp()),Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),Instant.of_epoch_milli(ctx.timer_service().current_watermark()))print(out_put)yield (value, Instant.of_epoch_milli(ctx.timestamp()), Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),Instant.of_epoch_milli(ctx.timer_service().current_watermark()))def gen_random_int_and_timestamp():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()# stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_execute_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)stream_execute_env.set_parallelism(1)stream_execute_env.get_config().set_auto_watermark_interval(2)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)ordinal_num_start = 0ordinal_num_end = 10rows_per_second = 1schame = Schema.new_builder().column('in_ord', DataTypes.INT()) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.in_ord.kind', 'sequence') \.option('fields.in_ord.start', str(ordinal_num_start)) \.option('fields.in_ord.end', str(ordinal_num_end)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')@udf(result_type=DataTypes.ROW([DataTypes.FIELD("in_ord", DataTypes.INT()), DataTypes.FIELD("calc_order", DataTypes.INT())]), input_types=[DataTypes.INT()])def colFunc(oneCol):ordinal_num_data_map = {0: 1, 1: 0, 2: 3, 3: 4, 4: 8, 5: 6, 6: 7, 7: 2, 8: 9, 9: 10, 10: 5}# ordinal_num_data_map = {0: 16, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9,#                       10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 0, 17: 17, 18: 18, 19: 19,#                       20: 20, 21: 121, 22: 122, 23: 123, 24: 124, 25: 125, 26: 126, 27: 127, 28: 128, 29: 129,}data = ordinal_num_data_map[oneCol] + 100return Row(oneCol, data)input_table=table.map(colFunc(col('in_ord')))datastream = stream_table_env.to_data_stream(input_table)###############################################################################################    # datastream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(10))) \#                     .apply(WindowFunc())################################################################################################ watermark_strategy = WatermarkStrategy.no_watermarks().with_timestamp_assigner(TimestampAssignerAdapter())# datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)# datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())# datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.milliseconds(10))) \#                     .apply(WindowFunc())        ################################################################################################ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(TimestampAssignerAdapter())watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(0)).with_timestamp_assigner(TimestampAssignerAdapter())datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \.apply(WindowFunc())###############################################################################################stream_execute_env.execute()if __name__ == '__main__':gen_random_int_and_timestamp()

-----------------------TimestampAssignerProcessFunctionAdapter 1699856800-----------------------
value: Row(in_ord=0, calc_order=101) timestamp: Instant<101, 0> current_processing_time: Instant<1699856800, 705000000> current_watermark: Instant<-9223372036854776, 192000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=1, calc_order=100) timestamp: Instant<100, 0> current_processing_time: Instant<1699856802, 700000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=2, calc_order=103) timestamp: Instant<103, 0> current_processing_time: Instant<1699856802, 702000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=3, calc_order=104) timestamp: Instant<104, 0> current_processing_time: Instant<1699856804, 700000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=4, calc_order=108) timestamp: Instant<108, 0> current_processing_time: Instant<1699856804, 709000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<100, 0> end:Instant<105, 0>
inputs: [Row(in_ord=0, calc_order=101), Row(in_ord=1, calc_order=100), Row(in_ord=2, calc_order=103), Row(in_ord=3, calc_order=104)]
WindowFunc
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=5, calc_order=106) timestamp: Instant<106, 0> current_processing_time: Instant<1699856806, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=6, calc_order=107) timestamp: Instant<107, 0> current_processing_time: Instant<1699856806, 705000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=7, calc_order=102) timestamp: Instant<102, 0> current_processing_time: Instant<1699856808, 700000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=8, calc_order=109) timestamp: Instant<109, 0> current_processing_time: Instant<1699856808, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=9, calc_order=110) timestamp: Instant<110, 0> current_processing_time: Instant<1699856809, 440000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=10, calc_order=105) timestamp: Instant<105, 0> current_processing_time: Instant<1699856809, 441000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<105, 0> end:Instant<110, 0>
inputs: [Row(in_ord=4, calc_order=108), Row(in_ord=5, calc_order=106), Row(in_ord=6, calc_order=107), Row(in_ord=8, calc_order=109), Row(in_ord=10, calc_order=105)]
WindowFunc
WindowFunc
window: start:Instant<110, 0> end:Instant<115, 0>
inputs: [Row(in_ord=9, calc_order=110)]

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/194973.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PyCharm 【unsupported Python 3.1】

PyCharm2020.1版本&#xff0c;当添加虚拟环境发生异常&#xff1a; 原因&#xff1a;Pycharm版本低了&#xff01;不支持配置的虚拟环境版本 解决&#xff1a;下载PyCharm2021.1版本&#xff0c;进行配置成功&#xff01;

2023年,全球CIO最关注的问题是什么?

面对AI大潮&#xff0c;全球CIO们在焦虑什么&#xff1f;随着全球数字化转型步伐的加速&#xff0c;CIO的角色发生了哪些转变&#xff1f; 继2022年5月发布首份全球CIO报告之后&#xff0c;联想集团今年又发布了以“韧性的全球首席信息官&#xff08;The Resilient CIO&#xf…

python大数据毕设选题

文章目录 0 前言1 大数据毕设选题推荐2 开题指导3 最后 0 前言 大家好&#xff01;大四的同学们&#xff0c;毕业设计的时间即将到来&#xff0c;你们准备好了吗&#xff1f;为了帮助大家更好地开始毕设&#xff0c;我作为学长给大家整理了最新的计算机大数据专业的毕设选题。…

微信公众号与小程序打通:流量变现的新路径

随着移动互联网的迅速发展&#xff0c;微信公众号和小程序已经成为企业营销和运营的重要工具。将微信公众号与小程序打通&#xff0c;不仅可以提高用户体验&#xff0c;还能有效提升流量的变现效率。本文将为您解析如何打通微信公众号与小程序&#xff0c;让流量快速变现。 一、…

开发知识点-Git

团队协作-Git Giteegitee 创建仓库打开项目所在目录&#xff0c;右键选择Git Bush Here(你要确定电脑上已经安装了Git&#xff09;初始化本地仓库配置验证信息。 完美解决github访问速度慢介绍Git 与 SVN 区别IDEA 添加 gitee Gitee Git Gitee 大家都知道国内访问 Github 速度…

【前段基础入门之】=>CSS3新特性 响应式布局

文章目录 概念媒体查询媒体类型媒体特性媒体运算符 概念 所谓对响应式布局方案的理解&#xff0c;众说纷纭&#xff0c;核心点就是同一套代码在不同尺度屏幕下的布局呈现方式的不同 社区中有很多人分享&#xff0c;并列出了多种实现响应式布局的方案&#xff0c;比如【 rem&…

quickapp_快应用_快应用组件

快应用组件 web组件web页面与快应用页面通信网页接收/发送消息网页接收消息 快应用页面接收/发送消息给网页发送消息 通信前提- trustedurl list组件refresh组件语法error-使用refresh组件会改变页面高度&#xff01;refresh组件list组件实现下拉刷新 tab组件 web组件 作用&am…

微信抽奖活动怎么做

微信抽奖活动&#xff1a;打破传统&#xff0c;创新互动&#xff0c;带给你超乎想象的惊喜体验&#xff01; 随着互联网的飞速发展&#xff0c;人们越来越热衷于参与各种线上活动。而微信&#xff0c;作为中国最大的社交平台之一&#xff0c;自然成为了各种活动的聚集地。今天…

IntelliJ IDEA 安装 GitHub Copilot插件 (最新)

注意&#xff1a; GitHub Copilot 插件对IDEA最低版本要求是2021.2&#xff0c;建议直接用2023.3&#xff0c;一次到位反正后续要升级的。 各个版本的依赖关系&#xff0c;请参照&#xff1a; ##在线安装&#xff1a; 打开 IntelliJ IDEA扩展商店&#xff0c;输入 "Git…

IntelliJ IDEA启动一个普通的java web项目的配置

原创/朱季谦 这是我很久以前刚开始用IntelliJ IDEA时记录的笔记&#xff0c;应该是五年前的一篇笔记了。正好赶上最近离职了&#xff0c;可以有比较多的时间把以前的记录整理一下&#xff0c;可以让刚接触到IntelliJ IDEA的童鞋学习如何在IntelliJ IDEA引入一个单机版的jar形式…

【python零基础入门学习】python进阶篇之数据库连接-PyMysql-全都是干货-一起来学习吧!!!

本站以分享各种运维经验和运维所需要的技能为主 《python零基础入门》&#xff1a;python零基础入门学习 《python运维脚本》&#xff1a; python运维脚本实践 《shell》&#xff1a;shell学习 《terraform》持续更新中&#xff1a;terraform_Aws学习零基础入门到最佳实战 《k8…

【npm 错误】:npm ERR! code ERESOLVE、npm ERR! ERESOLVE could not resolve问题

用过npm的小伙伴都会有这么一个情况出现&#xff0c;就是npm install /npm install xxxx 会出现改一连串的错误&#xff0c;如下&#xff1a; 解决办法&#xff1a; 只要在npm install后面加上--legacy-peer-deps就可以解决问题,安装插件也一样 npm install --legacy-peer-dep…

原论文一比一复现 | 更换 RT-DETR 主干网络为 【VGG13】【VGG16】【VGG19】| 对比实验必备

本专栏内容均为博主独家全网首发,未经授权,任何形式的复制、转载、洗稿或传播行为均属违法侵权行为,一经发现将采取法律手段维护合法权益。我们对所有未经授权传播行为保留追究责任的权利。请尊重原创,支持创作者的努力,共同维护网络知识产权。 论文地址:https://arxiv.o…

V10服务器安装virt-manage

kvm是什么 KVM(Kernel-based Virtual Machine, 即内核级虚拟机) 是一个开源的系统虚拟化模块。它使用Linux自身的调度器进行管理&#xff0c;所以相对于Xen&#xff0c;其核心源码很少。目前KVM已成为学术界的主流VMM之一&#xff0c;它包含一个为处理器提供底层虚拟化 可加载…

[CISCN 2023 华北]pysym

源码如下 from flask import Flask, render_template, request, send_from_directory import os import random import string app Flask(__name__) app.config[UPLOAD_FOLDER]uploads app.route(/, methods[GET]) def index():return render_template(index.html) app.route…

IDEA写mybatis程序,java.io.IOException:Could not find resource mybatis-config.xml

找不到mybatis-config.xml 尝试maven idea:module&#xff0c;不是模块构造问题 尝试检验pom.xml&#xff0c;在编译模块添加了解析resources内容依旧不行 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.or…

vb.net 实时监控双门双向门禁控制板源代码

本示例使用设备介绍&#xff1a;实时网络双门双向门禁控制板可二次编程控制网络继电器远程开关-淘宝网 (taobao.com) Imports System.Net.Sockets Imports System.Net Imports System.Text Imports System.ThreadingImports System.Net.NetworkInformation Imports System.Man…

释放搜索潜力:基于Milvus快速搭建语义检索系统(快速版),让信息尽在掌握

搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术细节以及项目实战(含码源) 专栏详细介绍:搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术…

用照片预测人的年龄【图像回归】

在图像分类任务中&#xff0c;卷积神经网络 (CNN) 是非常强大的神经网络架构。 然而&#xff0c;鲜为人知的是&#xff0c;它们同样能够执行图像回归任务。 图像分类和图像回归任务之间的基本区别在于分类任务中的目标变量&#xff08;我们试图预测的东西&#xff09;不是连续…

git的简单使用

git 的简单使用 前言&#xff1a; 为了方便理解&#xff0c;文中一些内容表达的不是十分准确&#xff0c;如有错误&#xff0c;欢迎大家友善的指出。 接下来就开始了&#xff01;&#xff01; 使用git其实就是围绕下面这个图展开的&#xff0c;大家可以先看下图&#xff0c…