5 Paimon数据湖之表数据查询详解

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

虽然前面我们已经讲过如何查询Paimon表中的数据了,但是有一些细节的东西还需要详细分析一下。

  • 首先是针对Paimon中系统表的查询,例如snapshots\schemas\options等等这些系统表。
    其实简单理解就是我们可以通过sql的形式查询系统表来查看实体表的快照、schema等信息,这些信息我们也可以直接到hdfs中查看,只是不太方便。

  • 在查询数据的时候,可以细分为批量读取和流式读取,因为Paimon可以同时支持批处理和流处理。

  • 在查询数据的时候,如果想要从之前的某一个时间点开始查询数据,也就说任务启动的时候想要查询一些历史数据,则需要用到时间旅行这个特性,可以在SQL查询语句中通过动态表选项指定scan.mode参数来控制具体查询哪些历史数据。

Scan Mode的值可以有多种,不同的值代表不同的含义,下面我们来具体分析一下:
在这里插入图片描述

注意:在分析的时候,我们需要针对批处理和流处理这两种情况分别进行分析。

  • (1)default:如果我们在执行查询的时候,没有指定scan.mode参数,则默认是default。但是此时需要注意,如果我们也没有同时指定其他参数,例如:timestamp-millis\snapshot-id等scan相关的参数,那么默认会执行latest-full策略。
    所以说,我们在执行查询的时候,如果没有指定任何scan相关的参数,那么默认执行的策略就是latest-full。

  • (2)latest-full:和full是一样的效果,不过full这个参数已经被标记为过期了。针对批处理,表示只读取最新快照中的所有数据,读取完成以后任务就执行结束了。针对流处理,表示第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据,这个任务会一直运行。

  • (3)latest:针对批处理,他的执行效果和latest-full是一样的,只会读取最新快照中的所有数据。但是针对流处理就不一样了,此时表示只读取最新的变更数据,也就是说任务启动之后,只读取新增的数据,之前的历史快照中的数据不读取。类似于kafka中消费者里面的latest消费策略。

  • (4)from-snapshot:使用此策略的时候,需要同时指定snapshot-id参数。针对批处理,表示只读取指定id的快照中的所有数据。针对流处理,表示从指定id的快照开始读取变更数据(注意:此时不是读取这个快照中的所有数据,而是读取此快照中的变更数据,也可以理解为这个快照和上一个快照相比新增的数据),当然,后续新增的变更数据也是可以读取到的,因为这个是流处理,他会一直执行读取操作。

  • (5)from-snapshot-full:使用此策略的时候,也需要同时指定snapshot-id参数。针对批处理,他的执行效果和from-snapshot是一样的。针对流处理,表示第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据,此时任务会一直执行。

  • (6)from-timestamp:使用此策略的时候,需要同时指定timestamp-millis参数。针对批处理,表示只读取指定时间戳的快照中的所有数据。针对流处理,表示从指定时间戳的快照开始读取变更数据,(注意,这里也是读取这个快照中的变更数据,不是所有数据。),然后读取后续新增的变更数据。

  • (7)incremental:表示是增量查询,这个主要是针对批处理的,通过这种策略可以读取开始和结束快照之间的增量变化。开始和结束快照可以通过快照id或者是时间戳进行指定。
    如果是使用快照id,则需要通过incremental-between参数指定。
    如果是使用时间戳,则需要通过incremental-between-timestamp参数指定。

  • (8)compacted-full:想要使用这个参数有一个前提,Paimon表需要开启完全压缩(full compaction)。此时针对批处理,表示只读取最新完全压缩(full compaction)的快照中的所有数据。针对流处理,表示第一次启动时读取最新完全压缩(full compaction)的快照中的所有数据,然后继续读取后续新增的变更数据。

针对这里面的latest、latest-full、compacted-full这几种策略放在一起可能容易混淆,下面我们来通过一个图重新梳理一下:
在这里插入图片描述

首先看中间这条线,表示是数据的时间轴,左边是历史数据,右边是最新产生的数据。

中间这条线上面是批处理,下面是流处理。

