Flink SQL -- 命令行的使用

1、启动Flink SQL
首先启动Flink的集群,选择独立集群模式或者是session的模式。此处选择是时session的模式:yarn-session.sh -d 在启动Flink SQL的client:
sql-client.sh
2、kafka SQL 连接器
在使用kafka作为数据源的时候需要上传jar包到flnik的lib下:/usr/local/soft/flink-1.15.2/lib可以去官网找对应的版本下载上传。

 

1、创建表:再流上定义表
再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据)CREATE TABLE students (sid STRING,name STRING,age INT,sex STRING,clazz STRING    
) WITH ('connector' = 'kafka','topic' = 'student','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)2、查询数据(连续查询):select clazz,count(1) as c from students group by clazz;
3、客户端为维护和可视化结果提供了三种的模式:

        1、表格模式(默认使用的模式),(table mode),在内存中实体化结果,并将结果用规则的分页表格可视化展示出来

SET 'sql-client.execution.result-mode' = 'table';

        2、变更日志模式,(changelog mode),不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET 'sql-client.execution.result-mode' = 'changelog';

        3、Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

SET 'sql-client.execution.result-mode' = 'tableau';

4、 Flink SQL流批一体:
        1、流处理:

                a、流处理即可以处理有界流也可以处理无界流

                b、流处理的输出的结果是连续的结果

                c、流处理的底层是持续流的模型,上游的Task和下游的Task同时启动等待数据的到达

SET 'execution.runtime-mode' = 'streaming'; 
        2、批处理:

                a、批处理只能用于处理有界流

                b、输出的是最终的结果

                c、批处理的底层是MapReduce模型,会先执行上游的Task,在执行下游的Task 

SET 'execution.runtime-mode' = 'batch';
Flink做批处理,读取一个文件:-- 创建一个有界流的表
CREATE TABLE students_hdfs (sid STRING,name STRING,age INT,sex STRING,clazz STRING
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/spark/stu/students.txt',  -- 必选:指定路径'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);select clazz,count(1) as c from 
students_hdfs
group by clazz
5、Flink SQL的连接器:
        1、kafka SQL 连接器

对于一些参数需要从官网进行了解。

                1、kafka source 

-- 创建kafka 表
CREATE TABLE students_kafka (`offset` BIGINT METADATA VIRTUAL, -- 偏移量`event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students', -- 数据的topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表'properties.group.id' = 'testGroup', -- 消费者组'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset'format' = 'csv' -- 读取数据的格式
);

                2、kafka sink 

-- 创建kafka 表
CREATE TABLE students_kafka_sink (sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students_sink', -- 数据的topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表'properties.group.id' = 'testGroup', -- 消费者组'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset'format' = 'csv' -- 读取数据的格式
);-- 将查询结果保存到kafka中
insert into students_kafka_sink
select * from students_hdfs;kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink

        3、将更新的流写入到kafka中 

因为在Kafka是一个消息队列,是不会去重的。所以只需要将读取数据的格式改成canal-json。当数据被读取回来还是原来的流模式。

CREATE TABLE clazz_num_kafka (clazz STRING,num BIGINT
) WITH ('connector' = 'kafka','topic' = 'clazz_num', -- 数据的topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表'properties.group.id' = 'testGroup', -- 消费者组'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset'format' = 'canal-json' -- 读取数据的格式
);-- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型
{"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"}
{"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"}insert into clazz_num_kafka
select clazz,count(1) as num from 
students
group by clazz;kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
        2、 hdfs SQL 连接器

                1、hdfs source

                        Flink读取文件可以使用有界流的方式,也可以是无界流方式。

-- 有界流
CREATE TABLE students_hdfs_batch (sid STRING,name STRING,age INT,sex STRING,clazz STRING
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);select * from students_hdfs_batch;-- 无界流
-- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大
CREATE TABLE students_hdfs_stream (sid STRING,name STRING,age INT,sex STRING,clazz STRING
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径'format' = 'csv' ,                    -- 必选:文件系统连接器指定 format'source.monitor-interval' = '5000' -- 每隔一段时间扫描目录,生成一个无界流
);select * from students_hdfs_stream;

                2、hdfs sink

-- 1、批处理模式(使用方式和底层原理和hive类似)
SET 'execution.runtime-mode' = 'batch';-- 创建表
CREATE TABLE clazz_num_hdfs (clazz STRING,num BIGINT
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/clazz_num',  -- 必选:指定路径'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
-- 将查询结果保存到表中
insert into clazz_num_hdfs
select clazz,count(1) as num
from students_hdfs_batch
group by clazz;-- 2、流处理模式
SET 'execution.runtime-mode' = 'streaming'; -- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式
CREATE TABLE clazz_num_hdfs_canal_json (clazz STRING,num BIGINT
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/clazz_num_canal_json',  -- 必选:指定路径'format' = 'canal-json'                     -- 必选:文件系统连接器指定 format
);insert into clazz_num_hdfs_canal_json
select clazz,count(1) as num
from students_hdfs_stream
group by clazz;
3、MySQL SQL 连接器

        1、整合:

# 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.49.jar# 2、需要重启flink集群
yarn application -kill [appid]
yarn-session.sh -d# 3、重新进入sql命令行
sql-client.sh

         2、mysql   source 

-- 有界流
-- flink中表的字段类型和字段名需要和mysql保持一致
CREATE TABLE students_jdbc (id BIGINT,name STRING,age BIGINT,gender STRING,clazz STRING,PRIMARY KEY (id) NOT ENFORCED -- 主键
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/student','table-name' = 'students','username' ='root','password' ='123456'
);select * from students_jdbc;

        3、mysql sink 

-- 创建kafka 表
CREATE TABLE students_kafka (`offset` BIGINT METADATA VIRTUAL, -- 偏移量`event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students', -- 数据的topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表'properties.group.id' = 'testGroup', -- 消费者组'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset'format' = 'csv' -- 读取数据的格式
);-- 创建mysql sink表
CREATE TABLE clazz_num_mysql (clazz STRING,num BIGINT,PRIMARY KEY (clazz) NOT ENFORCED -- 主键
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/student','table-name' = 'clazz_num','username' ='root','password' ='123456'
);--- 再mysql创建接收表
CREATE TABLE clazz_num (clazz varchar(10),num BIGINT,PRIMARY KEY (clazz) -- 主键
) ;-- 将sql查询结果实时写入mysql
-- 将更新更改的流写入mysql,flink会自动按照主键更新数据
insert into clazz_num_mysql
select 
clazz,
count(1) as num from 
students_kafka
group by clazz;kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 插入一条数据
        4、DataGen:用于生成随机数据,一般用在高性能测试上
-- 创建包(只能用于source表)
CREATE TABLE students_datagen (sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'datagen','rows-per-second'='5', -- 每秒随机生成的数据量'fields.age.min'='1','fields.age.max'='100','fields.sid.length'='10','fields.name.length'='2','fields.sex.length'='1','fields.clazz.length'='4'
);

        5、print:用于高性能测试 只能用于sink表
CREATE TABLE print_table (sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'print'
);insert into print_table
select * from students_datagen;结果需要在提交的任务中查看。
        6、BlackHole :是用于高性能测试使用,在后面可以用于Flink的反压的测试。
CREATE TABLE blackhole_table (sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'blackhole'
);insert into blackhole_table
select * from students_datagen;
6、SQL 语法
        1、Hints:

               用于提示执行,在Flink中可以动态的修改表中的属性,在Spark中可以用于广播。在修改动态表中属性后,不需要在重新建表,就可以读取修改后的需求。

CREATE TABLE students_kafka (`offset` BIGINT METADATA VIRTUAL, -- 偏移量`event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用sid STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students', -- 数据的topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表'properties.group.id' = 'testGroup', -- 消费者组'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset'format' = 'csv' -- 读取数据的格式
);-- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表
select * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;-- 有界流
CREATE TABLE students_hdfs (sid STRING,name STRING,age INT,sex STRING,clazz STRING
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);-- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;
         2、WITH:

                当一段SQL语句在被多次使用的时候,就将通过with给这个SQL起一个别名,类似于封装起来,就是为这个SQL创建一个临时的视图(并不是真正的视图),方便下次使用。

CREATE TABLE students_hdfs (sid STRING,name STRING,age INT,sex STRING,clazz STRING
)WITH ('connector' = 'filesystem',           -- 必选:指定连接器类型'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);-- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用
with tmp as (select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */where clazz='文科一班'
)
select * from tmp
union all
select * from tmp;
        3、DISTINCT:

在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大 状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题

select 
count(distinct sid) 
from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;select count(sid)  
from (select distinct *from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */
);

注意事项:

       1、 当Flink Client客户端退出来以后,里面创建的动态表就不存在了。这些表结构是元数据,是存储在内存中的。

        2、当在进行where过滤的时候,字符串会出现三种情况:空的字符串、空格字符串、null的字符串,三者是有区别的:

        这三者是不同的概念,在进行where过滤的时候过滤的条件是不同的。

1、过滤空的字符串:where s!= ‘空字符串’2、过滤空格字符串:where s!= ‘空格’3、过滤null字符串:where s!= null
Flink SQL中常见的函数:from_unixtime: 以字符串格式 string 返回数字参数 numberic 的表示形式(默认为 ‘yyyy-MM-dd HH:mm:ss’to_timestamp:  将格式为 string2(默认为:‘yyyy-MM-dd HH:mm:ss’)的字符串 string1 转换为 timestamp

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/186964.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uni-app多端开发

uni-app 多端开发 一、命令创建uni-app 项目二、在微信小程序后台找到 appId 填写 appId三、运行项目四、使用 uni-ui4-1、下载4-2、自动导入4-3、ts项目下载类型校验 (uni-ui 组件库)4-3-1、下载4-3-2、配置 五、持久化 pinia六、数据请求封装七、获取组…

python自动化测试selenium核心技术3种等待方式详解

这篇文章主要为大家介绍了python自动化测试selenium的核心技术三种等待方式示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪 UI自动化测试过程中,可能会出现因测试环境不稳定、网络慢等情况&a…

零基础必知的Python简介!

文章目录 前言1.Python 简介2.Python 发展历史3.Python 特点3.为什么是Python而不是其他语言?4.Python的种类总结关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python…

java数据结构--阻塞队列

目录 一.概念 二.生产者消费者问题 三.阻塞队列接口BlockingQueue 四.基于数组实现单锁的阻塞队列 1.加锁方式 2.代码实现 3.解释说明 (1).offer添加元素 (2)poll取出元素 4.timeout超时时间 5.测试 五.基于数组实现双锁的阻塞队列 1.问题 …

Python最基础的五个部分代码,零基础也能轻松看懂。

文章目录 前言一、表达式二、赋值语句三、引用四、分支语句五、循环语句关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资料六、Python兼…

JSON——数组语法

一段JSON可能是以 ”{“ 开头 也可能仅包含一段JSON数组 如下 [ { "name" : "hello,world"}, {"name" : "SB JSON”}, {“name” : "SB互联网房地产CNM“}, ] 瞧,蛋疼不...CJSON过来还是得搜下网…

星岛专栏|从Web3发展看金融与科技的融合之道

11月起,欧科云链与香港主流媒体星岛集团开设Web3.0安全技术专栏,该专栏主要面向香港从业者、交易机构、监管机构输出专业性的安全合规建议,旨在促进香港Web3.0行业向安全与合规发展。 出品|欧科云链研究院 自2016年首届香港金融…

[EFI]戴尔Latitude 5310电脑 Hackintosh 黑苹果efi引导文件

硬件型号驱动情况 主板戴尔Latitude 5310 处理器Intel Core i5-10210U(1.6GHz/L3 6M)已驱动 内存8GB已驱动 硬盘三星 MZVLW1T0HMLH-000L2 (1024 GB / 固态硬盘)已驱动 显卡Intel UHD620已驱动 声卡瑞昱 Audio ALC299 英特尔 High Definition Audio 控制器已驱动 网卡RT…

kubernetes etcd

目录 一、备份 二、回复 官网: https://v1-25.docs.kubernetes.io/zh-cn/docs/tasks/administer-cluster/configure-upgrade-etcd/#restoring-an-etcd-cluster 一、备份 从镜像中拷贝etcdctl二进制命令 输入ctrlpq快捷键,把容器打入后台 docker run…

【算法 | 数论 No.1】AcWing1246. 等差数列

个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【AcWing算法提高学习专栏】 🍔本专栏旨在提高自己算法能力的同时,记录一下自己的学习过程&a…

阻塞队列+定时器+常见的锁策略

1)阻塞队列:是一个线程安全的队列,是可以保证线程安全的 1.1)如果当前队列为空,尝试出队列,进入阻塞状态,一直阻塞到队列里面的元素不为空 1.2)如果当前队列满了,尝试入队列,也会产生阻塞,一直阻…

【vue会员管理系统】篇六之退出系统功能

一、效果图 点击之后跳转到登陆界面 二、实现步骤 2.1Easy Mock新增接口 打开Easy Mock新建接口 方法:post URL:user/logout 描述:退出系统 2.2新增api 在api/login.js下添加以下代码 export function logout(token) {return request({url: /user/logout,method:…

2023双十一:实体门店闯入,第二战场全面开战

“闺女,吃饺子了吗?”11月8日,立冬,忙碌一天的陈曦回家路上接到母亲电话,才想起来家里冷冻水饺没了,又不想再去超市,直接打开美团买菜买了两袋,回家就煮了吃。当然,最终她…

vr航空博物馆综合展馆趣味VR科普体验

第十期广州科普开放日 10月28日周六上午九点半,广州卓远VR科普基地再次迎来一批前来体验的亲子家庭,陆续到达的市民朋友让整个基地都热闹了起来,他们在这里开启了一场别开生面的VR科普体验。 一期一会,趣味VR科普 10月广州科普开放…

力扣21:合并两个有序链表

力扣21:合并两个有序链表 **题目描述:**将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4] 示例 2&…

【LeetCode: 54. 螺旋矩阵 | 模拟】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

【Kurbernetes资源管理】声明式资源管理+配置清单文件详解(附实例)

声明式 一、声明式资源管理方式1.1 简介1.2 基本语法1.3 子命令详解1.3.1 获取资源配置清单1.3.2 创建/更新资源补充:creat和apply的区别 1.3.3 删除资源----- delete1.3.4 编辑资源配置 -----edit1.3.5 获取资源的解释-----explain 二、资源清单格式详解2.1 yaml语…

字形变换-头歌

将一个给定字符串 s 根据给定的行数 numRows ,以从上往下、从左到右进行Z字形排列。之后,你的输出需要从左往右逐行读取,产生出一个新的字符串 示例 : 输入: s "QAZWSXEDCRFVTG",numRows 4 输出:"QETAXDVGZSCFWR&…

SpringBoot 学习笔记(四) - 原理篇

一、自动配置 1.1 bean加载方式 bean的加载方式1 - xml方式声明bean 导入依赖&#xff1a; <dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.9</ver…

【Docker】设置容器系统字符集zh_CN.UTF-8退出失效:关于Docker容器配置环境变量,再次进入失效问题

设置容器系统字符集zh_CN.UTF-8退出失效&#xff1a;关于Docker容器配置环境变量&#xff0c;再次进入失效问题 修改正在运行的Docker容器内的字符集: 先进入Docker容器&#xff1a;docker exec -it 容器ID /bin/bash查看是否支持中文字符集&#xff1a;locale -a | grep zh&a…