时间语义与窗口

时间语义

在Flink中,时间语义分为两种 : 处理时间和事件时间。时间语义与窗口函数是密不可分的。以窗口为单位进行某一段时间内指标统计,例如想要统计8点-9点的某个页面的访问量,此时就需要用到了窗口函数,这里的关键时间点是8点到9点,而这个时间点指的是事件时间,不是处理时间,也就是某个时间发生的时间。

一条日志数据发生的时间是8:59,而经过一系列转换到达系统开始处理的时间可能是9:01,很显然,如果想要统计8点到9点的数据,应该是根据事件的发生时间,而不是处理时间。

  • WaterMark

上面说到,在窗口函数开窗以及关窗中,不应该以当前系统的处理时间,而应该取当前的事件时间,这个事件时间就是WaterMark 用于标识当前流数据的时间到什么时候了,这个时间不是由系统去推进的,而是基于事件的到来去推进的。

  • WaterMark的延时

在流处理中,数据并不是一批一批到达的而是流式到达,且到达分布式系统处理的时间可能是乱序的

8:57 、 9:01 、 8:59 、 9:02 …

如果 想要统计8点到9点的数据统计,按照事件时间,当 9:01的数据到达之后,便认为,8点到9点的窗口可以关闭进行运算了,但是8:59 这条数据就 不在统计范围内了,这样就存在一定意义上的数据丢失。要解决这个问题,就是让Watermark的时间进行延时,也就是说,本来是计划9点关窗计算的,让Watermark的时间减少2分钟,那9:01这条数据到来之后,WaterMark的时间其实是8:59,9:02的数据来到以后,WaterMark才会到9:00,此时8点-9点才会关窗计算。这个等多久的时间就是WaterMark的延时。

窗口

窗口函数的统计基于时间语义的watermark,当窗口接收到数据以后,首先需要根据数据身上的时间戳来判断此条数据属于哪个窗口,将属于某个窗口的事件分配给窗口,而关窗计算的时间取决于WaterMark。在Flink中,窗口在同一时刻是可以存在多个,而不是只有一个。

在这里插入图片描述

数据流的WaterMark定义

  • 在DataStream上游数据流通过assignTimestampsAndWatermarks 分配watermark
new WatermarkStrategy<Event>() {@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return null;}@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return WatermarkStrategy.super.createTimestampAssigner(context);}}

createWatermarkGenerator 方法用于返回 WatermarkGenerator

@Public
public interface WatermarkGenerator<T> {void onEvent(T var1, long var2, WatermarkOutput var4);void onPeriodicEmit(WatermarkOutput var1);
}

WatermarkGenerator 两个方法分别对基于事件的WaterMark以及周期性的生成watermark两种生成策略发射watermark。

而每条数据上的时间戳该如何提前,是通过 TimestampAssigner 这个时间戳分配器提取的。

@FunctionalInterface
@Public
public interface TimestampAssigner<T> {long NO_TIMESTAMP = -9223372036854775808L;long extractTimestamp(T var1, long var2);
}

extractTimestamp 返回了一个时间戳,获取到这个时间以后,便可以根据这个时间去推进WaterMark的前进。

  • WaterMark时间分配器定义
//时间戳分配器,获取到事件时间后用于推进WaterMark的前进
class MyTimeStampAss implements TimestampAssigner<Event>{@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}
}
  • WaterMark生成器定义
//水位线提取器,基于事件或者周期性的生成水位线
class MyWaterMarkGenerator implements WatermarkGenerator<Event>{private long maxTimeStamp = 0;@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {maxTimeStamp = Math.max(event.timestamp,maxTimeStamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(maxTimeStamp)); //发射Watermark,让下游窗口感知}
}
  • 给DataStream设置WaterMark
public class WaterMarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStream<Event> dataStream = env.addSource(new ClickSource());//设置数据流的watermarkSingleOutputStreamOperator<Event> stream = dataStream.assignTimestampsAndWatermarks(new MyWaterMarkStrategy());env.execute();}}class MyWaterMarkStrategy implements WatermarkStrategy<Event>{@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWaterMarkGenerator();}@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimeStampAss();}
}

