通过写文件方式写入 Hive 数据

通过写文件方式写入 Hive 数据

Hive最简单的写入数据方式就是通过Hive Jdbc写入Hive数据,但这并不是写入Hive最高效的方法。

Hive通过读取相关Hdfs的文件来获取数据信息,而通过直接写入Hdfs文件数据达到写入Hive数据的效果,这是目前最高效的方法。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

通用写法

最通用的写法就是通过Serializer配合StandardStructObjectInspector序列化数据,再通过RecordWriter写入数据,它适用于几乎目前所有的文件类型。

StandardStructObjectInspector用于描述表结构和字段类型。

Serializer有多种实现,分别对应每种Hadoop文件格式的序列化器,例如:ParquetHiveSerDe、AvroSerDe、OrcSerde等。

RecordWriter创建需要HiveOutputFormat,HiveOutputFormat也是有多种Hadoop文件格式的实现的,例如:OrcOutputFormat、HiveIgnoreKeyTextOutputFormat、MapredParquetOutputFormat,用于写入相应格式的数据。

通过StorageFormatDescriptor可以快速的获取相应文件格式的Serializer、HiveOutputFormat,只需要StorageFormatFactory#get(formatType)即可创建一个对应文件格式类型的StorageFormatDescriptor,StorageFormatDescriptor也是有各种数据格式类型实现的,例如TextFileStorageFormatDescriptor、ParquetFileStorageFormatDescriptor等等。

StorageFormatDescriptor的getSerde()、getOutputFormat()、getInputFormat()等方法,可以获取Serializer和HiveOutputFormat。

当然你也可以通过Table API获取StorageDescriptor从而获取相应的OutputFormat和Serializer。

@Test
public void test2()throws ClassNotFoundException, IllegalAccessException, InstantiationException,
HiveException, IOException, SerDeException {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");StorageDescriptor sd = Table.getEmptyTable(null, null).getSd();SerDeInfo serDeInfo = new SerDeInfo();HashMap<String, String> parameters = new HashMap<>();parameters.put(serdeConstants.SERIALIZATION_FORMAT, "1");serDeInfo.setParameters(parameters);serDeInfo.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());sd.setInputFormat(SequenceFileInputFormat.class.getName());sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());StorageFormatFactory storageFormatFactory = new StorageFormatFactory();sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());// 通过格式类型获取StorageFormatDescriptor,这里一般有TEXT、AVRO、PARQUET、ORC这几种,可通过IOConstants查看StorageFormatDescriptor storageFormatDescriptor =storageFormatFactory.get(IOConstants.TEXTFILE);sd.setInputFormat(storageFormatDescriptor.getInputFormat());sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());String serdeLib = storageFormatDescriptor.getSerde();if (serdeLib != null) {sd.getSerdeInfo().setSerializationLib(serdeLib);}SerDeInfo serdeInfo = sd.getSerdeInfo();Properties tableProperties = new Properties();//        tableProperties.put(serdeConstants.FIELD_DELIM, (byte) 1);tableProperties.setProperty(serdeConstants.FIELD_DELIM, ",");//        tableProperties.setProperty(serdeConstants.COLLECTION_DELIM, "");//        tableProperties.setProperty(serdeConstants.MAPKEY_DELIM, "");Serializer recordSerDe =(Serializer) (Class.forName(serdeInfo.getSerializationLib()).newInstance());SerDeUtils.initializeSerDe((Deserializer) recordSerDe, configuration, tableProperties, null);Class<? extends OutputFormat> outputFormatClz =HiveFileFormatUtils.getOutputFormatSubstitute(Class.forName(storageFormatDescriptor.getOutputFormat()));HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();// 这里对应hive相应的表、分区路径、还有一个随机的文件名Path path =new Path( ".../hive/warehouse/table_name/pt_day=12/pt_hour=12/test");JobConf jobConf = new JobConf(configuration);jobConf.setMapOutputCompressorClass(GzipCodec.class);jobConf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC,GzipCodec.class.getName());FileSinkOperator.RecordWriter recordWriter =HiveFileFormatUtils.getRecordWriter(jobConf,outputFormat,recordSerDe.getSerializedClass(),false,tableProperties,path,Reporter.NULL);ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("address")),new ArrayList<>(Arrays.asList(intListInspector)));Object[] instance = new Object[1];ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[0] = address;Writable serialize = recordSerDe.serialize(instance, standardStructObjectInspector);recordWriter.write(serialize);recordWriter.close(false);
}

其他写法

Text格式

通过TextOutputFormat写入Text格式的Hive表数据文件,以下是一张拥有"id", "address"字段的表,而map是一个Map类型的字段

@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);TextOutputFormat<Object, Object> objectObjectTextOutputFormat =new TextOutputFormat<>();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter<Object, Object> recordWriter =objectObjectTextOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

ORC格式

ORC格式的写入和Text相似,不多说,只示范Map类型写入

