使用Flink SQL实时入湖Hudi/Hive

文章目录

  • 1 Hudi 简介
  • 2 COW和MOR
  • 3 接入COW模式Hudi表
  • 4 使用Flink SQL查看新接表
  • 5 使用Hive查看新接表
  • 6 总结

1 Hudi 简介

Hudi是一个流式数据湖平台,使用Hudi可以直接打通数据库与数据仓库,连通大数据平台,支持对数据增删改查。Hudi还支持同步数据入库,提供了事务保证、索引优化,是打造实时数仓、实时湖仓一体的新一代技术。下面以我实际工作中遇到的问题,聊下湖仓一体的好处,如有不对,敬请指正。
在这里插入图片描述
像传统关系型数据库,MySQL/Oracle等大多支持OLTP,但不支持OLAP。如果写很复杂的SQL,传统关系型数据库根本跑不动,尤其是需要跨系统/跨数据库联合查询分析,传统关系型数据库并不支持(这个可以使用Presto解决)。

在这里插入图片描述
而离线数仓无法支持实时/准实时需求,无法记录级更新,当业务表数据量很大时,无论使用增量还是全量接入Hive,对业务库都有很大压力(使用从库可缓解)。Hudi能很好解决这个问题,通过配置可以准实时的写入Hudi,并同步到Hive,相当于业务表数据准实时的同步到Hive,这时取快照或者直接当作ODS层都可,再也不用担心ODS接入延迟了。
在这里插入图片描述

2 COW和MOR

Hudi有两种表类型,COW和MOR,如果接入表读多写少可选择COW,如字典表,读少写多使用MOR。
Copy on write:写时复制,使用列式文件格式(如 parquet)存储数据。不同进程在访问同一资源的时候,只有更新操作,才会去复制一份新的数据并更新替换,否则都是访问同一个资源。
Merge on read:读时合并,使用列式+基于行的(例如avro)文件格式的组合存储数据。更新被记录到增量文件中,然后被压缩以同步或异步地生成新版本的列式文件。
在这里插入图片描述
如果Hudi表是COPY_ON_WRITE类型,那么映射的Hive表对应是指定的Hive表名,此表中存储着Hudi所有数据。

如果Hudi表类型是MERGE_ON_READ模式,那么映射的Hive表将会有2张,一张后缀为rt ,另一张表后缀为ro。后缀rt对应的Hive表中存储的是Base文件Parquet格式数据+log Avro格式数据,也就是全量数据。后缀为ro Hive表中存储的是存储的是Base文件对应的数据。

3 接入COW模式Hudi表

开发测试时,可在客户端调试

./bin/sql-client.sh embedded -s yarn-session

调试没问题后,在DolphinScheduler配置上线
在这里插入图片描述
选择FLINK_STREAM
在这里插入图片描述
根据集群类型,选择部署方式

初始化脚本
初始化脚本配置一些参数和建表

