第一章、大数据概述
人类的行为及产生的事件的一种记录称之为数据。
1、大数据时代的特征,并结合生活实例谈谈带来的影响。
(一)特征
1、Volume 规模性:数据量大。
2、Velocity高速性:处理速度快。数据的生成和响应快
摩尔定律:每两年,数据量增加一倍
1秒定律:响应时间时间控制在1秒以内
3、Variety多样化:数据种类繁多。
大数据是由结构化、非结构化、半结构化数据组成的,结构化数据仅占10%左右;非结构化数据,它们与人类信息密切相关;
4、Value价值性:价值密度低。
价值密度低,商业价值高;
通过各类大量数据中挖掘有价值的信息;
5、Veracity真实性
真实有效,为个性化推荐和精准营销提供了保证;
(二)、大数据的影响
- 1、科学研究
- 2、思维方式——全样非抽样,效率非精确,相关性
- 3、改变人们的生活方式——安全监测、智能交通、股票分析、疫情监测
- 4、带动相关的大数据产业诞生和发展
- 5、人才培养和就业
-
2、大数据时代的思维方式的特征。
整体性:大数据思维强调整体性,能够更高效地完成复杂的数据统计和分析。
互联性:大数据思维具有量化互联的特征,通过信息全面定量采集和互通,打通信息间隔阂。
价值性:大数据思维具有价值化特征,能够渗透至各个领域及行业的不同维度。
创新性:大数据分析技术为获取事物之间的相关关系提供了极大的便利,使“预测”成为大数据最核心的价值。
3、大数据关键技术
数据采集与预处理、数据存储和管理、数据处理与分析、数据安全和隐私保护。
大数据两大核心关键技术:分布式存储+分布式处理
4、大数据计算模式
5、简述大数据、云计算、物联网的概念以及之间的关系。
- 、大数据:指无法在一定时间内用常规软件工具对其内容进行抓取、管理和处理的数据集合;
- 云计算:实现了通过网络提供可伸缩的、廉价的分布式计算能力,用户只需要在具备网络接入条件的地方,就可以随时随地获得所需的各种IT资源
- 物联网:物物相连的互联网,是互联网的延伸,它利用局部网络或互联网等通信技术把传感器、控制器、机器、人员和物等通过新的方式联在一起,形成人与物、物与物相联,实现信息化和远程管理控制
4.关系:
大数据实验一: Linux系统安装和使用_linux安装实验结果分析-CSDN博
第二章、大数据处理架构Hadoop
1、Hadoop概述
Hadoop是一个能够对大量数据进行分布式存储和处理的软件框架,并且是以一种可靠、高效、可伸缩的方式进行处理的。
Hadoop是主从模式(中心化模式)的架构。
主从模式(Master-Slaves)就是中心化模式,表示有一个主节点来作为管理者,管理协调下属一批从节点工作。
2、Hadoop的核心
HDFS是分布式文件存储系统(Hadoop Distributed File System) 、也是Hadoop的核心。
3、Hadoop特性
高可靠性、高效性、高可扩展性、高容错性、成本低、运行在Linux平台上支持多种编程语言。
4、Hadoop项目结构
5、Hadoop的安装与使用
大数据实验2.Hadoop 集群搭建(单机/伪分布式/分布式)_hadoop搭建的实验目的及要求-CSDN博客
第三章、分布式文件系统HDFS
为什么需要分布式存储?
1、数据量太大,解决能存问题;2、提升网络传输、磁盘读写、CPU、内存等各方面的综合提升。分布式组合在一起可以达到1+1>2的效果
1、HDFS
(1)HDFS概念
HDFS(Hadoop Distributed File System)为Hadoop中的用于存储和处理海量数据的分布式文件系统,支持流式数据访问、存储和处理超大文件,并运行于廉价的普通机器组成的服务器集群上。
(2)存储物理机结构-集群
分布式文件系统把文件分布存储到多个计算机节点上,成千上万的计算机节点构成计算机集群
(3)分布式文件系统的体系结构
分布式文件系统在物理结构上是由计算机集群中的多个节点构成的,这些节点分为两类:
1、“主节点”(Master Node)或称“名称结点”(NameNode):
负责管理HDFS整个文件系统,负责文件操作的执行,管理数据节点和文件块的映射关系。
HDFS系统的主角色,是一个独立的进程。
2、“从节点”(Slave Node)或称“数据节点”(DataNode):
负责数据的读写和存储
HDFS系统的从角色,是一个独立的进程。
此外,还有主角色辅助角色:SecondaryNameNode,它主要帮助NameNode完成元数据整理工作
1)数据存储
2)数据块
1、定义:Block,HDFS默认一个块大小是128MB,一个文件被分成多个块,以块作为存储单位。
2、HDFS使用数据块的好处:
- 支持大规模文件存储:按块分发到各个计算机结点
- 简化系统设计:固定块大小简化了存储管理和元数据管理
- 适合数据备份:每个文件块都可以冗余存储到多个节点上,大大提高了系统的容错性和可用性
3)nameNode的数据结构
nameNode负责管理分布式文件系统的命名空间Namespace,保存了两个核心数据结构:
1、FsImage:维护文件和文件夹的元数据,包含文件的复制等级、访问权限、块大小以及组成文件的块等,一般是GB以上。
2、EditLog:记录了所有针对文件的创建、删除、重命名等操作
4)nameNode的使用
- 启动:将FsImage文件中的内容加载到内存中,之后再执行EditLog(只读)操作
- 更新:更新FsImage文件并创建一个新的空白EditLog
- 正常运行态:(读写)文件系统更新操作先写入到EditLog,而不是直接写入、更新FsImage
名称节点运行期间EditLog不断变大的问题会导致名称节点重启时缓慢
5)SecondaryNameNode
用来保存NameNode中对HDFS元数据信息的备份,并减少名称节点重启的时间。SecondaryNameNode一般是单独运行在一台机器上
作用:
EditLog和FsImage的合并操作;缩小EditLog;也成为名称节点的〝检查点〞
6)DataNode
负责数据的存储和读取,会根据客户端或者是名称节点的调度来进行数据的存储和检索,并且向名称节点定期发送自己所存储的块的列表。
每个数据节点中的数据会被保存在各自节点对应的文件系统中。
2、HDFS存储原理
(1)数据写入
- 第一个副本:放置在上传文件的数据节点;如果是集群外提交,则随机挑选一台磁盘不太满、CPU不太忙的节点
- 第二个副本:放置在与第一个副本不同的机架的节点上
- 第三个副本:与第一个副本相同机架的
- 其他节点上更多副本:随机节点
(2)数据复制
1、流水线复制
2、过程:
(1)文件写入本地,分为多个块
(2)针对每一个块:
- 向名称节点发出写请求,返回一个数据节点列表;
- 把数据写入第一个数据节点中,同时传递列表;
- 第一个数据节点接受并写入本地,向第二个数据节点发出连接请求,并传递接受的数据和列表;
- 第二个数据节点执行上一步操作,依次循环,直到完成所有副本的复制;
3、冗余存储的优势: 1、加快读数据时的传输速度; 2、容易检查数据传输错误;3、保证数据可靠性
(3)数据读取
- 从名称节点获得数据块不同副本的存放位置列表;
- 列表中包含了副本所在的数据节点,可以调用API来确定客户端和这些数据节点所属的机架ID;
- 当发现某个数据块副本对应的机架ID和客户端对应的机架ID相同时,就优先选择该副本读取数据,如果没有发现,就随机选择一个副本读取数据
(4)数据错误和恢复
(1)名称节点出错:
当名称节点出错时,获取其他文件系统中备份的元数据信息放到第二名称节点上,将其作为名称节点使用;
(2)数据节点出错:
每个数据节点会定期向名称节点发送“心跳”信息;
当数据节点发生故障,或者网络发生断网时,名称节点就无法收到来自一些数据节点的心跳息;
这些数据节点就会被标记为“宕机”,相关数据块都会被标记为“不可读”,名称节点不会再给它们发送任何I/O请求;
问题:由于一些数据节点的不可用,会导致一些数据块的副本数量小于冗余因子
解决方案:名称节点会定期检查,发现某个数据块的副本数量小于冗余因子,就会启动数据冗余复制,为它生成新的副本
HDFS和其它分布式文件系统的最大区别就是可以调整冗余数据的位置
(3)数据出错:
网络传输和磁盘错误等因素,都会造成数据错误
解决方案:会采用md5和sha1对数据块进行校验,以确定读取到正确的数据
校验过程:
1、在文件被创建时,客户端就会对每一个文件块进行信息摘录,并把这些信息写入到同一个路径的隐藏文件里面。
2、当客户端读取文件的时候,会先读取隐藏文件。
3、利用该隐藏文件信息对每个读取的数据块进行校验。
4、如果校验出错,客户端就会请求到另外一个数据节点读取该文件块,并且向名称节点报告这个文件块有错误,名称节点会定期检查并且重新复制这个块的FsImage和Editlog数据进行恢复。
3、HDFS编程实践
大数据实验3: HDFS基础编程-CSDN博客
第四章、分布式数据库HBase
1、HBase概述
谷歌开发的分布式数据存储系统,GFS作为底层支撑,利用MapReduce处理海量数据。
(1)HBase的特点
分布式处理效率极高、支持动态伸缩、成本低,适合读操作不适合写操作。
一个高可靠、高性能、面向列、可伸缩的分布式数据库,主要用来存储非结构化和半结构化的松散数据。
(2)HBase和BigTable的底层技术对应关系
(3)HBase与传统的关系数据库的区别
2、HBase数据模型
(1)数据模型概述
1、一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳。
2、每个值是一个未经解释的字符串,没有数据类型,需要用户自行类型转换。
3、用户在表中存储数据,每一行都有一个可排序的行键和任意多的列。
4、列族:
①表在水平方向由一个或者多个列族组成,一个列族中可以包含任意多个列,同一个列族里面的数据存储在一起;
②支持动态扩展;
5、执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,以时间戳进行索引不同版本
(2)数据模型相关概念
1、表:HBase采用表来组织数据,表由行和列组成,列划分为若干个列族
2、行:每个HBase表都由若干行组成,每个行由行键(row key)来标识
3、列族:一个HBase表被分组成许多“列族”(Column Family)的集合,它是基本的访问控制单元4、列限定符:列族里的数据通过列限定符(列名)来定位
5、单元格:在HBase表中,通过行、列族和列限定符确定一个“单元格”(cell),单元格中存储的数据没有数据类型,总被视为字节数组byte[]
6、时间戳:一个单元格中根据时间戳降序排序,最新版本最先被读取
(3)数据坐标
HBase中需要根据行键、列族、列限定符来确定一个单元格,使用时间戳确定数据版本,因此,可以视为一个“四维坐标”,即[行键, 列族, 列限定符, 时间戳]。
(不加时间戳,优先访问最新版本)
(4)概念视图
稀疏的映射表;每一行都有相同的列;
(5)物理视图
物理存储层面采用基于列族的存储方式,同一列族的数据(包括时间戳)存储在一起。
(6)面向列的存储
1、列式存储的优势:
①扩展性强;
②降低I/O开销,数据处理速度快;
③支持大量用户并发查询;
2、列式存储的局限性:
①元组重构代价高;
3、HBase访问接口
4、HBase编程实践
(1)HBase的安装
下载安装包hbase-2.2.2-bin.tar.gz
解压安装包hbase-2.2.2-bin.tar.gz至路径 /usr/local
配置系统环境变量,将hbase下的bin目录添加到系统变量PATH中
HBase 2.1.x 2.2.x. 2.3.x 2.4.x
(2)HBase配置
1、三种运行模式:单机模式、伪分布式模式、分布式模式
2、运行先决条件:JDK、Hadoop(单机模式不需要,伪分布式模式和分布式模式需要).SSH
3、修改相关配置文件 hbase/conf中的hbase-env.sh和hbase-site.xmi
(3)HBase启动
1、伪分布式模式下启动关闭Hadoop和HBase的顺序: 启动Hadoop—>启动HBase—>关闭HBase—>关闭Hadoop
2、执行对应bin/sbin文件夹下的start-*.sh或者stop-*.sh,
$cd /usr/local/hadoop
$sbin/start-dfs.sh
$cd /usr/local/hbase
$bin/start-hbase.sh
3、验证:jps
建议每次安装好软件后将sbin和bin路径加入到PATH变量中,任意目录下直接执行命令 start-XXX.sh等命令
执行hbase shell 命令进入HbaseShell后才能执行Hbase命令:
(4)HBase常用Shell命令
1、create:创建表
2、list:列出HBase中所有的表信息
3、put:向表、行、列指定的单元格添加数据,一次只能为一个表的一行数据的一个列添加一个数据
4、scan:浏览表的相关信息
5、 get:通过表名、行、列、时间戳、时间范围和版本号来获得相应单元格的值
6、enable/disable:使表有效或无效
7、drop:删除表
第5章、 分布式数据库HBase
1、HBase的实现原理
(1)表和Region
1、Region
(1)每个Region默认大小是10G:最佳大小取决于单台服务器的有效处理能力
(2)同一个Region不会被分拆到多个Region服务器
(3)每个Region服务器存储10-1000个Region
(4)开始只有一个Region,数据插入,后来不断分裂
(5)Region:包含了分配的行键区间的所有数据,是负载均衡和数据分发的基本单位;
2、HBase功能组件:
(2)HBase功能组件
1、库函数:链接到每个客户端
2、一个Master主服务器
3、许多个Region服务器
1、主服务器Master:负责管理和维护HBase表的分区信息,维护Region服务器列表,分配Region,负载均衡。
2、Region服务器:负责存储和维护分配给自己的Region,处理来自客户端的读写请求。
3、客户端:不是直接从Master主服务器上读取数据,而是在获得Region的存储位置信息后,直接从Region服务器上读取数据。
4、通过Zookeeper来获得Region位置信息,大多数客户端甚至从来不和Master通信,这种设计方式使得Master负载很小。
(3)Region的定位
1、META表:又名元数据表,存储了Region和Region服务器的映射关系,保存在内存中:
HBase表很大时,.META.表也会被分裂成多个Region
2、-ROOT-表:记录所有元数据的具体位置;
-ROOT-表只有唯一一个Region,名字是在程序中被写死的
3、Zookeeper文件:记录了-ROOT-表的位置
4、客户端访问数据时的“三级寻址”:
寻址过程客户端只需要询问Zookeeper服务器,不需要连接Master服务器;
为了加速寻址,客户端会缓存查询过的位置信息,同时,Region位置可能会更新,遇到缓存失效问题,此时需要再次“三级寻址”替换原来缓存的位置信息
2、HBase运行机制
(1)HBase系统架构
1、客户端:
包含访问HBase的接口,同时在缓存中维护着已经访问过的Region位置信息,用来加快后续数据访问过程。
2、Zookeeper:
一个很好的集群管理工具,感知机器的服务状态,负责Master服务器分配管理;提供配置维护、域名服务、分布式同步、组服务等。
(HBase自带简化Zookeeper,也可以独立安装Zookeeper。)
3、Master:
管理表和Region:
①表的增加、删除、修改、查询等操作;
②region的调整更新和region服务器的负载均衡以及故障处理
4、Region服务器:
是HBase中最核心的模块,负责维护分配给自己的Region,并响应用户的读写请求。
(2)Region服务器工作原理
Region服务器向HDFS文件系统中读写数据
1)基本组成
1、Region:一个表的若干行
2、Store:对应了一个列族的存储
3、HLog:
操作日志文件,在每次用户操作写入MemStore的同时,也会写一份数据到HLog文件中,HLog文件定期会滚动出新的,并删除旧的文件(已持久化到StoreFile中的数据)
2)用户读写数据工程
1、用户写入数据时,被分配到相应Region服务器去执行;
2、数据和操作记录被写入到MemStore 和Hlog中;
3、当用户读取数据时,Region服务器会首先访问MemStore缓存,如果找不到,再去磁盘上面的StoreFile中寻找;
3)缓存的刷新
1、系统会周期性地把MemStore缓存里的内容刷写到磁盘的新StoreFile文件中,清空缓存,并在Hlog里面写入一个标记;
2、每个Region服务器都有一个自己的HLog 文件,每次启动都检查该文件,确认最近缓存刷新之后是否发生新的写入操作;
3、如有新的写入操作,将其写入缓存,刷新缓存,写入新StoreFile,清空Hlog继续服务;
4) StoreFile的合并
1、StoreFile,数量太多,影响查找速度,需要合并;
2、合并操作比较耗费资源,只有数量达到一个阈值才启动合并;
(3)Store工作原理
1、Region服务器的核心;
2、每一个Store对应一个列族;
3、多个StoreFile合并成一个;
4、单个StoreFile过大时,又触发分裂操作,1个父Region被分裂成两个子Region重新分配;
(4)HLog工作原理
1、HLog保证系统可以在出错后恢复,避免缓存数据的丢失;
2、region服务器故障时:
① master首先会分析对应的Hlog文件,根据不同的region对象对HLog数据进行拆分
②将失效的region分配到可用的region服务器上,并把相关的HLog数据发送的对应的region服务器上;
3、共用日志优缺点:
①优点:提高对表的写操作性能;
②缺点:恢复时需要分拆日志
3、HBase Java编程实践
HBase常用Java API及应用实例
HBase是Java编写的,它的原生的API也是Java开发的,但可以使用Java或其他语言调用API来访问HBase。
- 首先要在工程中导入一下jar包:
- 导入hbase安装目录中的lib文件中的所有jar包,
- 导入"client-facing-thirdparty"目录中所有jar文件,
例:任务要求:创建表、插入数据、浏览数据
创建一个学生信息表,用来存储学生姓名(姓名作为行键,并且假设姓名不会重复)以及考试成绩,其中,考试成绩是一个列族,分别存储了各个科目的考试成绩。逻辑视图如表4-18所示。
(1)代码大体框架:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;public class Test { public static Configuration configuration; // Hadoop配置信息public static Connection connection; // Hadoop连接对象 public static Admin admin; // HBase管理员对象public static void main(String[] args) throws IOException {init(); // 初始化连接createTable("student", new String[]{"score"}); // 创建表insertData("student", "zhangsan", "score", "English", "69"); // 插入数据insertData("student", "zhangsan", "score", "Math", "86"); // 插入数据insertData("student", "zhangsan", "score", "Computer", "77"); // 插入数据getData("student", "zhangsan", "score", "English"); // 获取数据close(); // 关闭连接}public static void init() { // 初始化连接方法// ... 实现初始化连接的代码}public static void close() { // 关闭连接方法// ... 实现关闭连接的代码}public static void createTable(String tableName, String[] columnFamilies) { // 创建表方法// ... 实现创建表的代码}public static void insertData(String tableName, String rowKey, String columnFamily, String column, String value) { // 标记14: 插入数据方法// ... 实现插入数据的代码}public static void getData(String tableName, String rowKey, String columnFamily, String column) { // 标记15: 获取数据方法// ... 实现获取数据的代码}
}
(2)代码实例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;public class Test { public static Configuration configuration;public static Connection connection; public static Admin admin;public static void main(String[] args) throws IOException {init(); // 初始化连接createTable("student", new String[]{"score"}); // 创建表insertData("student", "zhangsan", "score", "English", "69"); // 插入数据insertData("student", "zhangsan", "score", "Math", "86");insertData("student", "zhangsan", "score", "Computer", "77");getData("student", "zhangsan", "score", "English"); // 获取数据close(); // 关闭连接}public static void init() throws IOException { // 初始化连接方法configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", "your_zookeeper_quorum"); // 设置ZooKeeper地址configuration.set("hbase.zookeeper.property.clientPort", "2181"); // 设置ZooKeeper客户端端口connection = ConnectionFactory.createConnection(configuration);admin = connection.getAdmin();}public static void close() throws IOException { // 关闭连接方法if (admin != null) {admin.close();}if (connection != null) {connection.close();}}public static void createTable(String tableName, String[] columnFamilies) throws IOException { // 创建表方法TableName table = TableName.valueOf(tableName);if (admin.tableExists(table)) {System.out.println("Table already exists.");return;}HTableDescriptor tableDescriptor = new HTableDescriptor(table);for (String columnFamily : columnFamilies) {tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));}admin.createTable(tableDescriptor);System.out.println("Table created.");}public static void insertData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException { // 插入数据方法Table table = connection.getTable(TableName.valueOf(tableName));Put put = new Put(Bytes.toBytes(rowKey));put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));table.put(put);table.close();System.out.println("Data inserted.");}public static void getData(String tableName, String rowKey, String columnFamily, String column) throws IOException { // 获取数据方法Table table = connection.getTable(TableName.valueOf(tableName));Get get = new Get(Bytes.toBytes(rowKey));get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));Result result = table.get(get);byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column));if (value != null) {System.out.println("Value: " + Bytes.toString(value));} else {System.out.println("Value not found.");}table.close();}
}
(2)、建立连接
//建立连接
public static void init() {configuration = HBaseConfiguration.create(); // 创建HBase配置对象configuration.set("hbase.rootdir", "hdfs://localhost:9000/hbase"); // 设置HBase的根目录为HDFS上的指定路径try {connection = ConnectionFactory.createConnection(configuration); // 根据配置创建HBase连接admin = connection.getAdmin(); // 从连接中获取管理员对象} catch (IOException e) { // 捕获可能发生的IO异常e.printStackTrace(); // 打印异常堆栈信息}
}
(3)关闭连接
//关闭连接
public static void close(){ try{ if(admin != null){ admin.close(); } if(null != connection){ connection.close(); } }catch (IOException e){ e.printStackTrace(); }
}
(4)创建表
创建一个学生信息表,用来存储学生姓名(姓名作为行键,并且假设姓名不会重复)以及考试成绩,其中,考试成绩是一个列族,分别存储了各个科目的考试成绩。逻辑视图如表4-18所示。
/*创建表*/public static void createTable(String myTableName, String[] colFamily)
throws IOException { // 定义创建表的方法TableName tableName = TableName.valueOf(myTableName); // 将字符串转换为TableName对象if (admin.tableExists(tableName)) { // 检查表是否已经存在System.out.println("table exists!"); // 如果表存在,打印提示信息} else { // 如果表不存在,执行下面的代码HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); // /*创建表*/ 创建HTableDescriptor对象for (String str : colFamily) { // 遍历列族数组HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str); // 为每个列族创建HColumnDescriptor对象hTableDescriptor.addFamily(hColumnDescriptor); // 将列族添加到表描述符中}admin.createTable(hTableDescriptor); // 调用Admin对象的createTable方法创建表}
}
(5)添加数据
/*** 插入数据到HBase表* * @param tableName 表名,指定要插入数据的表* @param rowKey 行键,指定要插入数据的行* @param colFamily 列族,指定数据所在的列族* @param col 列限定符,指定具体的列* @param val 数据,要插入的值*/
public static void insertData(String tableName, String rowKey, String colFamily, String col, String val) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName)); // 获取表的引用Put put = new Put(Bytes.toBytes(rowKey)); // 创建Put对象,并将行键转换为字节put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(val)); // 添加列族、列和值table.put(put); // 执行插入操作table.close(); // 关闭表的引用
}
(6)浏览数据
/* 获取某单元格数据 */
public static void getData(String tableName, String rowKey, String colFamily, String col) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName));// 获取表的引用Get get = new Get(Bytes.toBytes(rowKey)); // 创建Get对象,并将行键转换为字节get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col)); // 添加需要获取数据的列族和列限定符// 获取数据Result result = table.get(get);// 格式化并输出获取到的数据if (result.getValue(Bytes.toBytes(colFamily), Bytes.toBytes(col)) != null) {String valueStr = new String(result.getValue(Bytes.toBytes(colFamily), Bytes.toBytes(col)));System.out.println("Value: " + valueStr); // 输出获取到的数据} else {System.out.println("Value not found."); // 如果没有找到数据,则输出提示信息}table.close(); // 关闭表的引用
}
(7)插入行
要插入一行数据到HBase表中:可用insertRow
方法,该方法接受四个参数:
表名、行键、列信息数组(列族名和列限定符的组合),以及相应的值数组。
public static void insertRow(String tableName, String rowKey, String[] cols, String[] values) {// cols : 列族名:列限定符for (int i = 0; i < cols.length; i++) {String colInfo[] = cols[i].split(":"); // 分割列族名和列限定符insertData(tableName, rowKey, colInfo[0], colInfo[1], values[i]); // 插入数据}
}
实例:
public static void insertData(String tableName, String rowKey, String colFamily, String col, String val) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName));Put put = new Put(Bytes.toBytes(rowKey));put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(val));table.put(put);table.close();
}
第六章、MapReduce(1)
作为Hadoop的核心组件之一 ,MapReduce是一种编程模型,用于大规模数据集的并行运算。MapReduce是一个分布式、并行处理运算程序的编程框架,
其核心功能是:将用户编写的业务逻辑代码和自身的组件整合成一个完整的分布式运算程序 并发运行在一个hadoop集群上;
其核心思想是:将大数据处理任务分解为两个主要步骤:Map和Reduce。
MapReduce = Map + Reduce
1、MapReduce概述
(1)分布式并行编程
分布式并行编程:程序可运行在大规模计算机集群上,可以并行地执行大规模数据处理任务。
MapReduce :谷歌提出的分布式并行编程模型,Hadoop MapReduce是它的开源实现。
(2)MApReduce模型
1、可将MApReduce复杂的、运行于大规模集群上的并行计算过程,高度地抽象成两个函数:
Map函数 和 Reduce函数
2、特点:
(1)分而治之:
1)将大规模数据分解为多个小数据分别分配给多个Map任务并行处理;
2)Reduce整合多个中间结果;
(2)计算向数据靠拢:分发应用程序而非传输数据,降低传输开销;
3、MapReduce应用程序:可采用Java但不局限于Java进行开发。
(3)MapReduce的体系结构
1、概述:
主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
MapReduce采用Master/Slave架构,包括一个Master和若干个Slave。
1)Master上运行JobTracker,JobTracker负责整个数据处理作业调度、任务分配和处理等;
2)若干个Slave运行TaskTracker,TaskTracker负责分配到的任务的具体执行;
结点说明:
- Client
- 用户编写的
MapReduce
程序通过Client
提交到JobTracker
端, - 用户可通过
Client
提供的一些接口查看作业运行状态。
- JobTracker
JobTracker
负责资源监控和作业调度;
JobTracker
监控所有TaskTracker
与Job
的健康状况,一旦发现失败,就将相应的任务转移到其他节点;JobTracker
会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源。
- TaskTracker
TaskTracker
负责分配到的任务的具体执行
TaskTracker
会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker
,同时接收JobTracker
发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker
使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task
获取到一个slot
后才有机会运行,而Hadoop
调度器的作用就是将各个TaskTracker
上的空闲slot
分配给Task
使用。
注:slot 分为Map slot
和Reduce slot
两种,分别供Map Task
和Reduce Task
使用。
- Task
- Task分为
Map Task
和Reduce Task
两种,均由TaskTracker
启动。
2、Map函数
功能:负责在分解后的每一个小数据集上执行处理操作;
3、Reduce函数
功能:负责在将所有小数据集上的处理结果整合得到最终的处理结果;
2、MapReduce工作流程
(1)工作特点
1、MapReduce工作流程的核心特点:
处理数据规模较大,计算向数据靠拢
2、具体特点
• 不同的Map任务之间不会进行通信
• 不同的Reduce任务之间也不会发生任何信息交换
• 所有Map任务完成后才会执行Reduce任务
• 所有的数据交换都是通过MapReduce框架自身去实现的,无需用户显式交换
(2)处理单位Split
MapReduce 任务的处理单位是split
HDFS 以固定大小的block 为基本单位,block是一个物理概念。
split是一个逻辑概念,它只包含一些元数据,比如数据起始位置、数据长度、数据所在节点等。
(3)shuffle机制
Shuffle
机制是指分布式计算中用于重新分配数据并进行合并的过程。
1、shuffle-Map端
(1)每个Map任务分配一个缓存保存结果,默认大小100MB缓存,设置溢写比0.8;
(2)缓存数据持久化存储,生成溢写文件;
-分区,分发给不同Reduce
(3)Map任务全部结束之前进行归并Combiner,保证溢写文件少于3个;
所有Map任务结束,JobTracker通知Reduce任务领取Map输出数据;
(4)举例
输入: 两个键值对<“a”,1>和<“a”,1>
合并:后为<“a”,2>;
归并:后为<“a”,<1,1>>,逻辑可改写;
2、shuffle-Reduce端
(1)Reduce任务通过RPC向JobTracker询问,Map任务已完成则领取数据。
(2)Reduce领取不同Map的数据,缓存,归并,再合并、写入磁盘
(3)当领取数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce
(4)多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
(4)MapReduce应用程序执行过程
1、Map 阶段:
在 MapReduce框架中,任务首先会被划分为多个 Map 任务,每个 Map 任务负责处理输入数据的一部分。
在处理过程中,Map 任务会生成键值对(key-value pairs),其中键用于标识数据的分类,值则是实际的数据。
Map任务会将生成的键值对暂存在内存中的缓冲区中,直到缓冲区达到一定大小或者达到一定数量的键值对。
2、Shuffle 阶段:
- 一旦 Map 任务的缓冲区填满,或者 Map 阶段结束,就会触发 Shuffle 阶段。
- Shuffle 阶段的主要任务是将 Map 任务输出的键值对按照键的哈希值重新分配到不同的 Reduce 任务上。
- 这意味着具有相同键的键值对将被分配到同一个 Reduce 任务上,以便进行合并和聚合操作。
(1)输入数据准备:
每个 Map 任务都会生成一些键值对(key-value pairs)作为输出,这些输出通常存储在内存中的缓冲区中。
输出的键值对通常代表着从输入数据中提取的信息,以便后续的数据处理。
(2)数据重分配:
当进行数据重分配时,Shuffle 阶段的两个主要过程是分区(Partitioning)和排序(Sorting)。
① 分区(Partitioning):
哈希函数应用:
在分区过程开始时,系统会对每个键值对中的键(key)应用哈希函数,生成一个哈希值。
哈希函数将键映射到一组可能的哈希值中的一个,以便后续的分区和分配操作。
分区规则:
分区规则确定了如何将哈希值映射到不同的 Reduce 任务上。
典型的分区规则是使用模运算将哈希值映射到一组预定义的分区编号中。
这确保了具有相同哈希值的键值对被分配到同一个 Reduce 任务上,从而保证了具有相同键的键值对被发送到同一个 Reduce 任务进行处理。
分区数量:
分区数量通常由用户指定或根据系统默认值确定。
分区数量的选择取决于数据量、集群配置和任务性能需求等因素。
通常情况下,分区数量应该足够大,以便在数据重分配过程中实现负载均衡,避免某些 Reduce 任务负载过重或者负载不均衡的情况。
② 排序(Sorting):
分区内排序:
在分区之后,每个分区中的键值对可能需要进行排序操作。
排序操作确保了在 Reduce 任务处理数据时,数据以有序的方式进行操作,从而简化了后的 聚合和合并操作。
排序算法:
框架可能会使用各种排序算法对分区中的键值对进行排序,例如快速排序、归并排序等。
排序算法的选择通常取决于数据量、内存限制、性能需求等因素。
性能考虑:
在排序过程中,性能和内存占用是重要考虑因素。
框架可能会采取一些优化策略,如外部排序(External Sorting)来处理大量数据,或者使用 内存加速来提高排序速度。
③ 分区、排序优化策略:
局部性原理:尽可能地在同一台机器上进行分区和排序操作,以减少网络开销和数据传输时间。
数据压缩:在数据传输过程中对数据进行压缩,减少网络传输的数据量,提高传输效率。
并行处理:利用多核处理器和并行计算能力来加速分区和排序操作,提高整体性能。
通过分区和排序操作,Shuffle 阶段可以有效地将数据重新组织和分配到不同的 Reduce 任务上,并确保数据以有序的方式进行处理。
3)数据传输和合并:
数据传输:
一旦数据被分区和排序,就会开始将数据从 Map 任务所在的节点传输到相应的 Reduce 任务所在的节点。
这个过程涉及网络通信,数据会通过网络传输到目标节点上。
合并和聚合:
一旦数据到达目标节点,Reduce 任务会将来自不同节点的数据合并到本地,并进行必要的聚合操作。
合并和聚合操作可能包括计算总数、求和、平均值等,以便生成最终的输出结果。
3、 Reduce 阶段:
- 在 Shuffle 阶段完成后,每个 Reduce 任务将接收到分配给它的键值对数据。
- Reduce 任务会对接收到的键值对进行合并、排序和聚合等操作,生成最终的输出结果。
(1)数据接收:
在Shuffle阶段完成后,每个Reduce任务会接收到一个或多个分区的数据。这些数据包含了经过分区和排序后的键值对。
(2) 数据处理:
Reduce任务会对接收到的键值对进行处理。
合并(Merging):如果数据被分配到多个Reduce任务,每个任务可能会接收到来自不同Mapper任务的相同键的多个值。在合并阶段,Reduce任务会将相同键的值进行合并,以便后续的聚合操作。
排序(Sorting):在合并之后,数据可能会被进一步排序,以确保同一键的值按照特定的顺序进行处理。排序操作可以简化后续的聚合操作,并使得输出结果更易于理解和处理。
聚合(Aggregation):根据具体的业务逻辑,Reduce任务可能会对数据进行聚合操作,例如计算总和、平均值、最大值、最小值等统计量,或者执行其他自定义的聚合操作。
其他操作:除了合并、排序和聚合之外,Reduce任务可能还会执行其他必要的数据处理操作,例如过滤、转换等。
(3)输出生成:
经过数据处理后,Reduce任务会生成最终的输出结果。这些输出结果通常以键值对的形式组织,并可以存储到文件系统中,或者通过网络传输给其他任务或应用程序。
3、实例分析:WordCount
设计思路
4、MapReduce编程实践
编程准备-为项目添加JAR
为了编写一个MapReduce程序,一般需要向Java项目中添加以下JAR包:
(1)“/usr/local/hadoop/share/hadoop/common”目录下的hadoop-common-3.1.3.jar和haoop-nfs-3.1.3.jar,以及该目录下的lib中的JAR包;
(2)“/usr/local/hadoop/share/hadoop/mapreduce”目录下和lib下的所有JAR包
(1)编程步骤:
分析问题,确定Map和Reduce过程的输入和输出key-value含义和数据类型
实践:
任务要求:对于指定目录的文件中单词进行统计,假设该目录下有两个文件内容如下:
1、Map处理逻辑:
- Map输入类型为<行号, 一行文本>
- 期望的Map输出类型为<单词,出现次数>
- Map输入类型最终确定为<Object,Text>
- Map输出类型最终确定为<Text,IntWritable>
// 定义Mapper类,它将输入的文本数据转换为单词计数的键值对
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {// 定义一个静态的IntWritable对象,其值为1,用于表示单词出现的次数private final static IntWritable one = new IntWritable(1);// 定义一个Text对象,用于存储处理后的单词private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// 使用StringTokenizer对输入的文本行进行分词StringTokenizer itr = new StringTokenizer(value.toString());// 遍历分词后的单词while (itr.hasMoreTokens()) {// 获取一个单词并更新word对象word.set(itr.nextToken());// 将单词和计数(1)作为键值对输出context.write(word, one);}}
}
2、Reduce处理逻辑:
- Reduce输入类型为<单词, 次数列表>
- 期望的Reduce输出类型为<单词,出现次数> 最终结果
- Reduce函数的输入类型为<key, Iterable<IntWritable>>
- Reduce函数的输出类型为 <Text,IntWritable>
// 定义Reducer类,用于处理Mapper输出的中间键值对,并生成最终结果
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {// 定义一个IntWritable对象用于存储累加的和private IntWritable result = new IntWritable();// Reducer的reduce方法,用于处理具有相同key的值的集合public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 初始化计数器sumint sum = 0;// 遍历所有具有相同key的值for (IntWritable val : values) {// 将每个值加到sum上sum += val.get();}// 将累加的和设置给result对象result.set(sum);// 写出最终的键值对,key是单词,value是该单词出现的次数context.write(key, result);}
}
完整代码:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;// 定义WordCount类,它包含了MapReduce作业的Mapper和Reducer类
public class WordCount {// 定义Mapper类,用于处理输入数据并生成中间键值对public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {// 定义一个静态的IntWritable对象,其值为1,用于表示单词出现的次数private final static IntWritable one = new IntWritable(1);// 定义一个Text对象,用于存储处理后的单词private Text word = new Text();// Mapper的map方法,处理输入数据并生成中间键值对public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// 使用StringTokenizer对输入的文本行进行分词StringTokenizer itr = new StringTokenizer(value.toString());// 遍历分词后的单词while (itr.hasMoreTokens()) {// 获取一个单词并更新word对象word.set(itr.nextToken());// 将单词和计数(1)作为键值对输出context.write(word, one);}}}// 定义Reducer类,用于处理Mapper输出的中间键值对,并生成最终结果public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {// 定义一个IntWritable对象用于存储累加的和private IntWritable result = new IntWritable();// Reducer的reduce方法,用于处理具有相同key的值的集合public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 初始化计数器sumint sum = 0;// 遍历所有具有相同key的值for (IntWritable val : values) {// 将每个值加到sum上sum += val.get();}// 将累加的和设置给result对象result.set(sum);// 写出最终的键值对,key是单词,value是该单词出现的次数context.write(key, result);}}// 主函数,用于配置和启动MapReduce作业public static void main(String[] args) throws Exception {// 创建Configuration对象Configuration conf = new Configuration();// 解析命令行参数String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 检查参数数量是否正确if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}// 创建Job对象,设置作业的配置信息Job job = Job.getInstance(conf, "word count");// 设置作业的Jar文件job.setJarByClass(WordCount.class);// 设置Mapper类job.setMapperClass(MyMapper.class);// 设置Reducer类job.setReducerClass(MyReducer.class);// 设置输出键值对的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置输入路径FileInputFormat.addInputPath(job, new Path(otherArgs[0]));// 设置输出路径FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));// 运行作业并等待其完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
第七章、MapReduce(2)
1、MapReduce输入输出类型
(1)Hadoop封装的类型
Hadoop重新定义Java中常用的数据类型,并让它们具有序列化的特点。
(2)文件的合并和去重
对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
2.1Map处理逻辑:
1、 Map输入类型为<行号, 一行文本>
2、期望的Map输出类型为<一行文本,null>
3、Map输入类型最终确定为<Object,Text>
4、Map输出类型最终确定为<Text,NullWritable>
2.2 Reduce处理逻辑
1、Map输入类型为<行文本,null列表>
2、期望的Map输出类型为<行文本,null>
3、Reduce输入类型为<Text,Iterable<NullWritable>>
4、Reduce输出类型为<Text,NullWritable>
(3)自定义数据类型,实现接口
实现org.apache.hadoop.io.Writable/WritableComparable接口
Writable接口
重写其中的方法:write、readFields和toString
public class StudentScore implements Writable{String stuId; String name; int score;public void write(DataOutput out) throws IOException{out.writeUTF(stuId); out.writeUTF(name); out.writeInt(score);}public void readFields(DataIntput in) throws IOException{this.stuId=in.readUTF(); this.name=in.readUTF(); this.score=in.readInt ();}public String toString() { return stuId+","+name+","+score;}
2、MapReduce的具体应用
MapReduce可以很好地应用于各种计算问题:
关系代数运算(选择、投影、并、交、差、连接)
分组与聚合运算(WordCount)
矩阵-向量乘法
矩阵乘法
用MapReduce实现关系的自然连接
(1)Map过程:
把R1的每个元 组<oi, ad>转换成键值对 <oi, <R1,ad>>,键是共有属性OI的值,把关系R1包含到值中;
类似地,把来自R2的每个元组<oi, in>,转换成一个键值对 <oi, <R2,in>>
输入:<Object,Text>
输出:<Text, Text>
import org.apache.hadoop.fs.FileSplit;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;// 定义 Mapper 类
class MyMapper extends Mapper<Object, Text, Text, Text> {// 定义输出键值对的键和值,都使用 Text 类型private Text outKey = new Text();private Text outValue = new Text();// 定义 map 方法,它是 Mapper 的核心方法public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// 将输入的文本值转换为字符串String curline = value.toString();// 找到逗号分隔符的位置,用于分割键和值int ind = curline.indexOf(',');// 设置输出的键为逗号之前的部分outKey.set(curline.substring(0, ind));// 获取当前处理的数据分片所属的文件路径// 这里利用了 context 对象的 getInputSplit 方法String path = ((FileSplit) context.getInputSplit()).getPath().toString();// 设置输出的值为文件路径加上逗号之后的剩余部分// 这样做是为了保留原始数据,并将其与来源文件关联outValue.set(path + curline.substring(ind, curline.length()));// 写入中间的键值对,以便后续的 Shuffle 和 Reduce 阶段处理context.write(outKey, outValue);}
}
(2)Reduce过程
所有具有相同key(oi值)的元组被发送到同一个Reduce任务中,Reduce任务是把来自关系R1和R2的、具有相同属性oi值(Key)的元组进行合并<oi,ad,in>;
输入:<Text,Iterable<Text>>
列表中包含两类元素ad和in,需要区分,所以列表中元素需要包含文件名信息
输出:<Text,NullWritable>
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;// 定义 Reducer 类
class MyReducer extends Reducer<Text, Text, Text, NullWritable> {// 定义输出的键,使用 Text 类型private Text outKey = new Text();// 定义输出的值,使用 NullWritable 类型,这里表示不输出值,只输出键private NullWritable outValue = NullWritable.get();// 定义 reduce 方法,它是 Reducer 的核心方法public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 初始化两个字符串,分别存储订单数据和商品数据String orderStr = "", itemStr = "";// 遍历所有具有相同键的值for (Text v : values) {// 将值转换为字符串String str = v.toString();// 找到逗号分隔符的位置,用于分割文件路径和数据int ind = str.indexOf(',');// 提取文件路径String path = str.substring(0, ind);// 根据文件路径判断数据来源,分别提取订单数据和商品数据if (path.indexOf("order") != -1) {// 如果路径包含 "order",则提取订单数据orderStr = str.substring(ind + 1);} else {// 否则,提取商品数据itemStr += str.substring(ind + 1) + " ";}}// 将商品数据字符串按空格分割,得到该订单中所有商品数据String[] itemStrs = itemStr.split(" ");// 遍历所有商品数据for (String itemStr : itemStrs) {// 设置输出的键为 "key,orderStr,itemStr" 的格式outKey.set(key + "," + orderStr + "," + itemStr);// 写入输出结果,这里只输出键,不输出值context.write(outKey, outValue);}}
}
MapReduce作业的入口点:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;// 定义MR1类,包含MapReduce作业的入口点main方法
public class MR1 {public static void main(String[] args) throws Exception {// 创建Hadoop配置对象Configuration conf = new Configuration(); // 设置HDFS的默认文件系统地址conf.set("fs.defaultFS", "hdfs://localhost:9000");// 设置HDFS的Java类实现conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); // 获取Job对象,用于配置MapReduce作业Job job = Job.getInstance(conf, "connection"); // 设置作业名称为"connection"// 设置作业使用的jar包,即当前类所在的jar包job.setJarByClass(MR1.class); // 设置Mapper类为自定义的MyMapper类job.setMapperClass(MyMapper.class); // 设置Reducer类为自定义的MyReducer类job.setReducerClass(MyReducer.class); // 设置Mapper输出的值的类型为Textjob.setMapOutputValueClass(Text.class); // 设置作业输出的键的类型为Textjob.setOutputKeyClass(Text.class); // 设置作业输出的值的类型为NullWritable,表示不输出值job.setOutputValueClass(NullWritable.class); // 设置输入数据的HDFS路径FileInputFormat.addInputPath(job, new Path("/tmp1")); // 设置输出结果的HDFS路径FileOutputFormat.setOutputPath(job, new Path("/out1"));// 运行作业,并等待作业完成// 如果作业成功完成,返回0;否则,返回1System.exit(job.waitForCompletion(true) ? 0 : 1); }
}
3、再探Hadoop
第八章、数据仓库Hive
1、Hive概述
(1)数据仓库
一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合(静态数据),用于支持管理决策;体系结构一般包含数据源、数据存储和管理、数据服务、数据应用。
(2)Hive概述
Hive是一个构建于Hadoop顶层的数据仓库工具
某种程度上可以看作是用户编程接口,本身不存储和处理数据
依赖分布式文件系统HDFS存储数据
依赖分布式并行计算模型MapReduce(或者Spark等)处理数据
定义了简单的类SQL 查询语言——HiveQL
用户可以通过编写的HiveQL语句运行MapReduce任务
提供了对数据抽取、转换、加载、存储、查询分析等工具,应用广泛
(3)Hive与Hadoop生态系统中其他组件的关系
(4)与传统数据库的对比
(5)与HBase数据库的对比
2、Hive工作原理
Hive中SQL查询的MapReduce作业转化过程:
输入SQL语句
抽象语法树(AST Tree),结构复杂查询块(QueryBlock):一条SQL语法的组成单元,包含输入源、计算过程、输出;
执行操作树(OperatorTree),包含很多逻辑操作符,如SelectOperator等,可转换为MR中的操作;
最终,Hive中驱动模块的执行器执行MR任务
3、Hive编程实践
Hive应用实例:WordCount