我们首先来看批处理:
如果我们指定了scan.modelatest-full或者是latest,则会读取最新的快照中的所有数据,也就是Last Snapshot中的数据。
如果我们指定了scan.modecompacted-full,则会读取最新的完全压缩(full compaction)的快照中的数据,也就是Last Compact Snapshot中的数据。

接下来看一下流处理:
如果我们指定了scan.modelatest-full,则会在任务第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据。也就是第一次启动时先读取Last Snapshot中的所有数据,接着读取后续新产生的数据。
如果我们指定了scan.modelatest,则此时只读取最新的变更数据,不读取LastSnapshot快照中的数据。
如果我们指定了scan.modecompacted-full,则第一次启动时会读取最新完全压缩(full compaction)的快照中的所有数据,也就是Last Compact Snapshot中的数据,接着读取后续新产生的数据。

这就是这些策略在批处理和流处理中的执行流程。

(1)查询系统表

下面我们来通过具体的案例来演示一下前面提到的查询数据相关的用法。

首先创建一个向Paimon表中模拟写入数据的类,便于一会测试使用
创建package:tech.xuwei.paimon.query

创建object:FlinkSQLWriteToPaimon

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimon {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jack',18)")tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('tom',19)")tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('mick',20)")}}

在idea中运行这个代码。

接下来创建一个类来查询一下Paimon中的系统表。

创建object:FlinkPaimonSystemTable

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTable {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println("====================snapshot信息表===========================")tEnv.executeSql("SELECT * FROM query_table$snapshots").print()//schema信息表,对应的其实就是hdfs中表的schema目录下的schema-*文件信息println("====================schema信息表===========================")tEnv.executeSql("SELECT * FROM query_table$schemas").print()//manifest信息表,对应的其实就是hdfs中表的manifest目录下的manifest-*文件信息println("====================manifest信息表===========================")tEnv.executeSql("SELECT * FROM query_table$manifests").print()//file信息表,对应的其实就是hdfs中表的bucket-*目录下的data-*文件信息println("====================file信息表===========================")tEnv.executeSql("SELECT * FROM query_table$files").print()//option信息表,对应的就是建表语句中with里面指定的参数信息,在表的schema-*文件中也能看到option信息println("====================option信息表===========================")tEnv.executeSql("SELECT * FROM query_table$options").print()//consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到println("====================consumer信息表===========================")tEnv.executeSql("SELECT * FROM query_table$consumers").print()//audit log信息表,相当于是表的审核日志,可以看到表中每条数据的rowkind,也就是+I\-U\+U\-Dprintln("====================audit log信息表===========================")tEnv.executeSql("SELECT * FROM query_table$audit_log").print()}
}

运行代码。
注意:在本地执行flink sql中的print,会看到下面错误:

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1026)at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:899)at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:823)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:219)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at tech.xuwei.paimon.query.FlinkPaimonSystemTable$.main(FlinkPaimonSystemTable.scala:35)at tech.xuwei.paimon.query.FlinkPaimonSystemTable.main(FlinkPaimonSystemTable.scala)

这个异常不影响程序执行,实际工作中我们不会写这种代码,一般都是在sql中写insert into select语句了,在这主要是为了方便测试,忽略这个异常即可。

如果感觉看起来比较乱,可以修改一下log4j.properties日志中的告警级别,改为error级别即可。

log4j.rootLogger=error,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