默认的周期性生成WaterMark为200ms

 env.getConfig().setAutoWatermarkInterval(100);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/119146.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多目标应用:基于多目标向日葵优化算法(MOSFO)的微电网多目标优化调度MATLAB

一、微网系统运行优化模型 参考文献&#xff1a; [1]李兴莘,张靖,何宇,等.基于改进粒子群算法的微电网多目标优化调度[J].电力科学与工程, 2021, 37(3):7 二、多目标向日葵优化算法 多目标向日葵优化算法&#xff08;Multi-objective sunflower optimization&#xff0c;MOS…

kubesphere中部署grafana实现dashboard以PDF方式导出

1&#xff0c;部署grafana-image-renderer 2&#xff0c;部署grafana GF_RENDERING_SERVER_URL http://ip:30323/render #grafana-image-renderer地址 GF_RENDERING_CALLBACK_URL http://ip:32403/ #grafana地址 GF_LOG_FILTERS rend…

【CSS】简记CSS效果:通过transition(动画过渡属性)实现侧边栏目滑入滑出

需求 在资金明细的页面中&#xff0c;点击按钮时筛选区域从左侧滑出&#xff0c;完成筛选点击确认后调用接口完成数据查询&#xff0c;筛选区域滑入左侧&#xff1b; 基于微信小程序页面实现 wxml代码 <view><!-- 操作按钮 --><button type"primary&qu…

docker笔记8:Docker网络

1.是什么 1.1 docker不启动&#xff0c;默认网络情况 ens33 lo virbr0 在CentOS7的安装过程中如果有选择相关虚拟化的的服务安装系统后&#xff0c;启动网卡时会发现有一个以网桥连接的私网地址的virbr0网卡(virbr0网卡&#xff1a;它还有一个固定的默认IP地址192.168.122…

Thymeleaf常见属性

