hudi数据湖万字全方位教程+应用示例

1时间轴(TimeLine

Hudi的核心是维护表上在不同的即时时间(instants执行的所有操作的时间轴(timeline,这有助于提供表的即时视图

一个instant由以下三个部分组成:

1Instant action:在表上执行的操作类型

COMMITS:一次commit表示将一批数据原子性地写入一个表。

CLEANS:清除表中不再需要的旧版本文件的后台活动。

DELTA_COMMIT:增量提交指的是将一批数据原子性地写入一个MergeOnRead类型的表,其中部分或所有数据可以写入增量日志。

COMPACTION:合并Hudi内部差异数据结构的后台活动,例如:将更新操作从基于行的log日志文件合并到列式存储的数据文件。在内部,COMPACTION体现为timeline上的特殊提交。

ROLLBACK:表示当commit/delta_commit不成功时进行回滚,其会删除在写入过程中产生的部分文件。

SAVEPOINT:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。

2Instant time:时间

通常是一个时间戳(例如:20190117010349),它按照动作开始时间的顺序单调增加。

3State:状态

REQUESTED:表示某个action已经调度,但尚未执行。

INFLIGHT:表示action当前正在执行

        COMPLETED:表示timeline上的action已经完成

区分两个重要的时间概念:

Arrival time: 数据到达 Hudi 的时间,commit time。

Event time: record 中记录的时间。

10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到

2、文件布局

Hudi存储分为两个部分:

(1)元数据.hoodie目录对应着表的元数据信息,包括表的版本管理(Timeline)、归档目录(存放过时的instant也就是版本),一个instant记录了一次提交(commit)的行为、时间戳和状态,Hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据;

(2)数据:和hive一样,以分区方式存放数据分区里面存放着Base File.parquet)和Log File.log.*

每个分区中,文件被组织成文件组,每个文件组包含几个文件片

每个文件片包含:

一个基本文件(.parquet):在某个commit/compaction即时时间(instant time)生成的(MOR可能没有)

多个日志文件(.log.*),这些日志文件包含自生成基本文件以来对基本文件的插入/更新(COW没有

索引(Index

给定的hoodie key(record key + partition path)与文件组id建立唯一映射。这种映射关系,数据第一次写入文件后保持不变

一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。

索引选项

Bloom Index 默认配置,使用布隆过滤器来判断记录存在与否

Simple Index  性能比较差

HBase Index 把index存放在HBase里面,对于小批次的keys,查询效率高

Flink State-based

Index

Flink只有一种state based index(和bucket_index),其他index是Spark可选配置。

全局索引:全局索引在全表的所有分区范围下强制要求键的唯一性,但是随着表增大,update/delete 操作损失的性能越高,因此更适用于小表

非全局索引:默认的索引实现,只能保证数据在分区的唯一性。更适用于大表

HBase索引本质上是一个全局索引,bloom和simple index都有全局选项:

hoodie.index.type=GLOBAL_BLOOM

hoodie.index.type=GLOBAL_SIMPLE

对维度表的随机更删,使用简单索引对此场景更合适,如果额外的运维成本可以接受的话,也可以采用HBase索引

表类型

Copy On Write

在COW表中,只有数据文件/基本文件(.parquet,没有增量日志文件(.log.*

对每一个新批次写入都将创建相应数据文件的新版本(新的FileSlice),新版本文件包括旧版本文件的记录以及来自传入批次的记录(全量最新)。

data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并。

由于在写入期间进行合并,COW 会产生一些写入延迟

Merge On Read

MOR表中,包含列存的基本文件(.parquet)和行存的增量日志文件(基于行的avro格式,.log.*

MOR表的合并成本在读取端。因此在写入期间我们不会合并或创建较新的数据文件版本。

每次的读取延迟都比较高

查询类型

1Snapshot Queries

快照查询,可以查询指定commit/delta commit即时操作后表的最新快照。

2Incremental Queries

增量查询,可以查询给定commit/delta commit即时操作以来新写入的数据。

3Read Optimized Queries

读优化查询,可查看给定的commit/compact即时操作的表的最新快照。

Read Optimized Queries是对Merge On Read表类型快照查询的优化。

写流程(INSERT

1Copy On Write

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file

2Merge On Read

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file

通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:

(1)写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。

(2)对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。

Compaction

(1)没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file

(2)有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file

Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。

flink操作hudi

插入数据

set sql-client.execution.result-mode=tableau;

-- 创建hudi

CREATE TABLE t1(

  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1',

  'table.type' = 'MERGE_ON_READ' –- 默认是COW

);

-- 插入数据

INSERT INTO t1 VALUES

  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');

查询数据

select * from t1;

更新数据

insert into t1 values

  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

编写代码

package com.atguigu.hudi.flink;

 

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.contrib.streaming.state.PredefinedOptions;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

 

import java.util.concurrent.TimeUnit;

 

 

public class HudiDemo {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        // 设置状态后端RocksDB

        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);

        embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        env.setStateBackend(embeddedRocksDBStateBackend);

 

        // checkpoint配置

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        checkpointConfig.setCheckpointStorage("hdfs://hadoop1:8020/ckps");

        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));

        checkpointConfig.setTolerableCheckpointFailureNumber(5);

        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));

                checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 

        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);

 

        sTableEnv.executeSql("CREATE TABLE sourceT (\n" +

                "  uuid varchar(20),\n" +

                "  name varchar(10),\n" +

                "  age int,\n" +

                "  ts timestamp(3),\n" +

                "  `partition` varchar(20)\n" +

                ") WITH (\n" +

                "  'connector' = 'datagen',\n" +

                "  'rows-per-second' = '1'\n" +

                ")");

 

        sTableEnv.executeSql("create table t2(\n" +

                "  uuid varchar(20),\n" +

                "  name varchar(10),\n" +

                "  age int,\n" +

                "  ts timestamp(3),\n" +

                "  `partition` varchar(20)\n" +

                ")\n" +

                "with (\n" +

                "  'connector' = 'hudi',\n" +

                "  'path' = '/tmp/hudi_flink/t2',\n" +

                "  'table.type' = 'MERGE_ON_READ'\n" +

                ")");

 

        sTableEnv.executeSql("insert into t2 select * from sourceT");

 

    }

}

 

 

 

 

去重参数

通过如下语法设置主键:

-- 设置单个主键

create table hoodie_table (

  f0 int primary key not enforced,

  f1 varchar(20),

  ...

) with (

  'connector' = 'hudi',

  ...

)

 

-- 设置联合主键

create table hoodie_table (

  f0 int,

  f1 varchar(20),

  ...

  primary key(f0, f1) not enforced

) with (

  'connector' = 'hudi',

  ...

)

 

 

流读(Streaming Query

当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

CREATE TABLE t5(

  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t5',

  'table.type' = 'MERGE_ON_READ',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '4'   -- 默认60s

);

 

 

insert into t5 select * from sourceT;

 

select * from t5;

增量读取(Incremental Query

0.10.0 开始支持。

如果有增量读取 batch 数据的需求,增量读取包含三种场景。

(1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

(2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

(3)TimeTravelBatch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

flink读取kafka数据并写入hudi数据湖

(1)创建kafka源表

create table stu3_binlog_source_kafka(

  id bigint not null,

  name string,

  school string,

  nickname string,

  age int not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string

 ) with (

  'connector' = 'kafka',

  'topic' = 'cdc_mysql_stu3_sink',

  'properties.bootstrap.servers' = 'hadoop1:9092',

  'format' = 'json',

  'scan.startup.mode' = 'earliest-offset',

  'properties.group.id' = 'testGroup'

  );

(2)创建hudi目标表

 create table stu3_binlog_sink_hudi(

  id bigint not null,

  name string,

  `school` string,

  nickname string,

  age int not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string,

  primary key (id) not enforced

)

 partitioned by (`school`)

 with (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',

  'table.type' = 'MERGE_ON_READ',

  'write.option' = 'insert',

  'write.precombine.field' = 'school'

  );

(3)将kafka数据写入到hudi中

insert into stu3_binlog_sink_hudi

select * from  stu3_binlog_source_kafka;

离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1)原理

(1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。

(2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。

SET execution.runtime-mode = batch;

SET execution.checkpointing.interval = 0;

(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

案例

Flink SQL client创建hudi表

 create table stu4_sink_hudi(

  id bigint not null,

  name string,

  `school` string,

  nickname string,

  age int not null,

 score decimal(4,2) not null,

  class_num int not null,

  phone bigint not null,

  email string,

  ip string,

  primary key (id) not enforced

)

 partitioned by (`school`)

 with (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',

  'table.type' = 'MERGE_ON_READ',

  'write.option' = 'bulk_insert',

  'write.precombine.field' = 'school'

  );

Flink SQL client执行mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确

(2)设置 index.bootstrap.enabled = true开启索引加载功能

(6)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

Changelog 模式

如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息中间的变更可能会被 merge 改成 true 支持消费所有变更

批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau;

 

CREATE TABLE t6(

  id int,

  ts int,

  primary key (id) not enforced

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t6',

  'table.type' = 'MERGE_ON_READ',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '4',

  'changelog.enabled' = 'true'

);

 

insert into t6 values (1,1);

insert into t6 values (1,2);

 

set table.dynamic-table-options.enabled=true;

select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

 

 

 

 

Hudi Catalog

从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表避免每次使用都要重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。

DFS 模式 Catalog SQL样例:

CREATE CATALOG hoodie_catalog

  WITH (

    'type'='hudi',

    'catalog.path' = '${catalog 的默认路径}',

    'mode'='dfs'

  );

Hms 模式 Catalog SQL 样例:

CREATE CATALOG hoodie_catalog

  WITH (

    'type'='hudi',

    'catalog.path' = '${catalog 的默认路径}',

    'hive.conf.dir' = '${hive-site.xml 所在的目录}',

    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性

  );

 

离线 Compaction

MOR 表的 compaction 默认是自动打开的策略是 5 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

设置参数

compaction.async.enabled 为 false,关闭在线 compaction。

compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan。

命令行的方式

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

离线 Clustering

异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

5.14.1 设置参数

clustering.async.enabled 为 false,关闭在线 clustering。

clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。

命令行的方式

./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

常见基础问题

5.15.1 存储一直看不到数据

如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略:

当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)

当总的 buffer 大小积攒到一定大小(可配,默认 1GB)

当 checkpoint 触发,将内存里的数据全部 flush 出去

5.15.2 数据有重复

如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。)

如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认为 true。)

索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。)

5.15.3 Merge On Read 只有 log 文件

 Merge On Read 默认开启了异步的 compaction,策略是 5 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。

可以先观察 log,搜索 compaction 关键词,看是否有 compact 任务调度:

After filtering, Nothing to compact for 关键词说明本次 compaction strategy 是不做压缩。

集成 Hive

Hudi 源表对应一份 HDFS 数据,通过 Spark,Flink 组件或者 Hudi CLI,可以将 Hudi 表的数据映射为 Hive 外部表,基于该外部表, Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

6.1 集成步骤

以 hive3.1.2、hudi 0.12.0为例,其他版本类似。

1)拷贝编译好的jar

hudi-hadoop-mr-bundle-0.12.0.jar , hudi-hive-sync-bundle-0.12.0.jar 放到 hive 节点的lib目录下;

cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/

 

cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/

2)配置完后重启 hive

// 按照需求选择合适的方式重启

nohup hive --service metastore &

nohup hive --service hiveserver2 &

 Hive 同步

6.2.1 Flink 同步Hive

1)使用方式

Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:

## hms mode 配置

 

CREATE TABLE t1(

  uuid VARCHAR(20),

  name VARCHAR(10),

  age INT,

  ts TIMESTAMP(3),

  `partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

with(

  'connector'='hudi',

  'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',

  'table.type'='COPY_ON_WRITE',        -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出

  'hive_sync.enable'='true',           -- required,开启hive同步功能

  'hive_sync.table'='${hive_table}',              -- required, hive 新建的表名

  'hive_sync.db'='${hive_db}',             -- required, hive 新建的数据库名

  'hive_sync.mode' = 'hms',            -- required, hive sync mode设置为hms, 默认jdbc

  'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口

);

Flink 使用 HiveCatalog

6.3.1 直接使用Hive Catalog

1)上传hive connectorflinklib

hive3.1.3的connector存在guava版本冲突,需要解决:官网下载connector后,用压缩软件打开jar包,删除/com/google文件夹。处理完后上传flink的lib中。

2)解决与hadoop的冲突   

避免与hadoop的冲突,拷贝hadoop-mapreduce-client-core-3.1.3.jar到flink的lib中(5.2.1已经做过)

3)创建catalog

CREATE CATALOG hive_catalog

  WITH (

    'type' = 'hive',

    'default-database' = 'default',

    'hive-conf-dir' = '/opt/module/hive/conf',

'hadoop-conf-dir'='/opt/module/hadoop-3.1.3/etc/hadoop'

  );

 

use catalog hive_catalog;

 

-- hive-connector内置了hive module,提供了hive自带的系统函数

load module hive with ('hive-version'='3.1.2');

show modules;

show functions;

 

-- 可以调用hivesplit函数

select split('a,b', ',');

查询 Hive 外表

6.5.1 设置参数

使用 Hive 查询 Hudi 表前,需要通过set命令设置 hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置 hive.input.format 导致的:

java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx

除此之外对于增量查询,还需要 set 命令额外设置3个参数。

set hoodie.mytableName.consume.mode=INCREMENTAL;

set hoodie.mytableName.consume.max.commits=3;

set hoodie.mytableName.consume.start.timestamp=commitTime;

注意这3个参数是表级别参数。

COW 表查询

这里假设同步的 Hive 外表名为 hudi_cow。

1实时视图

设置 hive.input.format 为以下两个之一:

org.apache.hadoop.hive.ql.io.HiveInputFormat

org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat

像普通的hive表一样查询即可:

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

select count(*) from hudi_cow;

2增量视图

除了要设置 hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加 where 关键字并`_hoodie_commit_time > 'startCommitTime' 作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

set hoodie.hudicow.consume.mode= INCREMENTAL;

set hoodie.hudicow.consume.max.commits=3;

set hoodie.hudicow.consume.start.timestamp= xxxx;

select count(*) from hudicow where `_hoodie_commit_time`>'xxxx'

-- (这里注意`_hoodie_commit_time` 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号)

6.5.3 MOR 表查询

这里假设 MOR 类型 Hudi 源表的表名为hudi_mor,映射为两张 Hive 外部表hudi_mor_ro(ro表)和 hudi_mor_rt(rt表)。

1实时视图

设置了 hive.input.format 之后,即可查询到Hudi源表的最新数据

set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

select * from hudicow_rt;

2读优化视图

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可

3)增量视图