SET 'yarn.application.queue' = 'root.etl';
set execution.checkpointing.interval='300s';
SET execution.checkpointing.mode = AT_LEAST_ONCE;
-- 保存checkpoint文件的目录
set state.checkpoints.dir='hdfs://cluster/tmp/flink/checkpoints/h_account_holiday';
-- 恢复时需设置检查点 set execution.savepoint.path='hdfs://cluster/tmp/flink/checkpoints/h_account_holiday/077107d6530a1c63cb9126258cfe2546/chk-72';set taskmanager.network.memory.buffer-debloat.enabled=true;SET state.checkpoints.num-retained= 3; 
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;set execution.checkpointing.min-pause = '180000';
set 'table.exec.sink.upsert-materialize' = 'NONE';
set execution.checkpointing.max-concurrent-checkpoints=1;set akka.ask.timeout = '1200s';
set web.timeout = '500000';
set heartbeat.timeout=500000;SET 'connector.mysql-cdc.max-connection-attempts' = '5';
SET 'connector.mysql-cdc.connection-attempts-timeout' = '1200s';SET restart-strategy='fixed-delay';
SET restart-strategy.fixed-delay.attempts='50';
SET restart-strategy.fixed-delay.delay='1min';
SET execution.checkpointing.timeout='40min';SET state.backend='rocksdb';
SET state.backend.incremental=true;set high-availability='zookeeper';
set high-availability.storageDir='hdfs://cluster/tmp/flink/ha-yarn';
set high-availability.zookeeper.quorum='bigdata-093:2181,bigdata-094:2181,bigdata-ds-12-195:2181,bigdata-ds-12-198:2181,bigdata-ds-12-199:2181';
set high-availability.zookeeper.path.root='/flink_yarn';
set yarn.application-attempts='10';CREATE CATALOG cdc_catalog WITH (
'type' = 'hive',
'default-database' = 'flink_cdc',
'hive-conf-dir' = '/opt/apps/apache-hive-2.1.1-bin/conf'
);
-- 使用刚创建的catalog
use catalog cdc_catalog;
-- 选择flink_cdc库
use flink_cdc;drop table if exists source_account_holiday;
create table if not exists source_account_holiday(
`id` int primary key not enforced
,workday date
,week int
,next_workday date
,create_time timestamp
,update_time timestamp
) with (
'connector'='mysql-cdc',
'hostname'='10.100.xx.xx',
'port'='3306',
'server-time-zone'='Asia/Shanghai',
'server-id'='6066-6070', -- 注意同一个实例,id不要重复,数字范围要大于并行度
'username'='xxx',
'password'='xxx',
'debezium.snapshot.mode'='initial',
'database-name'='xd_account',
'table-name'='account_holiday',
'connect.timeout'='1000000'
);drop table if exists sink_account_holiday;
create table if not exists sink_account_holiday(
`id` int primary key not enforced
,workday date
,week int
,next_workday date
,create_time string -- 注意timestamp需转成string
,update_time string -- 注意timestamp需转成string
) with (
'connector' = 'hudi',
'path' = 'hdfs://cluster/tmp/flink/hudi/sink_account_holiday',
'hoodie.datasource.write.recordkey.field'='id', -- 设置主键
'table.type'='COPY_ON_WRITE',
'write.timezone'='Asia/Shanghai',
'hive_sync.enabled'='true',
'hive_sync.mode'='hms',
'hive_sync.metastore.uris'='thrift://bigdata-003:9083,thrift://bigdata-004:9083,thrift://bigdata-009:9083,thrift://bigdata-012:9083,thrift://bigdata-008:9083,thrift://bigdata-007:9083',
'hive_sync.db'='hudi', -- 同步到hive hudi库h_account_holiday,自动建表
'hive_sync.table'='h_account_holiday',
'hive_sync.username'='hive',
'hoodie.datasource.hive_sync.omit_metadata_fields'='true'
);

脚本
从source表写入sink表

insert into sink_account_holiday
select id
,workday 
,week 
,next_workday 
,date_format(create_time, 'yyyy-MM-dd HH:mm:ss') -- 注意timestamp需转成string
,date_format(update_time, 'yyyy-MM-dd HH:mm:ss') -- 注意timestamp需转成string
from source_account_holiday;

在这里插入图片描述
执行后注意看日志,成功会有Application ID 和 Job ID
在这里插入图片描述
可通过Application ID 和 Job ID查看任务运行情况
在这里插入图片描述

4 使用Flink SQL查看新接表

使用Flink SQL,可以实时看到数据更新

cd /opt/apps/flink-1.14.4/
./bin/sql-client.sh embedded -s yarn-session

embedded 内嵌模式

Flink SQL> CREATE CATALOG cdc_catalog WITH (
> 'type' = 'hive',
> 'default-database' = 'flink_cdc',
> 'hive-conf-dir' = '/opt/apps/apache-hive-2.1.1-bin/conf'
> );
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[INFO] Execute statement succeed.Flink SQL> use catalog cdc_catalog;
[INFO] Execute statement succeed.Flink SQL> show databases;Flink SQL> use hudi;
[INFO] Execute statement succeed.
Flink SQL> select * from h_account_holiday limit 10;

在这里插入图片描述

5 使用Hive查看新接表

前面初始化脚本必须配置同步到hive,hive查不了source和sink表,只能查同步到hive的表

