【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性

Flink SQL 基础概念》系列,共包含以下 5 篇文章:

  • Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
  • Flink SQL 基础概念(二):数据类型
  • Flink SQL 基础概念(三):SQL 动态表 & 连续查询
  • Flink SQL 基础概念(四):SQL 的时间属性
  • Flink SQL 基础概念(五):SQL 时区问题

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 基础概念(四):SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {@Overridepublic long extractTimestamp(Row element) {return (long) element.getField("f2");}});// 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource());// 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/278170.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

操作系统(AndroidIOS)图像绘图的基本原理

屏幕显示图像的过程 我们知道&#xff0c;屏幕是由一个个物理显示单元组成&#xff0c;每一个单元我们可以称之为一个物理像素点&#xff0c;而每一个像素点可以发出多种颜色。 而图像&#xff0c;就是在不同的物理像素点上显示不同的颜色构成的。 像素点的颜色 像素的颜色是…

【PyTorch】成功解决TypeError: iteration over a 0-d tensor

【PyTorch】成功解决TypeError: iteration over a 0-d tensor &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到您…

ssh 下连接Mysql 查看数据库数据表的内容的方法及步骤

要通过SSH连接到MySQL数据库&#xff0c;可以按照以下步骤进行操作&#xff1a; 在本地计算机上打开终端或命令提示符。 使用SSH命令连接到远程服务器。命令的格式如下&#xff1a; ssh usernameserver_ip其中&#xff0c;username是指在远程服务器上的用户名&#xff0c;serv…

Linux 块设备驱动

Linux 三大驱动分别是&#xff1a;字符设备驱动、块设备驱动、网络设备驱动。 块设备是针对存储设备的&#xff0c;比如 SD 卡、EMMC、NAND Flash、Nor Flash、SPI Flash、机械硬盘、固态硬盘等。因此块设备驱动其实就是这些存储设备驱动&#xff0c;块设备驱动相比字符设备驱…

jetson nano——编译一些包的网址导航,pyside2,qt(持续更新)

目录 1.PySide2下载地址2.tesserocr下载地址3.Qt下载地址4.OpenSSL官网5.latex编译器下载地址5.1MikTex5.2TeX Live 1.PySide2下载地址 https://download.qt.io/official_releases/QtForPython/pyside2/ 如下图&#xff1a; 2.tesserocr下载地址 https://github.com/simonflue…

ToolPlatform烧录HI3403实战

既然是嵌入式&#xff0c;烧录是逃不掉的。 连接串口&#xff01;必须 主机有串口&#xff0c;或者用USB转接。 软件 01.software\pc\ToolPlatform 启动 其实只有这一个选项 BurnTool面板&#xff1a; 选择配置 选择烧写eMMC&#xff0c;再点击游览&#xff0c;选择xml…

NetSuite多脚本性能研究

在项目中&#xff0c;随着复杂度的提升&#xff0c;客制脚本以及各类SuiteAPP的应用&#xff0c;导致某个对象上挂载的脚本大量增加&#xff0c;最终导致了性能问题。表现在保存单据时时间过长&#xff0c;严重影响人机界面的用户感受。基于此问题&#xff0c;我们开展了NetSui…

大语言模型RAG-langchain models (二)

大语言模型RAG-langchain models (二) 往期文章&#xff1a;大语言模型RAG-技术概览 (一) 文章目录 大语言模型RAG-langchain models (二)**往期文章&#xff1a;[大语言模型RAG-技术概览 (一)](https://blog.csdn.net/tangbiubiu/article/details/136651625)**核心模块总览Mod…

《硬件历险》之Mac抢救出现问题的时间机器硬盘中的数据

本文虽然使用“抢救”一词&#xff0c;但是运气比较好&#xff0c;远没有达到访问和修改底层的信息来抢救的地步。如果你是需要通过访问和修改底层信息来抢救数据&#xff0c;建议阅读刘伟的《数据恢复技术深度揭秘&#xff08;第二版&#xff09;》或者寻找专业人士的帮助。 《…

关于 NXP PCA85073A 实时时钟读取数据时出现 IIC 传输失败的原因解析和解决方法

