【SQL篇】一、Flink动态表与流的关系以及DDL语法

文章目录

  • 1、启动SQL客户端
  • 2、SQL客户端常用配置
  • 3、动态表和持续查询
  • 4、将流转为动态表
  • 5、用SQL持续查询
  • 6、动态表转为流
  • 7、时间属性
  • 8、DDL-数据库相关
  • 9、DDL-表相关

在这里插入图片描述

1、启动SQL客户端

启动Flink(基于yarn-session模式为例):

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

再启动sql的客户端:

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session

简单看下:

show databases;

2、SQL客户端常用配置

设置结果的显示模式,默认table,还可以设置为tableau、changelog

SET sql-client.execution.result-mode=changelog;

设置执行环境,默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度:

SET parallelism.default=1;

设置状态TTL:

SET table.exec.state.ttl=1000;

通过SQL文件初始化,可以发现,exit退出客户端时,刚创建的库表都被清空了,这个SQL初始化文件就是在启动客户端时你想执行的SQL语句

# 创建SQL文件vim conf/sql-client-init.sqlSET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;
# 启动时,-i指定SQL文件/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql

3、动态表和持续查询

和MySQL等关系型表不同的是,无限流下,会有源源不断的数据过来进入表中,即动态表,来一条数据,往表中插入一条数据。对应的,想获取最新结果就要写条SQL去不间断的查询,即持续查询(每次数据到来都会触发查询操作),持续查询的结果也是一个动态表。

关系型表流处理的动态表
处理的数据对象字段元组的有界集合字段元组的无限序列
查询时对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
查询终止条件生成固定大小的结果集后终止永不停止, 根据持续收到的数据不断更新查询结果

在这里插入图片描述

如图,持续查询的流程为:

  • 流(stream)被转换为动态表(dynamic table)
  • 对动态表进行持续查询(continuous query),生成新的动态表
  • 生成的动态表被转换成流

如此,就通过执行SQL实现了对数据流的处理。

4、将流转为动态表

把流看作一张表,来一条数据,insert一次,比如有个记录用户点击事件的无限流:

在这里插入图片描述

5、用SQL持续查询

代码中定义一条查询SQL:

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

此时,结果表(动态),可能是简单的insert,如Bob这条数据,也可能是对旧数据的更新update,如Alice,这就是更新查询。 此时,结果表转DataStream调用toChangelogStream()方法。
在这里插入图片描述

修改查询SQL,使用TUMBLE加一个开窗,每个窗口触发时,输出结果,此时对结果表就只有insert追加数据,没有update,即追加查询。

在这里插入图片描述

6、动态表转为流

仅追加(Append-only)流

动态表仅仅通过insert来修改,转为流时,对应一个仅追加的流,流中的每条数据,就是动态表的每行数据。

撤回(Retract)流

流中有添加消息add和撤回消息retract两种,对应表中:

  • insert就是add消息
  • delete就是retract
  • update就是被改行的retract+新结果的add

在这里插入图片描述

更新插入(Upsert)流

流中有更新插入消息upsert和删除消息delete两种,对应表中:

  • update和insert是upsert消息
  • delete为delete消息

在这里插入图片描述

最后,注意,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流两种。

7、时间属性

在表中加个时间字段,数据类型为TimeStamp,分为事件时间和处理时间。事件时间通过watermark语句来定义:

CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...
);# TIMESTAMP后的3为精确度

以上,ts字段为事件时间属性,且基于ts设置5s的水位线延迟,注意,延迟秒数5必须加单引号。时间戳类型需要转为秒或者毫秒时,可:

# ...
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
# ...
3即精确到毫秒

定义处理时间属性用procTime函数:

CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()
) WITH (...
);

8、DDL-数据库相关

建库:

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1=val1, key2=val2, ...)

查询所有库:

SHOW DATABASES

查当前库:

SHOW CURRENT DATABASE

修改库的某些属性:

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

删库:

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

注意,

  • RESTRICT:删除非空数据库会触发异常。默认启用
  • CASCADE:删除非空数据库也会删除所有相关的表和函数,慎用

切换当前库:

USE database_name;

9、DDL-表相关

建表:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name# 字段({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]# 定义watermark[ <watermark_definition> ][ <table_constraint> ][ , ...n])   # 注释  [COMMENT table_comment]# 分区[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]# Flink特色WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] | AS select_query ]

关于表中的字段:physical_column就是常规列。metadata_column是元数据列,可访问到数据源本身的一些元数据,必须加METADATA关键字标识,如:读取数据写入Kafka时,Kafka引擎给数据打上的时间戳标记:

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  # !!!
) WITH ('connector' = 'kafka'...
);

自定义的列名称和 Connector 中定义 metadata 字段的名称一样时,后面的FROM省略:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA  # !!!
) WITH (
'connector' = 'kafka'
...
);

自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致时,会自动强转,因此这两个类型必须可以强转:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

