玩转数据-大数据-Flink SQL 中的时间属性

一、说明

时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。

二、处理时间

2.1、准备WaterSensor类,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

执行结果:
在这里插入图片描述

2.3、创建数据文件sensor.txt 数据,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在创建表的 DDL 中定义

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

运行结果:
在这里插入图片描述

三、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

3.1、DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

运行结果:
在这里插入图片描述

3.2、使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

运行结果:
在这里插入图片描述
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/145743.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决前端二进制流下载的文件(例如:excel)打不开的问题

1. 现在后端请求数据后&#xff0c;返回了一个二进制的数据&#xff0c;我们要把它下载下来。 这是响应的数据&#xff1a; 2. 这是调用接口的地方&#xff1a; uploadOk(){if(this.files.length 0){return this.$Message.warning("请选择上传文件&#xff01;&#xff…

小白vite+vue3搭建项目整个流程

第一步 查看npm 版本npm -v&#xff0c;npm版本是7&#xff0c;创建项目命令&#xff1a; npm create vitelatest threejsVue -- --template vue第二步 // 进入项目名为threejsVue的项目命令 cd threejsVue // 安装路由 npm install vue-router4 // 安装css npm install -D s…

十、空闲任务及其钩子函数

1、空闲任务的介绍 (1)一个良好的程序&#xff0c;它的任务都是事件驱动的&#xff1a;平时大部分时间处于阻塞状态。 (2)有可能我们自己创建的所有任务都无法执行&#xff0c;但是调度器必须能找到一个可以运行的任务。所以&#xff0c;我们要提供空闲任务。 (3)在使用vTas…

Python日期的加减等操作

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 日期输出格式化 所有日期、时间的api都在datetime模块内。 &#x1f447; &#x1f447; &#x1f447; 更多精彩机密、教程&#xff0c;尽在下方&#xff0c;赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都…

揭秘:机构招生电子传单制作的五个黄金法则

机构招生微传单制作一直都是让很多人在意的事情。一款好的微传单不仅可以吸引更多的学生&#xff0c;还可以省去很多招生工作的时间和精力。但是&#xff0c;很多人却不知道如何制作一款精美的微传单。下面就让我们来学习一下如何制作一款机构招生的微传单吧。 首先&#xff0c…

亲和力的作用,以及提高亲和力的六个办法

亲和力指的是容易使人亲近&#xff0c;当人身处集体中&#xff0c;亲和力也即是影响力和凝聚力的体现。通常是在职场中会明确对亲和力的考评&#xff0c;尤其是某些管理型岗位&#xff0c;所以HR人力资源管理中对亲和力有详细的评级标准。不过这里小猫测试网不详细讨论亲和力的…

敏感性分析一览

敏感性分析 SobolMorrisFourier Amplitude Sensitivity Test (FAST)Random Balance Designs - Fourier Amplitude Sensitivity Test (RBD-FAST)Delta Moment-Independent MeasureDerivative-based Global Sensitivity Measure (DGSM)Fractional Factorial Sensitivity Analysis…

服务器数据恢复-zfs下raidz多块磁盘离线导致服务器崩溃的数据恢复案例

服务器数据恢复环境&#xff1a; 一台服务器共配备32块硬盘&#xff0c;组建了4组RAIDZ&#xff0c;Windows操作系统zfs文件系统。 服务器故障&#xff1a; 服务器在运行过程中突然崩溃&#xff0c;经过初步检测检测没有发现服务器存在物理故障&#xff0c;重启服务器后故障依…

YOLOv7改进:结合CotNet Transformer结构

1.简介 京东AI研究院提出的一种新的注意力结构。将CoT Block代替了ResNet结构中的3x3卷积&#xff0c;在分类检测分割等任务效果都出类拔萃 论文&#xff1a;Contextual Transformer Networks for Visual Recognition 论文地址&#xff1a;https://arxiv.org/abs/2107.12292 有…

技术Leader对下管理的法宝-SMART

