CDP集成Hudi实战-spark shell

[〇]关于本文

本文主要解释spark shell操作Hudi表的案例

软件版本
Hudi1.0.0
Hadoop Version3.1.1.7.3.1.0-197
Hive Version3.1.3000.7.3.1.0-197
Spark Version3.4.1.7.3.1.0-197
CDP7.3.1

[一]使用Spark-shell

1-配置hudi Jar包

[root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar   cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 418.2MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 304.8MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 365.0MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 406.1MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 472.7MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 447.1MB/s   00:00
[root@cdp73-1 ~]#

2-进入Spark-shell

spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

3-初始化项目

// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"

4-创建表

首次提交将自动初始化表,如果指定的基本路径中尚不存在该表。

5-导入数据

// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.embed.timeline.server", "false").option("hoodie.table.name", tableName).mode(Overwrite).save(basePath)

【映射到Hudi写操作】​​​​​​​Hudi提供了多种写操作——包括批量和增量写操作——以将数据写入Hudi表,这些操作具有不同的语义和性能。当未配置记录键(请参见下面的键)时,将选择bulk_insert作为写操作,这与Spark的Parquet数据源的非默认行为相匹配。

6-查询数据

// spark-shell
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()

7-更新数据

// Lets read data from target Hudi table, modify fare column for rider-D and update it. 
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)updatesDf.write.format("hudi").option("hoodie.datasource.write.operation", "upsert").option("hoodie.embed.timeline.server", "false").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.table.name", tableName).mode(Append).save(basePath)

8-合并数据

// spark-shell
val adjustedFareDF = spark.read.format("hudi").load(basePath).limit(2).withColumn("fare", col("fare") * 10)
adjustedFareDF.write.format("hudi").
option("hoodie.embed.timeline.server", "false").
mode(Append).
save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
spark.read.format("hudi").load(basePath).show()

9-删除数据

/ spark-shell
// Lets  delete rider: rider-D
val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")deletesDF.write.format("hudi").option("hoodie.datasource.write.operation", "delete").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.table.name", tableName).option("hoodie.embed.timeline.server", "false").mode(Append).save(basePath)

​​​​​​​

10-数据索引

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table_index"
val basePath = "hdfs:///tmp/hudi_indexed_table"val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").option("hoodie.datasource.write.partitionpath.field", "city").option("hoodie.table.name", tableName).option("hoodie.write.record.merge.mode", "COMMIT_TIME_ORDERING").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option("hoodie.embed.timeline.server", "false").mode(Overwrite).save(basePath)// Create record index and secondary index for the table
spark.sql(s"CREATE TABLE hudi_indexed_table USING hudi LOCATION '$basePath'")
// Create bloom filter expression index on driver column
spark.sql(s"CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity')");
// It would show bloom filter expression index 
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
// Query on driver column would prune the data using the idx_bloom_driver index
spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S'");// Create column stat expression index on ts column
spark.sql(s"CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd')");
// Shows both expression indexes 
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
// Query on ts column would prune the data using the idx_column_ts index
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-09-24'");// To create secondary index, first create the record index
spark.sql(s"SET hoodie.metadata.record.index.enable=true");
spark.sql(s"CREATE INDEX record_index ON hudi_indexed_table (uuid)");
// Create secondary index on rider column
spark.sql(s"CREATE INDEX idx_rider ON hudi_indexed_table (rider)");// Expression index and secondary index should show up
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
// Query on rider column would leverage the secondary index idx_rider
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E'");// Update a record and query the table based on indexed columns
spark.sql(s"UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts = '1697516137' WHERE rider = 'rider-A'");
// Data skipping would be performed using column stat expression index
spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-10-17'");
// Data skipping would be performed using bloom filter expression index
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N'");
// Data skipping would be performed using secondary index
spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B'");// Drop all the indexes
spark.sql(s"DROP INDEX secondary_index_idx_rider on hudi_indexed_table");
spark.sql(s"DROP INDEX record_index on hudi_indexed_table");
spark.sql(s"DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table");
spark.sql(s"DROP INDEX expr_index_idx_column_ts on hudi_indexed_table");
// No indexes should show up for the table
spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");spark.sql(s"SET hoodie.metadata.record.index.enable=false");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/503235.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

web实操9——session

概念 数据保存在服务器HttpSession对象里。 session也是域对象,有setAttribute和getAttribute方法 快速入门 代码 获取session和塞入数据: 获取session获取数据: 请求存储: 请求获取: 数据正常打印&#xff1a…

常用LabVIEW算法及应用

在LabVIEW项目中,算法的应用是提高系统性能、实现特定功能、完成复杂任务的核心。LabVIEW作为一种图形化编程语言,允许用户通过直观的图形编程来实现各种复杂的算法。这些算法广泛应用于控制系统、数据采集、信号处理、图像处理、机器学习等领域。了解常…

AI Agent 开发共学招募 | 来 Sui 上探索自治智能的边界

Agent 一词源自拉丁语 “Agere”,意为“行动(to do)”。在大语言模型(LLM)的语境下,Agent 指的是能够感知环境、进行决策并执行任务的智能实体。 与传统的 RPA 相比,后者只能在预设的条件下执行…

安卓NDK视觉开发——手机拍照文档边缘检测实现方法与库封装

一、项目创建 创建NDK项目有两种方式,一种从新创建整个项目,一个在创建好的项目添加NDK接口。 1.创建NDK项目 创建 一个Native C项目: 选择包名、API版本与算法交互的语言: 选择C版本: 创建完之后,可…

计算机网络 —— 网络编程实操(1)(UDP)

计算机网络 —— 网络编程实操(UDP) 套接字端口套接字的定义为什么需要套接字? 套接字的分类1. 按照通信协议分类2. 按照地址族(Address Family)分类3. 按照通信模式分类 socket APIsockaddr结构 使用接口套接字初始化…