这个增量查询针对的rt,不是ro表。同 COW 表的增量查询类似:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat

set hoodie.hudimor.consume.mode=INCREMENTAL;

set hoodie.hudimor.consume.max.commits=-1;

set hoodie.hudimor.consume.start.timestamp=xxxx;

select * from hudimor_rt where `_hoodie_commit_time`>'xxxx';// 这个表名要是rt

索引

说明:

set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于 rt 表的增量查询 当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置 set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 用于其他表的查询。

set hoodie.mytableName.consume.mode=INCREMENTAL; 仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;

 

湖仓一体的优势如下:

流数仓架构本质上有两个痛点实时/离线计算层不统一;实时/离线存储层不统一。

减少数据冗余:湖仓一体提单一的数据存储平台减少了数据的冗余和重复,避免维护多个存储系统的成本和时间。

成本效益:湖仓一体利用低成本对象存储实现高效益数据存储,降低了存储成本,并避免了维护多个数据存储系统的成本。

事务支持:湖仓体支持 ACID 事务,确保了多方同时读取或写入数据的一致性。

Schema的实施和治理:湖仓一体支持Schema的实施和演化,确保数据的完整性,并提供了强大的治理和审计机制。

开放性:湖仓一体采用开放和标准化的存储格式,如Parquet,可以让各种工具和引擎直接访问数据。