SMART方法论 源于国外管理大师的《管理的实践》是管理者能够更加明确员工高效工作的利器&#xff0c;科学、规范的对员工绩效制定考核目标和考核标准5个单词缩写 Specific:目标要具体Measurable:目标成果要可衡量(量化) Attainable:目标要可实现&#xff0c;避免过高/过低Rel…

模块化CSS

1、什么是模块化CSS 模块化CSS是一种将CSS样式表的规则和样式定义封装到模块或组件级别的方法&#xff0c;以便于更好地管理、维护和组织样式代码。这种方法通过将样式与特定的HTML元素或组件相关联&#xff0c;提供了一种更具可维护性、可复用性和隔离性的方式来处理样式。简单…

SpringBoot读取配置的方式

在 Spring Boot 应用中&#xff0c;我们通常需要一些配置信息来指导应用的运行。这些配置信息可以包括如下内容&#xff1a;端口号、数据库连接信息、日志配置、缓存配置、认证配置、等等。Spring Boot 提供了多种方式来读取这些配置信息。读取配置的目的是为了在程序中使用这些…

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— Web APIs(三)

思维导图 全选案例 大按钮控制小按钮 小按钮控制大按钮 css伪类选择器checked <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><…

Apache Commons Pool2 池化技术

对象池是一种设计模式&#xff0c;用于管理和重用对象&#xff0c;以提高性能和资源利用率。对象池的概念在许多应用程序中都有广泛应用&#xff0c;特别是在需要频繁创建和销毁对象的情况下&#xff0c;例如数据库连接、线程、HTTP连接等 对象池通过预先创建一组对象并将它们存…

B. Comparison String

题目&#xff1a; 样例&#xff1a; 输入 4 4 <<>> 4 >><< 5 >>>>> 7 <><><><输出 3 3 6 2 思路&#xff1a; 由题意&#xff0c;条件是 又因为要使用尽可能少的数字&#xff0c;这是一道贪心题&#xff0c;所以…

最大内切圆算法计算裂缝宽度

本文这里是对CSDN上另一位博主的代码进行了整理&#xff1a; 基于opencv的裂缝宽度检测算法&#xff08;计算轮廓最大内切圆算法&#xff09; 我觉得这位博主应该是上传了一个代码草稿&#xff0c;我对其进行了重新整理&#xff0c;并添加了详细的注释。 import cv2 import …

ICCV 2023|Occ2Net,一种基于3D 占据估计的有效且稳健的带有遮挡区域的图像匹配方法...

本文为大家介绍一篇入选ICCV 2023的论文&#xff0c;《Occ2Net: Robust Image Matching Based on 3D Occupancy Estimation for Occluded Regions》&#xff0c; 一种基于3D 占据估计的有效且稳健的带有遮挡区域的图像匹配方法。 论文链接&#xff1a;https://arxiv.org/abs/23…

[红明谷CTF 2021]write_shell %09绕过过滤空格 ``执行

目录 1.正常短标签 2.短标签配合内联执行 看看代码 <?php error_reporting(0); highlight_file(__FILE__); function check($input){if(preg_match("/| |_|php|;|~|\\^|\\|eval|{|}/i",$input)){ 过滤了 木马类型的东西// if(preg_match("/| |_||php/&quo…

Java应用生产Full GC或者OOM问题如何定位

1 引言 生产应用服务频繁Full GC却无法释放内存&#xff0c;甚至可能OOM&#xff0c;这种情况很有可能是内存泄露或者堆内存分配不足&#xff0c;此时需要dump堆信息来定位问题&#xff0c;查看是哪些地方内存泄漏。 Dump文件也称为内存转储文件或内存快照文件&#xff0c;是…

【Axure】常见元件、常用交互效果

产品结构图 以微信为例&#xff0c;根据页面层级制作 动态面板 动态面板的作用&#xff1a;动态面板是一个可视区域&#xff0c;如果要把控件放进去&#xff0c;要全部放进去&#xff0c;放多少显示多少 文本框 隐藏边框方法&#xff1a;先拉一个矩形&#xff0c;去掉部分…