flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

flink sql 实战实例 及延伸问题

  • Flink SQL 计算用户分布
  • Flink SQL 计算 DAU
  • 多topic 数据更新mysql topic接入mysql
    • 引入 upsert-kafka-connector 以1.14.4版本为例
  • 数据倾斜问题:
  • 让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?
  • 多维高阶聚合
  • FlinkSql Upsert 与 Primary Key
  • flinksql Hive 流批一体
    • Streaming Sink
    • Streaming Source
    • Hive Dialect
    • Filesystem Connector

Flink SQL 计算用户分布

⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。

- 如果需要可以打开 minibatch
select  level, count(1) as uv, max(time) as time
from (select uid, level, time, row_number() over (partition by uid order by time desc) rn from source
) tmp
where rn =1 
group by level

Flink SQL 计算 DAU

⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。

SELECT  window_start, window_end, platform, sum(bucket_dau) as dau
from (SELECTwindow_start, window_end, platform, count(distinct uid) as bucket_dauFROM TABLE(CUMULATE(TABLE user_log,DESCRIPTOR(time),INTERVAL '60' SECOND, INTERVAL '1' DAY))GROUP BY                                  window_start, window_end, platform, MOD(HASH_CODE(user_id), 1024)
) tmp
GROUP by   window_start, window_end, platform优点:如果是曲线图的需求,可以完美回溯曲线图。
缺点:大窗口之间如果有数据乱序,有丢数风险;并且由于是 watermark 推动产出,所以数据产出会有延迟。或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
select platform, count(1) as dau, max(time) as time
from (select uid, platform, time, row_number() over (partition by uid, platform, time / 24 / 3600 / 1000 order by time desc) rnfrom source
) tmp
where rn = 1
group byplatform优点:计算快。缺点:任务发生 failover,曲线图不能很好回溯。没法支持 cube 计算。或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
SELECT   max(time) as time, platform, sum(bucket_dau) as dau
from (SELECTmax(time) as time, platform, count(distinct uid) as bucket_dauFROM sourceGROUP BYplatform, MOD(HASH_CODE(user_id), 1024)
) t 
GROUP by   platform优点:计算快,支持 cube 计算。
缺点:任务发生 failover,曲线图不能很好回溯。

多topic 数据更新mysql topic接入mysql