默认metadata_column列可读可写,加VIRTUAL表示只读:

CREATE TABLE MyTable (`timestamp` BIGINT METADATA, `offset` BIGINT METADATA VIRTUAL,  # !!!!`user_id` BIGINT,`name` STRING,
) WITH ('connector' = 'kafka'...
);

computed_column即计算列,把几列的计算结果做为新列,这在关系型SQL中一般在查询语句中完成,而不存成一个新列。

CREATE TABLE MyTable (`user_id` BIGINT,`price` DOUBLE,`quantity` DOUBLE,`cost` AS price * quanitity   # !!!
) WITH ('connector' = 'kafka'...
);

主键的定义,只支持 not enforced:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced   # !!!
) WITH (
'connector' = 'kafka'
...
);

with子句,用于指定这个表相关的外部系统的相关配置,如Kafka:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

like子句,即在现有表的基础上,创建另一种表:

CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset'
);CREATE TABLE Orders_with_watermark (-- Add watermark definitionWATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (-- Overwrite the startup-mode'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;  # !!!!

举例:新表中的value字段加偏引号是因为value和关键字冲突了

CREATE TABLE test(id INT, ts BIGINT, vc INT
) WITH (
'connector' = 'print'
);CREATE TABLE test1 (`value` STRING
)
LIKE test;

create-table-as-select,即CTAS语句,通过查询结果创建表:

CREATE TABLE my_ctas_table
WITH ('connector' = 'kafka',...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;# 注意此时不能自己来定义列

查所有表:

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

查某张表信息:

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

修改表名:

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性:

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

删表:

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/182931.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java_类和对象详解

文章目录 前言简单认识类类定义和使用类的实例化引用的一些注意事项 类和对象的说明及关系this引用为什么要有this引用this应用this特性 构造方法构造特性及应用用this简化用idea编译器快捷创建构造 封装封装的概念访问限定符 封装的扩展-包包的概念导入包中的类自定义包常见的…

Skywalking介绍

一个优秀的项目&#xff0c;除了具有高拓展的架构、高性能的方案、高质量的代码之外&#xff0c;还应该在上线后具备多角度的监控功能。现在企业中的监控服务也有很多&#xff0c;Skywalking除了提供多维度、多粒度的监控之外&#xff0c;也提供了良好的图形化界面以及性能剖析…

LeetCode 面试题 16.17. 连续数列

文章目录 一、题目二、C# 题解 一、题目 给定一个整数数组&#xff0c;找出总和最大的连续数列&#xff0c;并返回总和。 示例&#xff1a; 输入&#xff1a; [-2,1,-3,4,-1,2,1,-5,4] 输出&#xff1a; 6 解释&#xff1a; 连续子数组 [4,-1,2,1] 的和最大&#xff0c;为 6。…

DDD学习笔记

1)ddd&#xff1a; 软件复杂性的应对之道。 但是不是说&#xff1a;redis这种不会使用。 开发过程中&#xff0c;一直面临的一种复杂性。 是一种架构思想: 领域之间的组合。 让开发软件具有搭积木的感觉。 领域的核心是边界。 以领域划分为基础。 以通用语言为建设…

持续集成交付CICD:安装Jenkins Slave(从节点)

目录 一、实验 1.安装Jenkins Slave&#xff08;从节点&#xff09; 二、问题 1.salve节点启动jenkins报错 2.终止命令行后jenkins从节点状态不在线 一、实验 1.安装Jenkins Slave&#xff08;从节点&#xff09; &#xff08;1&#xff09;查看jenkins版本 Version 2.…

c语言进阶部分详解(《高质量C-C++编程》经典例题讲解及柔性数组)

上篇文章我介绍了介绍动态内存管理 的相关内容&#xff1a;c语言进阶部分详解&#xff08;详细解析动态内存管理&#xff09;-CSDN博客 各种源码大家可以去我的github主页进行查找&#xff1a;唔姆/比特学习过程2 (gitee.com) 今天便接“上回书所言”&#xff0c;来介绍《高质…

ElasticSearch 实现 全文检索 支持(PDF、TXT、Word、HTML等文件)通过 ingest-attachment 插件实现 文档的检索

一、Attachment 介绍 Attachment 插件是 Elasticsearch 中的一种插件&#xff0c;允许将各种二进制文件&#xff08;如PDF、Word文档等&#xff09;以及它们的内容索引到 Elasticsearch 中。插件使用 Apache Tika 库来解析和提取二进制文件的内容。通过使用 Attachment 插件&a…

Qt的事件

一、鼠标按下事件 //鼠标按下事件&#xff0c;获取屏幕位置&#xff0c;并显示&#xff0c;移动显示框 void Widget::mousePressEvent(QMouseEvent *event) {if(event->button() ! Qt::LeftButton){return ;}QPoint point event->pos();QPointF winPt event->…

Python学习-shutil模块和OS模块学习

shutil模块 针对文件的拷贝&#xff0c;删除&#xff0c;移动&#xff0c;压缩和解压操作 # 1.copyfileobj只能复制文件内容&#xff0c;无法复制权限#复制文件时&#xff0c;要选择自己有权限的目录执行操作&#xff0c;创建的文件会根据系统umask设定的参数来指定用户权限 s…

首发scitb包,一个为制作统计表格而生的R包

目前&#xff0c;本人写的第3个R包scitb包已经正式在R语言官方CRAN上线&#xff0c;scitb包是一个为生成专业化统计表格而生的R包。 可以使用以下代码安装 install.packages("scitb")scitb包对我而言是个很重要的R包&#xff0c;我的很多想法需要靠它做平台来实现&a…

项目实战:优化Servlet,把所有围绕Fruit操作的Servlet封装成一个Servlet

1、FruitServlet 这些Servlet都是围绕着Fruit进行的把所有对水果增删改查的Servlet放到一个Servlet里面&#xff0c;让tomcat实例化一个Servlet对象 package com.csdn.fruit.servlet; import com.csdn.fruit.dto.PageInfo; import com.csdn.fruit.dto.PageQueryParam; import c…

多级缓存之JVM进程缓存

1.什么是多级缓存 传统的缓存策略一般是请求到达Tomcat后&#xff0c;先查询Redis&#xff0c;如果未命中则查询数据库&#xff0c;如图&#xff1a; 存在下面的问题&#xff1a; 请求要经过Tomcat处理&#xff0c;Tomcat的性能成为整个系统的瓶颈 Redis缓存失效时&#xff0…

STM32笔记—DMA

目录 一、DMA简介 二、DMA主要特性 三、DMA框图 3.1 DMA处理 3.2 仲裁器 3.3 DMA通道 扩展: 断言&#xff1a; 枚举&#xff1a; 3.4 可编程的数据传输宽度、对齐方式和数据大小端 3.5 DMA请求映像 四、DMA基本结构 4.1 DMA_Init配置 4.2 实现DMAADC扫描模式 实现要求…

PMP考试都是什么题?

PMP考试都是选择题&#xff0c;180道选择题&#xff0c;单选170道&#xff0c;多选10道&#xff0c;告知答案选项数量。 这里分享一下PMP考试中的常见翻译问题&#xff0c;pmp干货可在文末获取。 1、题目中出现的“启动会议”或“启动大会”开工会议&#xff08;kick-off mee…

康耐视深度学习ViDi-ViDi四大工具介绍与主要用途

Cognex ViDi 工具是一系列机器视觉工具&#xff0c;通过深度学习解决各种难以解决的挑战。虽然这些工具共享一个引擎&#xff0c;但它们在图像中寻找的内容不同。更具体地说&#xff0c;在分析单个点、单个区域或完整图像时&#xff0c;每个工具都有不同的侧重点。 Locate&…

极致性能优化:前端SSR渲染利器Qwik.js | 京东云技术团队

引言 前端性能已成为网站和应用成功的关键要素之一。用户期望快速加载的页面和流畅的交互&#xff0c;而前端框架的选择对于实现这些目标至关重要。然而&#xff0c;传统的前端框架在某些情况下可能面临性能挑战且存在技术壁垒。 在这个充满挑战的背景下&#xff0c;我们引入…

基础课18——智能客服系统架构

1.基础设施层 基础设施主要包括以下几点&#xff1a; 1. 硬件设施&#xff1a;包括服务器、存储设备、网络设备等&#xff0c;这是整个系统运行的物理基础。 2. 软件设施&#xff1a;包括操作系统、数据库管理系统、自然语言处理(NLP)工具和机器学习算法等&#xff0c;这些是…

Jmeter分布式压测 —— 易踩坑点

1、压测机 无论是从成本角度还是维护的难易方面&#xff0c;压测机的数量&#xff0c;适量就好。举个例子&#xff0c;8C16G的一台服务器&#xff0c;部署Jmeter后&#xff0c;根据我个人的测试比对数据&#xff0c;配置≤1500个线程数&#xff0c;最好。太多了性能损耗较大&a…

Cannot run program “D:\c\IntelliJ IDEA 2021.1.3\jbr\bin\java.exe“

如果你的idea在打开后出现了这个故障 Cannot run program "D:\c\IntelliJ IDEA 2021.1.3\jbr\bin\java.exe" (in directory "D:\c\IntelliJ IDEA 2021.1.3\bin"): CreateProcess error2, 系统找不到指定的文件。 打开IDEA的设置 file --> settings --&…

机器人制作开源方案 | 管内检测维护机器人

一、作品简介 作者&#xff1a;李泽彬&#xff0c;李晋晟&#xff0c;杜张坤&#xff0c;禹馨雅 单位&#xff1a;运城学院 指导老师&#xff1a;薛晓峰 随着我国的社会主义市场经济的飞速发展和科学技术的革新&#xff0c;各行各业的发展越来越离不开信息化和网络化的…