大数据-玩转数据-Flink 水印

一、Flink 中的水印

在Flink的流式操作中, 会涉及不同的时间概念:

1.1 处理时间

是指的执行操作的各个设备的时间,对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据,然后下个窗口是10:00am-11:00am, 等等。处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调。他提供了最好的性能和最低的延迟。 但是, 在分布式和异步的环境下,处理时间没有办法保证确定性,容易受到数据传递速度的影响: 事件的延迟和乱序。在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。

1.2 事件时间

是指的这个事件发生的时间。在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在。在事件时间体系中, 时间的进度依赖于数据本身,和任何设备的时间无关。事件时间程序必须制定如何产生Event Time Watermarks(水印) 。假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,这些记录落入该小时。在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。从1.12开始, Flink内部已经把默认的语义改成了事件时间。

1.3 Flink中的WaterMark

支持event time的流式处理框架需要一种能够测量event time 进度的方式。 比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间,从而在程序中去关闭这个窗口。事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中, 即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据,这些数据可以照样被正确处理, 就像实时发生的一样能够进入正确的窗口。这种在Flink中去测量事件时间的进度的机制就是watermark(水印)。

1.4 Flink中如何产生水印

在这里插入图片描述

二、代码集成

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.naming.Context;
import java.time.Duration;public class WatorMark_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}});stream.assignTimestampsAndWatermarks(wms) // 指定水印和时间戳.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {String msg = "当前key: " + key+ "窗口: [" + ctx.window().getStart() / 1000 + "," + ctx.window().getEnd()/1000 + ") 一共有 "+ elements.spliterator().estimateSize() + "条数据 ";out.collect(msg);}}).print();env.execute();}
}

三、测试结果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/115923.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux刻录iso到u盘

需要的工具&#xff1a;Linux系统、U盘、ISO镜像文件。 首先在Linux系统中打开终端&#xff0c;使用dd命令&#xff0c;格式如下&#xff1a; sudo dd ifxxx.iso of/dev/sdb 命令中xxx.iso是你的ISO镜像文件的路径&#xff0c;of后面的你的U盘路径&#xff0c;一般就是/dev/sdb…

