Flink学习之Flink SQL(补)

Flink SQL

1、SQL客户端

1.1 基本使用
  • 启动yarn-session

    yarn-session.sh -d
    
  • 启动Flink SQL客户端

    sql-client.sh--退出客户端
    exit;
    
  • 测试

    重启SQL客户端之后,需要重新建表

    -- 构建Kafka Source
    -- 无界流
    drop table if exists students_kafka_source;
    CREATE TABLE if not exists students_kafka_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING
    ) WITH ('connector' = 'kafka','topic' = 'students1000','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv', -- 以 ,分隔的数据-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
    );-- 执行查询,基于KafkaSource是无界流,所以查询时连续变化的
    select * from students_kafka_source;
    select clazz,count(*) as cnt from students_kafka_source group by clazz;-- 向Kafka生产数据
    kafka-console-producer.sh --broker-list master:9092 --topic students1000
    
1.2 三种显示模式
  • 表格模式

    SQL客户端默认的结果显示模式

    在内存中实体化结果,并将结果用规则的分页表格可视化展示出来

    SET 'sql-client.execution.result-mode' = 'table';
    
  • 变更日志模式

    不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流

    SET 'sql-client.execution.result-mode' = 'changelog';
    
  • Tableau模式

    更接近传统的数据库,会将执行的结果(类似变更日志模式,由插入(+)和撤销(-)组成的持续查询产生结果流)以制表的形式直接打在屏幕之上

    SET 'sql-client.execution.result-mode' = 'tableau';
    
1.3 不同的执行模式
  • 批处理

    只能处理有界流

    结果是固定的

    底层是基于MR模型

    不会出现由插入(+)和撤销(-)组成的持续查询产生结果流这种结果,只会出现最终结果

    SET 'execution.runtime-mode' = 'batch';
    
  • 流处理

    默认的方式

    既可以处理无界流,也可以处理有界流

    结果是连续变化的

    底层是基于持续流模型

    SET 'execution.runtime-mode' = 'streaming';
    

2、常用的connector

