Flink -- 事件时间 Watermark

1、事件时间:

        指的是数据产生的时间或是说是数据发生的时间。

在Flink中有三种时间分别是:

                Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间

                Infestion Time:事件接收时间

                Processing Time:事件处理时间

为什么会提出事件时间这个概念?

        因为当使用Processing Time(事件的处理时间)来对数据进行处理,此时数据可能会乱序,没有办法还原数据本身的时间顺序,这种情况在Flink中会可能导致数据丢失,如果使用事件时间,它会根据事件真实发生的时间对数据排序,就不会出现数据乱序的情况。

总结来说,数据产生的时间就是事件时间,现实中实时的时间就是事件的处理时间

2、Processing Time 事件处理时间

处理时间是接收数据过后对数据操作的时间。处理时间的会按照实时的时间触发。

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
3、事件时间:

数据产生的时间就是事件时间,不过在使用的时候使用的是时间戳。需要注意的是数据的时间与现实的时间是不一致的。

在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)

java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000例如上述:数据总共有两个部分组成,前面是单词,后面的是单词数据产生的时间戳


public class Demo04EventTime {public static void main(String[] args) throws Exception{/*** 需求:统计5秒内的单词的数量,使用的是事件时间滚动窗口* 触发的条件是事件时间5秒*///构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//需要并行度改成一env.setParallelism(1);//使用socket模拟实时的环境DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);/*** java,1699035731000* java,1699035732000* java,1699035735000* java,1699035733000* java,1699035736000* java,1699035737000* java,1699035740000*///此时的数据的格式并不是某一个单词,需要告诉flink哪一个是事件时间//首先对数据进行格式处理SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {String[] split = line.split(",");String word = split[0];
//            String time = split[1];long time1 = Long.parseLong(split[1]);return Tuple2.of(word, time1);}, Types.TUPLE(Types.STRING, Types.LONG));//告诉Flink,哪一个是事件的时间SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定事件时间.withTimestampAssigner((kv, ts) -> kv.f1));//统计5秒钟的单词的数量DataStream<Tuple2<String, Integer>>keyByDS = assDS.map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);//开窗WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));//对单词的数量进行统计SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);//打印数据countDS.print();//执行Flink的环境env.execute();}
}
        1、基于事件时间来说,触发窗口的条件:
    1、水位线需要大于等于窗口的结束时间2、窗口里面要存在数据3、窗口的划分时间是从1970年1月1日0时0分0秒开始的,按照窗口的大小轮替
4、水位线(watermark):默认是等于最新的一条数据的时间戳

5、在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
        解决方法:将水位线向后推移

假设一个时间窗口是5秒,如果将此时的水位线向后推移5秒,假设4进入的时候,此时的水位线就变成-3,但是此时就不满足触发窗口的条件,此时假设遗漏的数据是3,此时的水位线依旧是小于窗口的时间,依旧不会触发窗口。

但是不能完全的保证数据不丢失,推移的时间越久,对于Flink的延迟就会越大。

        1、在Flink中是默认使用的是单调递增的时间戳分配器:在没有乱序情况下,默认水位线是等于最新的一条数据的时间戳
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、指定水位线等于时间最新一条数据的时间戳,数据不存在乱序的时候使用,如果数据乱序,可能会丢失数据.<Tuple2<String, Long>>forMonotonousTimestamps()//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
        2、数据之间存在最大固定延迟的时间戳分配器:在乱序的情况下,就水位线先后推移固定的时间(是以最新的一条数据的时间戳为标准的)
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、水位线生成方式:最新一条数据的时间戳减去5秒,会导致计算延迟触发.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
6、水位线的生成:

上图表示的是以Flink的流程图,图中总共有两个并行度,每一个Task上面都带着任务的时间,在Flink中,会将任务的时间向后传递,当途中上游map(1)将任务时间传递给下游window(1)时,下面的上游map(2)也会任务时间传递给下游window(1)(上游的任务是并行的),此时下游window(1)就会产生两个任务时间,此时就会选择时间最小的时间的作为水位线。因为当选择时间大的作为水位线,那么对于时间较小的数据可能会丢失。