Vue在表格中拿到该行信息的方式(作用域插槽-#default-scope-解决按钮与行点击的顺序问题)

遇到的问题 在做表格的时候&#xff0c;表格是封装好了的&#xff0c;用于展示数据。如果想给单行增加按钮&#xff0c;可以单独写一列存放按钮&#xff0c;最基本的需求是&#xff0c;点击按钮后要拿到数据然后发起请求。 且Vue的element-plus&#xff0c;当我们点击按钮之后…

向openssl中添加一个最简单的算法

文章目录 一、尝试在sha.c中添加新的函数二、添加自定义算法2.1 添加对应文件2.2 相关配置2.3 编译运行 一、尝试在sha.c中添加新的函数 在尝试添加新算法前&#xff0c;我先尝试在原有的旧算法中添加一个新函数&#xff0c;看是否能被编译并生成对应的动态链接库。 关于open…

艾昆纬携手亚马逊云科技赋能生物医药企业,加速武田数字化转型

IQVIA艾昆纬是全球以及中国医疗健康服务领域的领导者&#xff0c;利用其数据及分析&#xff0c;前沿数字化技术和医疗领域的专业知识&#xff0c;智能连接医疗生态的各个环节。过去几年&#xff0c;IQVIA艾昆纬所服务的生物医药行业客户武田中国也迎来了数字化转型的高潮。 武田…

iOS开发Swift-6-深色模式,类与对象,MVC模式,弹出框,闭包-趣味问答App

1.创建趣味问答App项目 2.创建一个问题文本&#xff0c;水平居中约束。 创建蓝、红两个按钮&#xff0c;放入Stack View中&#xff0c;给StackView水平居中约束&#xff0c;下边约束&#xff0c;设置两按钮间距为20. 设置进度条view与safe View关系为equal width。设置他们的比…

VScode 调试python程序,debug状态闪断问题的解决方法

0. Few words 之前一直在VSCode中debug C和Python的程序没出过闪断的问题&#xff0c;但是最近在另一台电脑上debug&#xff0c;同样的方法&#xff0c;设置launch.json和CMakeList加debug状态等等操作&#xff0c;如我另一篇blog写的一样&#xff0c;可以点这里查看。 但是&a…

jsch网页版ssh

使用依赖 implementation com.jcraft:jsch:0.1.55Server端代码 import com.jcraft.jsch.Channel; import com.jcraft.jsch.JSch; import com.jcraft.jsch.Session; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.TimeUnit; import o…

前端面试中Vue的有经典面试题一

1. 谈谈你对MVVM开发模式的理解 MVVM分为Model、View、ViewModel三者。 Model&#xff1a;代表数据模型&#xff0c;数据和业务逻辑都在Model层中定义&#xff1b; View&#xff1a;代表UI视图&#xff0c;负责数据的展示&#xff1b; ViewModel&#xff1a;负责监听Model中…

【【萌新的STM32学习-27--USART异步通信配置步骤】】

萌新的STM32学习-27–USART异步通信配置步骤 USART/UART 异步通信配置步骤 1.配置串口工作参数 HAL_UART_Init() 我们会在此处调用MSP中的回调函数 2.串口底层初始化 用户定义HAL_UART_MspInit() 配置GPIO NVIC CLOCK 等 3.开启串口异步接收中断 HAL_UART_Receive_IT() 4.…

题目有点太简单了,不知道怎么选了

有个公司给了下面一个题目&#xff0c;看了下太简单了&#xff0c;都怕选错了。 后来拿着程序跑了下&#xff0c;就是这个意思嘛。 结论 程序跑出来的结果就是对输入的列表进行倒序排列。 public void testGetPut() throws Exception {List<Integer> numbers List.of(…

Shell - 加固系统配置

文章目录 Shell Script其他方案 Shell Script #! /bin/bash # Function:对账户的密码的一些加固 read -p "设置密码最多可多少天不修改&#xff1a;" A read -p "设置密码修改之间最小的天数&#xff1a;" B read -p "设置密码最短的长度&#xff1a…

ElasticSearch学习4--复杂查询

1、查询分类 查询所有&#xff1a;查询出所有数据&#xff0c;一般测试用。例如&#xff1a;match_all全文检索&#xff08;full text&#xff09;查询&#xff1a;利用分词器对用户输入内容分词&#xff0c;然后去倒排索引库中匹配。例如&#xff1a; match_query 根据单个字段…

保姆级 Keras 实现 Faster R-CNN 十一

保姆级 Keras 实现 Faster R-CNN 十一 一 RoI 区域二. 定义 RoiPoolingLyaer1. call 函数2. compute_output_shape 函数 三. 将 RoiPoolingLayer 加入模型 上一篇 文章中我们实现了 ProposalLyaer 层, 它将的功能是输出建议区域矩形. 本文要实现另一个自定义层 RoiPoolingLayer…

网络安全体系架构介绍

网络安全体系是一项复杂的系统工程&#xff0c;需要把安全组织体系、安全技术体系和安全管理体系等手段进行有机融合&#xff0c;构建一体化的整体安全屏障。针对网络安全防护&#xff0c;美国曾提出多个网络安全体系模型和架构&#xff0c;其中比较经典的包括PDRR模型、P2DR模…

git 查看当前分支最近一次提交的commit SHA

获取当前分支最近一次commit SHA &#xff08;长度为40个16进制数字的字符&#xff09;命令如下&#xff1a; git rev-parse HEAD 获取简写&#xff08;短&#xff09; commit SHA git rev-parse --short HEAD

Acwing 1233. 全球变暖 (每日一题)

如果你觉得这篇题解对你有用&#xff0c;可以点个赞或关注再走呗&#xff0c;谢谢你的关注~ 题目描述 你有一张某海域 NN 像素的照片&#xff0c;”.”表示海洋、”#”表示陆地&#xff0c;如下所示&#xff1a; … .##… .##… …##. …####. …###. … 其中”上下左右”…

时序预测 | MATLAB实现TCN-BiGRU时间卷积双向门控循环单元时间序列预测

时序预测 | MATLAB实现TCN-BiGRU时间卷积双向门控循环单元时间序列预测 目录 时序预测 | MATLAB实现TCN-BiGRU时间卷积双向门控循环单元时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现TCN-BiGRU时间卷积双向门控循环单元时间序列预测&a…

SQLAlchemy 封装的工具类,数据库pgsql(数据库连接池)

1.SQLAlchemy是什么&#xff1f; SQLAlchemy 是 Python 著名的 ORM 工具包。通过 ORM&#xff0c;开发者可以用面向对象的方式来操作数据库&#xff0c;不再需要编写 SQL 语句。 SQLAlchemy 支持多种数据库&#xff0c;除 sqlite 外&#xff0c;其它数据库需要安装第三方驱动。…

element中Notification组件(this.$notify)自定义样式

1、自定义样式效果 2、vue代码 this.notifications this.$notify({title: ,dangerouslyUseHTMLString: true,duration: obj.remindMethod3 ? 0:4500,customClass: notify-warning,offset: 50,showClose: false,message: this.$createElement("div",null,[this.$…

jvm-堆

1.堆的核心概念 一个jvm实例只存在一个堆内存&#xff0c;堆也是java内存管理核心区域 java堆区在jvm启动的时候即被创建&#xff0c;其空间大小就确定了&#xff0c;是jvm管理最大的一块内存空间&#xff1b; 堆可以处于物理上不连续的内存空间&#xff0c;但在逻辑上它应该被…