-- 作业开发逻辑-- mysql -h数据库 -ubigdata_rw -pe20ycoy3yp09qij0kj8ngpcgxyywgmc9-- -Dyarn.application.queue=stream_data   -Dyarn.provided.lib.dirs=/streamx/flink/flink-1.12.5/lib/
CREATE TABLE Direction_Wind_create_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_create_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_like_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_like_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_comment_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_comment_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_share_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_share_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_create_sink (feed_id VARCHAR,user_id VARCHAR,feed_type VARCHAR,time_stamp timestamp(3),area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_create_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.user_id as user_id,cast(properties.feed_type as varchar) as feed_type--  ,TO_DATE(properties.`timestamp`) as time_stamp,TO_TIMESTAMP(FROM_UNIXTIME(cast(properties.`timestamp` as bigint))) as time_stamp,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_create_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_like_sink (feed_id VARCHAR,like_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_like_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.like_count  as like_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_like_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_comment_sink (feed_id VARCHAR,comment_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_comment_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.comment_count  as comment_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_comment_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_share_sink (feed_id VARCHAR,use_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_share_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.share_count  as   use_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_share_source
) where rowNum = 1
;
这里要注意,如果去重直接用group by的方式,在批处理中还好,流式处理中,这部分数据会存放到内容中,并且越积越大,没有ttl,时间一长就会oom了,
Flink SQL Deduplicate 写法,row_number partition by user_id order by proctime asc,此 SQL 最后生成的算子只会在第一条数据来的时候更新 state,后续访问不会更新 state TTL,因此 state 会在用户设置的 state TTL 时间之后过期。  所以按理说 这种去重方式 不会百分百 有用,只能保持一段时间的 去重,感觉是不对的,正在测试中。

经过测试 在flink 1.12 版本时,flinksql的upsert into 功能 ,也就是 这种写法
在这里插入图片描述
是可以实现 update 功能的,但 必须要 group by 数据才行,并且要求把 把所有select 语句中的字段 都加入到 group by 语句,但这么写,又会导致 state 不停增大,过一段时间就会 OOM

引入 upsert-kafka-connector 以1.14.4版本为例

基本工作机制:

  • source:

在这里插入图片描述

  • sink:
    在这里插入图片描述
tenv.executeSql("CREATE TABLE upsert_kafka ("+ "province STRING, "+ "pv BIGINT, "+ "PRIMARY KEY (province) NOT ENFORCED"+ ") WITH ("+ "'connector' = 'upsert-kafka', "+ "'topic' = 'upsert_kafka2', "+ "'properties.bootstrap.servers' = 'doitedu:9092', "+ "'key.format' = 'csv', "+ "'value.format' = 'csv'"+ ")"
);DataStreamSource<Row> stream = env.fromElements(Row.ofKind(RowKind.INSERT, "sx", 1),Row.ofKind(RowKind.INSERT, "sx", 2),Row.ofKind(RowKind.INSERT, "gx", 1),Row.ofKind(RowKind.INSERT, "sx", 2),Row.ofKind(RowKind.INSERT, "gx", 2)
);tenv.createTemporaryView("s", stream, Schema.newBuilder().column("f0", DataTypes.STRING().notNull()).column("f1", DataTypes.INT()).build());
// 将查询结果(changelog 流),写入 kafka
tenv.executeSql("insert into upsert_kafka select f0, sum(f1) as pv from s group by f0");

写入的数据为
在这里插入图片描述

// 从 kafka 再读出上面的 changelog 结果 tenv.executeSql(" select * from upsert_kafka").print();
tenv.executeSql("select * from upsert_kafka").print();

读出的数据为
在这里插入图片描述

数据倾斜问题:

⭐ 场景:拿计算直播间的同时在线观看用户数来说,大 v 直播间的人数会比小直播间的任务多几个量级,因此如果计算一个直播间的数据需要注意这种业务数据倾斜的特点
⭐ 解决方案:计算这种数据时,我们可以先按照直播间 id 将数据进行打散,如下 SQL 案例所示(DataStream 也是相同的解决方案),内层打散,外层合并:

select id, sum(bucket_uv) as uv
from (select id, count(distinct uid) as bucket_uv from source group byid, mod(uid, 1000) -- 将大 v 分桶打散
)
group by id

⭐ 数据任务处理时参数\代码处理逻辑导致倾斜:
⭐ 场景:比如有时候虽然用户已经按照 key 进行分桶计算,但是【最大并发度】设置为 150,【并发度】设置为 100,会导致 keygroup 在 sub-task 的划分不均匀(其中 50 个 sub-task 的 keygroup 为 2 个,剩下的 50 个 sub-task 为 1 个)导致数据倾斜。
⭐ 解决方案:设置合理的【最大并发度】【并发度】,【最大并发度】最好为【并发度】的倍数关系,比如【最大并发度】1024,【并发度】512
⭐ 我已经设置【数据分桶打散】+【最大并发为并发 n 倍】,为啥还出现数据倾斜?
⭐ 场景:你的【数据分桶】和【最大并发数】之间可能是不均匀的。因为 Flink 会将 keyby 的 key 拿到之后计算 hash 值,然后根据 hash 值去决定发送到那个 sub-task 去计算。这是就有可能出现你的【数据分桶】key 经过 hash 计算完成之后,并不能均匀的发到所有的 keygroup 中。比如【最大并发数】4096,【数据分桶】key 只有 1024 个,那么这些数据必然最多只能到 1024 个 keygroup 中,有可能还少于 1024,从而导致剩下的 3072 个 keygroup 没有任何数据
⭐ 解决方案:其实可以利用【数据分桶】key 和【最大并行度】两个参数,在 keyby 中实现和 Flink key hash 选择 keygroup 的算法一致的算法,在【最大并发数】4096,【数据分桶】为 4096 时,做到分桶值为 1 的数据一定会发送到 keygroup1 中,2 一定会发到 keygroup2 中,从而缓解数据倾斜。

最大并行度的设置:最大并行度可以自己设置,也可以框架默认生成;默认的算法是取当前算子并行度的 1.5 倍和 2 的 7 次方比较,取两者之间的最大值,然后用上面的结果和 2 的 15 次方比较,取其中的最小值为默认的最大并行度,非常不建议自动生成,建议用户自己设置。

让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?

在这里插入图片描述
⭐ 有提到将用户上线标记为 1,下线标记为 0 的,然后将上线下线数据发到消息队列用实时计算引擎统计的

⭐ 有提到将用户心跳日志借助 Session Window Dynamic Gap 计算的

在这里插入图片描述

多维高阶聚合

在这里插入图片描述

FlinkSql Upsert 与 Primary Key

在flink1.11 及以后,flinksql 与blink 做了merge 所以有重大变更
流计算的一个典型场景是把聚合的数据写入到 Upsert Sink 中,比如 JDBC、HBase,当遇到复杂的 SQL 时,时常会出现:
在这里插入图片描述
UpsertStreamTableSink 需要上游的 Query 有完整的 Primary Key 信息,不然就直接抛异常。这个现象涉及到 Flink 的 UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,需要按 Key 来更新,所以必须要有 Key 信息。

如何发现 Primary Key?一个方法是让优化器从 Query 中推断,如下图发现 Primary Key 的例子。

这种情况下在简单 Query 当中很好,也满足语义,也非常自然。但是如果是一个复杂的 Query,比如聚合又 Join 再聚合,那就只有报错了。不能期待优化器有多智能,很多情况它都不能推断出 PK,而且,可能业务的 SQL 本身就不能推断出 PK,所以导致了这样的异常。

在这里插入图片描述
怎么解决问题?Flink 1.11 彻底的抛弃了这个机制,不再从 Query 来推断 PK 了,而是完全依赖 Create table 语法。比如 Create 一个 jdbc_table,需要在定义中显式地写好 Primary Key(后面 NOT ENFORCED 的意思是不强校验,因为 Connector 也许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就告诉框架这个Jdbc Sink 会按照对应的 Key 来进行更新。如此,就跟 Query 完全没有关系了,这样的设计可以定义得非常清晰,如何更新完全按照设置的定义来。

CREATE TABLE jdbc_table (id BIGINT,...PRIMARY KEY (id) NOT ENFORCED
)

flinksql Hive 流批一体

首先看传统的 Hive 数仓。一个典型的 Hive 数仓如下图所示。一般来说,ETL 使用调度工具来调度作业,比如作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的延迟。调度产生 Table1,再产生 Table2,再调度产生 Table3,计算延时需要叠加起来
在这里插入图片描述
问题是慢,延迟大,并且 Ad-hoc 分析延迟也比较大,因为前面的数据入库,或者前面的调度的 ETL 会有很大的延迟。Ad-hoc 分析再快返回,看到的也是历史数据。

所以现在流行构建实时数仓,从 Kafka 读计算写入 Kafka,最后再输出到 BI DB,BI DB 提供实时的数据服务,可以实时查询。Kafka 的 ETL 为实时作业,它的延时甚至可能达到毫秒级。实时数仓依赖 Queue,它的所有数据存储都是基于 Queue 或者实时数据库,这样实时性很好,延时低。但是:

第一,基于 Queue,一般来说就是行存加 Queue,存储效率其实不高。
第二,基于预计算,最终会落到 BI DB,已经是聚合好的数据了,没有历史数据。而且 Kafka 存的一般来说都是 15 天以内的数据,没有历史数据,意味着无法进行 Ad-hoc 分析。所有的分析全是预定义好的,必须要起对应的实时作业,且写到 DB 中,这样才可用。对比来说,Hive 数仓的好处在于它可以进行 Ad-hoc 分析,想要什么结果,就可以随时得到什么结果。
在这里插入图片描述
能否结合离线数仓和实时数仓两者的优势,然后构建一个 Lambda 的架构?

核心问题在于成本过高。无论是维护成本、计算成本还是存储成本等都很高。并且两边的数据还要保持一致性,离线数仓写完 Hive 数仓、SQL,然后实时数仓也要写完相应 SQL,将造成大量的重复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁移、对数据等将带来大量人力成本。如今,实时分析会越来越多,不断的发生迁移,导致重复开发的成本也越来越高。少部分重要的作业尚可接受,如果是大量的作业,维护成本其实是非常大的。

如何既享受 Ad-hoc 的好处,又能实现实时化的优势?一种思路是将 Hive 的离线数仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink 1.11 在 Hive 流批一体上做了一些探索和尝试,如下图所示。它能实时地按 Streaming 的方式来导出数据,写到 BI DB 中,并且这套系统也可以用分析计算框架来进行 Ad-hoc 的分析。这个图当中,最重要的就是 Flink Streaming 的导入。
在这里插入图片描述

Streaming Sink

早期 Flink 版本在 DataStreaming 层,已经有一个强大的 StreamingFileSink 将流数据写到文件系统。它是一个准实时的、Exactly-once 的系统,能实现一条数据不多,一条数据不少的 Sink。
在这里插入图片描述
具体原理是基于两阶段提交:

第一阶段:SnapshotPerTask,关闭需要 Commit 的文件,或者记录正在写的文件的 Offset。
第二阶段:NotifyCheckpointComplete,Rename 需要 Commit 的文件。注意,Rename 是一个原子且幂等的操作,所以只要保证 Rename 的 At-least-once,即可保证数据的 Exactly-once。

这样一个 File system 的 Writer 看似比较完美了。但是在 Hive 数仓中,数据的可见性是依赖 Hive Metastore 的,那在这个流程中,谁来通知 Hive Metastore 呢?
在这里插入图片描述
SQL 层在 StreamingFileSink,扩展了 Partition 的 Committer。

相当于不仅要进行 File 的 Commit,还要进行 Partition 的 Commit。如图所示,FileWriter 对应之前的 StreamingFileSink,它提供的是 Exactly-once 的 FileWriter。而后面再接了一个节点 PartitionCommitter。支持的 Commit Policy 有:

  • 内置支持 Add partition 到 Hive metastore;
  • 支持写 SuccessFile 到文件系统当中;
  • 并且也可以自定义 Committer,比如可以 analysis partition、合并 partition 里面的小文件。

Committer 挂在 Writer 后, 由 Commit Trigger 决定什么时机来 commit :

  • 默认的 commit 时机是,有文件就立即 commit。因为所有 commit 都是可重入的,所以这一点是可允许的。

  • 另外,也支持通过 partition 时间和 Watermark 来共同决定的。比如小时分区,如果现在时间到 11 点,10 点的分区就可以 commit 了。Watermark 保证了作业当前的准确性。

Streaming Source

Hive 数仓中存在大量的 ETL 任务,这些任务往往是通过调度工具来周期性的运行,这样做主要有两个问题:

  • 实时性不强,往往调度最小也是小时级。
  • 流程复杂,组件多,容易出现问题。

针对这些离线的 ETL 作业,Flink 1.11 为此开发了实时化的 Hive 流读,支持:

  • Partition 表,监控 Partition 的生成,增量读取新的 Partition。
  • 非 Partition 表,监控文件夹内新文件的生成,增量读取新的文件。

甚至可以使用 10 分钟级别的分区策略,使用 Flink 的 Hive streaming source 和 Hive streaming sink ,可以大大提高 Hive 数仓的实时性到准实时分钟级,在实时化的同时,也支持针对 Table 全量的 Ad-hoc 查询,提高灵活性。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'=’true’, 'streaming-source.consume-start-offset'='2020-05-20') */;/*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.partition.include' = 'latest','streaming-source.partition-order' = 'create-time','streaming-source.monitor-interval' = '1 h') */

另外除了 Scan 的读取方式,Flink 1.11 也支持了 Temporal Join 的方式,也就是以前常说的 Streaming Dim Join。

SELECTo.amout, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency

目前支持的方式是 Cache All,并且是不感知分区的,比较适合小表的情况。

Hive Dialect

Flink SQL 遵循的是 ANSI-SQL 的标准,而 Hive SQL 有它自己的 HQL 语法,它们之间的语法、语义都有些许不同。

如何让 Hive 用户迁移到 Flink 生态中,同时避免用户太大的学习成本?为此, Flink SQL 1.11 提供了 Hive Dialect,可以使得用户在 Flink 生态中使用 HQL 语言来计算。目前只支持 DDL,后续版本会逐步攻坚 Qeuries。

Filesystem Connector

Hive Integration 提供了一个重量级的集成,功能丰富,但是环境比较复杂。如果只是想要一个轻量级的 Filesystem 读写呢?

Flink table 在长久以来只支持一个 CSV 的 Filesystem Table,并且还不支持 Partition,行为上在某些方面也有些不符合大数据计算的直觉。

Flink 1.11 重构了整个 Filesystem connector 的实现:

  • 结合 Partition,现在,Filesystem connector 支持 SQL 中 Partition 的所有语义,支持 Partition 的 DDL,支持 Partition Pruning,支持静态 / 动态 Partition 的插入,支持 Overwrite 的插入。
  • 支持各种 Formats: ■ CSV ■ JSON ■ Aparch AVRO ■ Apache Parquet ■ Apache ORC
  • 支持 Batch 的读写。
  • 支持 Streaming sink,也支持 Partition commit,支持写 Success 文件。

用几句简单的 SQL,不用搭建 Hive 集成环境即可:

  • 启动一个流作业写入 Filesystem 中,然后在 Hive 端即可查询到 Filesystem 上的数据,相比之前 Datastream 的作业,简单 SQL 即可搞定离线数据的入库。
  • 通过 Filesystem Connector 来查询 Hive 数仓中的数据,功能没有 Hive 集成那么全,但是定义简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/261908.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode】递归精选8题——基础递归、链表递归

目录 基础递归问题&#xff1a; 1. 斐波那契数&#xff08;简单&#xff09; 1.1 递归求解 1.2 迭代求解 2. 爬楼梯&#xff08;简单&#xff09; 2.1 递归求解 2.2 迭代求解 3. 汉诺塔问题&#xff08;简单&#xff09; 3.1 递归求解 4. Pow(x, n)&#xff08;中等&…

机器人初识 —— 电机传动系统

一、背景 波士顿动力公司开发的机器人&#xff0c;其电机传动系统是其高性能和动态运动能力的核心部分。电机传动系统通常包括以下几个关键组件&#xff1a; 1. **电动马达**&#xff1a;波士顿动力的机器人采用了先进的电动马达作为主要的动力源&#xff0c;如伺服电机或步进…

【linux】shell命令 | Linux权限

目录 1. shell命令以及运行原理 2. Linux权限的概念 3. Linux权限管理 3.1 文件访问者的分类 3.2 文件类型和访问权限 3.3 文件权限值的表示方法 3.4 文件访问权限的相关设置方法 4. file指令 5. 目录的权限 6. 粘滞位 7. 关于权限的总结 1. shell命令以及运行原理 …

C++入门学习(三十二)二维数组定义方式

一维数组类似于一条“线”&#xff0c;而二维数组类似于一个“面”&#xff0c;二维数组也更像一个表格&#xff0c;由我们在“表格”中查询数据。 1、先定义数组&#xff0c;后赋值 int arr[2][3]; #include <iostream> using namespace std;int main() { int arr…

【2024软件测试面试必会技能】Jmeter_性能测试(5):负载测试和压力测试

负载测试 负载测试/容量测试&#xff1a;通过在测试过程中不断的调整负载&#xff0c;找到在多少用户量情况下&#xff0c;系统出现性能下降拐点&#xff1b;也被称为容量测试&#xff1b; 举例&#xff1a; 微信发送红包的负载测试&#xff1a; 1、找运维人员了解目前系统…

vue3中使用 tui-image-editor进行图片处理,并上传

效果图 下载包 pnpm i tui-image-editor pnpm i tui-color-picker调用组件 //html部分 <el-dialog v-model"imgshow" destroy-on-close width"40%" draggable align-center :show-close"true":close-on-click-modal"false">&l…

web安全学习笔记【13】——信息打点(3)

信息打点-JS架构&框架识别&泄漏提取&API接口枚举&FUZZ爬虫&插件项目[1] #知识点&#xff1a; 1、业务资产-应用类型分类 2、Web单域名获取-接口查询 3、Web子域名获取-解析枚举 4、Web架构资产-平台指纹识别 ------------------------------------ 1、开源…

【Web前端笔记10】CSS3新特性

10 CSS3新特性 &#xff11;、圆角 &#xff12;、阴影 &#xff08;&#xff11;&#xff09;盒阴影 &#xff13;、背景渐变 &#xff08;&#xff11;&#xff09;线性渐变&#xff08;主要掌握这种就可&#xff09; &#xff08;&#xff12;&#xff09;径向渐变 &…

HTTP与HTTPS-HTTPS 的应用数据是如何保证完整性的?

资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) HTTPS 的应用数据是如何保证完整性的? TLS 在实现上分为握手协议和记录协议两层 TLS 握手协议就是我们前面说的 TLS 四次握手的过程&#xff0c;负责协商加密算法和生成对称密钥&#xff0c;后续用此密…

【Python】实现一个类似于Glass2k的Windows窗口透明化软件

一 背景说明 网上看到一款Windows下的窗口透明化工具Glass2k&#xff08;Glass2k官网&#xff09;&#xff0c;可以简单地通过快捷键实现任意窗口的透明化&#xff0c;还挺方便的&#xff0c;想用Python自己实现一下类似的功能。 软件已经开源到&#xff1a;窗口透明化小工具开…

Java 面向对象进阶 14 抽象类和抽象方法(黑马)

抽象类不能实例化&#xff08;创建对象&#xff09;&#xff1a; 抽象类中不一定有抽象方法&#xff1a; 有抽象方法的类一定是抽象类&#xff1a; 可以有构造方法&#xff1a;&#xff08;作用&#xff1a;在创建子类对象时&#xff0c;给属性进行赋值的&#xff09; Perso…

RabbitMQ保证消息的可靠性

1. 问题引入 消息从发送&#xff0c;到消费者接收&#xff0c;会经理多个过程&#xff1a; 其中的每一步都可能导致消息丢失&#xff0c;常见的丢失原因包括&#xff1a; 发送时丢失&#xff1a; 生产者发送的消息未送达exchange消息到达exchange后未到达queue MQ宕机&…

C++ Webserver从零开始:配置环境(九)——下载github的项目进行测试

前言 大家好&#xff0c;我又来更新Webserver的博客了。上一次更新这个专栏时2024.2.5号&#xff0c;离现在已经13天了。非常抱歉&#xff0c;中间隔了那么久。一方面是基础知识学完之后&#xff0c;就要开始自己写代码了。看基础知识和写代码是两回事&#xff0c;理论和实践的…

Git常用命令整理

在介绍安装和简单使用前&#xff0c;先看一下百度百科中的简介吧&#xff1a; ———————————————————————————————————————— Git --- The stupid content tracker, 傻瓜内容跟踪器。 Linux 是这样给我们介绍 Git 的&#xff1a; Git 是用…

Kafka进阶

文章目录 概要应用场景消息队列两种模式kafka的基础架构分区常见问题小结 概要 kafka的传统定义&#xff1a;kafka是一个分布式的基于发布\订阅模式的消息队列&#xff0c;主要用于大数据实时处理领域。 kafka的最新概念&#xff1a;kafka是一个开源的分布式事件流平台&#x…

UIKit 在 UICollectionView 中拖放交换 Cell 视图的极简实现

概览 UIKit 中的 UICollectionView 视图是我们显示多列集合数据的不二选择&#xff0c;而丰富多彩的交互操作更是我们选择 UICollectionView 视图的另一个重要原因。 如上图所示&#xff1a;我们实现了在 UICollectionView 中拖放交换任意两个 Cell 子视图的功能&#xff0c;这…

不要抱怨,不如抱 Java 运算符吧 (1)

本篇会加入个人的所谓‘鱼式疯言’ ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

学习鸿蒙基础(5)

一、honmonyos的page路由界面的路径 新建了一个page,然后删除了。运行模拟器的时候报错了。提示找不到这个界面。原来是在路由界面没有删除这个page。新手刚接触找了半天才找到这个路由。在resources/base/profile/main_pages.json 这个和微信小程序好类似呀。 吐槽&#xf…

普中51单片机学习(串口通信)

串口通信 原理 计算机通信是将计算机技术和通信技术的相结合&#xff0c;完成计算机与外部设备或计算机与计算机之间的信息交换 。可以分为两大类&#xff1a;并行通信与串行通信。并行通信通常是将数据字节的各位用多条数据线同时进行传送 。控制简单、传输速度快&#xff1…

百度智能云分布式数据库 GaiaDB-X 与龙芯平台完成兼容认证

近日&#xff0c;百度智能云的分布式关系型数据库软件 V3.0 与龙芯中科技术股份有限公司的龙芯 3C5000L/3C5000 处理器平台完成兼容性测试&#xff0c;功能与稳定性良好&#xff0c;获得了龙架构兼容互认证证书。 龙芯系列处理器 通用 CPU 处理器是信息产业的基础部件&#xf…