hive> use hudi;
OK
Time taken: 2.406 seconds
hive> set role admin;
OK
Time taken: 0.093 seconds
hive> select * from h_account_holiday limit 10;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
44      2024-05-12      7       2024-05-13      2024-01-20 15:17:59     2024-01-20 15:17:59
45      2024-05-18      6       2024-05-20      2024-01-20 15:17:59     2024-01-20 15:17:59
89      2024-10-04      5       2024-10-08      2024-01-20 15:17:59     2024-01-20 15:17:59
110     2024-12-14      6       2024-12-16      2024-01-20 15:17:59     2024-01-20 15:17:59
112     2024-12-21      6       2024-12-23      2024-01-20 15:17:59     2024-01-20 15:17:59
115     2024-12-29      7       2024-12-30      2024-01-20 15:17:59     2024-01-20 15:17:59
91      2024-10-06      7       2024-10-08      2024-01-20 15:17:59     2024-01-20 15:17:59
93      2024-10-13      7       2024-10-14      2024-01-20 15:17:59     2024-01-20 15:17:59
50      2024-06-02      7       2024-06-03      2024-01-20 15:17:59     2024-01-20 15:17:59
95      2024-10-20      7       2024-10-21      2024-01-20 15:17:59     2024-01-20 15:17:59
Time taken: 0.147 seconds, Fetched: 10 row(s)

在这里插入图片描述

6 总结

使用这种方案,真正实现了湖仓一体,基本满足了实时和离线需求,且主要使用SQL,开发和维护成本较低。不过,该方案也有个问题,flink cdc 会挂,导致数据没更新,还是要多关注下。

参考链接:
https://blog.csdn.net/qq_32727095/article/details/123863620
https://zhuanlan.zhihu.com/p/471842018
https://zhuanlan.zhihu.com/p/526372429
https://blog.csdn.net/JH_Zhai/article/details/136042662
https://www.jianshu.com/p/0837ada9de76

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/326234.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件设计中的数字:7

“ 使软件更易理解的秘密:米勒法则” 小游戏 学习之前先一起玩一个小游戏。 3秒钟时间,看看下面的图片中有多少个小块? 3秒到了,数出来了吗?22个。 没数出来也没关系,我也没数出来o(╥﹏╥)o 现在&…

[AIGC] 压缩列表了解吗?快速列表 quicklist 了解吗?

文章目录 压缩列表了解吗?快速列表 quicklist 了解吗? 压缩列表了解吗? 压缩列表是 Redis 为了节约内存 而使用的一种数据结构,是由一系列特殊编码的连续内存快组成的顺序型数据结构。 一个压缩列表可以包含任意多个节点&#xf…

3D分子生成的定制扩散框架 MolDiff - 评测

MolDiff模型是一种考虑分子键生成的3D分子生成的新模型。MolDiff是清华大学智能产业研究院马剑竹课题组发表在PMLR 2023的工作,第一作者是Xingang Peng,文章题目为:《 Addressing the Atom-Bond Inconsistency Problem in 3D Molecule Genera…

Mac安装jadx

1、使用命令brew安装 : brew install jadx 输入完命令,等待安装完毕 备注(关于Homebrew ): Homebrew 是 MacOS 下的包管理工具,类似 apt-get/apt 之于 Linux,yum 之于 CentOS。如果一款软件发布时支持了 homebrew 安…

STM32 PWM 计数器模式和对齐

STM32 PWM 计数器模式和对齐 1. TIM高级定时器简介2. TIM计数模式2.1 向上计数2.2 向下计数2.3 中心对齐模式(向上/向下计数)2.4 重复计数 3. PWM输出模式3.1 举例看下PWM中心对齐模式,设置参数如下: 4. FOC中PWM相关设置说明4.1 …

Docker学习(带图详细)

一、安装docker 参考官方文档:https://docs.docker.com/engine/install/centos/ 查看系统版本 [rootlocalhost ~]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) [rootlocalhost ~]# [rootlocalhost ~]# uname -a Linux localhost.localdomai…

华为OD机试 - 密码输入检测(Java 2024 C卷 100分)

华为OD机试 2024C卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷C卷)》。 刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释、样例测试…

后端项目开发笔记

Maven打包与JDK版本不对应解决方法 我这里使用jdk8。 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configurat…

【适用全主题】WordPress原创插件:弹窗通知插件 支持内容自定义

内容目录 一、详细介绍二、效果展示1.部分代码2.效果图展示 三、学习资料下载 一、详细介绍 适用于所有WordPress主题的弹窗插件 一款WordPress原创插件&#xff1a;弹窗通知插件 支持内容自定义 二、效果展示 1.部分代码 代码如下&#xff08;示例&#xff09;&#xff1…