【HarmonyOS-ArkTS语言】面向对象【合集】

目录 😋环境配置:华为HarmonyOS开发者 🎯学习小目标: 📖实验步骤及方法: 1.在entry/src/main/ets/utils下创建MyClass.ets和MyConfig.ets文件​编辑 2.在MyConfig.ets中创建Interface Config 和enum l…

Excel | 空格分隔的行怎么导入excel?

准备工作:windows,一个记事本程序和微软的Excel软件。 打开记事本,选中所有内容,按CtrlA全选,然后复制(CtrlC)。 在Excel中,定位到你想粘贴的单元格,按CtrlV进行粘贴。粘贴后,你会在…

HTML 显示器纯色亮点检测工具

HTML 显示器纯色亮点检测工具 相关资源文件已经打包成html等文件,可双击直接运行程序,且文章末尾已附上相关源码,以供大家学习交流,博主主页还有更多Html相关程序案例,秉着开源精神的想法,望大家喜欢&#…

idea项目导入gitee 码云

1、安装gitee插件 IDEA 码云插件已由 gitosc 更名为 gitee。 1 在码云平台帮助文档http://git.mydoc.io/?t153739上介绍的很清楚,推荐前两种方法, 搜索码云插件的时候记得名字是gitee,gitosc已经搜不到了。 2、使用码云托管项目 如果之…

python学习笔记—13—while和for循环

1. while循环 (1) 代码 1. 示例 i 0 while i < 100:print(f"第{i}次循环")i 1 2. 计算从1加到100的和 i 1 sum 0 while i < 100:sum ii 1 print(f"1加到100的和是{sum}") 3. 使用while循环无限次猜测随机产生的数字&#xff0c;直到猜对&am…

计算机网络与服务器

目录 架构体系及相关知识 三层架构&#xff1a; 四层架构&#xff1a; 常见的应用的模式&#xff1a; OSI模型 分层 数据链路层 TCP/IP模型 TCP和UDP都是传输层的协议 TCP三次握手、四次次分手 URL&HTTP协议详解 网址URL 结构化 报文行 报文头 空行 报文体…

Mybatis(day09)

Mybatis基础操作 功能列表&#xff1a; 查询 根据主键ID查询 条件查询新增更新删除 根据主键ID删除 根据主键ID批量删除 准备 实施前的准备工作&#xff1a; 准备数据库表创建一个新的 springboot 工程&#xff0c;选择引入对应的起步依赖&#xff08;mybatis、mysql 驱动、…

CSS Grid 布局示例(基本布局+代码属性描述)

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>CSS Grid 布局示例</title><style>.gri…

数据挖掘——数据预处理

数据挖掘——数据预处理 数据预处理数据预处理 ——主要任务数据清洗如何处理丢失的数据如何处理噪声数据如何处理不一致数据 数据集成相关分析相关系数(也成为皮尔逊相关系数)协方差 数据规约降维法&#xff1a;PCA主成分分析降数据——抽样法数据压缩 数据预处理 数据预处理…

我在广州学 Mysql 系列——有关数据表的插入、更新与删除相关练习

ℹ️大家好&#xff0c;我是练小杰&#xff0c;今天是星期四了&#xff0c;明天就是星期五了&#xff01;&#xff01;这周过得真快啊&#xff01;&#xff01; 本文将针对MYSQL数据表内容的插入&#xff0c;更新以及删除&#xff0c;相关命令的各种练习~~ 复习&#xff1a;&am…

OpenGL —— 流媒体播放器 - ffmpeg解码rtsp流,opengl渲染yuv视频(附源码,glfw+glad)

效果 说明 FFMpeg和OpenGL作为两大技术巨头,分别在视频解码和图形渲染领域发挥着举足轻重的作用。本文将综合两者实战视频播放器,大概技术流程为:ffmpeg拉取rtsp协议视频流,并经过解码、尺寸格式转换为yuv420p后,使用opengl逐帧循环渲染该yuv实时视频。 核心源码 vertexSh…

彻底学会Gradle插件版本和Gradle版本及对应关系

看完这篇&#xff0c;保你彻底学会Gradle插件版本和Gradle版本及对应关系&#xff0c;超详细超全的对应关系表 需要知道Gradle插件版本和Gradle版本的对应关系&#xff0c;其实就是需要知道Gradle插件版本对应所需的gradle最低版本&#xff0c;详细对应关系如下表格&#xff0…

【学习笔记】数据结构(十)

内部排序 文章目录 内部排序10.1 概述10.2 插入排序10.2.1 直接插入排序10.2.2 其他插入排序10.2.2.1 折半插入排序(Binary Insertion Sort)10.2.2.2 2-路插入排序&#xff08;Two-Way Insertion Sort&#xff09;10.2.2.3 表插入排序&#xff08;Table Insertion Sort&#xf…

论文解读 | NeurIPS'24 IRCAN:通过识别和重新加权上下文感知神经元来减轻大语言模型生成中的知识冲突...

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; 点击 阅读原文 观看作者讲解回放&#xff01; 作者简介 史丹&#xff0c;天津大学博士生 内容简介 大语言模型&#xff08;LLM&#xff09;经过海量数据训练后编码了丰富的世界知识。最近的研究表明&#xff0c…

Python编程实例-特征向量与特征值编程实现

特征向量与特征值编程实现 文章目录 特征向量与特征值编程实现1、什么是特征向量2、特征向量背后的直觉3、为什么特征向量很重要?4、如何计算特征向量?4、特征向量Python实现5、可视化特征向量6、总结线性代数是许多高级数学概念的基石,广泛应用于数据科学、机器学习、计算机…