1 Hive基本概念
1.1 Hive定义
-
Hive:由 Facebook 开源用于解决海量结构化日志的数据统计工具。
Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并 提供类 SQL 查询功能。
利用MapReduce去查询数据文件中的某些内容,需要编写大量冗余的程序,而Hive可以通过其独有的HQL语言来转化成MapReduce程序,能够简化编程。
但是Hive只能处理结构化的数据文件,因此MapReduce能做的Hive并不一定能做,Hive能做的MapReduce一定能做。Hive的本质就是将HQL转化成MapReduce程序。
Hive其实也可以看作一个hadoop的客户端,提供给用户输入HQL的入口,客户端再将HQL转化成MapReduce程序,并交由YARN去执行得到结果。
-
Hive的优缺点:
优点:
(1)操作接口采用类 SQL 语法,提供快速开发的能力(简单、容易上手)。
(2)避免了去写 MapReduce,减少开发人员的学习成本。
(3)Hive 的执行延迟比较高,因此 Hive 常用于数据分析,对实时性要求不高的场合。
(4)Hive 优势在于处理大数据,对于处理小数据没有优势,因为 Hive 的执行延迟比较 高。
(5)Hive 支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。缺点:
1)Hive 的 HQL 表达能力有限
(1)迭代式算法无法表达(连续很多个MapReduce程序连接在一起,用HQL很难表达)
(2)数据挖掘方面不擅长,由于 MapReduce 数据处理流程的限制,效率更高的算法却 无法实现。
2)Hive 的效率比较低
(1)Hive 自动生成的 MapReduce 作业,通常情况下不够智能化
(2)Hive 调优比较困难,粒度较粗(Hive调优只能从HQL语句上进行调优,深层次的调优需要从Hadoop进行调优)
1.2 Hive架构原理(面试重点)
-
1)用户接口(Client):
CLI(command-line interface)
JDBC/ODBC(jdbc 访问 hive):通过java代码访问hive,跟java访问mysql差不多
WEBUI(浏览器访问 hive)
2)元数据(Meta store):
Hive中元数据用于存储映射关系;
Hive中元数据包括:表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、 表的类型(是否是外部表)、表的数据所在目录等;
Hive中元数据默认存储在自带的 derby 数据库中,推荐使用 MySQL 存储 Metastore
3)Hadoop 使用 HDFS 进行存储,使用 MapReduce 进行计算。
4)驱动器(Driver):
(1)解析器(SQL Parser):将 SQL 字符串转换成抽象语法树 AST,这一步一般都用第 三方工具库完成,比如 antlr;对 AST 进行语法分析,比如表是否存在、字段是否存在、SQL 语义是否有误。
(2)编译器(Physical Plan):将 AST 编译生成逻辑执行计划。
(3)优化器(Query Optimizer):对逻辑执行计划进行优化。
(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于 Hive 来 说,就是 MR/Spark。 -
Hive的运行机制:Hive 通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的 Driver, 结合元数据(MetaStore),将这些指令翻译成 MapReduce,提交到 Hadoop 中执行,最后,将 执行返回的结果输出到用户交互接口。
1.3 Hive跟数据库比较
- 查询语言:由于 SQL 被广泛的应用在数据仓库中,因此,专门针对 Hive 的特性设计了类 SQL 的查 询语言 HQL。熟悉 SQL 开发的开发者可以很方便的使用 Hive 进行开发。
- 数据更新:由于 Hive 是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive 中 不建议对数据的改写,所有的数据都是在加载的时候确定好的。而数据库中的数据通常是需 要经常进行修改的,因此可以使用 INSERT INTO … VALUES 添加数据,使用 UPDATE … SET 修 改数据。(实际上如果Hive要对数据进行更改,可以将数据下载下来进行更改再覆盖上传,但是当数据量大的时候速度将非常慢)
- 执行延迟:Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导 致 Hive 执行延迟高的因素是 MapReduce 框架。由于 MapReduce 本身具有较高的延迟,因此 在利用 MapReduce 执行 Hive 查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。 当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候, Hive 的并行计算显然能体现出优势。
- 数据规模:由于 Hive 建立在集群上并可以利用 MapReduce 进行并行计算,因此可以支持很大规模 的数据;对应的,数据库可以支持的数据规模较小。
1.4 安装
1.4.1 hive安装
-
官网下载所需版本Downloads (apache.org),这里使用3.1.2版本apache-hive-3.1.2-bin.tar.gz
-
解压到linux环境中
-
配置环境变量
vim /etc/profile.d/my_env.sh# 加入以下环境变量 #HIVE_HOME export HIVE_HOME=/opt/module/hive export PATH=$PATH:$HIVE_HOME/bin# 配置完后需要source一下该文件以更新环境变量 source /etc/profile.d/my_env.sh
-
解决日志 Jar 包冲突(可选)
mv $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.bak
-
初始化元数据库(用的是默认的derby数据库)
bin/schematool -dbType derby -initSchema
1.4.2 hive启动
-
启动zookeeper
-
启动hadoop
-
启动hive
bin/hive
-
hive默认的日志位置位置在/tmp/用户名/hive.log
小技巧:当hive报错时,可以查看hive.log,使用tail -f命令追踪当前日志文件,输入很多空行再执行刚刚报错的程序,能够有效帮助定位到报错信息。
1.4.3 hive简单使用
-
# 显示目前所有的数据库 show databases; # 显示目前所有表 show tables; # 创建新表,创建的表默认存储在/user/hive/warehouse/下 create table test(id string); # 向新表插入数据 insert into test values('1001'); # 查询表数据 select * from test;
-
derby数据库的缺点:查看元数据不方便,只能在HDFS中查看,并且当同时启用两个客户端时会报错,所以我们需要将 Hive 的元数据地址改为 MySQL。
1.4.4 mysql安装
-
官网下载,这里使用版本为mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
-
检查当前系统是否安装过 MySQL(一般linux自带mysql)
rpm -qa|grep mariadb //如果存在通过如下命令卸载 sudo rpm -e --nodeps mariadb-libs
-
将 MySQL 安装包拷贝到/opt/software 目录下
-
解压 MySQL 安装包
tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar -C /opt/module/mysql
-
在安装目录下执行 rpm 安装,注意按照顺序安装
sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm
-
删除/etc/my.cnf 文件中 datadir 指向的目录下的所有内容
-
初始化数据库
sudo mysqld --initialize --user=mysql
-
查看临时生成的 root 用户的密码
sudo cat /var/log/mysqld.log 2022-07-27T06:38:14.218371Z 1 [Note] A temporary password is generated for root@localhost: WFd(?xG)L2W_
-
启动 MySQL 服务
sudo systemctl start mysqld
-
登录 MySQL 数据库
mysql -uroot -p
-
必须先修改 root 用户的密码,否则执行其他的操作会报错
mysql>set password = password("root");
-
修改 mysql 库下的 user 表中的 root 用户允许任意 ip 连接
mysql> update mysql.user set host='%' where user='root'; mysql> flush privileges;
1.5 Hive相关配置
1.5.1 Hive元数据配置到mysql
-
将 MySQL 的 JDBC 驱动mysql-connector-java-5.1.27-bin.jar拷贝到 Hive 的 lib 目录下
cp mysql-connector-java-5.1.27-bin.jar $HIVE_HOME/lib
-
配置 Metastore 到 MySQL
1)在$HIVE_HOME/conf 目录下新建 hive-site.xml 文件,添加以下内容<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration><!-- jdbc 连接的 URL --><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value> </property><!-- jdbc 连接的 Driver--><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value> </property> <!-- jdbc 连接的 username--><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value></property><!-- jdbc 连接的 password --><property><name>javax.jdo.option.ConnectionPassword</name><value>root</value> </property><!-- Hive 元数据存储版本的验证,设置为false,因为默认使用的是derby数据库 --><property><name>hive.metastore.schema.verification</name><value>false</value> </property><!--元数据存储授权,设置为false,因为默认使用·的是derby数据库--><property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value></property><!-- Hive 默认在 HDFS 的工作目录 --><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property> </configuration>
2)登陆 MySQL
mysql -uroot -proot
3)新建 Hive 元数据库
mysql> create database metastore; mysql> quit;
4) 初始化 Hive 元数据库
schematool -initSchema -dbType mysql -verbose
5)启动hive,能够发现前面创建的test表已经没有了,但是重新创建tes表之后,能够发现test表中的数据还存在,这是因为mysql或derby数据库只是用来存储元数据,即存储表跟HDFS路径的映射,当重新创建出这个映射之后,就又能够读取到HDFS中对应路径的数据。
6)通过navicat连接上mysql,查看metastore数据库,查看DBS表,表中只有一条信息,是hive中默认的数据库以及对应的HDFS路径
查看TBLS,能够查看hive数据库中的表,根据DBS和TBLS两个数据表,可以得到HDFS的对应路径。
1.5.2 使用元数据服务的方式访问 Hive
-
目前hive的使用方式只支持通过在hadoop102上进行访问,也就是只能本地进行连接,第三方无法跟hive进行交互,要使外部可以进行连接的话,需要启动一个元数据服务
-
操作步骤
(1)在 hive-site.xml 文件中添加如下配置信息,开放9083端口提供连接<!-- 指定存储元数据要连接的地址 --><property><name>hive.metastore.uris</name><value>thrift://hadoop102:9083</value></property>
(2)启动元数据服务(配置了1之后若不启动元数据服务的话,则本地连接也不可用,配置了元数据服务之后,本地连接也是通过9083端口都元数据服务进行连接)(前台进程,启动后当前窗口不可用)
hive --service metastore
1.5.3 使用JDBC方式访问hive
-
除了通过元数据服务访问hive,还有一种通过jdbc的方式访问的hive。需要配置jdbc相关服务hiveserver2,通过hiveserver2去连接hive,需要启动元数据服务和hiveserver2两个服务,因为通过jdbc访问hive的方式是这样的:客户端通过hive的jdbc驱动访问到hiveserver2,hiveserver2内部会连接到元数据服务metastrore,metastore则通过mysql的jdbc驱动访问到mysql,获取存储在mysql中的元数据。
-
操作步骤:
(1)在 hive-site.xml 文件中添加如下配置信息<!-- 指定 hiveserver2 连接的 host --><property><name>hive.server2.thrift.bind.host</name><value>hadoop102</value></property><!-- 指定 hiveserver2 连接的端口号 --><property><name>hive.server2.thrift.port</name><value>10000</value></property>
(2)在hadoop的core-site.xml添加如下配置并重启hadoop(原因是hadoop引入了一个安全伪装机制,使得hadoop 不允许上层系统直接将实际用户传递到hadoop层,而是将实际用户传递给一个超级代理,由此代理在hadoop上执行操作,避免任意客户端随意操作hadoop,这里配置这两个参数使得当前用户能够访问到hadoop?)
<!-- 配置超级用户F能够通过代理访问的主机节点 --> <property><name>hadoop.proxyuser.F.hosts</name><value>*</value> </property> <!-- 配置超级用户F允许代理的用户所属组 --> <property><name>hadoop.proxyuser.F.groups</name><value>*</value> </property>
(3)启动hiveserver2(需要先启动元数据服务)(前台进程,启动后当前窗口不可用)(启动时间较长,内部有多次校验)
bin/hive --service hiveserver2
(4)通过hive自带的beeline客户端利用jdbc方式连接hive(如果通过其他工具进行连接的话,需要有hive的jdbc驱动,并提供下面的url和用户名即可)```shell
bin/beeline -u jdbc:hive2://hadoop102:10000 -n F
1.5.4 元数据服务&hiveserver2脚本封装
-
由于元数据服务和hiveserver2服务都是前台进程,直接启动不方便,需要将进程以后台方式启动
-
后台方式启动知识:
nohup: 放在命令开头,表示不挂起,也就是关闭终端进程也继续保持运行状态
/dev/null:是 Linux 文件系统中的一个文件,被称为黑洞,所有写入改文件的内容 都会被自动丢弃
2>&1 : 表示将错误重定向到标准输出上(0表示标准输入,1表示标准输出,2表示错误输出)
&: 放在命令结尾,表示后台运行
一般会组合使用: nohup [xxx 命令操作]> file 2>&1 &,表示将 xxx 命令运行的结 果输出到 file 中,并保持命令启动的进程在后台运行。 -
脚本编写hiveservices.sh,并赋予权限
#!/bin/bash #日志目录 HIVE_LOG_DIR=$HIVE_HOME/logs if [ ! -d $HIVE_LOG_DIR ] # -d表示判断当前路径是否存在并是否是文件夹 thenmkdir -p $HIVE_LOG_DIR # 如果不存在则创建 fi #检查进程是否运行正常,参数 1 为进程名,参数 2 为进程端口 function check_process() {pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}') # 通过进程名$1来过滤进程,并得到进程号$2,其中grep -v grep为排除带grep的进程,awk用于根据空格进行切分得到的进程字符串ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1) # 通过端口号$2来过滤进程,并得到进程号&7,echo $pid[[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1 # 如果通过以上两种方式得到的两个进程号一样则返回1,否则返回0 } function hive_start() {metapid=$(check_process HiveMetastore 9083) # 检查元数据服务是否启动cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"[ -z "$metapid" ] && eval $cmd || echo "Metastroe 服务已启动" # -z判断当前进程id长度是否为0,如果为0表明不存在,就执行&&后面的命令来启动元数据服务,eval表示所有参数解析完毕再执行命令,如果不为0就执行||后面的命令,输出元数据服务已启动server2pid=$(check_process HiveServer2 10000)cmd="nohup --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"[ -z "$server2pid" ] && eval $cmd || echo "HiveServer2 服务已启动" } function hive_stop() {metapid=$(check_process HiveMetastore 9083)[ "$metapid" ] && kill $metapid || echo "Metastore 服务未启动" # 存在则杀死进程,否则提示元数据服务未启动server2pid=$(check_process HiveServer2 10000)[ "$server2pid" ] && kill $server2pid || echo "HiveServer2 服务未启动" } case $1 in # 脚本后跟的第一个参数不同,对应调用的函数也不同 "start")hive_start;; "stop")hive_stop;; "restart")hive_stopsleep 2hive_start;; "status")check_process HiveMetastore 9083 >/dev/null && echo "Metastore 服务运行正常" || echo "Metastore 服务运行异常"check_process HiveServer2 10000 >/dev/null && echo "HiveServer2 服务运行正常" || echo "HiveServer2 服务运行异常";; *)echo Invalid Args! # 如果是没有定义的参数则提示参数错误echo 'Usage: '$(basename $0)' start|stop|restart|status';; esac
chmod u+x hiveservices.sh
1.5.5 Hive其他配置
1)配置Hive运行日志存放位置
-
Hive 的 log 默认存放在/tmp/F/hive.log 目录下,在磁盘空间不足的时候会优先删除tmp目录下的文件,不安全
-
修改 hive 的 log 存放日志到/opt/module/hive/logs
(1)修改/opt/module/hive/conf/hive-log4j2.properties.template 文件名称为 hive-log4j2.properties
(2)在 hive-log4j2.properties 文件中修改 log 存放位置property.hive.log.dir = /opt/module/hive/logs
-
重启hive客户端后生效
2)配置hive在执行命令时会打印当前库和表头
-
在hive-site.xml中加入如下配置:
<property><name>hive.cli.print.header</name><value>true</value> </property><property><name>hive.cli.print.current.db</name><value>true</value></property>
-
重启hive客户端后生效
3)hive的一些其他参数可以在hive-default.xml.template下查看
4)查看所有的配置信息
hive>set;
5)参数的配置三种方式
-
配置文件方式:在hive-site.xml中中配置参数,用户自定义配置会覆盖默认配置。另外,Hive 也会读入 Hadoop 的配置,因为 Hive 是作为 Hadoop 的客户端启动的,Hive 的配置会覆盖 Hadoop 的配置。配置文件的设定对本 机启动的所有 Hive 进程都有效。
-
命令行参数方式:启动 Hive 时,可以在命令行添加-hiveconf param=value 来设定参数。例如
bin/hive -hiveconf mapred.reduce.tasks=10;
在hive客户端中可以查看参数的设置
hive (default)> set mapred.reduce.tasks;
注意:以上操作仅对本次 hive 启动有效。
利用-e参数可以不进入hive客户端执行一些命令,可以利用这个机制,来针对某个语句设置参数;# 本次参数设置只对后面的select语句生效 bin/hive -e "set param=value; select * from test;"
-
参数声明方式
# 可以在 HQL 中使用 SET 关键字设定参数 hive (default)> set mapred.reduce.tasks=5;
-
三种设置方式的优先级依次递增。即配置文件<命令行参数<参数声明。
1.6 hive常用命令
-
通过hive目录下的bin/hive -help命令可以查看hive的一些交互命令
bin/hive -help which: no hbase in (/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/module/jdk1.8.0_212/bin:/opt/ha/hadoop-3.1.3/bin:/opt/ha/hadoop-3.1.3/sbin:/opt/module/hive/bin:/home/F/.local/bin:/home/F/bin) Hive Session ID = bd6c34c8-5407-4948-98ac-2b06f06de9c3 usage: hive-d,--define <key=value> Variable substitution to apply to Hivecommands. e.g. -d A=B or --define A=B--database <databasename> Specify the database to use-e <quoted-query-string> SQL from command line-f <filename> SQL from files-H,--help Print help information--hiveconf <property=value> Use value for given property--hivevar <key=value> Variable substitution to apply to Hivecommands. e.g. --hivevar A=B-i <filename> Initialization SQL file-S,--silent Silent mode in interactive shell-v,--verbose Verbose mode (echo executed SQL to theconsole)
-
常用交互命令
# “-e”不进入 hive 的交互窗口执行 sql 语句, 有该命令可以通过脚本来执行hive语句 bin/hive -e "select id from student;" # “-f”执行脚本中 sql 语句,脚本后缀不重要 bin/hive -f /opt/module/hive/datas/hivef.sql # 执行文件中的 sql 语句并将结果写入文件中 bin/hive -f /opt/module/hive/datas/hivef.sql > /opt/module/datas/hive_result.txt
-
其他命令
# 在 hive命令窗口中如何查看 hdfs 文件系统 dfs -ls /; # 查看在 hive 中输入的所有历史命令 (1)进入到当前用户的根目录 /root 或/home/atguigu (2)查看. hivehistory 文件 cat .hivehistory
2 Hive数据类型
2.1 基本数据类型
-
对于 Hive 的 String 类型相当于数据库的 varchar 类型,该类型是一个可变的字符串,不 过它不能声明其中最多能存储多少个字符,理论上它可以存储 2GB 的字符数。
2.2 集合数据类型
-
Hive 有三种复杂数据类型 ARRAY、MAP 和 STRUCT。ARRAY 和 MAP 与 Java 中的 Array 和 Map 类似,而 STRUCT 与 C 语言中的 Struct 类似,它封装了一个命名字段集合,复杂数据 类型允许任意层次的嵌套。
-
案例实操:
在hive中创建新表test2create table test( name string, friends array<string>, children map<string, int>, address struct<street:string, city:string> ) row format delimited fields terminated by ',' # 定义列分隔符为, collection items terminated by '_' # 定义集合数据分隔符为_ map keys terminated by ':' # 定义map中key和value的分隔符为: lines terminated by '\n'; # 定义行分隔符为\n
准备测试数据test.txt
songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hui long guan_beijing yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao yang_beijing
上传test.txt到HDFS路径下的/user/hive/warehouse/test2,hive中查询test2的数据
# 查询array类型字段中的第一个数据 select friends[0] from test2 where name='songsong'; # 获取map类型字段中某一个key下对应的value select children['xiao song'] from test2 where name='songsong'; # 获取struct类型字段中的某一个成员变量 select address.street from test2 where name='songsong';
2.3 类型转换
- Hive 的原子数据类型是可以进行隐式转换的,类似于 Java 的类型转换,例如某表达式 使用 INT 类型,TINYINT 会自动转换为 INT 类型,但是 Hive 不会进行反向转化,例如,某表 达式使用 TINYINT 类型,INT 不会自动转换为 TINYINT 类型,它会返回错误,除非使用 CAST 操作。
1)隐式类型转换规则如下
(1)任何整数类型都可以隐式地转换为一个范围更广的类型,如 TINYINT 可以转换成 INT,INT 可以转换成 BIGINT。
(2)所有整数类型、FLOAT 和 STRING 类型(STRING类型的内容必须是数字)都可以隐式地转换成 DOUBLE。
(3)TINYINT、SMALLINT、INT 都可以转换为 FLOAT。
(4)BOOLEAN 类型不可以转换为任何其它的类型。
2)可以使用 CAST 操作显示进行数据类型转换:例如 CAST(‘1’ AS INT)将把字符串’1’ 转换成整数 1;如果强制类型转换失败,如执行 CAST(‘X’ AS INT),表达式返回空值 NULL。
3 Hive-DDL数据定义(增删改查数据库)
3.1 创建数据库
-
创建数据库命令
# []代表可选项 # IF NOT EXISTS不存在时才创建 # COMMENT database_comment指定数据库注释 # LOCATION hdfs_path指定数据库存放位置,默认是hdfs下的/user/hive/warehouse/数据库名.db # WITH DBPROPERTIES (property_name=property_value, ...)额外参数信息,如作者信息,创建时间等 CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] [LOCATION hdfs_path] [WITH DBPROPERTIES (property_name=property_value, ...)];
-
示例
create database if not exists hive; create database hive location 'hive';
3.2 查询数据库
-
常用命令
# 显示数据库 show databases; # 过滤显示查询的数据库 show databases like 'db_hive*' # 显示数据库信息 desc database db_hive; # 显示数据库详细信息,extended,可以查看一些扩展信息DBPROPERTIES desc database extended db_hive; # 切换数据库 use db_hive;
3.3 修改数据库
-
用户可以使用 ALTER DATABASE 命令为某个数据库的 DBPROPERTIES 设置键-值对属性值, 来描述这个数据库的属性信息。
alter database db_hive set dbproperties('createtime'='20170830');
-
修改只能修改DBPROPERTIES这种扩展信息,查看扩展信息的修改情况通过extended参数去查看
3.4 删除数据库
-
常用命令
# 删除非空数据库 drop database db_hive2; # 如果删除的数据库不存在,最好采用 if exists 判断数据库是否存在 drop database if exists db_hive2; # 如果数据库不为空,上面的命令删除不了,可以采用 cascade 命令,强制删除 drop database db_hive cascade;
3.5 创建表
-
建表语法
# [EXTERNAL]:创建外部表,HIVE中表有内外之分 # [IF NOT EXISTS]:不存在才创建 # [(col_name data_type [COMMENT col_comment], ...)]:列名,列类型,列注释 # [COMMENT table_comment]:表注释 # [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]:按字段进行分区,用于创建分区表 # [CLUSTERED BY (col_name, col_name, ...)]:用于创建分桶表 # [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]:设置按什么字段分桶,分几个桶 # [ROW FORMAT row_format]:定义行格式的限制 # [STORED AS file_format]:指定文件格式 # [LOCATION hdfs_path]:指定表的存储位置 # [TBLPROPERTIES (property_name=property_value, ...)]:表的额外属性 # [AS select_statement]:根据查询语句建表 CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [CLUSTERED BY (col_name, col_name, ...)] [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION hdfs_path] [TBLPROPERTIES (property_name=property_value, ...)] [AS select_statement]
-
默认创建的表都是所谓的管理表,有时也被称为内部表。因为这种表,Hive 会(或多或 少地)控制着数据的生命周期。Hive 默认情况下会将这些表的数据存储在由配置项hive.metastore.warehouse.dir(例如,/user/hive/warehouse)所定义的目录的子目录下。 当我们删除一个管理表时,Hive 也会删除这个表中数据(HDFS路径存储的数据)。管理表不适合和其他工具共享 数据。在元数据库中TBLS表中可以看到各个表的类型。如果删除的表是外部表的话,Hive 并非认为其完全拥有这份数据。删除该表并不会删除掉HDFS中的数据,不过描述表的元数据信息会被删除掉。一般建表除了中间数据和临时数据外都会选择外部表,更加安全,在使用共享数据的时候要构建外部表。
-
管理表和内部表的转换:注意:(‘EXTERNAL’=‘TRUE’)和(‘EXTERNAL’=‘FALSE’)为固定写法,区分大小写!
# 查询表的类型 desc formatted student2; # 修改内部表 student2 为外部表 alter table student2 set tblproperties('EXTERNAL'='TRUE'); # 修改外部表 student2 为内部表 alter table student2 set tblproperties('EXTERNAL'='FALSE');
-
用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe,如果未指定row format的话,往该表存数据的话将会使用自带的 SerDe,SerDe 是 Serialize/Deserilize 的简称, hive 使用 Serde 进行行对象的序列与反序列化。打开HDFS中与表数据对应的文件,由于使用的SerDe是自带的,所以如果要对该文件进行复用的话,难以将每一列的数据分割出来,因此时常自己定义SerDe。
# 对每个字段的分隔符设置为,对应的表数据文件则容易拿来复用 # 如果通过hadoop上传的文件要被准确检测为table的数据的话,上传的文件的内容分隔符需要为, create table test(id int, name string) row format delimited fields terminated by ',';
3.6 修改表
-
常用命令
# 修改表名 ALTER TABLE table_name RENAME TO new_table_name # 更新列信息 ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name] # 不修改列类型的话也需要加上原类型 alter table test3 change id stu_id string; # 增加和替换列,ADD 是代表新增一字段,字段位置在所有列后面(partition 列前),REPLACE 则是表示替换表中所有字段。 ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...) # 新增列 alter table test3 add columns (address string); # 与change参数不一样,replace是对整张表的列进行替换,如果replace后面跟的列数比表中的列数少的话,那么replace后得到的表就只会显示部分数据,当用replace或add增加列数到原来数量时,表中原来的数据仍存在,因为这些操作只是对元数据进行操作 alter table test3 replace columns (address string);
3.7 删除表
-
drop table dept;
4 Hive-DML数据操作
4.1 数据导入
-
向表中装载数据
#(1)load data:表示加载数据 #(2)local:表示从本地加载数据到 hive 表;否则从 HDFS 加载数据到 hive 表 #(3)inpath:表示加载数据的路径 #(4)overwrite:表示覆盖表中已有数据,否则表示追加 #(5)into table:表示加载到哪张表 #(6)table_name:表示具体的表 #(7)partition:表示上传到指定分区 load data [local] inpath '数据的 path' [overwrite] into table table_name [partition (partcol1=val1,…)]; # 从本地加载数据 load data local inpath './student.txt' into table student; # 从HDFS中加载数据,在HDFS路径中会发现根目录下的student.txt被移动到了/user/hive/warehouse/student下 load data inpath '/student.txt' into table student;
对比:
load方式导入数据,会修改元数据库中TABLE_PARAMS表中对应表的numFiles参数数据,而不会修改numRows参数数据;
与hadoop上传文件对比,hadoop上传文件操作不会修改元数据;
与insert操作对比,insert命令添加表数据后,会同时修改numFiles参数数据和numRows参数数据。 -
通过查询语句向表中插入数据(Insert)
# 基本插入 insert into table student_par values(1,'wangwu'),(2,'zhaoliu'); # 从另一张表查询后进行插入 insert overwrite table student_par select id, name from student where month='201709'; # 查询后插入 insert into student1 select * from student; # 查询后覆盖原表信息插入 insert overwrite table student1 select * from student; # 根据查询结果插入多张表from studentinsert overwrite table student partition(month='201707')select id, name where month='201709'insert overwrite table student partition(month='201706')select id, name where month='201709';from studentinsert into student1select * insert into student2select *;
-
查询语句中创建表并加载数据(As Select)
create table if not exists student3 as select * from student;
-
创建表时通过location指定加载数据路径
create external table if not exists student5(id int, name string) row format delimited fields terminated by '\t' location '/student;
-
import数据到指定hive表中,import的数据必须是export导出的数据
import table student2 from '/user/hive/warehouse/export/student';
4.2 数据导出
-
insert导出
# 将查询的结果导出到本地,不加local的话就是导出到HDFS路径,导出的文件中数据列之间的分隔符仍然是默认的,不会继承原表格的分隔符 insert overwrite local directory '/opt/module/hive/data/export/student' select * from student; # 将查询结果导出到本地,同时加上自定义的数据分隔符, insert overwrite local directory '/opt/module/hive/data/export/student' row format delimited fields terminated by ',' select * from student;
-
Hadoop命令导出到本地
hive (default)> dfs -get /user/hive/warehouse/student/student.txt opt/module/data/export/student3.txt;
-
Hive Shell命令导出
bin/hive -e 'select * from default.student;' > /opt/module/hive/data/export/student4.txt;
-
Export 导出到 HDFS 上
hive(default)> export table default.student to '/user/hive/warehouse/export/student';
-
Sqoop 导出到mysql(一般用于将分析得出的结果导出到mysql)
4.3 数据清空
-
清除表中数据
# 注意:Truncate 只能删除管理表,不能删除外部表中数据 hive (default)> truncate table student;
4.4 数据查询
-
文档https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
-
查询语法:
SELECT [ALL | DISTINCT] select_expr, select_expr, ... FROM table_reference [WHERE where_condition] [GROUP BY col_list] [ORDER BY col_list] [CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY col_list] ] [LIMIT number]
4.4.1 基本查询
-
hql不区分大小写;一般关键字顶格写;字段前有缩进;各子句一般要分行,关键字一般不能缩进和分行。
-
算数运算符
-
常用函数count、max、min、sum、avg
-
比较运算符
-
逻辑运算符
-
# 查询全表 select * from dept; # 查询某列 select dno from dept; # 查询某个字段并给这个字段起别名,as可省 select dno [as] d from dept; # 查询某个字段并将每个结果加1后显示 select dno+1 from dept; # 计数并对结果起别名 select count(*) cnt from dept; # 限制查询返回结果行数 select * from dept limit 2; # 限定查询条件,where子句中字段不能用别名 select * from dept where dept>1000; # =运算符,查找工资为5000的员工 select * from emp where sal=5000; # between运算符,查找工资区间为500到1000的员工,左闭又闭区间 select * from emp where sal between 500 and 1000; # is null,查询没有奖金的员工 select * from emp where comm is null; # in运算符,or运算符,查询工资为1500或5000的员工 select * from emp where sal in (1500,5000); select * from emp where sal=1500 or sal=5000; # like运算符,%代表零个或多个字符,_代表一个字符 # 查找名字以A开头的员工信息 select * from emp where enmae like 'A%'; # 查找名字中第二个字母为A的员工信息 select * from emp where ename like '_A%'; # 查找名字中带有A的员工信息 select * from emp where ename like '%A%'; # rlike运算符,后面跟一个正则表达式 # 查询名字中带有A的员工信息 select * from emp where ename rlike '[A]';
4.4.2 高级查询
-
# GROUP BY 语句通常会和聚合函数一起使用,按照一个或者多个列对结果进行分组,然后对每个组执行聚合操作。 # 查询部门的平均工资 select deptno, avg(sal) from emp group by deptno; # 查询平均工资大于2000的部门及部门平均工资,注意子查询语句后面要跟上别名,否则会报错 selectdeptno,avg_sal from (select deptno,avg(sal) avg_sal from emp group by deptno) t where avg_sal>1000; # having语句 # (1)where 后面不能写分组函数,而 having 后面可以使用分组函数。 # (2)having 只用于 group by 分组统计语句。 # 查询平均工资大于2000的部门及部门平均工资 select deptno,avg(sal) avg_sal group by deptno having avg_sal>1000;
-
JOIN关键字用于连接多张表,JOIN默认表示内连接:只有进行连接的两个表中都存在与连接条件相匹配的数据才会被保留下来。
左外连接:LEFT JOIN 操作符左边表中符合 WHERE 子句的所有记录将会被返回,左边表的数据全要,右边表只要满足条件的数据
右外连接:RIGHT JOIN 操作符右边表中符合 WHERE 子句的所有记录将会被返回,左边表只要满足条件的数据,右边表的数据全要
满外连接:FULL JOIN将会返回所有表中符合 WHERE 语句条件的所有记录。左右两边表的数据都要,如果任一表的指定字段没有符合条件的值的话,那么就使用 NULL 值替代。# 内连接 # 根据员工表和部门表中的部门编号相等,查询员工编号、员工名称和部门名称 select e.empno,e.ename,e.dname # 字段指明所属表的话效率会提升,两个表若拥有同一字段,则必须加上所属表 fromemp e # 使用表的别名简化查询 joindept d on e.deptno=d.deptno; # 左外连接 # 根据员工表和部门表中的部门编号相等,查询员工编号、员工名称和部门名称,如果没有部门名称则补充NULL select e.empno,e.ename,e.dname # 字段指明所属表的话效率会提升,两个表若拥有同一字段,则必须加上所属表 fromemp e # 使用表的别名简化查询 left joindept d on e.deptno=d.deptno; # 右外连接 # 根据员工表和部门表中的部门编号相等,查询员工编号、员工名称和部门名称,如果部门没有员工则补充NULL selecte.empno,e.ename,d.deptno fromemp e right joindept d on e.deptno=d.deptno # 满外连接 # 根据员工表和部门表中的部门编号相等,查询员工编号、员工名称和部门名称,如果部门没有员工则补充NULL,如果员工没有部门的也补充NULL selecte.empno,e.ename,nvl(e.deptno,d.deptno) #nvl:如果第一个字段为null就显示第二个字段 fromemp e full joindept d on e.deptno=d.deptno# 取左表独有数据 # 根据员工表和部门表中的部门编号相等,查询没有部门的员工信息 select e.empno,e.ename,e.deptno fromemp e # 使用表的别名简化查询 left joindept d on e.deptno=d.deptno where d.deptno is null;或 # 取出员工表中deptno不在部门表的记录 select e.empno,e.ename,e.deptno fromemp e where e.deptno not in (select deptnofrom dept);# 取右表独有数据 # 查询部门表中不存在任何员工的部门信息 select d.deptno, d.dname fromemp e right joindept d on e.deptno=d.deptno where e.deptno is null;或 # 取出部门表中deptno不在员工表的记录 select d.deptno,d.dname from dept d where d.deptno not in (select distinct e.deptnofrom emp e);# 取左右表独有数据 # 查询员工表中没有部门的员工和部门表中没有员工的部门 selecte.empno,e.ename,e.deptno,d.deptno,d.dname fromemp e full joindept d on e.deptno=d.deptno where e.deptno is null or d.deptno is null;或 # 使用union联合左表独有数据和右表独有数据 select * from (select e.empno,e.ename,e.deptno,d.deptno,d.dnamefromemp e left joindept don e.deptno=d.deptnowhere d.deptno is nullunion select e.empno,e.ename,e.deptno,d.deptno,d.dnamefromemp e right joindept don e.deptno=d.deptnowhere e.deptno is null)tmp;# 多表连接查询 # 查询所有员工所处的部门姓名和部门所属城市 select e.ename,d.dname,l.loc_name from emp e joindept d on e.deptno=d.deptno join location l on d.loc=l.loc;# 笛卡尔积:A表的每一条数据跟B表的数据每一条数据都会进行一次连接 select e.ename,d.dname from emp e join dept d;
-
排序
# Order By:全局排序,只有一个 Reducer # 按员工工资进行升序排序,默认升序ASC select ename,sal from emp order by sal;# 按照别名排序 select ename, sal*2 twosal from emp order by twosal;# 按照部门和工资升序排序,先按部门再按工资 select ename,deptno,sal from emp order by deptno,sal;# Sort By:对于大规模的数据集 order by 的效率非常低。在很多情况下,并不需要全局排序,此时可以使用 sort by。Sort by 为每个 reducer 产生一个排序文件。每个 Reducer 内部进行排序,对全局结果集来说不是排序。 # 设置reduce个数 set mapreduce.job.reduces=3; # 根据部门工资降序查看员工信息 select * from emp sort by deptno desc; # 将查询结果导出到文件中,会有三个文件,每个文件里都是有序的,三个文件里面的数据都是随机分配的 insert overwrite local directory '/opt/module/hive/data/sortby-result' select * from emp sort by sal desc;# Distribute By: 在有些情况下,我们需要控制某个特定行应该到哪个 reducer,通常是为了进行后续的聚集操作。distribute by 子句可以做这件事。distribute by 类似 MR 中 partition(自定义分区),进行分区,结合 sort by 使用。 # distribute by 的分区规则是根据分区字段的 hash 码与 reduce 的个数进行模除后,余数相同的分到一个区。 Hive 要求 DISTRIBUTE BY 语句要写在 SORT BY 语句之前。 # 设置reducer个数 set mapreduce.job.reduces=3; # 根据部门编号的hash码与 reduce 的个数进行模运算后,按照余数分区,再按员工编号降序排序 select * from emp distribute by deptno sort by sal;# 当 distribute by 和 sorts by 字段相同时,可以使用 cluster by 方式。cluster by 除了具有 distribute by 的功能外还兼具 sort by 的功能。但是排序只能是升序排序,不能指定排序规则为 ASC 或者 DESC。 # 根据部门编号的hash码与 reduce 的个数进行模运算后,按照余数分区,分区内再按部门编号升序排序 select * from emp cluster by deptno; 相当于 select * from emp distribute by deptno sort by deptno;
4.4.3 函数
-
系统内置函数
# 查看系统内置函数show functions;# 显示自带函数upper的用法desc function upper;# 详细显示自带的函数的用法desc function extended upper;
-
hive中的函数分为三种,一跟多指的是输入数据跟输出数据的行数
UDF:普通函数,一进一出
UDAF:聚合函数,多进一出
UDTF:炸裂函数,一进多出 -
常用内置函数举例
# NVL( value,default_value)。它的功能是如果 value 为 NULL,则 NVL 函数返回 default_value 的值,否则返回 value 的值,如果两个参数都为 NULL ,则返回 NULL。 # 查询员工的奖金,如果为NULL则返回-1 select nvl(comm, -1) from emp;# CASE WHEN THEN ELSE END,用于对字段进行条件判断, CASE后面跟字段,WHEN后面跟条件 THEN ELSE跟操作,END则语句结束 # 求出不同部门男女各多少人。 # 部门表 name dept_id sex 悟空 A 男 大海 A 男 宋宋 B 男 凤姐 A 女 婷姐 B 女 婷婷 B 女 selectdept_id,sum(case sex when '男' then 1 else 0 end) maleCount,sum(case sex when '女' then 1 else 0 end) femaleCount fromemp_sex group by dept_id;# if,第一个表达式为true则返回第二个值,第一个表达式为false则返回第三个值 # 求出不同部门男女各多少人。 selectdept_id,sum(if(sex='男',1,0)) maleCount,sum(if(sex='女',1,0)) femaleCount fromemp_sex group by dept_id;# concat,实现字符串或字段拼接 select concat(deptno,'-',dname) from dept; # concat_ws,:它是一个特殊形式的 CONCAT()。第一个参数是剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何 NULL 和空字符串。分隔符将被加到被连接的字符串之间; 注意: CONCAT_WS 的参数必须是"string or array<string> select concat_ws('-',ename,job) from emp; # COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生 Array 类型字段。 select collect_set(id) from student1; # 个人信息表 name constellation blood_type 孙悟空 白羊座 A 大海 射手座 A 宋宋 白羊座 B 猪八戒 白羊座 A 凤姐 射手座 A 苍老师 白羊座 B # 把星座和血型相同的人拼接在一起 select con_blood,concat_ws('|',collect_set(name)) # 用collect_set去重生成一个集合 from# 子查询,逗号连接星座跟血型(select concat(consetellation,',',blood_type) con_blood,namefrom person_info) t group by con_blood;# EXPLODE(col):将 hive 一列中复杂的 Array 或者 Map 结构拆分成多行。 # LATERAL VIEW侧写表,用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias 解释:用于和 split, explode 等 UDTF 一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。使用侧写表的原因是用了explode炸裂出来的数据要跟其他列进行关联,使用侧写表能够对其他列的数据进行复制 # 电影表 movie category 《疑犯追踪》 悬疑,动作,科幻,剧情 《Lie to me》 悬疑,警匪,动作,心理,剧情 《战狼 2》 战争,动作,灾难 # 将电影按照类别生成多行数据 select movie,category_name from movie_info lateral view explode(split(category, ",")) movie_info_tmp as category_name; # movie_info_tmp是侧写表别名,category_name是字段拆分后的列名
-
常用的日期函数
unix_timestamp() -- 返回当前或指定时间的时间戳 from_unixtime() -- 将时间戳转换成日期格式 current_date() -- 当前日期 currnet_timestamp() -- 当前日期加时间 to_date() -- 抽取日期 year() --获取年 month() -- 获取月 day() -- 获取日 hour() minute() second() weekofyear() -- 当前时间是一年中的第几周 dayofmonth() -- 当前时间是一个月的第几天 months_between() -- 两个日期之间的月份 add_months() -- 日期加减月 date_diff() -- 两个日期相差的天数 date_add() -- 日期加天数 date_sub() -- 日期减天数 last_day() -- 日期的当月的最后一天 date_format() -- 格式化日期
-
常用的数字函数
round() -- 四舍五入 ceil() -- 向上取整 floor() -- 向下取整
-
常用的字符串函数
upper() --转大写 lower() -- 转小写 length() -- 长度 trim() -- 前后去空格 lpad() -- 向左补齐到指定长度 rpad() -- 向右补齐到指定长度 regexp_replace() -- 正则表达式匹配字符串
-
常用的集合函数
size() -- 集合中元素的个数 map_keys() -- 返回map中的key map_values() -- 返回map中的value array_contains() -- 判断集合中是否包含元素 sort_array() -- 对array中的元素排序
-
Grouping Sets函数,用于多维分析,详情看p94
select a,b,sum(c) from t1 group by a,b grouping sets((a,b),a,b,()) 相当于 select a,b,sum(c) from t1 group by a,b union select a,null,sum(c) from t1 group by a union select null,b,sum(c) from t1 group by b union select null,null,sum(c) from t1
4.4.4 自定义函数
-
自定义函数,当 Hive 提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义 函数(UDF:user-defined function)。根据用户自定义函数类别分为以下三种:
(1)UDF(User-Defined-Function) 一进一出
(2)UDAF(User-Defined Aggregation Function) 聚集函数,多进一出 类似于:count/max/min
(3)UDTF(User-Defined Table-Generating Functions) 一进多出 如 lateral view explode()
官方文档地址为https://cwiki.apache.org/confluence/display/Hive/HivePlugins。
用户自定义函数的步骤为:
(1)继承 Hive 提供的类 org.apache.hadoop.hive.ql.udf.generic.GenericUDF ;org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
(2)实现类中的抽象方法
(3)把实现类打包成jar包上传至linux
(4)在hive的命令行窗口创建函数# 添加jar add jar jar包路径 # 创建function create [temporary] function [dbname.]function_name AS class_name;
-
自定义UDF函数
案例:自定义UDF函数实现计算给定字符串的长度
(1)创建maven项目,导入依赖<dependencies><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency> </dependencies>
(2)创建一个类继承GenericUDF类,并实现initialize(初始化方法,一般进行数据校验)、evaluate方法(处理数据的方法),getDisplayString方法用于展示执行计划(hive的explain关键词,可以以文字形式将hql语句的执行过程展示出来)时使用,一般可以不用写,返回空字符串即可。
package com.atguigu.hive; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * 自定义 UDF 函数,需要继承 GenericUDF 类 * 需求: 计算指定字符串的长度 */ public class MyStringLength extends GenericUDF {/**** @param arguments 输入参数类型的鉴别器对象* @return 返回值类型的鉴别器对象* @throws UDFArgumentException*/@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {// 判断输入参数的个数if(arguments.length !=1){throw new UDFArgumentLengthException("Input Args Length Error!!!");}// 判断输入参数的类型if(!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){throw new UDFArgumentTypeException(0,"Input Args Type Error!!!");} //函数本身返回值为 int,需要返回 int 类型的鉴别器对象return PrimitiveObjectInspectorFactory.javaIntObjectInspector;}/*** 函数的逻辑处理* @param arguments 输入的参数* @return 返回值* @throws HiveException*/@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {if(arguments[0].get() == null){return 0;}return arguments[0].get().toString().length();}@Overridepublic String getDisplayString (String[] children) {return "";} }
(3)将maven项目打成jar包: Maven-Lifecycle-package
(4)把jar包上传至hive目录下的lib目录
(5)重启hive或者把jar包添加到hive的classpath中hive (default)> add jar jar包路径
(6)创建临时函数与开发好的 java class 关联
llhive (default)> create temporary function my_len as "com.atguigu.hive.MyStringLength";
(7)即可在 hql 中使用自定义的函数
hive (default)> select ename,my_len(ename) ename_len from emp;
-
自定义UDTF函数DeveloperGuide UDTF - Apache Hive - Apache Software Foundation
案例:自定义一个 UDTF 实现将一个任意分割符的字符串切割成独立的单词
(1)创建maven项目并导入依赖<dependencies><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency> </dependencies>
(2)创建类实现GenericUDTF类,并实现initialize方法(初始化方法)、process方法(处理数据)和close方法(收尾方法)
package com.atguigu.udtf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.List; public class MyUDTF extends GenericUDTF {private ArrayList<String> outList = new ArrayList<>();@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {//1.定义输出数据的默认列名,可以被别名覆盖,List类型,说明可以使用该自定义UDTF函数产生多列数据List<String> fieldNames = new ArrayList<>();// 定义输出数据的类型List<ObjectInspector> fieldOIs = new ArrayList<>();//2.添加输出数据的列名和类型fieldNames.add("lineToWord");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);}@Overridepublic void process(Object[] args) throws HiveException {//1.获取原始数据String arg = args[0].toString();//2.获取数据传入的第二个参数,此处为分隔符String splitKey = args[1].toString();//3.将原始数据按照传入的分隔符进行切分String[] fields = arg.split(splitKey);//4.遍历切分后的结果,并写出for (String field : fields) {//集合为复用的,首先清空集合outList.clear();//将每一个单词添加至集合outList.add(field);//将集合内容写出forward(outList);}}@Overridepublic void close() throws HiveException {} }
(3)将maven项目打成jar包
(4)把jar包上传至hive目录下的lib目录
(5)重启hive或者把jar包添加到hive的classpath中
(6)创建临时函数与开发好的java class相关联hive (default)> create temporary function myudtf as "com.atguigu.hive.MyUDTF";
(7)使用自定义的函数
hive (default)> select myudtf("hello,world,hadoop,hive",",");
4.4.5 窗口函数
-
窗口函数:为每一行数据开辟一个数据窗口,从而进行数据计算
# 窗口函数: OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化。
OVER函数可以利用一下关键字来控制数据窗口的大小
CURRENT ROW:当前行 n PRECEDING:往前 n 行数据 n FOLLOWING:往后 n 行数据 UNBOUNDED:起点, UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING 表示到后面的终点
在OVER函数前可以通过以下函数来对窗口内的数据进行计算
LAG(col,n,default_val):往前第 n 行数据 LEAD(col,n, default_val):往后第 n 行数据 NTILE(n):把有序窗口的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每一行,NTILE 返回此行所属的组的编号。注意:n 必须为 int 类型。
-
案例
案例数据jack,2017-01-01,10 tony,2017-01-02,15 jack,2017-02-03,23 tony,2017-01-04,29 jack,2017-01-05,46 jack,2017-04-06,42 tony,2017-01-07,50 jack,2017-01-08,55 mart,2017-04-08,62 mart,2017-04-09,68 neil,2017-05-10,12 mart,2017-04-11,75 neil,2017-06-12,80 mart,2017-04-13,94
(1)查询在 2017 年 4 月份购买过的顾客及总人数
select name,count(*) -- 不加over相当于在求每个人在4月份购买过几次 from business where substring(orderdate,1,7) = '2017-04' group by name; 结果:jack 1mart 4对比 select name,count(*) over() -- 加over即为题目所求,因为over是对group by生效后的数据产生作用,因此可以得出正确答案,group by后的数据只有两行,而over内不加参数,所以窗口内的数据是所有数据,即这两行数据 from business where substring(orderdate,1,7) = '2017-04' group by name; 结果:jack 2mart 2select name, from business where substring(orderdate,1,7) = '2017-04' group by name; 结果:jackmart
(2)查询顾客的购买明细及顾客月购买总额
select name,orderdate,cost,sum(cost) over(partition by name,month(orderdate)) -- 不加order by,相同name的每一行数据的窗口都是一样大的,sum求的窗口内数据的总和 from business; -- 得到每个顾客每个月每一笔消费以及这个月的消费总额
(3)上述的场景, 将每个顾客的 cost 按照日期进行累加
selectname,orderdate,cost,sum(cost) over(partiton by name order by orderdate) -- 如果order by被指定,且没有用rows between语句指定窗口范围的话,默认窗口范围为ROWS between UNBOUNDED PRECEDING and CURRENT ROW from bussiness;或者 selectname,orderdate,cost,sum(cost) over(partition by name ROWS between UNBOUNDED PRECEDING and CURRENT ROW) from bussiness;
需要注意的是,若使用order by来限定窗口为ROWS between UNBOUNDED PRECEDING and CURRENT ROW时,如果order by后跟的字段的值有两个或者更多个是一样的,hive会将这几条数据视为在同一行,这样看起来好像在遇到这样的数据时,窗口范围会变大
比如 数据 id 1 2 3 3 4 5 select id,sum(id) over(order by id) from t; 会得到结果 1 3 9 9 13 18 而不是结果 1 3 6 9 13 18 因为在遇到数据3时,把两个3视为在同一行,遇到3时窗口包含的数据为1 2 3 3
(4)查询每个顾客上次的购买时间
selectname,lag(orderdate, 1) over(partition by name order by orderdate) last_orderdate from bussiness;
(5)查询前 20%时间的订单信息
select * from (select name,orderdate,cost, ntile(5) over(order by orderdate) sorted from business) t where sorted = 1;
-
其他常用的窗口函数
RANK() --排序,排序相同时会重复,总数不会变,即有两个第一名时,名次值为1 1 3 4 DENSE_RANK() --排序,排序相同时会重复,总数会减少,即有两个第一名时,名次为 1 1 2 3 ROW_NUMBER() --会根据顺序计算
5 分区表和分桶表
5.1 分区表
-
分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。一般分区都是按天进行分区。
# 创建分区表,创建分区表要用partitioned by给定一个分区字段,注意:分区字段不能跟表中字段一样,可以将分区字段看作表的伪列。 create table dept_partition( deptno int, dname string, loc string) partitioned by (day string) row format delimited fields terminated by ' ';# 加载数据到表中时需要指定分区信息,如果没有指定分区信息的话,会自动产生一个默认分区HIVE_DEFAULT_PARTITION,数据将会存在默认分区内 load data local inpath './data/dept1.txt' into table dept_partition partition(day='2022-9-3'); load data local inpath './data/dept2.txt' into table dept_partition partition(day='2022-9-4'); load data local inpath './data/dept3.txt' into table dept_partition partition(day='2022-9-5');
建好的分区表对应一个目录,每个分区又对应着这个目录下的一个目录,分区字i段值即分区目录名,由于分区字段值不存储在表中(相当于提前在文件系统中将数据分好类),因此根据分区字段去筛选表中数据会比用其他字段筛选数据效率高,因为用其他字段筛选数据需要进行全表扫描,而利用分区字段筛选数据能够有效避免全表扫描。
-
分区表的增删改查
# 单分区查询 select * from dept_partition where day='2022-9-3'; # 多分区联合查询 select * from dept_partition where day='2022-9-3' union select * from dept_partition where day='2022-9-4' union select * from dept_partition where day='2022-9-5'; hive (default)> select * from dept_partition where day='2022-9-3' or day='2022-9-4' or day='2022-9-5'# 增加分区 # 增加一个分区 alter table dept_partition add partition(day='2022-9-6'); # 增加多个分区,分区用空格隔开 alter table dept_partition add partition(day='2022-9-6') partition(day='2022-9-7');# 删除分区 # 删除一个分区 alter table dept_partition drop partition (day='2022-9-6') # 删除多个分区,分区用逗号隔开 alter table dept_partition drop partition (day='2022-9-6'), partition(day='2022-9-7');# 查看有多少分区 show partitions dept_partition;# 查看分区表结构 desc formatted dept_partition;
-
当按天进行分区后,分区的数据还很大时,需要进行二级分区,即要设置两个分区字段,这样建好的表就对应一个目录,这个目录下根据分区字段day的值有多个一级分区目录,每个一级分区目录下根据分区字段hour又有多个二级分区目录
# 创建二级分区表,设置两个分区字段 create table dept_partition2(deptno int, dname string, loc string)partitioned by (day string, hour string)row format delimited fields terminated by ' ';# 加载数据 load data local inpath './data/dept1.txt' into table dept_partition2 partition(day='2022-9-5',hour='11'); load data local inpath './data/dept3.txt' into table dept_partition2 partition(day='2022-9-5',hour='12'); load data local inpath './data/dept3.txt' into table dept_partition2 partition(day='2022-9-5',hour='13');
-
往普通表内存数据,是可以先向HDFS路径上传文件,再创建表(因为普通表只有表的元数据);但是对于分区表,如果想直接创建一个目录作为分区目录,并往里上传文件数据是不行的,分区表并不会读取到这个自己手动创建的分区目录,原因是这样子操作并不会修改到元数据,hive元数据库中PARTITIONS表中会存有分区表的分区信息,手动创建的分区目录并不会修改这个分区信息(分区表不只要表的元数据,还要有分区的元数据)。要让手动创建的分区目录下的数据能够被查询到,即让分区表和数据产生关联的方式有以下三种:
# 执行修复分区命令,以下命令会检测HDFS目录结构去把元数据库中的元数据信息补充完整 msck repair table dept_partition;# 添加分区 alter table dept_partition2 add partition(day='2022-9-6');# 创建分区目录后 load 数据到分区,也就是可以手动创建分区目录,但是里面的数据要用load方式加载进去 load data local inpath '/opt/module/hive/data/dept1.txt' into table dept_partition partition(day='2022-9-6');
-
前面都是明确指定分区给分区表插入数据,称为静态分区。在关系型数据库中,对分区表 Insert 数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive 中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用 Hive 的动态分区,需要进行相应的配置。使用hive的动态分区,可以根据插入数据的某一字段,自动的将数据存储到相应的分区内。
# 需求:根据某个字段值进行分区 # 创建一个分区表,将deptno字段作为分区字段 create table dept_no_par( dname string, loc string ) partitioned by (deptno int) row format delimited fields terminated by ' ';# 静态分区 insert into table dept_no_par partition(deptno="70") select dname,loc,deptno from dept; # 开启动态分区功能(默认就是true) set hive.exec.dynamic.partition=true; # 设置为非严格模式(动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。) set hive.exec.dynamic.partition.mode=nonstrict; # 在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。默认 1000 set hive.exec.max.dynamic.partitions=1000; # 在每个执行 MR 的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。 set hive.exec.max.dynamic.partitions.pernode=100; # 整个 MR Job 中,最大可以创建多少个 HDFS 文件。默认 100000 set hive.exec.max.created.files=100000; # 当有空分区生成时,是否抛出异常。一般不需要设置。默认 false set hive.error.on.empty.partition=false;# 将dept表的数据按照deptno字段,插入到dept_no_par表中的不同分区中 insert into table dept_no_par partition(deptno) select dname,loc,deptno from dept; # 在严格模式下执行会报错 相当于 insert into table dept_no_par select dname,loc,deptno from dept; # hive3.0新特性:默认使用最后一个字段作为分区字段,在严格模式下执行不会报错,底层自动将动态分区的模式设置成了飞严格模式
5.2 分桶表
-
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围划分。分桶是将数据集分解成更容易管理的若干部分的另一个技术。分区针对的是数据的存储路径;分桶针对的是数据文件。
# 创建分桶表,需要指定某一个表字段作为分桶字段,以及分桶个数 create table stu_buck(id int, name string) clustered by(id) into 4 buckets row format delimited fields terminated by ' ';# 查看表结构 desc formatted stu_buck;# 加载数据到分桶表,由于需要走MR任务,所以最好确保每一台机器上都有这个txt文件,或者直接上传这个txt文件到HDFS上 load data local inpath './data/stu.txt' into table stu_buck;
创建的分桶表对应一个目录,这个目录下有多个分桶目录,Hive 的分桶采用对分桶字段的值进行哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
-
分桶表操作注意事项:
(1)reduce 的个数设置为-1,让 Job 自行决定需要用多少个 reduce 或者将 reduce 的个数设置为大于等于分桶表的桶数,否则无法将数据进行正确分桶
(2)从 hdfs 中 load 数据到分桶表中,避免本地文件找不到问题
(3)不要使用本地模式 -
对于非常大的数据集,有时用户需要使用的是一个具有代表性的查询结果而不是全部结果。Hive 可以通过对表进行抽样来满足这个需求。
语法: TABLESAMPLE(BUCKET x OUT OF y)# 抽样查询,将整个数据集分成4分,从第二分开始抽样,注意:x 的值必须小于等于 y 的值,否则 select * from stu_buck tablesample(bucket 2 out of 4 on id);
6 压缩和存储
6.1 Hadoop压缩配置
- 见hadoop6.4数据压缩
- 用的更多的还是Snappy和LZO,虽然Snappy不能分片,但Snappy压缩和解压缩速度更快,可以通过限制文件大小从而解决不能分片导致的任务运行过慢的问题
6.2 开启Map输出阶段压缩(MR引擎)
-
开启 map 输出阶段压缩可以减少 job 中 map 和 Reduce task 间数据传输量。具体配置如 下
(1)开启 hive 中间传输数据压缩功能hive (default)>set hive.exec.compress.intermediate=true;
(2)开启 mapreduce 中 map 输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
(3)设置 mapreduce 中 map 输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
(4)执行查询语句
hive (default)> select count(ename) name from emp;
6.3 开启 Reduce 输出阶段压缩
-
当 Hive 将 输 出 写 入 到 表 中 时 , 输出内容同样可以进行压缩。属性 hive.exec.compress.output控制着这个功能。用户可能需要保持默认设置文件中的默认值false, 这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这 个值为 true,来开启输出结果压缩功能。具体操作如下
(1)开启 hive 最终输出数据压缩功能hive (default)>set hive.exec.compress.output=true;
(2)开启 mapreduce 最终输出数据压缩
hive (default)>set mapreduce.output.fileoutputformat.compress=true;
(3)设置 mapreduce 最终数据输出压缩方式
hive (default)> set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
(4)设置 mapreduce 最终数据输出压缩为块压缩
hive (default)> set mapreduce.output.fileoutputformat.compress.type=BLOCK;
(5)测试一下输出结果是否是压缩文件
hive (default)> insert overwrite local directory '/opt/module/data/distribute-result' select * from emp distribute by deptno sort by empno desc;
6.4 Hive文件存储格式
-
行式存储在物理上是按照逻辑表每行数据的顺序进行存储,而列式存储在物理上是按照逻辑表每列数据的顺序进行存储。
行式存储的特点:查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列 的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度 更快。
列式存储的特点:因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的 数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算 法。 -
Hive 支持的存储数据的格式主要有:TEXTFILE(默认) 、SEQUENCEFILE、ORC、PARQUET。TEXTFILE 和 SEQUENCEFILE 的存储格式都是基于行存储的; ORC 和 PARQUET 是基于列式存储的。
-
TEXTFILE:默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用, 但使用 Gzip 这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作。
-
ORC:ORC格式是列式存储,但跟上面所说的最普通的列式存储有些许不同。前面说列式存储是存完一个列再存下一个列,但是在大数据场景下,如果真的是存完一个列在存下一个列的话,即使是每个列分文件存储,文件也会特别大,不好进行处理。所以ORC是对表数据进行分段,对每一段再进行列式存储。每个 Orc 文件由 1 个或多个 stripe 组成,每个 stripe 一般为 HDFS 的块大小,每一个 stripe 包含多条记录,这些记录按照列进行独立存储。
(1)由于每个列的数据分布在多个Stripe,在找数据的时候需要跨Stripe进行查找,因此ORC文件存储了一个FileFooter,用于记录每个Stripe的索引,可以用于定位想要的数据在哪几个Stripe中。
(2)每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后 往前读。
(3)每个 Stripe 里有三部分组成,分别是 Index Data,Row Data,Stripe Footer:
1)Index Data:记录了在当前Stripe中某一列的数据的行索引,也就是在这个Stripe中这一列的数据是从哪一行开始到哪一行结束。
2)Row Data:存的是具体的数据,对每个 列进行了编码,每个列以Data Stream 的形式来存储,同时也存储了每个列的元数据信息。
3)Stripe Footer:记录了每一个Stream的类型,长度等信息。
每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column 的数据类型信息等; -
Parquet:Parquet也是列式存储,跟ORC一样也是对数据进行分块再进行列式存储。Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的 数据和元数据,因此 Parquet 格式文件是自解析的。 Parquet文件格式包含以下结构:
(1)行组(Row Group):每一个行组包含一定的行数,在一个 HDFS 文件中至少存储一 个行组,类似于 orc 的 stripe 的概念。
(2)列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连 续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的 算法进行压缩。 (原理上跟orc的data stream差不多)
(3)页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块 的不同页可能使用不同的编码方式。
通常情况下,在存储 Parquet 数据的时候会按照 Block 大小设置行组的大小,由于一般 情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一 个 Mapper 任务处理,增大任务执行并行度。
-
TEXTFILE、ORC、PARQUERT对比
存储上压缩性能: ORC > Parquet > textFile
查询性能:速度相近
6.5 存储和压缩结合
-
ORC文件格式支持的压缩,默认压缩方式为ZLIB,要指定关于 ORCFile 的参数,都是在 HQL 语句的 TBLPROPERTIES 字段里面出现
Key Default Notes orc.compress ZLIB high level compression (one of NONE, ZLIB, SNAPPY) orc.compress.size 262,144 number of bytes in each compression chunk orc.stripe.size 268,435,456 number of bytes in each stripe orc.row.index.stride 10,000 number of rows between index entries (must be >= 1000) orc.create.index true whether to create row indexes orc.bloom.filter.columns “” comma separated list of column names for which bloom filter should be created orc.bloom.filter.fpp 0.05 false positive probability for bloom filter (must >0.0 and <1.0) -- 创建一个表格,文件格式为orc,压缩方式为zlib,ZLIB 比 Snappy 压缩的还小。原因是 ZLIB 采用的是 deflate 压缩算法。比 snappy 压缩的 压缩率高。 create table log_orc_zlib( track_time string,url string, session_id string, referer string,ip string, end_user_id string, city_id string ) row format delimited fields terminated by '\t' stored as orc -- 创建一个表格,文件格式为orc,压缩方式为snappy create table log_orc_snappy( track_time string,url string, referer string, ip string,end_user_id string, city_id string ) row format delimited fields terminated by '\t' stored as orc tblproperties("orc.compress"="SNAPPY");
-
在实际的项目开发当中,hive 表的数据存储格式一般选择:orc 或 parquet。压缩方式一 般选择 snappy,lzo。
7 企业级调优
7.1 执行计划
-
Hive通过Explain关键字查询HQL的执行计划
-
执行计划示例
-- 没有生成mr任务的执行计划 hive (default)> explain select * from emp; Explain STAGE DEPENDENCIES:Stage-0 is a root stage -- 描述阶段依赖STAGE PLANS:Stage: Stage-0 Fetch Operator -- 抓取操作limit: -1 Processor Tree:TableScan alias: emp -- 表别名,没有的话就是本身Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONESelect Operatorexpressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int)outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,_col6, _col7Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONEListSink
-
有生成mr任务的执行计划
-
查看详细的执行计划:加多一个extended关键字,展示的信息基本差不多,可能会多一些压缩之类的细节信息
select expalin extended select * from emp
7.2 Fetch抓取
-
Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如:SELECT * FROM employees;在这种情况下,Hive 可以简单地读取 employee 对应的存储目录下的文件, 然后输出查询结果到控制台。在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 hive 默认是 minimal,该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走 mapreduce。
<property><name>hive.fetch.task.conversion</name><value>more</value><description> Expects one of [none, minimal, more]. Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 1. none : disable hive.fetch.task.conversion 2. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 3. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)</description> </property>
-
(1)把 hive.fetch.task.conversion 设置成 none,然后执行查询语句,都会执行 mapreduce 程序。 hive (default)> set hive.fetch.task.conversion=none; hive (default)> select * from emp; hive (default)> select ename from emp; hive (default)> select ename from emp limit 3; (2)把 hive.fetch.task.conversion 设置成 more,然后执行查询语句,如下查询方式都不 会执行 mapreduce 程序。 hive (default)> set hive.fetch.task.conversion=more; hive (default)> select * from emp; hive (default)> select ename from emp; hive (default)> select ename from emp limit 3;
7.3 本地模式
-
大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过, 有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能 会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机 器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置 hive.exec.mode.local.auto 的值为 true,来让 Hive 在适当的时候自动 启动这个优化。(默认情况是关闭的)
set hive.exec.mode.local.auto=true; //开启本地 mr //设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,默认 为 134217728,即 128M set hive.exec.mode.local.auto.inputbytes.max=50000000; //设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默 认为 4 set hive.exec.mode.local.auto.input.files.max=10;
7.4 表的优化
7.4.1 Map Join
-
当小表join大表时,可能某个key在大表中数据很多,容易引起数据倾斜,且shuffle阶段数据传输消耗大量资源,这种情况可以用map join来进行解决,在map阶段进行连接,从而节省了资源。
-- 开启map join设置(默认开启) set hive.auto.convert.join = true; 默认为 true -- 设置大表小表的阈值,默认25M以下就是小表 set hive.mapjoin.smalltable.filesize = 25000000;
-
map join的原理:将小表加载到分布式缓存中,在map阶段,大表的数据与缓存中的小表数据进行连接
-
新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放 在左边和右边已经没有区别。(旧版在编写sql时需要把小表写在左边,才可以使用 map join 让小的维度表 先进内存)
7.4.2 大表Join大表
-
空key过滤:大表Join大表时,有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同 的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下, 这些 key 对应的数据是异常数据(比如空值),我们需要在 SQL 语句中进行过滤(先过滤再join)。
-- 不过滤空值 insert overwrite table jointable select n.* from nullidtable n left join bigtable o on n.id = o.id; -- 过滤空值 insert overwrite table jointable select n.* from (select * from nullidtable where id is not null) n left join bigtable o on n.id = o.id;
空key过滤使用场景:
(1)非inner join:inner join场景默认过滤
(2)不需要字段为null的 -
空key转换:有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地 分不到不同的 reducer 上。
-- 不做操作,可能会导致数据倾斜 -- nullidtable:有空key的大表,bigtable:大表 set mapreduce.job.reduces = 5; -- 设置reduce个数 insert overwrite table jointable select n.* from nullidtable n left join bigtable b on n.id = b.id; -- left join时,左表的空key会被送到一个reduce任务中,造成数据倾斜-- 对id为空的数据赋予随机值进行打乱,消除数据倾斜 set mapreduce.job.reduces = 5; insert overwrite table jointable select n.* from nullidtable n full join bigtable o on nvl(n.id,rand()) = o.id;
-
SMB(Sort Merge Bucket Join):有时候没有那么多空key,为了解决数据倾斜的问题,或者想要优化任务执行的效率,可以将数据表设置成分桶表,对对应桶的数据进行join操作,使得大表join大表转化成小表join小表。
-- 创建分桶表1,桶的个数不要超过可用 CPU 的核数 create table bigtable_buck1( id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) clustered by(id) sorted by(id) into 6 buckets row format delimited fields terminated by '\t'; load data local inpath '/opt/module/data/bigtable' into table -- 创建分桶表2,桶的个数不要超过可用 CPU 的核数 create table bigtable_buck2( id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) clustered by(id) sorted by(id) into 6 buckets row format delimited fields terminated by '\t'; load data local inpath '/opt/module/data/bigtable' into table-- 设置参数 set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;-- 分桶连接 insert overwrite table jointable select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from bigtable_buck1 s join bigtable_buck2 b on b.id = s.id;
7.4.3 Map端聚合(Group by优化)
-
Group by操作会把相同key的数据发送到一个reduce任务中进行聚合计算,容易引发数据倾斜。可以开启Map端聚合,在Map端先进行部分数据的聚合,缓解数据倾斜。
-- 开启map端聚合 set hive.map.aggr = true -- 是否在 Map 端进行聚合,默认为 True set hive.groupby.mapaggr.checkinterval = 100000 -- 在 Map 端进行聚合操作的条目数目 set hive.groupby.skewindata = true -- 有数据倾斜的时候进行负载均衡(默认是 false)
当选项设定为 true,Map端聚合实际上生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出 结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果 是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二 个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证 相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
7.4.4 Count(distinct)去重优化
-
数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成, 一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换,但是需要注意 group by 造成 的数据倾斜问题.
set mapreduce.job.reduces = 5; -- 设置reduce个数 select count(distinct id) from a; -- 只会开启一个reduce来去重 select count(id) from (select id from bigtable group by id) a; -- select id from bigtable group by id相当于进行了并行的去重操作
7.4.5 笛卡尔积优化
- 尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用 1 个reducer 来完成笛卡尔积。
7.4.6 行列过滤(谓词下推)
-
列处理:在 SELECT 中,只拿需要的列,如果有分区,尽量使用分区过滤,少用 SELECT *。
-
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面,那么就会先全表关联,之后再过滤
hive (default)> select o.id from bigtable b join bigtable o on o.id = b.id where o.id <= 10; -- 先进行表关联再过滤数据,虽然理论上是会先进行全表关联后再过滤数据,但是sql中底层对这种sql进行了谓词下推优化,会先进行表数据的过滤再进行表关联,并且这里由于过滤条件用的是id,跟表关联字段一致,会使得在执行计划中,对表o和表b都先利用条件id<=10进行数据过滤;如果过滤条件用的字段不是id的话,就只会对o表的数据进行数据过滤。谓词下推语法在太长的sql中容易失效。 hive (default)> select b.id from bigtable b join (select id from bigtable where id <= 10) o on b.id = o.id; -- 先进行过滤再进行表关联,其实相当于手动的谓词下推
7.4.7 合理设置map数和reduce数
-
- 通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。 主要的决定因素有:input 的文件总个数,input 的文件大小,集群设置的文件块大小。
- 是不是 map 数越多越好? 答案是否定的。如果一个任务有很多小文件(远远小于块大小 128m),则每个小文件也会被当做一个块,用一个 map 任务来完成,而一个 map 任务启动和初始化的时间远远大 于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 map 数是受限的。
- 是不是保证每个 map 处理接近 128m 的文件块,就高枕无忧了?答案也是不一定。比如有一个 127m 的文件,正常会用一个 map 去完成,但这个文件只 有一个或者两个小字段,却有几千万的记录(因为map是一行调用一次map方法,这样也会很慢),如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定也比较耗时。
针对上面的问题 2 和 3,我们需要采取两种方式来解决:即减少 map 数和增加 map 数;
-
当 input 的文件都很大,任务逻辑复杂,map 执行非常慢的时候,可以考虑增加 Map 数, 来使得每个 map 处理的数据量减少,从而提高任务的执行效率。增加 map 的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式, 调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。
-- 调整前 hive (default)> select count(*) from emp; Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1-- 调整后 hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100; hive (default)> select count(*) from emp; Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1
-
当小文件太多导致map数量太多影响任务完成效率,可以考虑小文件合并。
-- 合并小文件,在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat 具有对小文件进行合 并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。 set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;-- 避免小文件生成 SET hive.merge.mapfiles = true; -- 在 map-only 任务结束时合并小文件,默认 true SET hive.merge.mapredfiles = true; -- 在 map-reduce 任务结束时合并小文件,默认 false SET hive.merge.size.per.task = 268435456; -- 合并文件的大小上限,默认 256M SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于该值时,启动一个独立的 map-reduce 任务进行文件 merge
-
过少的reduce会影响任务执行效率,可以通过设置合理的reduce数量来提高任务完成效率。在设置 reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 reduce 数; 使单个 reduce 任务处理数据量大小要合适;
(1)过多的启动和初始化 reduce 也会消耗时间和资源;
(2)另外,有多少个 reduce,就会有多少个输出文件,如果生成了很多个小文件,那 么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;-- 调整reduce方式一,通过reduce个数计算公式调整 N=min(参数 2,总输入数据量/参数 1) -- 参数1,每个 Reduce 处理的数据量默认是 256MB hive.exec.reducers.bytes.per.reducer=256000000 -- 参数2,每个任务最大的 reduce 数,默认为 1009 hive.exec.reducers.max=1009 -- 调整reduce个数方式二,在 hadoop 的 mapred-default.xml 文件中修改 设置每个 job 的 Reduce 个数 set mapreduce.job.reduces = 15;
7.4.8 并行执行
-
Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽 样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下, Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能 并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行 时间缩短。不过,如果有更多的阶段可以并行执行,那么 job 可能就越快完成。
set hive.exec.parallel=true;//打开任务并行执行 set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为8。
7.4.9 严格模式
-
Hive 可以通过设置防止一些危险操作:
1)分区表不使用分区过滤
将 hive.strict.checks.no.partition.filter 设置为 true 时,对于分区表,除非 where 语句中含 有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有 进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2)使用 order by 没有 limit 过滤
将 hive.strict.checks.orderby.no.limit 设置为 true 时,对于使用了 order by 语句的查询,要 求必须使用 limit 语句。因为 order by 为了执行排序过程会将所有的结果数据分发到同一个 Reducer 中进行处理,强制要求用户增加这个 LIMIT 语句可以防止 Reducer 额外执行很长一 段时间。
3)笛卡尔积
将 hive.strict.checks.cartesian.product 设置为 true 时,会限制笛卡尔积的查询。对关系型数 据库非常了解的用户可能期望在 执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语 句,这样关系数据库的执行优化器就可以高效地将 WHERE 语句转化成那个 ON 语句。不幸 的是,Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情 况。
7.4.10 JVM重用
- 见Hadoop
7.4.11 压缩
- 见6.3
8 Tez数据引擎
-
Tez 是一个 Hive 的运行引擎,性能优于 MR。用 Hive 直接编写 MR 程序,假设有四个有依赖关系的 MR 作业,上图中,绿色是 Reduce Task,云状表示写屏蔽,需要将中间结果持久化写到 HDFS。Tez 可以将多个有依赖的作业转换为一个作业,这样只需写一次 HDFS,且中间节点较少, 从而大大提升作业的计算性能。
-
修改Hive执行引擎为Tez
-- 拷贝tez安装包到集群并解压 [atguigu@hadoop102 software]$ mkdir /opt/module/tez [atguigu@hadoop102 software]$ tar -zxvf /opt/software/tez-0.10.1- SNAPSHOT-minimal.tar.gz -C /opt/module/tez-- 上传tez依赖到HDFS [atguigu@hadoop102 software]$ hadoop fs -mkdir /tez [atguigu@hadoop102 software]$ hadoop fs -put /opt/software/tez-0.10.1- SNAPSHOT.tar.gz /tez-- 新建tez-site.xml [atguigu@hadoop102 software]$ vim $HADOOP_HOME/etc/hadoop/tez-site.xml <?xml version="1.0" encoding="UTF-8"? <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>tez.lib.uris</name> <value>${fs.defaultFS}/tez/tez-0.10.1-SNAPSHOT.tar.gz</value> </property> <property> <name>tez.use.cluster.hadoop-libs</name> <value>true</value> </property> <property> <name>tez.am.resource.memory.mb</name> <value>1024</value> </property> <property> <name>tez.am.resource.cpu.vcores</name> <value>1</value> </property> <property> <name>tez.container.max.java.heap.fraction</name> <value>0.4</value> </property> <property> <name>tez.task.resource.memory.mb</name> <value>1024</value> </property> <property> <name>tez.task.resource.cpu.vcores</name> <value>1</value> </property> </configuration>-- 修改环境变量,添加tez的环境信息 [atguigu@hadoop102 software]$ vim $HADOOP_HOME/etc/hadoop/shellprofile.d/tez.sh hadoop_add_profile tez function _tez_hadoop_classpath {hadoop_add_classpath "$HADOOP_HOME/etc/hadoop" after hadoop_add_classpath "/opt/module/tez/*" after hadoop_add_classpath "/opt/module/tez/lib/*" after }-- 修改hive的计算引擎 [atguigu@hadoop102 software]$ vim $HIVE_HOME/conf/hive-site.xml <property> <name>hive.execution.engine</name> <value>tez</value> </property> <property> <name>hive.tez.container.size</name> <value>1024</value> </property>-- 解决日志jar包冲突 [atguigu@hadoop102 software]$ rm /opt/module/tez/lib/slf4j-log4j12- 1.7.10.jar
9 实际案例
- 表数据结构
-
统计视频观看数top10
SELECTvideoId, views FROM gulivideo_orc ORDER BY views DESC LIMIT 10;
-
统计视频类别热度top10
由于一个视频可能对应多个类别,需要先对视频表进行列转行操作,即展开后再进行统计select catgory_name, count(videoId) hot from (select videoId, catgory_name from gulivideo_orc lateral view explode(split(category)) gulivide_orc_tmp as catgory_name) t group by category_name order by hot limit 10
-
统计视频观看数最高的20个视频所属类别以及类别包含的top20视频的个数
SELECT t2.category_name, COUNT(t2.videoId) video_sum FROM ( SELECT t1.videoId, category_name FROM ( SELECTvideoId, views , categoryFROM gulivideo_orc ORDER BY views DESC LIMIT 20) t1 lateral VIEW explode(t1.category) t1_tmp AS category_name) t2 GROUP BY t2.category_name
-
统计视频观看数top50所关联视频的所属类别排序
SELECTt6.category_name, t6.video_sum,rank() over(ORDER BY t6.video_sum DESC ) rk FROM( SELECTt5.category_name, COUNT(t5.relatedid_id) video_sumFROM (SELECTt4.relatedid_id, category_nameFROM ( SELECTt2.relatedid_id , t3.categoryFROM ( SELECT relatedid_id FROM(SELECTvideoId, views, relatedidFROM gulivideo_orc ORDER BY views DESC LIMIT 50)t1lateral VIEW explode(t1.relatedid) t1_tmp AS relatedid_id)t2 JOIN gulivideo_orc t3 ON t2.relatedid_id = t3.videoId) t4lateral VIEW explode(t4.category) t4_tmp AS category_name) t5 GROUP BY t5.category_name ORDER BY video_sum DESC) t6
-
统计每个类别中的视频热度top10,以music为例
SELECTt1.videoId, t1.views, t1.category_name FROM (SELECTvideoId, views, category_nameFROM gulivideo_orc lateral VIEW explode(category) gulivideo_orc_tmp AS category_name)t1 WHERE t1.category_name = "Music" ORDER BY t1.views DESC LIMIT 10
-
统计每个类别视频观看数top10
SELECTt2.videoId, t2.views, t2.category_name, t2.rk FROM ( SELECTt1.videoId, t1.views, t1.category_name,rank() over(PARTITION BY t1.category_name ORDER BY t1.views DESC ) rk FROM( SELECTvideoId, views, category_nameFROM gulivideo_orc lateral VIEW explode(category) gulivideo_orc_tmp AS category_name)t1)t2 WHERE t2.rk <=10
-
统计上传视频数最多的用户top10以及他们上传的视频观看数在前20的视频
SELECTt2.videoId, t2.views, t2.uploader FROM ( SELECTuploader, videosFROM gulivideo_user_orc ORDER BY videos DESC LIMIT 10 ) t1 JOIN gulivideo_orc t2 ON t1.uploader = t2.uploader ORDER BY t2.views DESC LIMIT 20