参考文档 thymeleaf 语法——th:text默认值、字符串连接、th:attr、th:href 传参、th:include传参、th:inline 内联、th:each循环、th:with、th:if_猎人在吃肉的博客-CSDN博客 代码演示 Controller public class TestController {AutowiredMenuService menuService;GetMapp…

【洛谷】P3853 路标设置

原题链接&#xff1a;https://www.luogu.com.cn/problem/P3853 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 整体思路&#xff1a;二分答案 由题意知&#xff0c;公路上相邻路标的最大距离定义为该公路的“空旷指数”。在公路上增设一些路标&…

MySQL的备份与恢复以及日志管理

目录 一、数据备份的重要性 二、数据库备份的分类 1、物理备份 2、逻辑备份 &#xff08;1&#xff09;完全备份&#xff1a;每次对数据进行完整的备份 &#xff08;2&#xff09;差异备份&#xff1a;备份自从上次完全备份之后被修改的过文件 &#xff08;3&#xff09…

yolov5手机版移植

感谢阅读 运行export.py然后百度一个onnx转化工具下载yolov5移动版文件和ncnn修改代码CMakeLists.txt修改修改param的参数![在这里插入图片描述](https://img-blog.csdnimg.cn/7c929414761840db8a2556843abcb2b3.jpeg)yolov5ncnn_jni.cpp修改修改stride16和stride32完工 运行ex…

QT day 1

作业&#xff1a; widget.cpp #include "window.h" #include<QDebug> #include<QIcon> Window::Window(QWidget *parent): QWidget(parent) {qDebug() <<"size" <<this->size();//this->resize(430,330);this->resize(Q…

django-项目

一、RESTful设计风格 基础概念 全称&#xff1a;Representational State Transfer 1.资源 网络上的一个实体&#xff0c;每个资源都有一个独一无二的URL与之对应&#xff1b;获取资源-直接访问URL即可 2.表现层 资源的表现形式 如HTML、xml、JPG、json等 3.状态转化 …

点可云进销存开源系统V6.0.1 ERP系统进销存源码仓库管理

介绍 点可云进销存系统&#xff0c;基于thinkphplayui开发。 功能包含&#xff1a;采购、销售、零售、多仓库管理、财务管理等功能 和超详细的报表功能&#xff08;采购报表、销售报表、零售报表、仓库报表、资金报表等&#xff09; 软件架构 thinkphplayui 功能概览 购货 -购…

红日靶场五(vulnstack5)渗透分析

环境搭建 win7 192.168.111.132&#xff08;仅主机&#xff09; 192.168.123.212&#xff08;桥接&#xff09; .\heart p-0p-0p-0win2008 ip: 192.168.111.131&#xff08;仅主机&#xff09; sun\admin 2020.comkali ip: 192.168.10.131&#xff08;nat&#xff09;vps&…

UG\NX二次开发BlockUI 进入NX的BlockUI编辑界面

文章作者:里海 来源网站:王牌飞行员_里海_里海NX二次开发3000例,里海BlockUI专栏,C\C++-CSDN博客 简介: 要使用BlockUI,需要先进入NX的BlockUI编辑界面。在低版本中,可以在Toolbar工具条中进入开始→所有应用模块→块UI样式编辑器;在高版本中,可以在Ribbon工具栏…

LLMs NLP模型评估Model evaluation ROUGE and BLEU SCORE

在整个课程中&#xff0c;你看到过类似模型在这个任务上表现良好&#xff0c;或者这个微调模型在性能上相对于基础模型有显著提升等陈述。 这些陈述是什么意思&#xff1f;如何形式化你的微调模型在你起初的预训练模型上的性能改进&#xff1f;让我们探讨一些由大型语言模型开…

微信小程序开发教学系列(12)- 实战项目案例

十二、实战项目案例 本章将通过一个简单的实战项目案例来帮助读者巩固之前学习到的知识。我们将搭建一个名为“ToDoList”的微信小程序&#xff0c;实现一个简单的任务清单功能。 项目介绍 ToDoList是一个用于记录和管理任务的小程序。用户可以添加、编辑、完成和删除任务&a…

12.redis 持久化

redis 持久化 redis 持久化redis持久化策略RDB > Redis DataBase 定期备份rdb 文件处理rdb 优缺点 AOF > Append Only File 实时备份AOF 工作流程AOF 缓冲区刷新策略AOF 重写机制AOF 重写流程 混合持久化持久化流程总结 redis 持久化 redis 是一个内存数据库&#xff0c…

JVM学习(四)--内存问题分析思路

linux获取jvm当前dump文件 命令行为&#xff1a;jmap -dump:file[文件名] [pid] 然后等待生成dump文件&#xff0c;生成的dump文件就在当前目录下。如下图&#xff1a; 然后就可以下载到本地&#xff0c;用本地jdk里自带的jvisualvm来解析文件。 在用本地的jvisualvm解析之前…

Elasticsearch 优化

Elasticsearch 优化 2.1硬件选择 Elasticsearch 的基础是 Lucene &#xff0c;所有的索引和文档数据是存储在本地的磁盘中&#xff0c;具体的 路径可在 ES 的配置文件 ../config/elasticsearch.yml 中配置&#xff0c;如下&#xff1a; #----------------------------…

QT Day2!!1.登录跳转界面 2.枚举类型 3.左值与右值4.面试问题

1.作业登录跳转界面 //form.h #ifndef FORM_H #define FORM_H#include <QWidget>namespace Ui { class Form; }class Form : public QWidget {Q_OBJECTpublic:explicit Form(QWidget *parent nullptr);~Form();public slots:void jump_slot();private:Ui::Form *ui; };…

使用Python爬虫采集网络热点

在当今信息爆炸的时代&#xff0c;了解网络热搜词和热点事件对于我们保持时事敏感性和把握舆论动向非常重要。在本文中&#xff0c;我将与你分享使用Python爬虫采集网络热搜词和热点事件的方法&#xff0c;帮助你及时获取热门话题和热点新闻。 1. 网络热搜词采集 网络热搜词是人…