2.1 Kafka
  • 准备工作

    # 下载依赖
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar# 长传至${FLINK_HOME}/lib# 重启yarn-session.sh
    # 先找到yarn-session的application id
    yarn application -list# kill掉yarn-session在Yarn上的进程
    yarn application -kill application_1722331927004_0007# 再启动yarn-session
    yarn-session.sh -d# 再启动sql-client
    sql-client.sh
    
  • Source

    -- 构建Kafka Source
    -- 无界流
    drop table if exists students_kafka_source;
    CREATE TABLE if not exists students_kafka_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,-- Kafka Source提供的数据之外的数据(元数据)`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',`pt` BIGINT METADATA FROM 'partition',`offset` BIGINT METADATA FROM 'offset',`topic` STRING METADATA FROM 'topic'
    ) WITH ('connector' = 'kafka','topic' = 'students1000','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
    );-- 执行查询
    select id,name,event_time,pt,`offset`,`topic` from students_kafka_source limit 10;
    
  • Sink

    • 结果不带更新的Sink

    • csv:只能添加新数据,不能修改或删除现有数据

      drop table if exists students_lksb_sink;
      CREATE TABLE if not exists students_lksb_sink (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING
      ) WITH ('connector' = 'kafka','topic' = 'students_lksb_sink01','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );-- 执行不带更新的查询
      insert into students_lksb_sink
      select id,name,age,gender,clazz from students_kafka_source where clazz='理科四班';select * from students_lksb_sink;-- 若执行下述代码则会出错,org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.clazz_cnt_sink' doesn't support consuming update changes 指出你正在尝试将包含更新操作的流数据写入到一个不支持更新操作的表接收器(Table Sink)中。
      drop table if exists clazz_cnt_sink;
      CREATE TABLE if not exists clazz_cnt_sink (`clazz` String,`cnt` BIGINT
      ) WITH ('connector' = 'kafka','topic' = 'clazz_cnt_sink_02','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv' 
      );-- 执行查询并且将查询结果插入到Sink表中
      insert into clazz_cnt_sink
      select clazz,count(*) as cnt from students_kafka_source group by clazz;select * from clazz_cnt_sink;
      

      结果:数据以追加的形式进行更新

      在这里插入图片描述

    • 结果带更新的Sink

      Kafka只支持追加的写入,不支持更新数据

      故有更新的查询结果无法直接编码,写入Kafka

      虽然Kafka支支持append,但是可以将更新流编码成“ +、-”不断追加到Kafka中

      如果有更新,那么往Kafka写两条记录即可表示更新,即:先“-”再“+”

      但是csv这种格式无法表达“-”或“+”操作,故无法在有更新的结果写Kafka时使用

      需要使用:canal-json或者是debezium-json

      canal-json:{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}

      debezium-json:{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}

      -- 基于Kafka Source 统计班级人数 最终结果写入Kafka
      drop table if exists clazz_cnt_sink;
      CREATE TABLE if not exists clazz_cnt_sink (`clazz` String,`cnt` BIGINT
      ) WITH ('connector' = 'kafka','topic' = 'clazz_cnt_sink_02','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'canal-json' -- 或者是指定为debezium-json
      );-- 执行查询并且将查询结果插入到Sink表中
      insert into clazz_cnt_sink
      select clazz,count(*) as cnt from students_kafka_source group by clazz;select * from clazz_cnt_sink;
      

结果数据会持续的更新:

在这里插入图片描述

2.2 JDBC

用于连接数据库,例如:MySQL、Oracle、PG、Derby

  • 准备工作

    # 下载依赖
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar# 上传依赖至FLINK的lib目录下,还需要将Linu中MySQL的驱动拷贝一份到lib目录下,可以从Hadoop中进行拷贝# 重启yarn-session以及sql客户端
    
  • Source

    有界流,只会查询一次,查询完后直接结束(从jdbc中读取数据是有界流

    drop table if exists students_mysql_source;
    CREATE TABLE if not exists students_mysql_source (`id` INT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,PRIMARY KEY (id) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata_30?useSSL=false','table-name' = 'students2','username' = 'root','password' = '123456'
    );-- 执行查询
    select * from students_mysql_source;
    -- 将模式换成tableau 看结果变化的全过程
    SET 'sql-client.execution.result-mode' = 'tableau';
    -- 默认会以 流处理的方式 执行,所以可以看到结果连续变化的过程
    select gender,count(*) as cnt from students_mysql_source group by gender;
    -- 将运行模式切换成批处理
    SET 'execution.runtime-mode' = 'batch';
    -- 再试执行,只会看到最终的一个结果,没有变化的过程(这是与流处理的区别之处)
    select gender,count(*) as cnt from students_mysql_source group by gender;
    
  • Sink

    从Kafka接收无界流的学生数据,统计班级人数,将最终的结果写入MySQL

    -- 创建MySQL的结果表
    -- 查询库中已有表的建表语句
    show create table xxx;-- 无主键的MySQL建表语句
    -- 最终发现写入的结果是有连续变换的过程,并不是直接写入最终的结果
    drop table if exists `clazz_cnt`;
    CREATE TABLE if not exists `clazz_cnt`(`clazz` varchar(255) DEFAULT NULL,`cnt` bigint DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 将班级设置为主键
    -- 最终写入的结果是可以通过主键进行更新,所以可以展示最终的结果,并且可以实时更新
    drop table if exists `clazz_cnt`;
    CREATE TABLE if not exists `clazz_cnt`(`clazz` varchar(255) NOT NULL,`cnt` bigint(20) DEFAULT NULL,PRIMARY KEY (`clazz`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 创建MySQL的Sink表
    drop table if exists clazz_cnt_mysql_sink;
    CREATE TABLE if not exists clazz_cnt_mysql_sink (`clazz` STRING,`cnt`	BIGINT,-- 如果查询的结果有更新,则需要设置主键PRIMARY KEY (clazz) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name' = 'clazz_cnt','username' = 'root','password' = '123456'
    );-- 记得将执行模式切换成流处理,因为Kafka是无界流
    SET 'execution.runtime-mode' = 'streaming';
    -- 执行查询:实时统计班级人数,将结果写入MySQL
    insert into clazz_cnt_mysql_sink
    select clazz,count(*) as cnt from students_kafka_source where clazz is not null group by clazz;
    
2.3 HDFS
  • Source

    • 有界流

      默认的方式

      drop table if exists students_hdfs_source;
      CREATE TABLE if not exists students_hdfs_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,`file.path` STRING NOT NULL METADATA
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/students.txt','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );-- 查询表中是否有数据
      select * from students_hdfs_source limit 100;
      
    • 无界流

      同DataStream的FileSource一致

      可以通过设置source.monitor-interval参数,来指定一个监控的间隔时间,例如:5s

      FLink就会定时监控目录的一个变换,有新的文件就可以实时进行读取

      最终得到一个无界流

      -- 创建HDFS目录
      hdfs dfs -mkdir /bigdata30/flink-- 创建Source表
      drop table if exists students_hdfs_unbounded_source;
      CREATE TABLE if not exists students_hdfs_unbounded_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,`file.path` STRING NOT NULL METADATA
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/flink','source.monitor-interval' = '5s','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );-- 执行查询
      select * from students_hdfs_unbounded_source;-- 向目录上传文件
      hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt1
      hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt2
      
  • Sink

    • 查询结果没有更新,写入数据

      drop table if exists students_hdfs_sink;
      CREATE TABLE if not exists students_hdfs_sink (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,`file_path` STRING
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/sink/','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
      );insert into students_hdfs_sink
      select * from students_hdfs_source;
      
    • 查询结果有更新,写入数据

      同Kafka类似,HDFS不支持更新数据,故需要将变换的结果编码成canal-json或者是debezium-json的格式才能进行insert

      drop table if exists clazz_cnt_hdfs_sink;
      CREATE TABLE if not exists clazz_cnt_hdfs_sink (`clazz` STRING,`cnt` BIGINT
      ) WITH ('connector' = 'filesystem','path' = 'hdfs://master:9000/bigdata30/clazz_cnt/','format' = 'canal-json'
      );-- 使用有界的数据源来写入待更新的计算结果
      insert into clazz_cnt_hdfs_sink
      select clazz,count(*) as cnt from students_hdfs_source group by clazz;
      
2.4 HBase
hbase启动顺序:
zk(三台虚拟机都启动)-->hadoop(主从复制:在master端启动即可)-->hbase(在master端启动即可)hbase关闭顺序:
hbase-->hadoop-->zk# 启动
start-hbase.sh#关闭
stop-hbase.sh# 进入HBase的客户端
hbase shell
  • 准备工作

    # 下载依赖
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar# 上传依赖并重启yarn-session及sql客户端
    
  • Source

    同MySQL类似,得到是一个有界流

    drop table if exists students_hbase_source;
    CREATE TABLE if not exists students_hbase_source (rowkey STRING,info ROW<name STRING, age STRING,gender STRING,clazz STRING>,PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH ('connector' = 'hbase-2.2','table-name' = 'students','zookeeper.quorum' = 'master:2181'
    );select rowkey,info.name,info.age,info.gender,info.clazz from students_hbase_source;
    
  • Sink

    同MySQL类似

    -- 在HBase中建表
    create 'stu01','info'-- 构建HBase Sink表
    drop table if exists stu_hbase_sink;
    CREATE TABLE if not exists stu_hbase_sink (id STRING,info ROW<name STRING,clazz STRING>,PRIMARY KEY (id) NOT ENFORCED
    ) WITH ('connector' = 'hbase-2.2','table-name' = 'stu01','zookeeper.quorum' = 'master:2181'
    );-- 丢弃null的数据
    set 'table.exec.sink.not-null-enforcer'='DROP';-- 仅追加的结果写入,由于HBase有rk存在,相同的RK会进行覆盖
    insert into stu_hbase_sink
    select cast(id as STRING) as id,ROW(name,clazz) as info 
    from students_kafka_source
    ;-- hbase中遍历数据
    scan "stu01",LIMIT => 50-- 在HBase中建表
    create 'clazz_cnt_01','info'-- 构建HBase Sink表
    drop table if exists clazz_cnt_hbase_sink;
    CREATE TABLE if not exists clazz_cnt_hbase_sink (clazz STRING,info ROW<cnt BIGINT>,PRIMARY KEY (clazz) NOT ENFORCED
    ) WITH ('connector' = 'hbase-2.2','table-name' = 'clazz_cnt_01','zookeeper.quorum' = 'master:2181'
    );-- 带更新的查询结果可以实时在HBase中通过RK进行更新
    insert into clazz_cnt_hbase_sink
    select clazz,ROW(count(*)) as info
    from students_kafka_source
    group by clazz
    ;-- hbase中遍历数据
    scan "clazz_cnt_01",LIMIT => 50
    
2.5 DataGen

用于按照指定的规则生成数据,一般用于性能测试

drop table if exists datagen;
CREATE TABLE if not exists datagen (id BIGINT,random_id BIGINT,name STRING
) WITH ('connector' = 'datagen',-- optional options --'rows-per-second'='20', -- 设置每秒钟生成的数据量'fields.id.kind' = 'random','fields.id.min'='10000000','fields.id.max'='99999999','fields.random_id.kind' = 'random','fields.random_id.min'='10000000','fields.random_id.max'='99999999','fields.name.length'='5'
);
2.6 Blackhole

用于性能测试,可以作为Sink端

drop table if exists blackhole_table;
CREATE TABLE if not exists  blackhole_table
WITH ('connector' = 'blackhole')
LIKE datagen (EXCLUDING ALL);insert into blackhole_table
select * from datagen group by name;drop table if exists blackhole_table;
CREATE TABLE if not exists  blackhole_table(name String,cnt BIGINT
)
WITH ('connector' = 'blackhole')
;insert into blackhole_table
select name,count(*) as cnt from datagen group by name;
2.7 Print

将结果数据在TaskManager中输出

drop table if exists print_table;
CREATE TABLE if not exists print_table (name STRING,cnt BIGINT
) WITH ('connector' = 'print'
);insert into print_table
select name,count(*) as cnt from datagen group by name;

3、常用的格式

3.1 CSV

逗号分隔符文件,并非一定是.csv文件

在作为Sink时的format,仅支持写入不带更新的结果

解析每条数据是通过顺序匹配

常用参数:

csv.ignore-parse-errors 默认false,忽略解析错误,不会导致程序直接停止

csv.field-delimiter 默认 逗号,指定数据的列分隔符

3.2 JSON
3.2.1 json

普通的json格式,解析数据是通过列名进行匹配

同csv类似,只支持写入不带更新的结果

drop table if exists cars_json_source;
CREATE TABLE if not exists cars_json_source (car String,county_code INT,city_code INT,card BIGINT,camera_id String,orientation String,road_id BIGINT,`time` BIGINT,speed Double
) WITH ('connector' = 'kafka','topic' = 'cars_json','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
3.2.2 canal-json

一种特殊的JSON格式

支持写入更新的结果

{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}

3.2.3 debezium-json

同canal-json,只是数据格式有些许差异

{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}

3.3 ORC

一般不用

3.4 PARQUET

一般不用

4、时间属性

4.1 处理时间

基于系统的时间

drop table if exists students_kafka_source;
CREATE TABLE if not exists students_kafka_source (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,-- 通过系统时间给表增加一列,即:处理时间proc_time as PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'students1000','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 查询的结果每个五秒统计一次
select  clazz,count(*) as cnt,tumble_start(proc_time,INTERVAL '5' SECONDS) as window_start,tumble_end(proc_time,INTERVAL '5' SECONDS) as window_end
from students_kafka_source 
group by clazz,tumble(proc_time,INTERVAL '5' SECONDS)
;-- 向Topic中生产数据
kafka-console-producer.sh --broker-list master:9092 --topic students1000

由于是csv格式,只支持写入不带更新的结果
在这里插入图片描述

4.2 事件时间

基于数据自带的时间

java,2024-08-03 10:41:50

java,2024-08-03 10:41:51

java,2024-08-03 10:41:52

java,2024-08-03 10:41:55

java,2024-08-03 10:41:56

java,2024-08-03 10:42:56

java,2024-08-03 10:43:00

java,2024-08-03 10:43:10

drop table if exists words_kafka_source;
CREATE TABLE if not exists words_kafka_source (`word` STRING,-- 从数据中过来的一列,作为事件时间event_time TIMESTAMP(3),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间;水位线前移五秒,相当于每隔10秒的事件时间,触发一次执行WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'words_event_times','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 创建topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic words_event_times-- 执行查询,使用滚动的事件时间窗口进行word count,每5s统计一次
select  word,count(*) as cnt,tumble_start(event_time,INTERVAL '5' SECONDS) as window_start,tumble_end(event_time,INTERVAL '5' SECONDS) as window_end
from words_kafka_source 
group by word,tumble(event_time,INTERVAL '5' SECONDS)
;-- 向Topic中生产数据
kafka-console-producer.sh --broker-list master:9092 --topic words_event_times

5、SQL语法

5.1 Hints

在SQL查询时动态修改表的参数配置

-- words_kafka_source 默认从最后开始消费
select * from words_kafka_source; // 只能查询到最新的数据,不会从头开始消费-- 假设现在需要从头开始消费
-- 第一种方案,将words_kafka_source删除重建
-- 第二种方案,通过alter table 对表进行修改
-- 第三种方案,通过hints动态调整表的配置
select * from words_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset') */ ;
5.2 With

用于将多次执行的同一查询通过with先定义,后面可以进行多次使用,避免重复的SQL

应用场景:1、多次使用的SQL查询可以缓存提高性能 2、将多级嵌套解开来,降低主SQL的复杂度

-- 创建表时,应注意表中的数据类型要与MySQL数据库中的表的数据类型相符合drop table if exists students_mysql_source;
CREATE TABLE if not exists students_mysql_source (`id` INT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata_30?useSSL=false','table-name' = 'students2','username' = 'root','password' = '123456'
);select id,name from students_mysql_source where clazz = '理科一班'
union all
select id,name from students_mysql_source where clazz = '理科一班'
;-- 通过with可以将多次使用的SQL进行定义
with stu_lkyb as (select id,name from students_mysql_source where clazz = '理科一班'
)
select * from stu_lkyb
union all
select * from stu_lkyb
union all
select * from stu_lkyb
;
5.3 Where

可以进行过滤

select id,name,clazz,age from students_mysql_source where clazz = '理科一班' and age > 20;-- 找到重复数据并进行过滤,但是这样会将所有重复的数据都过滤掉,一般都是想留下一条,其余重复的数据给它过滤
select	id,name,age,gender,clazz
from (
select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
) t1 where t1.cnt = 1;-- 聚合后的过滤可以使用Having
select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
having count(*) = 1;
5.4 Distinct

用于去重

需要对每条不同的数据维护一个状态,状态会无限制的增大,最终任务可能会失败

无界流是正常可以去重的

有界流必须在分组之后带上聚合操作才能去重,如果直接distinct或者是groupby不聚合,最终任务里不会产生shuffle,即不会分组,也就无法去重

-- 去重
select id,name,age,gender,clazz from students_mysql_source group by id,name,age,gender,clazz;-- 等价于distinct
select distinct id,name,age,gender,clazz from students_mysql_source;select distinct id from students_mysql_source;
5.5 Windowing TVFs(现在推荐使用)

目前提供了三类TVFs窗口操作:TUMBLE、HOP、CUMULATE

会话SESSION窗口只能通过GROUP WINDOW FUNCTION实现(老版本实现,新版本还没有

计数窗口在FLINK SQL中暂未支持

5.5.1 Tumble

会给bid表增加三个窗口列:window_start、window_end、window_time

需要设置一个滚动时间

每隔一段时间会触发一次窗口的统计

-- 创建Bid订单表
drop table if exists bid_kafka_source;
CREATE TABLE if not exists bid_kafka_source (`item` STRING,`price` DOUBLE,`bidtime` TIMESTAMP(3),`proc_time` as PROCTIME(),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间WATERMARK FOR bidtime AS bidtime
) WITH ('connector' = 'kafka','topic' = 'bid','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 准备数据
C,4.00,2020-04-15 08:05:00
C,4.00,2020-04-15 08:06:00
C,4.00,2020-04-15 08:07:00
D,5.00,2020-04-15 08:09:00
B,3.00,2020-04-15 08:11:00
E,1.00,2020-04-15 08:13:00
F,6.00,2020-04-15 08:17:00
F,6.00,2020-04-15 08:20:00
-- 创建Kafka Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic bid-- 生产数据
kafka-console-producer.sh --broker-list master:9092 --topic bid-- 基于事件时间的滚动窗口,每隔十分钟事件时间便会执行一次
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
) group by window_start,window_end
;-- 基于处理时间的滚动窗口,每隔10秒的处理时间便会执行一次
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '10' SECONDS)
) group by window_start,window_end
;

基于处理时间的滚动窗口,在时间段内有数据输入程序才会执行。

在这里插入图片描述

5.5.2 HOP

批处理中的窗口不同,有数据来时,才会进行滑动、滚动,根据这个区间进行数据的统计

滑动窗口

需要指定两个时间:滑动的时间、窗口的大小

-- 基于事件时间的滑动窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可HOP(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
) group by window_start,window_end
;-- 基于处理时间的滑动窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可HOP(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '5' SECONDS, INTERVAL '10' SECONDS)
) group by window_start,window_end
;

基于事件时间的滑动窗口
在这里插入图片描述
在这里插入图片描述

基于处理时间的滑动窗口:滑动大小为5秒,窗口大小为10秒
在这里插入图片描述
在这里插入图片描述

5.5.3 CUMULATE

累积窗口:首先会按照步长初始化一个窗口大小,然后按照步长的间隔时间触发窗口的统计,接下来窗口大小会不断增大,直到达到设置的最大size,然后重复这个过程

需要指定两个时间间隔:步长、最大的size

例如:步长为2分钟,size为10分钟

每隔2分钟会触发一次统计,第一次统计的最近两分钟的数据,第二次统计是最近4分钟的…第5次统计是最近10分钟的数据,第6次统计是最近2分钟的数据…

-- 基于事件时间的累计窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
) group by window_start,window_end
;-- 基于处理时间的累计窗口
SELECT window_start,window_end,sum(price) as sum_price
FROM TABLE(-- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '2' SECONDS, INTERVAL '10' SECONDS)
) group by window_start,window_end
;
5.5.4 SESSION

会话窗口,目前版本不支持TVFs写法

需要使用老版本的写法:GROUP WINDOW FUNCTION

间隔一段时间没有数据就会触发窗口的统计

-- 基于事件时间的会话窗口
select session_start(bidtime, INTERVAL '2' MINUTES) as session_start,session_end(bidtime, INTERVAL '2' MINUTES) as session_end,sum(price) as sum_price
from bid_kafka_source
group by session(bidtime, INTERVAL '2' MINUTES)
;-- 基于处理时间的会话窗口
select session_start(proc_time, INTERVAL '2' SECONDS) as session_start,session_end(proc_time, INTERVAL '2' SECONDS) as session_end,sum(price) as sum_price
from bid_kafka_source
group by session(proc_time, INTERVAL '2' SECONDS)
;
5.6 Over聚合(注意与hivesql中的用法有所不同)
5.6.1 聚合类

sum、max、min、count、avg

sum 比较特殊:如果指定了order By,则表示累加求和,不指定则表示整个窗口求和

max、min、count、avg 不需要指定order By

-- 准备数据
item,supply_id,price,bidtime
A,001,4.00,2020-04-15 08:05:00
A,002,2.00,2020-04-15 08:06:00
A,001,5.00,2020-04-15 08:07:00
B,002,3.00,2020-04-15 08:08:00
A,001,1.00,2020-04-15 08:09:00
A,002,6.00,2020-04-15 08:10:00
B,001,6.00,2020-04-15 08:11:00
A,001,6.00,2020-04-15 08:12:00
B,002,6.00,2020-04-15 08:13:00
B,002,6.00,2020-04-15 08:14:00
A,001,66.00,2020-04-15 08:18:00
B,001,7.00,2020-04-15 08:16:00
B,001,7.00,2020-04-15 08:17:00-- 创建order订单表
drop table if exists order_kafka_source;
CREATE TABLE if not exists order_kafka_source (`item` STRING,`supply_id` STRING,`price` DOUBLE,`bidtime` TIMESTAMP(3),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间WATERMARK FOR bidtime AS bidtime
) WITH ('connector' = 'kafka','topic' = 'order','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 创建Kafka Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic order-- 生产数据
kafka-console-producer.sh --broker-list master:9092 --topic order-- 聚合类函数在实时的Over窗口上只会产生追加的数据,没有更新
-- 最终需要维护的状态大小同partition by指定的字段有关
-- 1、统计每种商品的累计成交金额
select item-- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算-- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合,sum(price) over (partition by item order by bidtime) as sum_price
from order_kafka_source
;-- 2、统计每种商品的最大成交金额
select item-- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计-- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值,max(price) over (partition by item order by bidtime) as max_price
from order_kafka_source
;-- 3、统计每种商品的最小、平均成交金额/交易次数 同上
select item-- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计-- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值,min(price) over (partition by item order by bidtime) as max_price
from order_kafka_source
;-- 4、统计最近10分钟内每种商品的累计成交金额,每隔事件时间的十分钟统计一次
select item-- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算-- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合,sum(price) over (partition by item order by bidtime -- 每次统计时 只会将和当前数据的时间相差10分钟内的数据进行统计RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW-- ROWS BETWEEN 10 PRECEDING AND CURRENT ROW -- 统计最近10条) as sum_price
from order_kafka_source
;
5.6.2 排名类

组内排名

row_number、rank、dense_rank

-- 1、统计每种商品成交时间的排名
select item,price-- 对item进行分组,同上面的聚合类over一样,order by必须指定时间,而且必须时升序,row_number() over (partition by item order by bidtime) as rn
from order_kafka_source
;-- 2、统计每种商品成交金额的排名, 无法统计所有数据的排名,代价太大,所以只能做TopN
-- 统计每种商品成交金额的排名Top3
select t1.item,t1.price,t1.rn
from (select item,price,row_number() over (partition by item order by price desc) as rnfrom order_kafka_source
) t1 where t1.rn <= 3
;
5.7 Order By

全局排序,注意代价

select *
from order_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/
-- 如果直接基于非时间列排序是不被支持的
-- order by price desc 只有price是不行的,必须得带上时间
order by bidtime,price desc
;-- 会保留排序最大的前10条数据,但是这10条数据并不是按序输出的
select *
from order_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/
order by price desc
-- 还可以通过limit来限制代价
limit 10
;
5.8 Limit

用于限制返回的条数

在实时场景中一般结合Order By来降低排序的代价

5.9 Join(重点、难懂)

了解清楚,各个join所适用的场景,面试题

join分为:

内连接、外连接、全连接

外连接分为:左外连接、右外连接

union区别于join,为上下连接

5.9.1 Regular Join

更偏向于非实时或批量处理,Regular Join在实时处理过程中需要维护大量的状态信息,从而增加内存和存储的压力。

常规Join,同HiveSQL、SparkSQL一致

可以进行:inner join、left join、right join、full join

注意状态的大小,可以设置TTL(超过TTL,两张表便无法再进行数据的连接

drop table if exists students_join;
CREATE TABLE if not exists students_join (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'students_join','properties.bootstrap.servers' = 'master:9092',-- 读数据时,若没有grp1,则会自动创建'properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);drop table if exists score_join;
CREATE TABLE if not exists score_join (`id` BIGINT,`subject_id` BIGINT,`score` INT,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'score_join','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 进行join
-- 用法同离线join没有任何区别
-- 但是要注意:一个流中的数据会一直等待另一个流中的数据达到,意味着状态会一直变大,最终任务肯定会失败
-- 可以在join的时候指定状态的过期时间TTL,这样状态不会无限制的变大
-- 设置TTL
set 'table.exec.state.ttl' = '100000ms';
select t1.id,t1.name,t2.subject_id,t2.score-- join没有限制,可以实现内/外join
from students_join t1 left join score_join t2 on t1.id = t2.id;-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic students_join
kafka-console-producer.sh --broker-list master:9092 --topic score_join
5.9.2 Interval Join

时间间隔关联

在Regular Join的基础之上指定一个时间间隔

实际上也是通过时间间隔来让状态不会一直变大,类似TTL;超过时间间隔则数据不会进行join

select t1.id,t1.name,t2.subject_id,t2.score-- join没有限制,可以实现内/外join
from students_join t1 
left join score_join t2 
on t1.id = t2.id
and t1.proc_time BETWEEN t2.proc_time - INTERVAL '10' SECONDS AND t2.proc_time
;
5.9.3 Temporal Join

时态表关联

适用于流表关联时态表

时态表:随着时间一直变化

001,2,DOL,2024-08-05 11:20:20
001,3,DOL,2024-08-05 11:25:20
001,5,DOL,2024-08-05 11:30:20
001,6,DOL,2024-08-05 11:45:20
001,4,DOL,2024-08-05 12:15:20-- 构建订单表,流表
drop table if exists orders_join;
CREATE TABLE if not exists orders_join (order_id    STRING,price       DECIMAL(32,2),currency    STRING,order_time  TIMESTAMP(3),WATERMARK FOR order_time AS order_time
) WITH ('connector' = 'kafka','topic' = 'orders_join','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 构建货币汇率表,时态表
-- 会随着时间一直变化
drop table if exists currency_rates;
CREATE TABLE if not exists currency_rates(currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3),WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','topic' = 'currency_rates','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'canal-json'
);-- 准备数据
{"data":[{"currency":"DOL","conversion_rate":7.14,"update_time":"2024-08-05 10:30:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.24,"update_time":"2024-08-05 11:25:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.04,"update_time":"2024-08-05 12:00:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.34,"update_time":"2024-08-05 12:14:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.14,"update_time":"2024-08-05 12:20:00"}],"type":"INSERT"}-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic orders_join
kafka-console-producer.sh --broker-list master:9092 --topic currency_rates-- 时态表Join,订单表中的数据会与汇率表中的小于且距离订单表中的数据最近的数据进行join
-- 注:若汇率表中没有大于当前订单表中最新的数据的时间的数据,那么汇率表中没有与这个订单表中的这条最新的数据相join的数据
select t1.order_id,t1.currency,t1.price,t1.order_time,t2.conversion_rate,t2.update_time,t1.price * t2.conversion_rate as price_rmb
from orders_join t1 
-- 通过FOR SYSTEM_TIME 来表示进行时态JOIN
left join currency_rates FOR SYSTEM_TIME AS OF t1.order_time t2
on t1.currency = t2.currency
;
5.9.4 LookUp Join

适用于流表关联维表

维表:存储维度数据,通常变化频率不是很高

-- MySQL直接作为Source ---> 有界流(任务执行完便结束)
-- MySQL的学生信息表作为维表
drop table if exists students_mysql_join;
CREATE TABLE if not exists students_mysql_join (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name' = 'students','username' = 'root','password' = '123456',-- 设置LookUp缓存的条数以及过期时间'lookup.cache.max-rows' = '1000','lookup.cache.ttl' = '60s'
);-- 使用Regular Join中的score_join作为事实表
-- 如果MySQL有数据更新,程序不会识别到,因为MySQL的数据只会加载一次,有数据变更时需要重启任务
select t1.id,t1.score,t2.name,t2.clazz
from score_join t1
left join students_mysql_join t2
on t1.id = t2.id
;-- 使用Lookup Join
select t1.id,t1.score,t2.name,t2.clazz
from score_join t1
-- 来一数据就会去MySQL中查询一次,立马能够识别到更新的数据
-- 对MySQL的性能影响较大
-- 下述代码与Temporal Join中的一致
left join students_mysql_join FOR SYSTEM_TIME AS OF t1.proc_time t2
on t1.id = t2.id
;
5.10 模式匹配(CEP)(难懂)

CEP(Complex Event Processing,复杂事件处理),模式匹配是处理方法

语法:

SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
PARTITION BY userid
ORDER BY proctime
MEASURES -- 相当于select
A.id AS aid,
B.id AS bid,
C.id AS cid
-- 定义A B C三个规则,默认每个规则只需要匹配1次即可
-- 当所有的规则都满足,则输出数据
PATTERN (A B C) 
DEFINE -- 定义具体规则
A AS name = 'a',
B AS name = 'b',
C AS name = 'c'
) AS T
5.10.1 欺诈检测
  • 实时监控某个账户的交易流水,如果出现一笔交易小于1,紧接着下一笔交易大于500,那么就输出一个警告
-- 创建一个交易流水表
drop table if exists trans;
CREATE TABLE if not exists trans (`id` STRING,`price` DOUBLE,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'trans','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 准备数据
001,2
001,600
001,0.1
001,200
001,700
001,0.8
001,600
001,0.7
001,400-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic trans-- 进行模式匹配
SELECT T.min_price,T.max_price,T.a_proc_time,T.b_proc_time
FROM transMATCH_RECOGNIZE (PARTITION BY id -- 按什么分组统计ORDER BY proc_time -- 按时间升序排列数据MEASURES -- 相当于selectA.price as min_price,A.proc_time as a_proc_time,B.price as max_price,B.proc_time as b_proc_time-- 定义A B 两个规则,当所有的规则都满足,则输出数据PATTERN (A B) DEFINE -- 定义具体规则A AS price < 1,B AS price > 500) AS T
;
  • 实时监控某个账户的交易流水,如果出现三笔交易小于1,紧接着下一笔交易大于500,那么就输出一个警告
-- 进行模式匹配
SELECT *
FROM transMATCH_RECOGNIZE (PARTITION BY id -- 按什么分组统计ORDER BY proc_time -- 按时间升序排列数据MEASURES -- 相当于selectA.price as last_price -- 默认取出来的是最后一条A的记录,avg(A.price) as avg_price,A.proc_time as a_proc_time,B.price as max_price,B.proc_time as b_proc_time-- 定义A B 两个规则,当所有的规则都满足,则输出数据-- A{3} 表示需要匹配3次A的规则,才能进行B规则的匹配PATTERN (A{3} B) DEFINE -- 定义具体规则A AS price < 1,B AS price > 500) AS T
;001,0.1
001,0.2
001,0.4
001,600
  • 实时监控某个账户的交易流水,如果出现一笔交易小于1,紧接着下一笔交易大于500,两笔交易时间差如果小于5s钟,那么就输出一个警告
SELECT T.min_price,T.max_price,T.a_proc_time,T.b_proc_time
FROM transMATCH_RECOGNIZE (PARTITION BY id -- 按什么分组统计ORDER BY proc_time -- 按时间升序排列数据MEASURES -- 相当于selectA.price as min_price,A.proc_time as a_proc_time,B.price as max_price,B.proc_time as b_proc_time-- 定义A B 两个规则,当所有的规则都满足,则输出数据PATTERN (A B) WITHIN INTERVAL '5' SECONDDEFINE -- 定义具体规则A AS price < 1,B AS price > 500) AS T
;001,0.1
001,600001,0.1
-- 等待5s
001,600
5.10.2 股票检测
  • 实时监控股票价格走势,找出连续下降的区间
drop table if exists symbol;
CREATE TABLE if not exists symbol (`symbol` STRING,`rowtime` TIMESTAMP(3),`price` DECIMAL(10,2),WATERMARK FOR rowtime AS rowtime
) WITH ('connector' = 'kafka','topic' = 'symbol','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'earliest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);-- 准备数据
ACME,2024-08-05 10:00:00,17
ACME,2024-08-05 10:20:00,18
ACME,2024-08-05 10:40:00,20
ACME,2024-08-05 11:00:00,21
ACME,2024-08-05 11:20:00,22
ACME,2024-08-05 11:40:00,20
ACME,2024-08-05 12:00:00,15
ACME,2024-08-05 12:20:00,14
ACME,2024-08-05 12:40:00,13
ACME,2024-08-05 13:00:00,16
ACME,2024-08-05 13:20:00,19-- 创建Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic symbol-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic symbol-- 进行模式匹配
SELECT *
FROM symbolMATCH_RECOGNIZE (PARTITION BY symbol -- 按什么分组统计ORDER BY rowtime -- 按时间升序排列数据MEASURES -- 相当于selectA.price as a_price,A.rowtime as a_rowtime,max(B.price) as max_price,min(B.price) as min_price,min(B.rowtime) as start_time,max(B.rowtime) as end_time,C.price as c_price,C.rowtime as c_rowtimeAFTER MATCH SKIP PAST LAST ROW-- 定义A B C三个规则,当所有的规则都满足,则输出数据-- B+ 表示至少要符合一次PATTERN (A B{2,} C) DEFINE -- 定义具体规则-- 如果B是顶点数据,那么往前取一条B匹配到的数据是取不到的,则返回null-- 如果B是下降区间的数据,那么往前取一条B匹配到的数据是可以取到数据的B as (LAST(B.price, 1) > B.price) or (LAST(B.price, 1) is null and B.price > A.price),C as LAST(B.price) < C.price) AS T
;

6、整合Hive

获取解析Hive的元数据

Flink整合Hive主要有两个目的:

1、将Flink本身的元数据借助Hive保存

2、可以加载Hive中的数据,正常一般优先通过Spark处理Hive的数据

6.1 准备工作
# 1、下载Hive的connector依赖
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.15.4/flink-sql-connector-hive-3.1.2_2.12-1.15.4.jar# 2、上传到FLINK的lib目录下# 3、重启yarn-session以及sql客户端
6.2 创建Catalog

Catalog --> 库 --> 表 --> 数据

-- flinksql中的操作,操作后flinksql端可获得hive中的CATALOG,亦可查询其中的表数据
-- 使用catalog之前需要先启动hive的metastore服务
-- hive --service metastore
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'default','hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf/'
);-- 查看CATALOG
show CATALOGs;
-- 切换catalog
USE CATALOG myhive;-- hive中的操作
create database testdb;use testdb;drop table if exists students;
CREATE TABLE if not exists students (`id` STRING,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING
) row format delimited
fields terminated by ',';load data local inpath "/usr/local/soft/bigdata30/students.txt" into table students;
6.3 将FlinkSQL的表存入Hive

flink的SQL客户端默认会使用内存保存元数据Catalog,重启之后会丢失,需要重新创建

借助Hive的Catalog来保存Flink表的元数据,重启后还能保留

保存的元数据虽然能在hive中看到,但只能在Flink环境下使用

-- 将数据存储到从hive获取的 CATALOG中drop table if exists students_join;
CREATE TABLE if not exists students_join (`id` BIGINT,`name` STRING,`age` INT,`gender` STRING,`clazz` STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'students_join','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);kafka-console-producer.sh --broker-list master:9092 --topic students_join-- 重启后
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'default','hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf/'
);
-- 查看CATALOG
show CATALOGs;
-- 切换catalog
USE CATALOG myhive;
show databases;
use testdb;-- 查询数据,/*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/ :从头开始读取
select * from students_join /*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/ ; 
6.4 加载Hive的函数
LOAD MODULE hive WITH ('hive-version' = '3.1.2');select split('hello,world',',');drop table if exists words;
CREATE TABLE if not exists words (line STRING
) WITH ('connector' = 'kafka','topic' = 'words','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);select word
from words,
lateral table(explode(split(line,'#'))) as t(word);

7、CheckPoint

7.1 准备任务

需要通过sql-client.sh -f 来执行

将下列SQL放到word_cnt.sql文件中

drop table if exists words;
CREATE TABLE if not exists words (word STRING
) WITH ('connector' = 'kafka','topic' = 'words','properties.bootstrap.servers' = 'master:9092','properties.group.id' = 'grp1','scan.startup.mode' = 'latest-offset','format' = 'csv',-- 是否忽略脏数据'csv.ignore-parse-errors' = 'true'
);drop table if exists word_cnt;
CREATE TABLE if not exists word_cnt (word STRING,cnt BIGINT
) WITH ('connector' = 'print'
);insert into word_cnt
select word,count(*) as cnt
from words
group by word
;kafka-console-producer.sh --broker-list master:9092 --topic words
7.2 提交任务

第一次提交,不需要指定恢复的目录

sql-client.sh -f word_cnt.sql
7.3 故障之后的恢复
-- 1、先找到任务在HDFS保存的CK的路径
-- state.checkpoints.dir: hdfs://master:9000/file/checkpoint:flink中checkpoint的保存路径,在flink-conf.yaml中
/file/checkpoint/99d18855d54ed2a7e9670822307569f9/chk-25-- 2、在刚刚的word_cnt.sql文件中的insert语句前加入下面内容
SET 'execution.savepoint.path' = 'hdfs://master:9000/file/checkpoint/99d18855d54ed2a7e9670822307569f9/chk-25';-- 3、再次通过sql-client.sh提交任务
sql-client.sh -f word_cnt.sql

在这里插入图片描述

8、优化

8.1 Source被多次使用
drop table if exists students_01;
CREATE TABLE if not exists students_01(id STRING,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students_01', -- 指定topic'properties.bootstrap.servers' = 'master:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'csv', -- 指定数据的格式'csv.field-delimiter' = ',' ,-- 指定分隔符'csv.ignore-parse-errors' ='true' -- 跳过脏数据
);-- 创建sink表
CREATE TABLE clazz_cnt (clazz STRING,cnt BIGINT
) WITH ('connector' = 'print'
);CREATE TABLE gender_cnt (gender STRING,cnt BIGINT
) WITH ('connector' = 'print'
);CREATE TABLE age_cnt (age INT,cnt BIGINT
) WITH ('connector' = 'print'
);-- 假设同一个source被使用多次
-- 统计班级人数
insert into clazz_cnt
select clazz,count(*) as cnt
from students_01
group by clazz
;
-- 统计性别人数
insert into gender_cnt
select gender,count(*) as cnt
from students_01
group by gender
;
-- 统计年龄人数
insert into age_cnt
select age,count(*) as cnt
from students_01
group by age
;-- 每个insert都会提交一次job,最终会产生三个job
-- 但每个job的source都一样,故可以进行合并
-- 执行一组INSERT,最终只会生成一个Job
EXECUTE STATEMENT SET 
BEGIN-- 统计班级人数insert into clazz_cntselect clazz,count(*) as cntfrom students_01group by clazz;-- 统计性别人数insert into gender_cntselect gender,count(*) as cntfrom students_01group by gender;-- 统计年龄人数insert into age_cntselect age,count(*) as cntfrom students_01group by age; 
END;
8.2 反压

下游任务处理数据的速度 无法跟上 上游Source接收数据的速度

  • 准备数据

    -- 创建datagen source表
    drop table if exists words_datagen;
    CREATE TABLE words_datagen (word STRING
    ) WITH ('connector' = 'datagen','rows-per-second'='50000', -- 指定每秒生成的数据量'fields.word.length'='5' -- 生成word字段的值时,每个单词(或字符串)应该恰好包含5个字符。
    );drop table if exists blackhole_table;
    CREATE TABLE blackhole_table (word STRING,cnt BIGINT
    ) WITH ('connector' = 'blackhole'
    );
    
8.2.1 数据量太大
insert into blackhole_table
select word,count(1) as cnt
from words_datagen /*+ OPTIONS('rows-per-second'='50000') */
group by word;-- 开启微批处理
set 'table.exec.mini-batch.enabled' ='true';
set 'table.exec.mini-batch.allow-latency' = '5 s';
set 'table.exec.mini-batch.size' ='100000';-- 开启预聚合
set 'table.optimizer.agg-phase-strategy' ='TWO_PHASE';
8.2.2 状态过大

CK消耗的时间过大

insert into blackhole_table
select word,count(1) as cnt
from words_datagen /*+ OPTIONS('fields.word.length'='6') */
group by word;-- 增加资源
-- 1、增加并行度
SET 'parallelism.default' = '8';-- 2、增加TM内存
-- 修改配置文件
taskmanager.memory.process.size: 5000m
-- 通过命令提交时,可以通过参数指定: -ytm,--yarntaskManagerMemory
flink run -ytm 5000m

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/393099.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件生命周期(二)

1. 软件生命周期定义 软件生命周期&#xff08;SDLC&#xff09;是软件开始研制到最终废弃不用所经历的各个阶段 – 软件开发模型 2. 瀑布型生命周期模型 瀑布模型规定自上而下&#xff0c;相互衔接的固定次序&#xff0c;如同瀑布流水&#xff0c;逐级下落&#xff0c;具有…

sqli-labs(超详解)——Lass32~Lass38

Lass32&#xff08;宽字节注入&#xff09; 源码 function check_addslashes($string) {$string preg_replace(/. preg_quote(\\) ./, "\\\\\\", $string); //escape any backslash$string preg_replace(/\/i, \\\, $string); …

double类型 精度丢失的问题

前言 精度丢失的问题是在其他计算机语言中也都会出现&#xff0c;float和double类型的数据在执行二进制浮点运算的时候&#xff0c;并没有提供完全精确的结果。产生误差不在于数的大小&#xff0c;而是因为数的精度。 一、double进行运算时,经常出现精度丢失 0.10.2使用计算…

记录一次网关无响应的排查

1. 使用jstack pid > thread.txt 打印进 thread.txt 文件里 去观察线程的状态。 我发现&#xff0c;一个线程在经过 rateliter的prefilter后, 先是调用 consume方法&#xff0c;获取到锁。 接着在执行 jedis的 evalsha命令时 一直卡在socket.read()的状态。 发现jedis官…

软件测试必备技能

在软件测试领域&#xff0c;以下是一些必备的技能和能力&#xff0c;可以帮助你成为一名优秀的软件测试工程师&#xff1a; 1. 测试基础知识&#xff1a; 熟悉软件测试的基本概念、原则和流程&#xff0c;包括不同类型的测试&#xff08;如单元测试、集成测试、系统测试&#…

这几个高级爬虫软件和插件真的强!

亮数据&#xff08;Bright Data&#xff09; 亮数据是一款强大的数据采集工具&#xff0c;以其全球代理IP网络和强大数据采集技术而闻名。它能够轻松采集各种网页数据&#xff0c;包括产品信息、价格、评论和社交媒体数据等。 网站&#xff1a;https://get.brightdata.com/we…

LLM(大语言模型)「Agent」开发教程-LangChain(三)

v1.0官方文档&#xff5c;最新文档 一、LangChain入门开发教程&#xff1a;Model I/O 二、基于LangChain的RAG开发教程 LangChain是一个能够利用大语言模型&#xff08;LLM&#xff0c;Large Language Model&#xff09;能力进行快速应用开发的框架&#xff1a; 高度抽象的组件…

智能仪表板DevExpress Dashboard v24.1 - 新增级联参数过滤

使用DevExpress Analytics Dashboard&#xff0c;再选择合适的UI元素&#xff08;图表、数据透视表、数据卡、计量器、地图和网格&#xff09;&#xff0c;删除相应参数、值和序列的数据字段&#xff0c;就可以轻松地为执行主管和商业用户创建有洞察力、信息丰富的、跨平台和设…

揭秘LoRA:利用深度学习原理在Stable Diffusion中打造完美图像生成的秘密武器

文章目录 引言LoRA的原理LoRA在角色生成中的应用LoRA在风格生成中的应用LoRA在概念生成中的应用LoRA在服装生成中的应用LoRA在物体生成中的应用结论 引言 在生成式人工智能领域&#xff0c;图像生成模型如Stable Diffusion凭借其出色的生成效果和广泛的应用场景&#xff0c;逐…

NVIDIA Triton系列03-开发资源说明

NVIDIA Triton系列03-开发资源说明 大部分要学习 Triton 推理服务器的入门者&#xff0c;都会被搜索引擎或网上文章引导至官方的 https://developer.nvidia.com/nvidia-triton-inference-server 处&#xff08;如下截图&#xff09;&#xff0c;然后从 “Get Started” 直接安…

Google四年推迟两次,Cookie不弃了,但也不藏了

四年两次推迟&#xff0c;这段改变了数字广告生态系统发展的代码&#xff0c;还是被Google保留了下来。2020年&#xff0c;Google第一次提出&#xff0c;将在2022年初结束Cookie的使用&#xff0c;同步推出隐私沙盒计划&#xff1b;2021年6月&#xff0c;Google第一次进行了延迟…

人脸识别Arcface的Tensorrt C++

代码已经上传至github&#xff0c;欢迎使用&#xff0c;不是为了研究人脸识别&#xff0c;而是为了实现Tensorrt部署Arcface模型&#xff0c;推理耗时33ms左右~ GitHub - Broad-sky/face-recognition-arcface-tensort: This project mainly implements the transplantation of…

50etf期权行权采用什么交割方式 ?

50ETF期权是欧式期&#xff0c;要到期日当天才能行权交制&#xff0c;其交割方式是实物交割买卖双方在到期行权日时需要准备一手交钱&#xff0c;一手收货或是一手交&#xff0c;一手收钱&#xff0c;如果持有期权到达到期日之前&#xff0c;投资者认为行权并不划算&#xff0c…

Linux 照片图像编辑器

前言 照片图像编辑器是一种软件程序,它允许用户对数字照片或图像进行各种编辑和修改。以下是一些常见的功能及其解释: 裁剪与旋转 : 裁剪:移除图像的某些部分,以改善构图或符合特定尺寸要求。旋转:改变图像的方向,可以校正歪斜的照片或者为了艺术效果而旋转。调整亮度…

【画流程图工具】

画流程图工具 draw.io draw.io&#xff08;现称为 diagrams.net&#xff09;是一款在线图表绘制工具&#xff0c;可以用于创建各种类型的图表&#xff0c;如流程图、网络图、组织结构图、UML图、思维导图等。以下是关于它的一些优点、应用场景及使用方法&#xff1a; 优点&a…

密码学基础-身份认证

密码学基础-身份认证 概述 书信的亲笔签名&#xff1b;公文、证书的印章起到了核准、认证的功能。 如前文密码学基础-数据加密所述&#xff0c;信息安全少不了身份认证的话题。只有认证了信息的来源&#xff0c;我们才能知道这条信息是否是正确的&#xff0c;合法的&#xff…

如何在linux系统上安装tomcat应用程序?

1&#xff09;首先查看安装包信息 yum info tomcat yum info tomcat 2&#xff09;安装 yum -y install tomcat yum -y install tomcat 3&#xff09;查看安装是否成功 rpm -q tomcat rpm -q tomcat 4&#xff09;如果输出一下内容则代表安装成功 tomcat-7.0.76-16.el7_9.n…

力扣高频SQL 50题(基础版)第三十八题

文章目录 力扣高频SQL 50题&#xff08;基础版&#xff09;第三十八题1484.按日期分组销售产品题目说明实现过程准备数据实现方式结果截图总结 力扣高频SQL 50题&#xff08;基础版&#xff09;第三十八题 1484.按日期分组销售产品 题目说明 表 Activities&#xff1a; ---…

Python的100道经典练习题,每日一练,必成大神!!!

Python的100道经典练习题是一个广泛而深入的学习资源&#xff0c;可以帮助Python初学者和进阶者巩固和提升编程技能 完整的100多道练习题可在下面图片免沸获取哦~ 整理了100道Python的题目&#xff0c;如果你是一位初学者&#xff0c;这一百多道题可以 帮助你轻松的使用Python…

新书《计算机视觉从入门到进阶实战:基于Pytorch》

本书基于PyTorch深度学习框架&#xff0c;结合计算机视觉中的主流任务&#xff0c;介绍了深度学习相关算法的计算机视觉上的应用。 本书主要内容分为两部分。 第一部分为PyTorch框架使用的相关知识&#xff0c;以及计算机视觉和深度学习的入门知识。第二部分重点介绍深度学习在…