写入MAP<STRING, MAP<STRING, STRING>>类型数据
@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);OrcOutputFormat orcOutputFormat = new OrcOutputFormat();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter recordWriter =orcOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

Parquet格式

Parquest通过MessageType表示表结构,用group存储数据类型和数据,最后通过ParquetWriter写入数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

写入MAP<STRING, MAP<STRING, STRING>>类型数据

数据如下:

id: 100
addresskey_valuekey: key0value: value0key_valuekey: key1value: value1key_valuekey: key2value: value4

格式如下:

message Pair {optional int32 id;optional group address (MAP) {repeated group key_value {optional binary key;optional binary value;}}
}

代码如下:

@Test
public void testWriteIdWithMap1() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();String name = "address";// 注意这里的named后面必须是key、valuePrimitiveType keyType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("key");PrimitiveType valueType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("value");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalMap().key(keyType).value(valueType).named(name);MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);Group mapGroup = simpleGroup.addGroup(1);for (int i = 0; i < 3; i++) {Group entry0 = mapGroup.addGroup(0);entry0.add(0, "key" + i);entry0.add(1, "value" + i * i);}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
写入ARRAY<ARRAY<INT>>类型数据
@Test
public void testWriteIdWithArrayArray2() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();PrimitiveType named =Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("address");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalList().optionalListElement().element(named).named("address").named("address");MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);// add groupGroup address = simpleGroup.addGroup(1);for (int i = 0; i < 5; i++) {// group add list entryGroup listGroup = address.addGroup(0);// add groupGroup sublist = listGroup.addGroup(0);for (int j = 5; j < 10; j++) {// group add list entryGroup subListGroup = sublist.addGroup(0);subListGroup.add(0, i * i);}}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/411202.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nerfstudio半离线配置踩坑记录

安装torch2.1.2 with cuda11.8 由于清华镜像源&#xff08;包括阿里源和豆瓣源&#xff09;都没有torch2.1.2cu118的包&#xff0c;因此只能从pytorch官网下载。 服务器上直接通过下面pip的方式安装会由于网络原因中断&#xff0c;无奈只能在本地先把torch的包下载下来再上传到…

SAP与生产制造MPM系统集成案例

一、需求介绍 某公司为保证企业内部生产管理系统的多项基础数据的同步更新&#xff0c;确保各模块间信息的一致性和准确性&#xff0c;对后续的生产计划和物料管理打下基础&#xff0c;该公司将MPM系统和SAP系统经过SAP PO中间件集成平台进行了集成。MPM全称为Manufacturing…

blender--二维平面图标变为三维网格

有时候我们希望把一些二维图片能变成三维网格&#xff0c;本案例我们就针对这一场景进行实现。 首先我们可以先去找一张需要的图片(注意&#xff0c;本例需要图片是svg格式)&#xff0c;我们可以在阿里巴巴矢量图标库等平台进行搜索&#xff0c;如图所示&#xff0c;找到需要的…

diffusion model(扩散模型)DDPM解析

DDPM 前向阶段 重复 2-5 步骤 x 0 ∼ q ( x 0 ) \mathbf{x}_0\sim q(\mathbf{x}_0) x0​∼q(x0​)从数据集中采样一张图片 t ∼ U n i f o r m ( { 1 , … , T } ) t\sim\mathrm{Uniform}(\{1,\ldots,T\}) t∼Uniform({1,…,T})&#xff0c;从 1~T 中随机挑选一个时间步 t ϵ …

三种tcp并发服务器实现程序

都需先进行tcp连接 1、多进程并发 2、多线程并发 3、IO多路复用并发 &#xff08;1&#xff09;select &#xff08;2&#xff09;epoll

SAP ERP与长城汽车EDI业务集成案例(SAP CPI平台)

一、项目背景 某智能座舱公司是国内领先的智能座舱领域科技公司&#xff0c;致力于成为智能网联行业变革的领导者和推动者&#xff0c;聚焦整车域控制器产品、智能网联软件产品和运营服务产品&#xff1b; 已建成首条先进的数智化域控制器生产线&#xff0c;为客户提供最优…

大刀阔斧改革之后,阅文距离“东方迪士尼”更近了吗?

当前&#xff0c;网文IP的确是“富矿”。中国社会科学院文学研究所发布的《2023中国网络文学发展研究报告》显示&#xff0c;截至2023年底&#xff0c;网络文学IP市场规模2605亿元&#xff0c;同比增长近百亿元。 近日&#xff0c;网文产业中的头部企业阅文集团也披露数据称&a…

Android U WMShell动画调用堆栈

本文主要简单介绍WMShell动画调用堆栈 代码环境&#xff1a;repo init -u https://mirrors.tuna.tsinghua.edu.cn/git/AOSP/platform/manifest -b android-14.0.0_r7 Systemserver侧 TAG: at com.android.server.wm.Transition.onTransactionReady(Transition.java:1575) TA…