存算分离:湖仓一体将存储和计算解耦,可以横向扩展到更大规模和更多并发用户。

支持多种工作负载:湖仓一体支持数据科学、机器学习、SQL和数据分析等各种工作负载,减少了需要维护多个工具的成本。

端到端的流计算支持:湖仓一体支持流计算,实时/离线存储层统一,实现实时报告的需求,避免了使用单独系统来实时数据应用程序的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/374470.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频号矩阵系统源码,实现AI自动生成文案和自动回复私信评论,支持多个短视频平台

在当今短视频蓬勃发展的时代,视频号矩阵系统源码成为了自媒体人争相探索的宝藏。这一强大的技术工具不仅能帮助我们高效管理多个短视频平台,更能通过AI智能生成文案和自动回复私信评论,为自媒体运营带来前所未有的便利与效率。 一、视频号矩…

layui-表单(输入框)

1.基本使用方法 先写一个表单元素块 form 加上layui-form 里面写行区块结构,如下: 2.输入框样式选项 input框 placeholder默认文本 autocomplete自动填充 lay-verify required必填 3.下拉菜单样式选项 默认选择第一项 select框 disable禁…

导员:你这么牛,那你来讲讲你项目的核心流程-判题模块吧

耗时一个月开发的OJ在线判题系统,文末有项目地址,目前还在更新代码~ 今天我们来开发OJ系统后端核心流程之一的判题模块 文章目录 判题机模块与代码沙箱的关系代码沙箱架构开发判题服务开发判题服务业务流程判断逻辑策略模式优化 小知识-Lombox Builder …

