flink watermark 实例分析

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');

watermark使用demo

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/222417.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

指标体系构建-01-什么是数据指标

参考 四千字全面解析数据产品经理必知概念&#xff1a;标签、维度、指标 什么是数据指标 指标是指于其中打算达到的指数&#xff0c;规格&#xff0c;标准等,是用数据对事物进行描述的工具。通常指标对应是否有价值取决于这个指标的实际意义。同时关注指标对应的数值&#x…

养老院自助饮水机(字符设备驱动)

目录 1、项目背景 2、驱动程序 2.1 三层架构 2.2 驱动三要素 2.3 字符设备驱动 2.3.1 驱动模块 2.3.2 应用层 3、设计实现 3.1 项目设计 3.2 项目实现 3.2.1 驱动模块代码 3.2.2 用户层代码 4、功能特性 5、技术分析 6. 总结与未来展望 1、项目背景 养老院的老人…

网络基础【网线的制作、OSI七层模型、集线器、交换机介绍、路由器的配置】

目录 一.网线的制作 1.1.网线的标准 1.2.水晶头的做法 二.OSI七层模型、集线器、交换机介绍 集线器&#xff08;Hub&#xff09;&#xff1a; 交换机&#xff08;Switch&#xff09;&#xff1a; 三.路由器的配置 3.1.使用 3.2.常用的功能介绍 1、如何管理路由器 2、家…

Linux线程

文章目录 线程线程原理页表线程VS进程线程相关函数pthread_create函数pthread_selfpthread_exitpthread_cancelpthread_joinpthread_detach 线程ID 线程 什么是线程&#xff1f;为什么要有线程&#xff1f; 线程本质上就是轻量化的进程&#xff0c;一个进程就是一个执行流&…

信息论安全与概率论

目录 一. Markov不等式 二. 选择引理 三. Chebyshev不等式 四. Chernov上限 4.1 变量大于 4.2 变量小于 信息论安全中会用到很多概率论相关的上界&#xff0c;本文章将梳理几个论文中常用的定理&#xff0c;重点关注如何理解这些定理以及怎么用。 一. Markov不等式 假定…

Protobuf 编码规则及c++使用详解

Protobuf 编码规则及c使用详解 Protobuf 介绍 Protocol Buffers (a.k.a., protobuf) are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data Protocol Buffers&#xff08;简称为protobuf&#xff09;是谷歌的语言无关、…

多层负载均衡实现

1、单节点负载均衡 1&#xff09;站点层与浏览器层之间加入了一个反向代理层&#xff0c;利用高性能的nginx来做反向代理 2&#xff09;nginx将http请求分发给后端多个web-server 优点&#xff1a; 1&#xff09;DNS-server不需要动 2&#xff09;负载均衡&#xff1a;通过ngi…

Python深度学习028:神经网络模型太多,傻傻分不清?

文章目录 深度学习网络模型常见CNN网络深度学习网络模型 在深度学习领域,有许多常见的网络模型,每种模型都有其特定的应用和优势。以下是一些广泛使用的深度学习模型: 卷积神经网络(CNN): 应用:主要用于图像处理,如图像分类、物体检测。 特点:利用卷积层来提取图像特…

《数据分析-JiMuReport》积木报表详细入门教程

积木报表详细入门教程 一、JimuReport部署入门介绍 积木报表可以通过源码部署、SpringBoot集成、Docker部署以及各种成熟框架部署&#xff0c;具体可查看积木官方文档 当前采用源码部署&#xff0c;首先下载Jimureport-example-1.5.6 1 jimureport-example目录查看 使用ID…

喜报|迪捷软件“ModelCoder 建模及形式化验证代码生成软件”荣登浙江省首版次产品目录

近日&#xff0c;浙江省经济和信息化厅公布《2023年浙江省首版次软件产品应用推广指导目录》&#xff0c;浙江迪捷软件科技有限公司的“ModelCoder 建模及形式化验证代码生成软件”经过多轮审核及专家评定被纳入目录&#xff0c;这是迪捷软件自主研发的产品继“天目全数字实时仿…