BGP学习二:BGP通告原则,BGP反射器,BGP路径属性细致讲解,新手小白无负担

目录 一.AS号 二.BGP路由生成 1.network 2.import-route引入 三.BGP通告原则 1.只发布最优且有效的路由 2.从EBGP获取的路由&#xff0c;会发布给所有对等体 3.水平分割原则 4.IBGP学习BGP默认不发送给EBGP&#xff0c;但如果也从IGP学习到了这条路由&#xff0c;就发…

4. 分布式链路追踪客户端工具包Starter设计

前言 本文将从零搭建分布式链路追踪客户端工具包的Starter&#xff0c;并将在后续文章中逐步丰富支持的场景。这里首先将搭建一个最基础的Starter&#xff0c;能提供的功能和1. 看完这篇文章我奶奶都懂Opentracing了一文中的示例demo类似。 相关版本依赖如下。 opentracing-…

生成ssl证书并配置到nginx

生成ssl证书并配置到nginx 安装证书生成工具 apt-get update apt install software-properties-common add-apt-repository ppa:certbot/certbot apt-get update apt-get install certbot python3-certbot-nginx生成证书 首先在新网上创建一个A链接&#xff0c;域名与服务器做…

vue3 自定义国际化、elementPlus 国际化

自定义国际化 1. 引入 vue-i18n 插件 pnpm install vue-i18nnext 2. 页面添加语言文件目录&#xff0c;添加自定义的语言文件 3.语言目录里添加 index.ts&#xff0c; 内容如下 import { createI18n } from "vue-i18n";// 自定义语言文件 import zhCN from "…

WordPress中插入视频的两种方法详解

最近我在建设WordPress网站的时候需要上传视频&#xff0c;我使用的是Hostease的主机安装的WordPress&#xff0c;随后在咨询了他们的技术支持后获得了一些解决方法。下面将介绍WordPress中插入视频的两种方法&#xff1a;本地上传和外部引用。 本地上传视频 使用WordPress的古…

【eclipse】如何在IDE里创建一个Java Web项目?

如何在eclipse中创建一个动态Web项目并成功运行&#xff1f; 一、 最终效果 懒得写那么多了…我也不知道该怎么写了&#xff0c;有点乱&#xff0c;有问题可以在评论里留言&#xff0c;我看到会解决的&#xff0c;在这个过程中也踩到了一些坑&#xff0c;但好在有CSDN帮助解决…

利用知识图谱提升RAG应用的准确性

文章目录 一、关于 GraphRAG二、Neo4j环境配置三、数据提取四、RAG混合检索1、非结构化数据检索器2、图谱检索器3、最终的检索器 五、定义RAG Chain 本文转载自&#xff1a;lucas大叔 : 利用知识图谱提升RAG应用的准确性 https://zhuanlan.zhihu.com/p/692595027 英文原文&…

网页版五子棋的自动化测试

目录 前言 一、主要技术 二、测试环境的准备部署 三、测试用例 四、执行测试 4.1、公共类设计 创建浏览器驱动对象 测试套件 释放驱动类 4.2、功能测试 登录页面 注册页面 游戏大厅页面 游戏房间页面 测试套件结果 4.3、界面测试 登录页面 注册页面 游戏大…

密码学《图解密码技术》 记录学习 第十五章

目录 十五章 15.1本章学习的内容 15.2 密码技术小结 15.2.1 密码学家的工具箱 15.2.2 密码与认证 15.2.3 密码技术的框架化 15.2.4 密码技术与压缩技术 15.3 虚拟货币——比特币 15.3.1 什么是比特币 15.3.2 P2P 网络 15.3.3地址 15.3.4 钱包 15.3.5 区块链 15.3.…

web安全之登录框渗透骚姿势,新思路

不管漏洞挖掘还是挖SRC&#xff0c;登录框都是重点关注对象&#xff0c;什么漏洞都有可能出现&#xff0c; 本篇文章做个总结&#xff0c;后面发现新思路后会继续更新 万能密码 or 弱口令 SQL注入 水平越权 垂直越权 逻辑漏洞 短信轰炸 邮箱轰炸 信息泄露 验证码DOS XSS万能密…