1.表层面设计优化
1.1 表分区
分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。
使用场景:在进行模型设计的时候如果考虑到表数据量很大,则需要对该表进行分区操作,在实际的项目中,分区一般都是按照日期进行分区
如果现在要向一个分区表中加载数据,则选择使用静态分区,一般增量抽取到的数据进行加载的时候都是用的是静态分区。
如果获取到的数据要按照某一列的值保存到多个分区中,则需要使用动态分区
1.2 表分桶
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围划分。
分桶将整个数据内容安装某列属性值得hash值进行区分,如按照name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件。分桶是将数据集分解成更容易管理的若干部分的另一个技术。分区针对的是数据的存储路径;分桶针对的是数据文件。
使用场景:1.如果表的数据量比较大,在加载数据的时候想要启用多个reduce则可以使用分桶表。
2.如果表中的数据量比较大,并且经常按照某一列进行表关联操作,则建议使用分桶表,提高关联效率。
3.如果分桶表之后还要提高关联效率,则可以在分桶表上对每一桶的数据进行排序。
建表的时候分桶排序:
create table 表名(
.......
)clustered by(分桶列) sorted by (排序列) into n buckets
....
4.如果要经常对表中的数据进行抽样查询,则也可以使用分桶表。
1.3 文件存储格式
文件存储格式 | 缺点 | 优点 | 加载数据方式 |
textfile 不支持切片压缩 | 磁盘消耗大 | 直接将本地的文件加载的hdfs中,加载数据的速度最高 | 1.-put 2.load data 3.insert into |
sequencefile 二进制文件,可压缩可切片 | 读写速度慢 | 支持切片2.可以进行数据的压缩 | insert into |
rcfile 文件的存储方式,支持切片,支持压缩 | insert into | ||
orc 压缩和文件处理的效率都比rcfile高很多 | 高效的数据压缩 高效的数据处理的速度 | insert into | |
parquet 一种文件存储格式 | 不支持数据的修改操作 | 高效的数据压缩 高效的数据处理的速度 | insert into |
TextFile
- 默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大。
- 可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,压缩后的文件不支持split,Hive不会对数据进行切分,从而无法对数据进行并行操作。
- 并且在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。
SequenceFile
- SequenceFile是Hadoop API提供的一种二进制文件支持,以key-value的形式序列化到文件中,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。
- 压缩数据文件可以节省磁盘空间,但Hadoop中有些原生压缩文件的缺点之一就是不支持分割。支持分割的文件可以并行的有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。Sequence File是可分割的文件格式,支持Hadoop的block级压缩。
- SequenceFile支持三种压缩选择:
NONE
,RECORD
,BLOCK
。RECORD是默认选项,Record压缩率低,通常BLOCK会带来较RECORD更好的压缩性能,一般建议使用BLOCK压缩。 - 优势是文件和hadoop api中的MapFile是相互兼容的。
RCFile
- 存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:
- RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低;
- 像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取;
- 数据追加:RCFile不支持任意方式的数据写操作,仅提供一种追加接口,这是因为底层的 HDFS当前仅仅支持数据追加写文件尾部。
ORCFile(Optimized Row Columnar)
- 存储方式:数据按行分块 每块按照列存储。
- 压缩快、快速列存取,提高Hive的读取、写入和处理数据的性能。
- 效率比rcfile高,是rcfile的优化版本。
- 能够兼容多种计算引擎
Parquet
是一种列式数据存储格式,可以兼容多种计算引擎,如MapRedcue 和Spark等,对多层嵌套的数据结构提供了良好的性能支持,是目前Hive 生产环境中数据存储的主流选择之一。
建议:在实际的项目中,如果贴源层加载一些本地的文件,则需要使用textfile,数据仓库中间的一些层建议使用orc或者parquet,如果项目只是用hive的计算框架则选择orc,如果项目中用到了多种计算框架则建议使用parquet,最上层的应用层一般会将数据导出到共享层,并且数据量不大所以直接使用textfile。
1.4 压缩格式
【输出结果来设置压缩格式】
--SQL语句的最终输出结果是否压缩:
set hive.exec.compress.output=true;
--输出结果的压缩格式(以snappy为例):
开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec
=org.apache.hadoop.io.compress.SnappyCodec;
【保存文件的时候设置压缩格式】
---建表语句设置压缩格式:
create table table_name(
...
)stored as orc
tblproperties ("orc.compress"="snappy");
【对单个MR的中间结果进行压缩】
单个MR的中间结果指的是Mapper输出的数据,压缩该数据库降低Shuffle阶段的IO压力,配置以下参数:
--开启MapReduce中间数据压缩功能
set mapreduce.map.output.compress=true;
--设置MapReduce中间数据的压缩方式(以snappy为例)
set mapreduce.map.output.compress.codec
=org.apache.hadoop.io.compress.SnappyCodec;
【对单条SQL语句的中间结果进行压缩】
一个SQL数据可能通过MR进行计算,单条SQL语句的中间结果指两个MR之间的临时数据,配置以下参数:
--是否对两个MR之间的临时数据进行压缩
set hive.exec.compress.intermediate=true;
--设置两个MR之间的压缩格式(以snappy为例)
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
压缩格式 | 算法 | 文件扩展名 | 是否可切分 |
---|---|---|---|
Gzip | DEFLATE | .gz | 否 |
bzip2 | bzip2 | .bz2 | 是 |
LZO | LZO | .lzo | 是 |
Snappy | Snappy | .snappy | 否 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示:
压缩格式 | 对应的编码/解码器 |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较:
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
压缩方式选择时重点考虑:解压缩速度、压缩率、压缩后是否可以支持切片
压缩格式 | 是否支持切片 | 解压缩速度 | 压缩率 |
---|---|---|---|
snappy | no | 最快 | 很差 |
lzo | yes | 很快 | 很高 |
bzip2 | yes | 最慢 | 最高 |
gzip | no | 一般 | 很高 |
使用场景:
2.2.1 Gzip压缩
优点: 压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样;大部分Linux系统都自带Gzip命令,使用方便。
缺点:不支持Split。
应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件。
2.2.2 Bzip2压缩
优点:支持Split;具有很高的压缩率,比Gzip压缩率都高;Hadoop本身自带,使用方便。
缺点:压缩/解压速度慢。
应用场景:在数据仓库中处理数据的时候一般很少选择,但是它可以使用在一些对文件进行归档保存
2.2.3 Lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持Split,是Hadoop中最流行的压缩格式;可以在Linux系统下安装lzop命令,使用方便。
缺点:压缩率比Gzip要低一些;Hadoop本身不支持,需要安装;在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为Lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,Lzo优点越越明显。
2.2.4 Snappy压缩
优点:高速压缩速度和合理的压缩率。
缺点:不支持Split;压缩率比Gzip要低;Hadoop本身不支持,需要安装。
应用场景:一般在中间层文件存储的时候会使用,或者是mapreduce中中间临时文件的压缩可以使用,因为他有高效的压缩解压速度, 所以一般会配合orc或者parquet一起使用。
压缩参数配置
设置map后输出压缩
1.开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;2.开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;3.设置mapreduce中map输出数据的压缩方式
set mapreduce.map.outout.compress.codec=
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.Lz4Codec
设置reduce后输出压缩
1.开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;2.开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;3.设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.Lz4Codec4.设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
2.语法和参数的层面
2.1.列裁剪
列裁剪(只查询需要的字段,千万不要直接写 select * from)列裁剪就是在查询时只读取需要的列。当列很多或者数据量很大时,如果select所有的列或者不指定分区,导致的全列扫描和全表扫描效率都很低。
设置列裁剪的参数:set hive.optimize.cp=true
2.2 分区裁剪
在查询数据的时候只选择所需要的分区进行过滤,减少数据的读入量,提高查询效率。
设置分区裁剪的参数:set hive.optimize.pruner=true --开启分区裁剪(默认开启)
2.3.group by优化
1.开启map端聚合
很多操作不一定要在reduce中进行,可以在map端完成聚合操作,然后reduce中直接输出结果即可。
设置开启map聚合参数:set hive.map.aggr=true; ---开启map端聚合操作
设置map端聚合的条数:set hive.groupby.mapaggr.checkinterval=100000
2.解决数据倾斜问题
默认情况下,在进行分组聚合的时候,相同的键的数据会进入到同一个reduce中进行处理,如果分组的时候某一个值有大量的重复的数据,则会导致某一个reduce任务量会很大,从而导致数据倾斜。
解决办法:开启负载均衡 set hive.groupby.skewindata=true; --默认为false
开启负载均衡之后,系统在分组的时候会启动两个mapreduce程序
第一个mapreduce程序会先在map端将数据按照key平均分配,可能会出现相同的键的数据进入不同的reduce中,然后在reduce中进行初步聚合操作
然后将第一个mapreduce中reduce聚合的结果交给第二个mapreduce进行处理,然后在map端将相同的键分配到同一个reduce进行最终的聚合操作,最后输出结果。
3.去重优化:
去重方式:distinct和group by都可以
在数据量比较大的情况下建议使用group by来代替distinct。
4.排序优化:
order by:全局排序,不管数据量多大,都会启动一个reduce进行数据的处理,所以效率比较低。
sort by:局部排序,会将每一个reduce中的数据进行排序,不能保证全局的数据是有序的,一般配合distribute by一起使用。
cluster by:和sort by是一样的,但是不能进行降序排序。
优化建议:1.尽量避免全局排序,不要一开始就对表中的数据进行全局排序,如果要进行排序,最好将排序的操作放到最后面。
2.如果要查询表中的前n条数据,则建议使用sort by配合limit一起使用。
5.hive中为了提高查询效率,尽量每次获取条的条数的时候不要去查询文件,可以直接从元数据中获取条的条数信息。
参数设置:
set hive.compute.query.using.stats=true; --在查询表的条数的时候直接从元数据中获取(默认开启)
6.表join优化:
原则:将大表转换成小表进行关联
表的关联方式:
1.map side join:在map端完成关联操作,不需要启动reduce程序。
使用场景:关联的表中有一个小表则可以使用该种方式(如果有小表进行关联系统默认会使用map side join)
在hive中默认的小表为小于25M的表,当然可以通过参数来设置小表默认大小:
set hive.mapjoin.smalltable.filesize=25000000
设置让系统自动识别小表:
set hive.auto.convert.join=true --系统默认识别小表
select /*+mapjoin(小表名)*/ * from 表 a join 表 b on ......
2.bucket map join:分桶关联
使用场景:大表和大表关联的时候可以使用,最好关联的表按照关联的列进行分桶
--设置开启分桶关联:
set hive.optimize.bucketmapjoin=true; ----默认false
3. sort merge bucket map join:
在bucket map join的基础上将每一桶的数据先进行排序,然后再进行关联操作,可以大大的减少表关联次数,提高查询效率。
--设置开启排序分桶关联:
set hive.optimize.bucketmapjoin.sortedmerge=true; ---默认关闭false
注意:如果大表和大表关联的时候,表有分桶并且也有排序,则关联效率最高的。
8.谓词下推:PPD
主要思想就是尽量将过滤条件放在map端执行,这样就可以减少后续的数据的数据量,从而降低了数据传输的IO消耗,提高查询效率。
设置开启谓词下推的参数:
set hive.optimize.ppd=true; ---默认就是开启
注意:如果要准确的来测试谓词下推的情况,则必须关闭CBO优化器
--关闭CBO优化器的参数
set hive.cbo.enable=false; --默认true开启
谓词:代表sql语句中的过滤条件
主要测试谓词写在on后面和写在where后面的谓词是否会下推。
表关联的时候表主要分为两种:
1.保留表:在进行表关联的时候,如果要保留该表的全部数据则叫做保留表,例如:左连接的左边
2.空行表: 在进行表关联的时候,如果匹配不到的数据使用空行代替的表,叫做空行表,例如:左连接的右边
内连接:如果两张表进行的是内连接,则不管CBO优化器是否打开,也不管条件在on后面还是where后面,都可以实现谓词下推。
左外|右外连接:
情况1:过滤条件写在where后面,并且过滤的字段属于保留表,可以进行谓词下推
情况2:过滤条件写在where后面,但是过滤的字段属于空行表,不会进行谓词下推,但是CBO优化器打开可以进行谓词下推
情况3:过滤条件写在on后面,并且过滤字段属于保留表,不管CBO优化器是否打开,都不会进行谓词下推
情况4:过滤条件写在on后面,但是过滤字段属于空行表,可以进行谓词下推
建议:如果进行的是左连接,则右表的过滤条件尽量写在on后面,左表的条件尽量写在where后面。
全外连接:
情况1:过滤条件写在where后面,关闭CBO优化器则不会进行下推,打开CBO优化器会进行下推
情况2:过滤条件写在on后面,不可以进行谓词下推
9.导致数据倾斜的问题:
1.数据分组时候重复数据太多,则可能会导致数据侵袭则可以开启负载均衡
2.数据处理的时候null值太多,在处理的数据的时候空值就会被分配到一个reduce中,则也可能导致数据倾斜。
一般在一些日志文件中null值会比较多,因为在记录日志的时候没有抓取到的数据则都保存为null。
解决办法:1.可以给null值在处理的时候设置一个随机数,则就可以将空值的数据随机分配到不同的reduce中处理
2.如果null值对后续的数据分析没有用则可以提前过滤掉。
3.数据类型不相同也可能会导致数据倾斜。
如果有两张表进行关联操作,关联的列的数据类型一个是数字类型一个是字符串类型,则会导致所有的字符串类型全部会被分配到一个reduce中则导致数据倾斜。
解决办法:提前将数据的数据类型进行统一。
10.合理的控制map和reduce的数量
1.map的数量的控制
map数量=文件的大小/split片的大小(128M)
1.map端输入合并小文件:用来减少map的数量
如果在map端有大量的小文件输入,则会导致系统会分配大量的map任务来进行梳理数据,就会造成资源的浪费,所以建议将小文件进行合并
参数的设置:
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端输入,不合并
2.map端输出合并
set hive.merge.mapfiles=true; -- 是否合并Map输出文件, 默认值为真
3.通过设置split的大小来控制map的数量,split越大map数量就越小,split越小则map数量就越多。
split默认的大小128M
设置split大小:
set mapred.max.split.size=256000000 --集群默认值
set mapred.min.split.size=10000000 --集群默认值
set dfs.blocksize=134217728 设置block块的大小,默认128M
4.直接通过参数的方式来设置map的数量
set mapred.map.tasks=n; 直接设置map的数量
注意:设置的n必须要大于默认的map数量才会生效。
2.reduce数量的控制:
默认情况下reduce的数量由每一个reduce梳理的数量来决定的。
set hive.exec.reducers.bytes.per.reducer=256000000 --默认256M
设置每一个mapreduce中最多启动的reduce的数量
set hive.exec.reducers.bytes.per.reducer=1009
直接设置reduce的数量:
set mapred.reduce.tasks=n;
set mapreduce.job.reduces=m;
最终的reduce的数量max(n,m)
reduce端进行合并输出
set hive.merge.mapredfiles=true; -- 是否合并Reduce 端输出文件,默认值为假
set hive.merge.size.per.task=25610001000; -- 合并文件的大小,默认值为 256000000
以下情况只会启动一个reduce
1.使用order by排序
2.使用聚合函数的时候没有使用groupby
3.如果进行的是笛卡尔积操作也会只启动一个reduce。
3.hive架构层面优化:
1.本地化执行
Hive在集群上查询时,默认是在集群上多台机器上运行,需要多个机器进行协调运行,这种方式很好的解决了大数据量的查询问题。但是在Hive查询处理的数据量比较小的时候,其实没有必要启动分布式模式去执行,因为以分布式方式执行设计到跨网络传输、多节点协调等,并且消耗资源。对于小数据集,可以通过本地模式,在单台机器上处理所有任务,执行时间明显被缩短。
set hive.exec.mode.local.auto=true; -- 打开hive自动判断是否启动本地模式的开关
set hive.exec.mode.local.auto.input.files.max=4; -- map任务数最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map输入文件最大大小
2.JVM重用
Hive 语句最终会转换为一系列的 MapReduce 任务,每一个MapReduce 任务是由一系列的Map Task 和 Reduce Task 组成的,默认情况下,MapReduce 中一个 Map Task 或者 Reduce Task 就会启动一个 JVM 进程,一个 Task 执行完毕后,JVM进程就会退出。
这样如果任务花费时间很短,又要多次启动 JVM 的情况下,JVM的启动、关闭时间会变成一个比较大的消耗,这时,可以通过重用 JVM 来解决。
set mapred.job.reuse.jvm.num.tasks=5;
JVM也是有缺点的,开启JVM重用会一直占用使用到的 task 的插槽,以便进行重用,直到任务完成后才会释放。
如果某个不平衡的job中有几个 reduce task 执行的时间要比其他的 reduce task 消耗的时间要多得多的话,那么保留的插槽就会一直空闲却无法被其他的 job 使用,直到所有的 task 都结束了才会释放。
3.并行执行
有的查询语句,hive会将其转化为一个或多个阶段,包括:MapReduce 阶段、抽样阶段、合并阶段、limit 阶段等。默认情况下,
一次只执行一个阶段。但是,如果某些阶段不是互相依赖,是可以并行执行的。多阶段并行是比较耗系统资源的。
set hive.exec.parallel=true; -- 可以开启并发执行。
set hive.exec.parallel.thread.number=16; -- 同一个sql允许最大并行度,默认为8。