【前缀和】【单调栈】LeetCode2281:巫师的总力量和

作者推荐 map|动态规划|单调栈|LeetCode975:奇偶跳 涉及知识点 单调栈 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 题目 作为国王的统治者&#xff0c;你有一支巫师军队听你指挥。 给你一个下标从 0 开始的整数数组 strength &…

【Matlab in VSCode】在VSCode中编辑MATLAB文件

【Matlab in VSCode】在VSCode中编辑MATLAB文件 1.安装插件 插件&#xff1a;在vscode拓展商店下载 MATLABMatlab in VSCode 其他&#xff1a;Windows环境MATLAB2019bpython3.7.9 2.插件配置 MATLAB插件下载后不用配置。 Matlab in VSCode需要进行相应的配置。 Windows…

【C语言】自定义类型:结构体深入解析(二)结构体内存对齐宏offsetof计算偏移量结构体传参

文章目录 &#x1f4dd;前言&#x1f320; 结构体内存对齐&#x1f309;内存对齐包含结构体的计算&#x1f320;宏offsetof计算偏移量&#x1f309;为什么存在内存对⻬?&#x1f320; 结构体传参&#x1f6a9;总结 &#x1f4dd;前言 本小节&#xff0c;我们学习结构的内存对…

C++面向对象(OOP)编程-STL详解(vector)

本文主要介绍STL六大组件&#xff0c;并主要介绍一些容器的使用。 目录 1 泛型编程 2 CSTL 3 STL 六大组件 4 容器 4.1 顺序性容器 4.1.1 顺序性容器的使用场景 4.2 关联式容器 4.2.1 关联式容器的使用场景 4.3 容器适配器 4.3.1 容器适配器的使用场景 5 具体容器的…

大模型ChatGLM下载、安装与使用

在人工智能领域&#xff0c;清华技术成果转化的公司智谱AI启动了支持中英双语的对话机器人ChatGLM内测。ChatGLM是一个初具问答和对话功能的千亿中英语言模型&#xff0c; 并针对中文进行了优化&#xff0c;现已开启邀请制内测&#xff0c;后续还会逐步扩大内测范围。 ChatGLM…

Unity中Shader平移矩阵

文章目录 前言方式一&#xff1a;对顶点本地空间下的坐标进行相加平移1、在属性面板定义一个四维变量记录在 xyz 上平移多少。2、在常量缓冲区进行申明3、在顶点着色器中&#xff0c;在进行其他坐标转化之前&#xff0c;对模型顶点本地空间下的坐标进行转化4、我们来看看效果 方…

Tomcat报404问题解决方案大全(包括tomcat可以正常运行但是报404)

文章目录 Tomcat报404问题解决方案大全(包括tomcat可以正常运行但是报404)1、正确的运行页面2、报错404问题分类解决2.1、Tomcat未配置环境变量2.2、IIs访问权限问题2.3、端口占用问题2.4、文件缺少问题解决办法&#xff1a; Tomcat报404问题解决方案大全(包括tomcat可以正常运…

智能优化算法应用:基于龙格-库塔算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于龙格-库塔算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于龙格-库塔算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.龙格-库塔算法4.实验参数设定5.算法结果…

@vue/cli脚手架

0_vue/cli 脚手架介绍 目标: webpack自己配置环境很麻烦, 下载vue/cli包,用vue命令创建脚手架项目 vue/cli是Vue官方提供的一个全局模块包(得到vue命令), 此包用于创建脚手架项目 脚手架是为了保证各施工过程顺利进行而搭设的工作平 vue/cli的好处 开箱即用 0配置webpack babe…

算法模板之栈图文详解

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;算法模板、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. ⛳️模拟栈1.1 &#x1f514;用数组模拟实现栈1.1.1 &#x1f47b;栈的定义1.1.…