新品牌快速成长指南:揭秘品牌成功的黄金法则

打造一个新品牌是一个系统性工程,不是一两句话就能说清楚的。 作为一个13年的营销人,今天试图给大家以最简练和通俗的文字,详细讲讲打造一个全新的品牌都需要做些啥?码字不易,请多给点支持哦。 一、市场调研与定位&a…

Elasticsearch 开放推理 API 增加了对 Amazon Bedrock 的支持

作者:来自 Elastic Mark Hoy, Hemant Malik Elasticsearch 开放推理 API 增加了对托管在 Amazon Bedrock 上的模型生成嵌入的支持。 Elasticsearch 开放 infereence API 使开发人员能够创建推理端点并使用来自领先提供商的机器学习模型。从今天开始,托管…

超简单的通配证书签发工具,免费,无需安装任何插件到本地

常见的acme.sh 或者 lego等工具需要配置,安装不灵活,续签需要配置计划任务,签发单域名证书或者通配证书需要不同的指令和配置,繁琐,如果自己程序想要对接签发证书的api有的不支持,有的用起来繁琐。 最近发…

确保智慧校园安全,充分利用操作日志功能

智慧校园基础平台系统的操作日志功能是确保整个平台运行透明、安全及可追溯的核心组件。它自动且详尽地记录下系统内的每一次关键操作细节,涵盖操作的具体时间、执行操作的用户账号、涉及的数据对象(例如学生信息更新、课程调度变动等)、操作…

华为HCIP Datacom H12-821 卷34

1.单选题 防火墙默认已经创建了一些安全区域,以下哪一个安全区域不是防火墙上默认存在的? A、Trust B、DMZ C、Internet D、Local 正确答案: C 解析: 防火墙默认情况下为我们提供了三个安全区域,分别是 Trust、DMZ和Untrust 2.判断题 …

(图文详解)小程序AppID申请以及在Hbuilderx中运行

