Fink CDC数据同步(六)数据入湖Hudi

数据入湖Hudi

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

配置

将hudi相关jar包放在flink安装目录的lib下

hudi-flink1.16-bundle-0.13.0.jar

hudi-hadoop-mr-0.13.0.jar

hudi-hive-sync-0.13.0.jar

确保/etc/profile配置了hadoop和hive的环境变量

#HADOOP_HOME
export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop
export HADOOP_COMMON_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_HDFS_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_YARN_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_MAPRED_HOME=/usr/hdp/3.1.5.0-152/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`#HIVE HOME
export HIVE_HOME=/usr/hdp/3.1.5.0-152/hive
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/sbin

测试插入hudi表

set sql-client.execution.result-mode = tableau;
set execution.checkpointing.interval=30sec;
SET table.sql-dialect=default;CREATE TABLE hudi_test(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi',  -- 连接器指定hudi'path' = 'hdfs://bigdata101:8020/hudi/hudi_test',  -- 数据存储地址'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);INSERT INTO hudi_test VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

MySql数据写入Hudi表

建hudi表

create table hudi_user(id string not null,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id'
);

将MySql映射表的数据插入hudi表,此时会生成一个flink任务

insert into ods.hudi_user select * from mysql_user;

流式查询

上面的查询方式是非流式查询,流式查询会生成一个flink作业,并且实时显示数据源变更的数据。

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称

是否必填

默认值

备注

read.streaming.enabled

FALSE

FALSE

设置为true,开启stream query

read.start-commit

FALSE

the latest commit

Instant time的格式为:’yyyyMMddHHmmss’

read.streaming_skip_compaction

FALSE

FALSE

是否不消费compaction commit,消费compaction commit会出现重复数据

clean.retain_commits

FALSE

10

当开启change log mode,保留的最大commit数量。如果checkpoint interval为5分钟,则保留50分钟的change log

建表:

create table hudi_user_read_streaming(id int not null ,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id','read.streaming.enabled' = 'true',  -- 默认值false,设置为true,开启stream query'read.start-commit' = '20231008134557', -- start-commit之前提交的数据不显示,'read.streaming.check-interval' = '4'  -- 检查间隔,默认60s);insert into hudi_user_read_streaming select * from mysql_user;select * from hudi_user_read_streaming;

此时,执行select 语句就会生成一个flink 作业

源端变更数据会实时展示出来


 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/254043.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信创ARM架构QT应用开发环境搭建

Linux ARM架构QT应用开发环境搭建 前言交叉工具链Ubuntu上安装 32 位 ARM 交叉工具链Ubuntu上安装 64 位 ARM 交叉工具链 交叉编译 QT 库下载 QT 源码交叉编译 QT 源码 Qt Creator交叉编译配置配置 Qt Creator Kits创建一个测试项目 小结 前言 有没有碰到过这种情况&#xff1…

一文讲透Python函数中的形式参数和实际参数

函数参数包括形式参数和实际参数,简称形参和实参。其中形式参数即是在定义函数时函数后面括号中的参数列表(parameterlist),比如上一个帖子的示例中的width, length;实际参数则是调用函数时函数后面括号中的参数值&…

Docker配置Portainer容器管理界面

目录 一、Portainer 简介 优点: 缺点: 二、环境配置 1. 拉取镜像 2. 创建启动容器 三、操作测试 1. 进入容器 2. 拉取镜像并部署 3. 访问测试 一、Portainer 简介 Portainer 是一个开源的轻量级容器管理界面,用于管理 Docker 容器…

开源免费的物联网网关 IoT Gateway

1. 概述 物联网网关,也被称为IOT网关,是一种至关重要的网络设备。在物联网系统中,它承担着连接和控制各种设备的重要任务,将这些设备有效地连接到云端、本地服务器或其他设备上。它既能够在广域范围内实现互联,也能在…

Docker部署前端项目

某次阿里云的自动流水线失败了,代码本地跑起来莫得问题,错误日志提示让我跑一下npm run build ,但是俺忽然发现,我跑了,文件打包好了,但是往哪里运行呢?这涉及到要构建一个环境供打包文件部署吧…

RedissonClient妙用-分布式布隆过滤器

目录 布隆过滤器介绍 布隆过滤器的落地应用场景 高并发处理 多个过滤器平滑切换 分析总结 布隆过滤器介绍 布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是…

unity-ios-解决内购商品在Appstore上面已配置,但在手机测试时却无法显示的问题

自己这几天用 unity 2021 xcode 14.2 开发ios内购,appstore上面内购商品都已经配置好了,但是在手机里就是不显示,最后才发现必需得满足以下条件才行: 1. Appstore后台 -> 内购商品 -> 商品状态必需为『准备提交』以上状态…

Docker部署Grafana+Promethus监控Mysql和服务器

一、Grafana部署所需资源 Grafana 需要最少的系统资源: 建议的最小内存:512 MB建议的最低 CPU:1 官方文档:https://grafana.com/docs/grafana/latest/getting-started/build-first-dashboard/ 可以看到,我的这台服务…

放假--寒假自学版 day1(补2.5)

fread 函数: 今日练习 C语言面试题5道~ 1. static 有什么用途?(请至少说明两种) 1) 限制变量的作用域 2) 设置变量的存储域 2. 引用与指针有什么区别? 1) 引用必须被初始化,指针不必。 2) 引用初始…

Android中设置Toast.setGravity()了后没有效果

当设置 toast.setGravity()后,弹窗依旧从原来的位置弹出,不按设置方向弹出 类似以下代码: var toast Toast.makeText(this, R.string.ture_toast, Toast.LENGTH_SHORT)toast.setGravity(Gravity.TOP, 0, 0)//设置toast的弹出方向为屏幕顶部…

【Java八股面试系列】JVM-常见参数设置

目录 堆内存相关 显式指定堆内存–Xms和-Xmx 显式新生代内存(Young Generation) 显式指定永久代/元空间的大小 垃圾收集相关 垃圾回收器 GC 日志记录 处理 OOM JDK监控和故障处理工具总结 堆内存相关 Java 虚拟机所管理的内存中最大的一块,Java 堆是所有线…

汇编笔记 01

小蒟蒻的汇编自学笔记,如有错误,望不吝赐教 文章目录 笔记编辑器,启动!debug功能CS & IPmovaddsub汇编语言寄存器的英文全称中英对照表muldivandor 笔记 编辑器,启动! 进入 debug 模式 debug功能 …

Arm发布新的人工智能Cortex-M处理器

Arm发布了一款新的Cortex-M处理器,旨在为资源受限的物联网(IoT)设备提供先进的人工智能功能。这款新的Cortex-M52声称是最小的、面积和成本效率最高的处理器,采用了Arm Helium技术,使开发者能够在单一工具链上使用简化…

动漫风博客介绍页面源码

动漫风博客介绍页面源码,HTML源码,图片背景有淡入切换特效 蓝奏云:https://wfr.lanzout.com/iIDZu1nrmjve

Python调用matlab程序

matlab官网:https://ww2.mathworks.cn/?s_tidgn_logo matlab外部语言和库接口,包括 Python、Java、C、C、.NET 和 Web 服务。 matlab和python的版本 安装依赖配置 安装matlab的engine 找到matlab的安装目录:“xxx\ extern\engines\python…

【python量化交易】qteasy使用教程01 - 安装方法及初始化配置

qteasy教程1 - 安装方法及初始化配置 qteasy教程1 - 安装方法及初始配置qteasy安装前的准备工作1, 创建安装环境2,安装MySQL数据库 (可选)安装pymysql 3,创建tushare账号并获取API token (可选)4,安装TA-lib (可选)WindowsMac OSL…

Windows自动化实现:系统通知和任务栏图标自定义

文章目录 Windows自动化的三个小工具系统通知任务栏图标使用pystray实现使用infi.systray实现 Windows自动化的三个小工具 系统通知 import win10toastwin10toast.ToastNotifier().show_toast("eee", "休息一下", icon_path"icon.ico", durati…

uniapp中使用EelementPlus

uniapp的强大是非常震撼的,一套代码可以编写到十几个平台。这个可以在官网上进行查询uni-app官网。主要还是开发小型的软件系统,使用起来非常的方便、快捷、高效。 uniapp中有很多自带的UI,在创建项目的时候,就可以自由选择。而E…

深度神经网络中的BNN和DNN:基于存内计算的原理、实现与能量效率

前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家:https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言引言内存计算体系结构深度神经网络(DNN)随机梯度的优…

自然语言处理(NLP)——使用Rasa创建聊天机器人

1 基本概念 1.1 自然语言处理的分类 IR-BOT:检索型问答系统 Task-bot:任务型对话系统 Chitchat-bot:闲聊系统 1.2 任务型对话Task-Bot:task-oriented bot 这张图展示了一个语音对话系统(或聊天机器人)的基本组成部分和它们之间的…