通过写文件方式写入 Hive 数据
Hive最简单的写入数据方式就是通过Hive Jdbc写入Hive数据,但这并不是写入Hive最高效的方法。
Hive通过读取相关Hdfs的文件来获取数据信息,而通过直接写入Hdfs文件数据达到写入Hive数据的效果,这是目前最高效的方法。
通用写法
最通用的写法就是通过Serializer配合StandardStructObjectInspector序列化数据,再通过RecordWriter写入数据,它适用于几乎目前所有的文件类型。
StandardStructObjectInspector用于描述表结构和字段类型。
Serializer有多种实现,分别对应每种Hadoop文件格式的序列化器,例如:ParquetHiveSerDe、AvroSerDe、OrcSerde等。
RecordWriter创建需要HiveOutputFormat,HiveOutputFormat也是有多种Hadoop文件格式的实现的,例如:OrcOutputFormat、HiveIgnoreKeyTextOutputFormat、MapredParquetOutputFormat,用于写入相应格式的数据。
通过StorageFormatDescriptor可以快速的获取相应文件格式的Serializer、HiveOutputFormat,只需要StorageFormatFactory#get(formatType)即可创建一个对应文件格式类型的StorageFormatDescriptor,StorageFormatDescriptor也是有各种数据格式类型实现的,例如TextFileStorageFormatDescriptor、ParquetFileStorageFormatDescriptor等等。
StorageFormatDescriptor的getSerde()、getOutputFormat()、getInputFormat()等方法,可以获取Serializer和HiveOutputFormat。
当然你也可以通过Table API获取StorageDescriptor从而获取相应的OutputFormat和Serializer。
@Test
public void test2()throws ClassNotFoundException, IllegalAccessException, InstantiationException,
HiveException, IOException, SerDeException {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");StorageDescriptor sd = Table.getEmptyTable(null, null).getSd();SerDeInfo serDeInfo = new SerDeInfo();HashMap<String, String> parameters = new HashMap<>();parameters.put(serdeConstants.SERIALIZATION_FORMAT, "1");serDeInfo.setParameters(parameters);serDeInfo.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());sd.setInputFormat(SequenceFileInputFormat.class.getName());sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());StorageFormatFactory storageFormatFactory = new StorageFormatFactory();sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());// 通过格式类型获取StorageFormatDescriptor,这里一般有TEXT、AVRO、PARQUET、ORC这几种,可通过IOConstants查看StorageFormatDescriptor storageFormatDescriptor =storageFormatFactory.get(IOConstants.TEXTFILE);sd.setInputFormat(storageFormatDescriptor.getInputFormat());sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());String serdeLib = storageFormatDescriptor.getSerde();if (serdeLib != null) {sd.getSerdeInfo().setSerializationLib(serdeLib);}SerDeInfo serdeInfo = sd.getSerdeInfo();Properties tableProperties = new Properties();// tableProperties.put(serdeConstants.FIELD_DELIM, (byte) 1);tableProperties.setProperty(serdeConstants.FIELD_DELIM, ",");// tableProperties.setProperty(serdeConstants.COLLECTION_DELIM, "");// tableProperties.setProperty(serdeConstants.MAPKEY_DELIM, "");Serializer recordSerDe =(Serializer) (Class.forName(serdeInfo.getSerializationLib()).newInstance());SerDeUtils.initializeSerDe((Deserializer) recordSerDe, configuration, tableProperties, null);Class<? extends OutputFormat> outputFormatClz =HiveFileFormatUtils.getOutputFormatSubstitute(Class.forName(storageFormatDescriptor.getOutputFormat()));HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();// 这里对应hive相应的表、分区路径、还有一个随机的文件名Path path =new Path( ".../hive/warehouse/table_name/pt_day=12/pt_hour=12/test");JobConf jobConf = new JobConf(configuration);jobConf.setMapOutputCompressorClass(GzipCodec.class);jobConf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC,GzipCodec.class.getName());FileSinkOperator.RecordWriter recordWriter =HiveFileFormatUtils.getRecordWriter(jobConf,outputFormat,recordSerDe.getSerializedClass(),false,tableProperties,path,Reporter.NULL);ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("address")),new ArrayList<>(Arrays.asList(intListInspector)));Object[] instance = new Object[1];ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[0] = address;Writable serialize = recordSerDe.serialize(instance, standardStructObjectInspector);recordWriter.write(serialize);recordWriter.close(false);
}
其他写法
Text格式
通过TextOutputFormat写入Text格式的Hive表数据文件,以下是一张拥有"id", "address"字段的表,而map是一个Map类型的字段
@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);TextOutputFormat<Object, Object> objectObjectTextOutputFormat =new TextOutputFormat<>();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter<Object, Object> recordWriter =objectObjectTextOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
ORC格式
ORC格式的写入和Text相似,不多说,只示范Map类型写入
写入MAP<STRING, MAP<STRING, STRING>>类型数据
@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);OrcOutputFormat orcOutputFormat = new OrcOutputFormat();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter recordWriter =orcOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
Parquet格式
Parquest通过MessageType表示表结构,用group存储数据类型和数据,最后通过ParquetWriter写入数据
写入MAP<STRING, MAP<STRING, STRING>>类型数据
数据如下:
id: 100
addresskey_valuekey: key0value: value0key_valuekey: key1value: value1key_valuekey: key2value: value4
格式如下:
message Pair {optional int32 id;optional group address (MAP) {repeated group key_value {optional binary key;optional binary value;}}
}
代码如下:
@Test
public void testWriteIdWithMap1() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();String name = "address";// 注意这里的named后面必须是key、valuePrimitiveType keyType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("key");PrimitiveType valueType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("value");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalMap().key(keyType).value(valueType).named(name);MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);Group mapGroup = simpleGroup.addGroup(1);for (int i = 0; i < 3; i++) {Group entry0 = mapGroup.addGroup(0);entry0.add(0, "key" + i);entry0.add(1, "value" + i * i);}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
写入ARRAY<ARRAY<INT>>类型数据
@Test
public void testWriteIdWithArrayArray2() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();PrimitiveType named =Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("address");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalList().optionalListElement().element(named).named("address").named("address");MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);// add groupGroup address = simpleGroup.addGroup(1);for (int i = 0; i < 5; i++) {// group add list entryGroup listGroup = address.addGroup(0);// add groupGroup sublist = listGroup.addGroup(0);for (int j = 5; j < 10; j++) {// group add list entryGroup subListGroup = sublist.addGroup(0);subListGroup.add(0, i * i);}}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}