爆改YOLOv8|利用分层特征融合策略MSBlock改进yolov8,暴力涨点

1&#xff0c;本文介绍 MSBlock 是一种分层特征融合策略&#xff0c;用于改进卷积神经网络中的特征融合效果。它通过分层次地融合不同尺度的特征图来提高网络的表达能力和性能。MSBlock 采用多尺度特征融合的方法&#xff0c;确保网络能够有效地捕捉不同层次和尺度的信息&…

Neo4j导入csv数据,并创建节点

Neo4j 是一种图数据库&#xff0c;特别适合管理和分析复杂的关系数据。 数据来源&#xff1a;http://openkg.cn/ 导入到 Neo4j 的合适场景&#xff1a; 需要在物种分类中查找层级关系&#xff08;如物种的科、属等&#xff09;。 需要进行关系查询和图结构的分析。 想在分类树…

【Axure高保真原型】输入框控制多选下拉列表选项

今天和大家分享输入框控制多选下拉列表选项选项的原型模板&#xff0c;效果包括&#xff1a; 点击下拉框可以弹出选项列表&#xff0c;点击可以切换选中或取消选中 根据选中项在外框出自动生成标签&#xff0c;可以自适应调整高度 下拉列表的选项由左侧多行输入框里的内容控制…

数据结构—— 再探二叉树

1. TOP-K问题 TOP-K问题&#xff1a;求数据结合中前K个最大或者最小的数据 比如&#xff1a;专业前10名、世界500强、富豪榜、游戏中前100的活跃玩家等 思路&#xff1a; 1. 用数据集合中前K个数据来建堆&#xff1a; …

WEB服务器-Nginx源码安装及相关配置

一、web服务的常用种类 Apache HTTP Server 简介&#xff1a;Apache是一款广泛使用的Web服务器软件&#xff0c;支持多种操作系统&#xff0c;包括Linux。​​​​​​​特点&#xff1a; 支持多个虚拟主机。 模块化架构&#xff0c;可以根据需要加载不同的模块。 强大的安全…

多态(虚构的整体,具体的个体)(多态的基本概念/多态的原理剖析/纯虚函数和抽象类/虚析构和纯虚析构)

多态的基本概念 #define _CRT_SECURE_NO_WARNINGS #include<iostream> using namespace std; // 多态的基本概念 // 多态分为静态多态和动态多态 // 静态多态&#xff1a; 函数重载还运算符重载属于静态多态&#xff0c;服用函数名 // 动态多态&#xff1a; 派生派和虚函…

VUE使用websocket

在之前搭建好的项目的基础上新版security demo&#xff08;二&#xff09;前端-CSDN博客 目录 一、代码改造 1、后端改造 2、VUE使用websocket 3、测试 二、按用户推送 1、完整代码如下 1.1、前端 1.2、后端&#xff1a; 2、测试 一、代码改造 1、后端改造 &#x…

逆波兰表达式

简介 介绍逆波兰表达式之前&#xff0c;先介绍一下运算种类。 中缀运算与后缀运算 中缀运算是一种常用的算术和逻辑公式表示方法&#xff0c;其中操作符位于两个运算数之间。例如&#xff0c;在表达式 “3 2” 中&#xff0c;加号&#xff08;&#xff09;是操作符&#xf…

算法设计:实验一分治与递归

【实验目的】 深入理解分治法的算法思想&#xff0c;应用分治法解决实际的算法问题。 【实验内容与要求】 设有n2k个运动员要进行网球循环赛。现要设计一个满足以下要求的比赛日程表&#xff1a; 1.每个选手必须与其他n-1个选手各赛一次&#xff1b;2.每个选手一天只能赛一…

Mysql 集群技术

目录 一 Mysql 在服务器中的部署方法 1.1 在Linux下部署mysql 1.1.1 安装依赖性并解压源码包&#xff0c;源码编译安装mysql&#xff1a; 1.1.2 部署mysql 二 mysql的组从复制 2.1 配置mastesr和salve 测试结果 2.2 当有数据时添加slave2 2.3 延迟复制 2.4 慢查询日志…

【C++ | 设计模式】简单工厂模式的详解与实现

1.简单工厂模式概述 简单工厂模式&#xff08;Simple Factory Pattern&#xff09;是一种创建型设计模式&#xff0c;它定义了一个工厂类&#xff0c;由这个类根据提供的参数决定创建哪种具体的产品对象。简单工厂模式将对象的创建逻辑集中到一个工厂类中&#xff0c;从而将对…

Python-进阶-Excel基本操作

文章目录 Excel 基本操作1. 概述2. 写入2.1 使用 xlwt2.2 使用 XlsxWriter 3. 读取4. 修改 Excel 基本操作 1. 概述 在数据处理方面&#xff0c;Python 一直扮演着重要的角色&#xff0c;对于 Excel 操作&#xff0c;它有着完整且成熟的第三方库&#xff0c;使用也较为简单。…