        1、水位线对齐:

                因为上游的任务是并行执行的,指的时对于上游的所有的Task的水位线都需要逐步的向后推移。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/186544.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

远程运维如何更高效的远程管理?向日葵的这几项功能会帮到你

具备一定规模的企业&#xff0c;其IT运维需求普遍会面临设备数量众多、难以统一高效管理、始终存在安全敞口等问题&#xff0c;尤其是针对分部广泛的无人值守设备时&#xff0c;更是如此。 举一个简单的例子&#xff0c;一台位于商圈的无人值守可互动广告机设备&#xff0c;所…

【MySQL习题】各个视频的平均完播率【全网最详细教学】

目录 数据表描述 问题描述 输出示例 解题思路【重点】 正解代码 数据表描述 有以下两张表&#xff1a; 表1&#xff1a;用户-视频互动表tb_user_video_log 数据举例&#xff1a; 说明&#xff1a; uid-用户ID,video_id-视频ID start_time-开始观看时间end_time-结束观…

【Python3】【力扣题】258. 各位相加

【力扣题】题目描述&#xff1a; 【Python3】代码&#xff1a; 1、解题思路&#xff1a;将整数转为字符串&#xff0c;遍历字符串中的数字&#xff0c;求和。 知识点&#xff1a;str(...)&#xff1a;转为字符串。为了遍历每个数字。 int(...)&#xff1a;转为整数。为了数字…

三菱FX3U系列-定位指令

目录 一、简介 二、指令形式 1、相对定位[DRVI、DDRVI] 2、绝对定位[DRVA、DDRVA] 三、总结 一、简介 定位指令用于控制伺服电机或步进电机的位置移动。可以通过改变脉冲频率和脉冲数量来控制电机的移动速度和移动距离&#xff0c;同时还可以指定移动的方向。 二、指令形…

带你一分钟看懂 “kubernetes”

目录 什么是 Kubernetes Kubernetes 概述 为什么需要 Kubernetes&#xff0c;它能做什么&#xff1f; 什么是 Kubernetes 从官方网站上可以看到&#xff0c;它是一个工业级的容器编排平台。Kubernetes 这个单词是希腊语&#xff0c;它的中文翻译是“舵手”或者“飞行员”。在…

互联网金融P2P主业务场景自动化测试

互联网金融P2P行业&#xff0c;近三年来发展迅速&#xff0c;如火如荼。 据不完全统计&#xff0c;全国有3000的企业。 “互联网”企业&#xff0c;几乎每天都会碰到一些奇奇怪怪的bug&#xff0c;作为在互联网企业工作的测试人员&#xff0c;风险和压力都巨大。那么我们如何降…

python用tkinter随机数猜数字大小

python用tkinter随机数猜数字大小 没事做&#xff0c;看到好多人用scratch做的猜大小的示例&#xff0c;也用python的tkinter搞一个猜大小的代码玩玩。 猜数字代码 from tkinter import * from random import randint# 定义确定按钮的点击事件 def hit(x,y):global s_Labprint(…

CCF ChinaSoft 2023 论坛巡礼 | 云计算标准化论坛

2023年CCF中国软件大会&#xff08;CCF ChinaSoft 2023&#xff09;由CCF主办&#xff0c;CCF系统软件专委会、形式化方法专委会、软件工程专委会以及复旦大学联合承办&#xff0c;将于2023年12月1-3日在上海国际会议中心举行。 本次大会主题是“智能化软件创新推动数字经济与社…

DHorse(K8S的CICD平台)的实现原理

综述 首先&#xff0c;本篇文章所介绍的内容&#xff0c;已经有完整的实现&#xff0c;可以参考这里。 在微服务、DevOps和云平台流行的当下&#xff0c;使用一个高效的持续集成工具也是一个非常重要的事情。虽然市面上目前已经存在了比较成熟的自动化构建工具&#xff0c;比如…

this.$message提示内容添加换行

0 效果 1 代码 let msgArr [只允许上传doc/docx/xls/xlsx/pdf/png/jpg/bmp/ppt/pptx/rar/zip格式文件,且单个文件大小不能超过20MB,已过滤无效的文件] let msg msgArr.join(<br/>) this.$message({dangerouslyUseHTMLString: true,message: msg,type: warning })

linux系统,确认账户密码正确

linux系统&#xff0c;确认账户密码正确 1、问题背景2、解决方法 1、问题背景 有时在linux系统安装软件时&#xff0c;有的软件可能会在安装过程中创建系统用户&#xff0c;同时会给出这个用户的密码。过了一段时间我们不确定这个密码是否还正确&#xff0c;那怎么确认这个密码…

适用于 iOS 的 10 个最佳数据恢复工具分享

在当今的数字时代&#xff0c;我们的移动设备占据了我们生活的很大一部分。从令人难忘的照片和视频到重要的文档和消息&#xff0c;我们的 iOS 设备存储了大量我们无法承受丢失的数据。然而&#xff0c;事故时有发生&#xff0c;无论是由于软件故障、无意删除&#xff0c;甚至是…

centos 7.9系统安装老版本jenkins,并解决插件问题

1.初衷 因为jenkins随着时间推移&#xff0c;其版本也越来越新&#xff0c;支持它运行的JDK也越来越新。基于不折腾的目标&#xff0c;我们安装一个老的固定版本就行。以前安装新版本&#xff0c;经常碰到的问题就是插件安装不兼容的问题。现在这个问题&#xff0c;可以把以前…

CentOS7安装部署StarRocks

文章目录 CentOS7安装部署StarRocks一、前言1.简介2.环境 二、正文1.StarRocks基础1&#xff09;架构图2&#xff09;通讯端口 2.部署服务器3.安装基础环境1&#xff09;安装JDK 112&#xff09;修改机器名3&#xff09;安装GCC4&#xff09;关闭交换分区&#xff08;swap&…

【机器学习】给大家推荐几个资源

我写博客的目的就是让大家了解人工智能背后的数学原理&#xff0c;但人工智能这个话题太大了&#xff0c;背后涉及到的知识非常庞大&#xff0c;仅靠写几篇文章传播力度有限&#xff0c;况且知识传播过程中也容易引入误解&#xff0c;所以授之以鱼不如授之以渔&#xff0c;这里…

Facebook广告被暂停是什么原因?广告账号被封怎么办?

许多做海外广告投放的小伙伴经常遇到一个难题&#xff0c;那就是投放的Facebook广告被拒或广告帐户被关闭赞停的经历&#xff0c;随之而来的更可能是广告账户被封&#xff0c;导致资金的损失。本文将从我自身经验&#xff0c;为大家分享&#xff0c;FB广告被暂停的原因有哪些&a…

什么是伺服电机?Parker派克伺服电机盘点

一、什么是伺服电机&#xff1f; 要准确地定义伺服电机&#xff0c;我们首先需理解其核心特性&#xff1a;反馈与闭环控制。伺服电机凭借这些特性&#xff0c;能精确控制扭矩、速度或位置&#xff0c;即使在零速度下&#xff0c;也能保持足够的扭矩以锁定负载。 伺服电机与其…

Elastic Stack 8.11:引入一种新的强大查询语言 ES|QL

作者&#xff1a;Tyler Perkins, Ninoslav Miskovic, Gilad Gal, Teresa Soler, Shani Sagiv, Jason Burns Elastic Stack 8.11 引入了数据流生命周期、一种配置数据流保留和降采样&#xff08;downsampling&#xff09; 的简单方法&#xff08;技术预览版&#xff09;&#xf…

电梯用电量-第10届蓝桥杯国赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第8讲。 电梯用电量&#x…

学者观察 | 数字经济中长期发展中的区块链影响力——清华大学柴跃廷

导语 区块链是一种全新的分布式基础架构与计算范式&#xff0c;既能利用非对称加密和冗余分布存储实现信息不可篡改&#xff0c;又可以利用链式数据结构实现数据信息可溯源。当前&#xff0c;区块链技术已成为全球数据交易、金融结算、国际贸易、政务民生等领域的信息基础设施…