今天小编给大家带来了如何去申请APPID,如果你是小程序的开发者,就必须要这个id。 申请步骤 到小程序注册页面,注册一个小程序账号 微信公众平台 填完信息后提交注册 会在邮箱收到 链接激活账号 确认。邮箱打开链接后,会输入实…

设备管理中的数据结构

一、有哪些数据结构属于设备管理数据结构 1. 设备控制表(DCT) “Device Control Table”的首字母缩写 2. 控制器控制表(COCT) “Controller Of Control Table”的首字母缩写。 3. 通道控制表(CHCT) “…

guided-diffusion 相比于improved-diffusion的sample增加的cond_fn()

目录 1、cond_fn()函数代码2、softmax与log_softmax函数 1、cond_fn()函数代码 def cond_fn(x, t, yNone):assert y is not Nonewith th.enable_grad():x_in x.detach().requires_grad_(True)logits classifier(x_in, t)log_probs F.log_softmax(logits, dim-1)selected l…

Transformer特辑

https://github.com/LongxingTan/Machine-learning-interview 模型结构 基本单元:token_embedding positional encoding, encoder, token_embedding positional encoding, decoderencoder: (self-attention, skip-connect, ln), (ffn, skip-connect, ln)decoder:…

顶顶通呼叫中心中间件实现随时启动和停止质检(mod_cti基于FreeSWITCH)

文章目录 前言联系我们拨号方案启动停止ASR执行FreeSWITCH 命令接口启动ASR接口停止ASR接口 通知配置cti.json配置质检结果写入数据库 前言 顶顶通呼叫中心中间件的实时质检功能是由两个模块组成:mod_asr 和 mod_qc。 mod_asr:负责调用ASR将用户们在通…

二、Qemu+Vscode调试内核

编译内核、busybox、配置Qemu参考:Qemu调试内核 一、修改启动脚本 1、修改Qemu启动脚本 #! /bin/shqemu-system-aarch64 \-machine virt,virtualizationtrue,gic-version3 \-nographic \-m size1024M \-cpu cortex-a72 \-smp 2 \-kernel Image \-drive formatraw…

写作遇到AI痕迹困扰?这里有降低AI痕迹的实用技巧

请问有没有什么免费的论文降重网站? 副本 一句“知网是什么”,我查重查了千百遍。天临六年五月,大家的论文差不多都到了查重的阶段。好不容易论文写(shui)完了,一看查重报告,满屏的红字让人心心…

Linux--线程ID封装管理原生线程

目录 1.线程的tid(本质是线程属性集合的起始虚拟地址) 1.1pthread库中线程的tid是什么? 1.2理解库 1.3phtread库中做了什么? 1.4线程的tid,和内核中的lwp 1.5线程的局部存储 2.封装管理原生线程库 1.线程的tid…

java设计模式(十五)命令模式(Command Pattern)

1、模式介绍: 命令模式(Command Pattern)是一种行为设计模式,其主要目的是将请求封装成一个对象,从而允许使用不同的请求、队列或者日志来参数化其他对象。这种模式使得命令的请求者和实现者解耦。 2、应用场景&…

服务启动何时触发 Nacos 的注册流程?

前言: 前面的系列文章让我们对 Nacos 有了一个基本了解,并知道了如何去试用 Nacos 作为注册中心和配置中心,本篇我们将从源码层面去分析 Nacos 的服务注册流程。 Nacos 系列文章传送门: Nacos 初步认识和 Nacos 部署细节 Naco…

C++基础学习笔记

1.命名空间(namespace) 1.什么是命名空间&命名空间的作用 1.在C/C中,变量、函数、类都是大量存在的,这些变量等的名称将都存在于全局作用域中,就会导致很多的命名冲突等。使用命名空间的目的就是对标识符的名称进行本地化,以…

短视频矩阵系统全解析:让获客变得更简单

随着数字媒体的迅猛发展,短视频已成为人们生活中不可或缺的一部分。对于企业而言,如何有效利用短视频平台吸引目标用户,实现高效获客,成为了一个亟待解决的问题。本文将全面解析短视频矩阵系统,带您领略其独特魅力&…