一、前言 对使用 I2C 传输的 RTC 外设 PCA85073&#xff0c;在 I2C 传输过程中若有复位信号输入&#xff0c;则有概率出现 I2C 死锁的状态&#xff0c;即 SCL为高&#xff0c;SDA一直为低的现象。 二、I2C 基本协议 在分析问题出现的原因之前&#xff0c;我…

es索引操作命令

索引操作 index 创建索引 put 方法创建索引 使用 put 创建索引时必须指明文档id&#xff0c;否则报错 # PUT 创建命令 # test1 索引名称 # type1 类型名称&#xff0c;默认为_doc&#xff0c;已经被废弃 # 1 文档id PUT /test1/type1/1 {"name":"zhangsan&…

【体验有奖】用 AI 画春天,函数计算搭建 Stable Diffusion WebUI

人工智能生成内容 AIGC&#xff08;Artificial Intelligence Generated Content&#xff09;是当下备受关注的概念之一&#xff0c;是继 PGC 和 UGC 之后的新型生产方式。AIGC 技术的核心思想是利用人工智能算法生成具有一定创意和质量的内容。例如&#xff0c;根据用户的描述或…

YOLOv9详解

1.概述 在逐层进行特征提取和空间转换的过程中&#xff0c;会损失大量信息&#xff0c;例如图中的马在建模过程中逐渐变得模糊&#xff0c;从而影响到最终的性能。YOLOv9尝试使用可编程梯度信息PGI解决这一问题。 具体来说&#xff0c; PGI包含三个部分&#xff0c;&#xff0…

AJAX 02 案例、Bootstrap框架

AJAX 学习 AJAX 2 综合案例黑马 API01 图书管理Bootstrap 官网Bootstrap 弹框图书管理-渲染列表图书管理-添加图书图书管理-删除图书图书管理 - 编辑图书 02 图片上传03 更换图片04 个人信息设置信息渲染头像修改补充知识点&#xff1a;label扩大表单的范围 AJAX 2 综合案例 黑…

【鸿蒙HarmonyOS开发笔记】自定义组件详解

自定义组件 除去系统预置的组件外&#xff0c;ArkTS 还支持自定义组件。使用自定义组件&#xff0c;可使代码的结构更加清晰&#xff0c;并且能提高代码的复用性。 我们开发的每个页面其实都可以视为自定义组件内置组件的结合 语法说明 自定义组件的语法如下图所示 各部分…

深度序列模型与自然语言处理:基于TensorFlow2实践

目录 写在前面 推荐图书 编辑推荐 内容简介 作者简介 推荐理由 写在最后 写在前面 本期博主给大家推荐一本深度学习的好书&#xff0c;对Python深度学习感兴趣的小伙伴快来看看吧&#xff01; 推荐图书 《深度序列模型与自然语言处理 基于TensorFlow2实践》 直达链接…

基于centos7的k8s最新版v1.29.2安装教程

k8s概述 Kubernetes 是一个可移植、可扩展的开源平台&#xff0c;用于管理容器化的工作负载和服务&#xff0c;可促进声明式配置和自动化。 Kubernetes 拥有一个庞大且快速增长的生态&#xff0c;其服务、支持和工具的使用范围相当广泛。 Kubernetes 这个名字源于希腊语&…

AI系统性学习01- Prompt Engineering

文章目录 面向开发者的Prompt Engineering一、简介二、Prompt设计原则1 环境配置2.两个基本原则2.1 原则1&#xff1a;编写清晰、具体的指令2.1.1 策略一&#xff1a;分割2.1.2 策略2&#xff1a;结构化输出2.1.3 策略3&#xff1a;模型检测2.1.4 策略4&#xff1a;提供示例 2.…

css设置选中文字和选中图片字的颜色

要改变页面中选中文字的颜色&#xff0c;可以使用 CSS 的 ::selection 伪元素来实现 *::selection {/* 改变选中文字的背景色 */background-color: #c42121;/* 改变选中文字的文本颜色 */color: #fff; } 用通配符选择器给所有元素都加上了 ::selection伪元素&#xff0c;用于…

鸿蒙开发之MPChart图表开发

一、简介 随着移动应用的不断发展,数据可视化成为提高用户体验和数据交流的重要手段之一,因此需要经常使用图表,如折线图、柱形图等。OpenHarmony提供了一个强大而灵活的图表库是实现这一目标的关键。 在 ohpm 中心仓(https://ohpm.openharmony.cn/)中,汇聚了众多开发者…