重新运行代码,可以看到如下结果:

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 8f74d97b-bf6b-4ac7-bb47-3bb... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:22.859 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 49412497-1749-4566-8bf8-1c5... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:24.802 |                    2 |                    1 |                      0 | -9223372036854775808 |
| +I |                    3 |                    0 | e55e756d-e528-4b7c-97f0-a01... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:26.409 |                    3 |                    1 |                      0 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
3 rows in set
====================schema信息表===========================
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |            schema_id |                         fields |                 partition_keys |                   primary_keys |                        options |                        comment |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    0 | [{"id":0,"name":"name","typ... |                             [] |                       ["name"] |                             {} |                                |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
1 row in set
====================manifest信息表===========================
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| op |                      file_name |            file_size |      num_added_files |    num_deleted_files |            schema_id |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| +I | manifest-800ac729-22d3-494b... |                 1665 |                    1 |                    0 |                    0 |
| +I | manifest-61d14e4e-d2a0-42ac... |                 1675 |                    1 |                    0 |                    0 |
| +I | manifest-fd8e45b0-d456-467a... |                 1673 |                    1 |                    0 |                    0 |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
3 rows in set
====================file信息表===========================
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                      partition |      bucket |                      file_path |                    file_format |            schema_id |       level |         record_count |   file_size_in_bytes |                        min_key |                        max_key |              null_value_counts |                min_value_stats |                max_value_stats |           creation_time |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I |                             [] |           0 | data-6b23bcaf-3dbe-46c0-a67... |                            orc |                    0 |           0 |                    1 |                  566 |                         [jack] |                         [jack] |                {age=0, name=0} |            {age=18, name=jack} |            {age=18, name=jack} | 2023-07-28 17:35:22.453 |
| +I |                             [] |           0 | data-ce40f0df-aa2a-4682-8b6... |                            orc |                    0 |           0 |                    1 |                  581 |                         [mick] |                         [mick] |                {age=0, name=0} |            {age=20, name=mick} |            {age=20, name=mick} | 2023-07-28 17:35:26.257 |
| +I |                             [] |           0 | data-ac9bd895-2b8e-4efe-969... |                            orc |                    0 |           0 |                    1 |                  572 |                          [tom] |                          [tom] |                {age=0, name=0} |             {age=19, name=tom} |             {age=19, name=tom} | 2023-07-28 17:35:24.603 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
3 rows in set
====================option信息表===========================
Empty set
====================tag信息表===========================
Empty set
====================consumer信息表===========================
Empty set
====================audit log信息表===========================
+----+--------------------------------+--------------------------------+-------------+
| op |                        rowkind |                           name |         age |
+----+--------------------------------+--------------------------------+-------------+
| +I |                             +I |                           jack |          18 |
| +I |                             +I |                           mick |          20 |
| +I |                             +I |                            tom |          19 |
+----+--------------------------------+--------------------------------+-------------+
3 rows in set
(2)批量读取

下面演示一下如何在批量读取中使用时间旅行功能。

创建object:tech.xuwei.paimon.query.FlinkPaimonBatchQuery

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQuery {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'batch';env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//批量查询数据tEnv.executeSql("""|SELECT * FROM query_table|-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,只读取最新快照中的所有数据|-- /*+ OPTIONS('scan.mode'='latest') */ -- 在批处理模式下和latest-full的效果一致|-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 只读取指定id的快照中的所有数据|-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 在批处理模式下和from-snapshot的效果一致|-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 只读取指定时间戳的快照中的所有数据|-- /*+ OPTIONS('scan.mode'='incremental','incremental-between' = '1,3') */ -- 指定两个快照id,查询这两个快照之间的增量变化|-- /*+ OPTIONS('scan.mode'='incremental','incremental-between-timestamp' = '1690536922859,1690536926409') */ -- 指定两个时间戳,查询这两个快照之间的增量变化|""".stripMargin).print()}
}

运行代码,查看每一种策略的数据结果。

注意:在演示compacted-full这种策略的时候需要给表开启full-compaction

所以重新创建一个新的表。

创建object:FlinkSQLWriteToPaimonForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table_compact`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)WITH(|    'changelog-producer' = 'full-compaction',|    'full-compaction.delta-commits' = '1'|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('jack',18)")tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('tom',19)")tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('mick',20)")}}

运行代码。

再创建一个新的读取数据的类:

创建object:FlinkPaimonBatchQueryForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQueryForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'batch';env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//批量查询数据tEnv.executeSql("""|SELECT * FROM query_table_compact|/*+ OPTIONS('scan.mode' = 'compacted-full') */ --表需要开启full-compaction,设置changelog-producer和full-compaction.delta-commits|""".stripMargin).print()}
}

运行代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+

由于目前每一次提交数据都会触发完全压缩,所以我们查询最新的完全压缩快照中的数据是可以获取到所有数据的。

此时可以通过系统表查看一下这个表的snapshot信息:
创建object:FlinkPaimonSystemTableForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println("====================snapshot信息表===========================")tEnv.executeSql("SELECT * FROM query_table_compact$snapshots").print()}
}

执行代码,可以看到如下结果

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:07.293 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:08.211 |                    3 |                    2 |                      1 | -9223372036854775808 |
| +I |                    3 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:09.423 |                    4 |                    1 |                      0 | -9223372036854775808 |
| +I |                    4 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:09.641 |                    8 |                    4 |                      1 | -9223372036854775808 |
| +I |                    5 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:11.500 |                    9 |                    1 |                      0 | -9223372036854775808 |
| +I |                    6 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:12.130 |                   15 |                    6 |                      1 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+

此时可以看到在commit_kind这一列中显示的有APPENDCOMPACT,表示这个快照是追加产生的还是完全压缩产生的。

由于我们配置的每一次提交数据都会触发完全压缩,所以对应的有3个完全压缩产生的快照。

为了便于验证,我们可以把最新的那个完全压缩的快照删除掉,再执行查询,看看结果是什么样的:

删除最新的完全压缩的快照:

[root@bigdata04 ~]# hdfs dfs -rm -r /paimon/default.db/query_table_compact/snapshot/snapshot-6

注意:这个删除操作建议大家在命令行执行,不要在web页面执行,在web页面删除可能会直接把这个表的目录删除掉!!!!!

然后再执行FlinkPaimonBatchQueryForCompact,结果如下:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+
2 rows in set

注意:此时最新的完全压缩的快照就是snapshot-4了,这个快照中只有2条数据。

这就是批量读取中时间旅行参数的使用。

(3)流式读取

下面演示一下如何在流式读取中使用时间旅行功能。

创建object:FlinkPaimonStreamingQuery

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQuery {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'streaming';env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//流式查询数据tEnv.executeSql("""|SELECT * FROM query_table|-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据|-- /*+ OPTIONS('scan.mode'='latest') */ -- 只读取最新的变更数据|-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 从指定id的快照开始读取变更数据(包含后续新增)|-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据|-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 从指定时间戳的快照开始读取变更数据(包含后续新增)|""".stripMargin).print()}
}
(4)Consumer ID

最后我们在流式读取这里扩展一个知识点:Consumer ID,这个功能是针对流式读取设计的。

相当于我们在kafka消费者中指定一个groupid,这样可以通过groupid维护消费数据的偏移量信息,便于任务停止以后重启的时候继续基于之前的进度进行查询。

在这里Consumer ID的主要作用是为了方便记录每次查询到的数据快照的位置,他会把下一个还未读取的快照id记录到hdfs文件中。
当之前的任务停止以后,新启动的任务可以基于之前任务记录的快照id继续查询数据,不需要从状态中恢复位置信息。

这个特性目前属于实验特性,还没有经过大量生产环境的验证,大家可以先提前了解一下。

下面来结合一个案例演示一下:
具体的思路是这样的:

  • 1:首先使用Consumer ID查询一次query_table表中的数据。
  • 2:然后停止之前的查询任务,向query_table表中模拟产生1条数据。
  • 3:重新启动第1步骤中的任务,验证一下是否只读取到了新增的那1条数据

创建object:FlinkPaimonStreamingQueryForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQueryForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'streaming';env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv = StreamTableEnvironment.create(env)//注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。env.enableCheckpointing(5000)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//流式查询数据tEnv.executeSql("""|SELECT * FROM query_table|/*+ OPTIONS('consumer-id'='con-1') */ -- 指定消费者id|""".stripMargin).print()}
}

注意:在这需要开启checkpoint,否则Consumer ID的功能无法正常触发。

第一次执行此代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |

停止此代码。

此时其实可以到hdfs中查看一下维护的Consumer ID信息:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/query_table/consumer/consumer-con-1
{"nextSnapshot" : 4
}

这里面记录的是下一次需要读取的快照id,数值为4,此时最新的快照id是3,因为快照id为3的快照已经读取过了,下一个快照id就是4了。

其实直接查询consumer系统表也是可以看到这些信息的。

创建object:FlinkPaimonSystemTableForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到println("====================consumer信息表===========================")tEnv.executeSql("SELECT * FROM query_table$consumers").print()}
}

执行代码,可以看到如下结果:

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    4 |
+----+--------------------------------+----------------------+
1 row in set

从这可以看出来,next_snapshot_id4,查出来的结果是一样的。

接下来我们向query_table中新增一条数据。

创建object:FlinkSQLWriteToPaimonForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jessic',30)")}}

执行代码。

最后,我们再重新启动FlinkPaimonStreamingQueryForConsumerid,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                         jessic |          30 |

能看到这个结果,说明这个consumer id生效了,当我们第二次使用相同的consumer id读取这个表的时候,是可以基于之前的进度继续读取的。

停止此任务。

此时再执行FlinkPaimonSystemTableForConsumerid,查看最新的next_snapshot_id

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    5 |
+----+--------------------------------+----------------------+
1 row in set

此时next_snapshot_id变成了5,这是正确的。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/187639.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【每日OJ—— 206. 反转链表(链表)】

每日OJ—— 206. 反转链表(链表) 1.题目:206. 反转链表(链表)2.方法讲解:2.1解法:2.1.1.图文解析2.1.2.代码实现2.1.3.提交通过展示 1.题目:206. 反转链表(链表&#xff…

LoRAShear:微软在LLM修剪和知识恢复方面的最新研究

LoRAShear是微软为优化语言模型模型(llm)和保存知识而开发的一种新方法。它可以进行结构性修剪,减少计算需求并提高效率。 LHSPG技术( Lora Half-Space Projected Gradient)支持渐进式结构化剪枝和动态知识恢复。可以通过依赖图分析和稀疏度…

Sentinel网关限流

背景 在微服务架构下,每个服务的性能都不同,为避免出现流量洪峰将服务冲垮,需要依赖限流工具来保护服务的稳定性。sentinel是阿里提供的限流工具,社区活跃,功能也很全面,包含实时监控、流控、熔断等功能。…

基于SpringBoot+Redis的前后端分离外卖项目-苍穹外卖(二)

新增员工功能开发 1. 新增员工1.1 需求分析和设计1.1.1 产品原型1.1.2 接口设计1.1.3 表设计 1.2 代码开发1.2.1 设计DTO类1.2.2 Controller层1.2.3 Service层接口1.2.4 Service层实现类1.2.5 Mapper层 1.3 功能测试1.3.1 接口文档测试 1.4 代码完善1.4.1 问题一1.4.2 问题二1.…

PyGWalker :数据分析中最优秀工具库!

假设你在 Jupyter Notebook 中有一堆数据需要分析和可视化。PyGWalker 就像一个神奇的工具,使这一切变得非常容易。它接受你的数据并将其转换成一种特殊的表格,你可以像使用 Tableau 一样与之交互。 你可以通过视觉方式探索数据,进行互动&am…

电脑想要微信多开——打开多个微信的必胜法宝!

一个不知名大学生,江湖人称菜狗 original author: Jacky Li Email : 3435673055qq.com Time of completion:2023.11.11 Last edited: 2023.11.11 导读:在生活当中经常遇到工作和生活相撞的事情,导致在处理私人的事情同时不得不处理…

asp.net学生宿舍管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net 学生宿舍管理系统是一套完善的web设计管理系统,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为vs2010,数据库为sqlserver2008,使用c#语言 开发 asp.net学生宿舍管理系统1 应用技…

python实现全向轮EKF_SLAM

python实现全向轮EKF_SLAM 代码地址及效果运动预测观测修正参考算法 代码地址及效果 代码地址 运动预测 简化控制量 u t u_t ut​ 分别定义为 v x Δ t v_x \Delta t vx​Δt, v y Δ t v_y \Delta t vy​Δt,和 ω z Δ t \omega_z \Delta t ωz…

如何设计一个网盘系统的架构

1. 概述 现代生活中已经离不开网盘,比如百度网盘。在使用网盘的过程中,有没有想过它是如何工作的?在本文中,我们将讨论如何设计像百度网盘这样的系统的基础架构。 2. 系统需求 2.1. 功能性需求 用户能够上传照片/文件。用户能…

【华为OD题库-007】代表团坐车-Java

题目 某组织举行会议,来了多个代表团同时到达,接待处只有一辆汽车,可以同时接待多个代表团,为了提高车辆利用率,请帮接待员计算可以坐满车的接待方案,输出方案数量。 约束: 1.一个团只能上一辆车&#xff0…

Postman基本页面和请求/响应页签介绍

近期在复习Postman的基础知识,在小破站上跟着百里老师系统复习了一遍,也做了一些笔记,希望可以给大家一点点启发。 一、Postman的界面介绍 Home主页、Workspace工作空间、Collections集合、Environments环境变量、Mock Server虚拟服务器、Mo…

PDF有限制密码,不能复制怎么办?

大家现在接触PDF文件越来越多,有的时候在网上下载的PDF文件打开之后,发现选中文字之后无法复制。甚至其他功能也都无法使用,这是怎么回事?该怎么办? 当我们发现文件打开之后,编辑功能无法使用,很…

传统企业数字化转型都要面临哪些挑战?_数据治理平台_光点科技

数字化转型已经成为传统企业发展的必经之路,但在这个过程中,企业往往会遭遇多方面的挑战。 1.文化和组织惯性 最大的挑战之一是企业文化和组织惯性的阻力。传统企业往往有着深厚的历史和根深蒂固的工作方式,员工和管理层可能对新的数字化工作…

【Java】I/O流—转换流、序列化流的初学者指南及RandomAccessFile类

🌺个人主页:Dawn黎明开始 🎀系列专栏:Java ⭐每日一句:我不在意你曾堕落,我只在意你是否会崛起 📢欢迎大家:关注🔍点赞👍评论📝收藏⭐️ 文章目录…

Clickhouse学习笔记(3)—— Clickhouse表引擎

前言: 有关Clickhouse的前置知识详见: 1.ClickHouse的安装启动_clickhouse后台启动_THE WHY的博客-CSDN博客 2.ClickHouse目录结构_clickhouse 目录结构-CSDN博客 Cickhouse创建表时必须指定表引擎 表引擎(即表的类型)决定了&…

HTML点击链接强制触发下载

常见网页中会有很多点击链接即下载的内容&#xff0c;以下示范一下如何实现 <a href"文件地址" download"下载的文件名字&#xff08;不包括后缀&#xff09;">强制下载</a> 下面举个例子&#xff1a; <a href"./image/test.jpg"…

solidworks对电脑要求高吗?2023solidworks配置要求

solidworks对电脑要求高吗&#xff1f;SolidWorks是一款功能强大的三维CAD软件&#xff0c;对电脑配置有一定的要求。一般来说&#xff0c;运行SolidWorks需要的电脑配置包括较高的处理器性能、足够的内存和存储空间&#xff0c;以及一块性能良好的显卡。此外&#xff0c;对于大…

YOLOv5改进 | 添加CA注意力机制 + 增加预测层 + 更换损失函数之GIoU

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。在小目标场景的检测中&#xff0c;存在远距离目标识别效果差的情形&#xff0c;本节课提出一种基于改进YOLOv5的小目标检测方法。首先&#xff0c;在YOLOv5s模型的Neck网络层融合坐标注意力机制&#xff0c;以提升模型的特…

成集云 | 英克对接零售O2O+线上商城 | 解决方案

方案介绍 零售O2O线上商城是一种新型的商业模式&#xff0c;它通过线上和线下的融合&#xff0c;提供更加便捷的购物体验。其中&#xff0c;O2O指的是线上与线下的结合&#xff0c;通过互联网平台与实体店面的结合&#xff0c;实现线上线下的互动和协同。线上商城则是指通过互…

flink1.18.0 自适应调度器 资源弹性缩放 flink帮你决定并行度

jobmanager.scheduler Elastic Scaling | Apache Flink 配置文件修改并重启flink后,webui上会显示调整并行度的按钮,他可以自己调整,你也可以通过webUI手动调整: 点击 之后: 调整完成后: