接 [离线数仓] 总结一、数据采集
5.8 数仓开发之ODS层
ODS层的设计要点如下:
(1)ODS层的表结构设计依托于从业务系统同步过来的数据结构。
(2)ODS层要保存全部历史数据,故其压缩格式应选择压缩比率,较高的,此处选择gzip。
CompressedStorage - Apache Hive - Apache Software Foundation
You can import text files compressed with Gzip or Bzip2 directly into a table stored as TextFile. The compression will be detected automatically and the file will be decompressed on-the-fly during query execution. 您可以直接将使用 Gzip 或 Bzip2 压缩的文本文件导入到存储为 TextFile 的表中。系统会自动检测压缩格式,并在查询执行时即时解压缩文件。
大数据场景需要海量的数据,因为数据量足够大,分析出来的结果更即准确。
(3)ODS层表名的命名规范为:ods_表名_单分区增量全量标识(inc/full)。
全量表数据的采集是为了状态的同步;
增量数据的采集是为了行为的同步;
为了辨识增量同步的表和全量同步的表用inc和full做标记。
5.8.1 日志表
(1)ODS 层简介
-- ODS 层 Operate Data Store
-- 存储从MySQL业务数据库和日志服务器的日志文件中采集到的数据
-- 日志数据:JSON格式
-- 业务数据:
-- 全量:DataX,TSV格式,"fieldDelimiter": "\t",
-- 增量:Maxwell,JSON格式
-- 汇总数据:希望用最少得资源存储更多的数据
-- 压缩:列式存储压缩起来比较方便,因为行存不能保证一行的数据类型是一致的,不同类型的数据采用的压缩算法和效率不一样, 列式存储可以保证一列的数据类型一致,因此列式存储在压缩效率上会高一些。
-- gzip:Hadoop默认支持,压缩率极高,压缩速率(压缩、解压缩)低
-- lzo:Hadoop默认不支持,需要额外的jar包,压缩率高,压缩速率居中,支持切片(额外索引)
-- snappy:Hadoop默认不支持,需要额外配置,压缩率低,压缩速率极高
-- 压缩方式的选择:gzip,ODS层主要功能为存储,不需要计算,因此对压缩速率要求不高,反而对压缩率要求高(更少的空间存更多的数据)
-- 数据格式尽可能保持不变
-- 数据压缩格式尽可能保持不变(采集通道采用gzip压缩,这里也选择gzip压缩)
-- 命名规范
-- 在数据仓库中,表其实都是放置在一起的,从逻辑上进行区分,进行分层
-- 表从名称上区分每一层
-- 分层标记:ods_ + 同步数据的表名 + 全量(_full)/ 增量(_inc) 标识
-- 日志表
/*
表的数据是同步的日志数据:
页面浏览日志:JSON
APP启动日志:JSON
命名:ods_log_inc
建表语句:
EXTERNAL,创建外部表,目的是在测试阶段可能会频繁修改表结构来验证问题,
如果使用内部表,删除表的时候会删除数据,因此为了避免重复上传测试数据,采用外部表,外部表在删除表的时候数据不会删除。
生产中可以使用内部表。
PARTITIONED BY:底层采用很多文件进行保存大量数据,一旦文件大数据多时会影响查询效率,可以通过建立分区的操作,提高查询效率。hive的分区表,实际是在表目录中创建不同的子目录,子目录中保存的数据减少,可以快速定位查询到需要的数据,从而提高查询效率。
分区表,存在分区字段,这个字段不是数据字段,而是用于文件目录的划分,不会存储到数据文件中。虽然分区字段只是用来区分子目录的,但是在管理表的时候会当作字段来处理,因此在插入数据到分区表的时候需要补充分区字段。
create table test_part( id int) partitioned by (`dt` string);
insert into table test_part values (1, '2022-06-08');
这样写容易让人产生一个表有两个字段的错觉,可以换成下面的写法:
insert into table test_part partition (dt = '2022-06-08') values (1);
这样就指明了分区字段和数据字段。
静态分区:分区字段的值为固定值
动态分区:分区字段取决于查询结果,怎么实现?
分区字段不赋值,查询字段在最后增加一个额外的字段用于分区操作。
insert into table test_part partition (dt) select 2, '2022-06-09'
*/
-- 测试
drop table if exists test_part;
create table test_part( id int) partitioned by (`dt` string);
insert into table test_part values (1, '2022-06-08');
insert into table test_part partition (dt = '2022-06-09') values (2);
insert overwrite table test_part select * from test_part;
-- 默认情况下,hive没有开启动态分区处理(strict),需要设置为非严格模式(nonstrict)
-- set hive.exec.dynamic.partition.mode=strict;
set hive.exec.dynamic.partition.mode=nonstrict;
insert into table test_part partition (dt) select 2, '2022-06-10';
-- 严格限定查询语句中必须带分区字段的筛选条件,否则不能执行Query,因为数据量太大。
set hive.mapred.mode=strict;
-- set hive.mapred.mode=nonstrict;
-- 不带分区不能查询,Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.no.partition.filter to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features.
select * from test_part;
-- 带分区字段的筛选可以查询
select * from test_part where dt='2022-06-09';
-- SerDe
-- log:JSON
-- 默认情况下,Hive表无法解析JSON格式,
建表时需要加上ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe',才能正常解析JSON
-- 如果Hive表可以解析JSON格式的数据,那么一般就称之为JSON表
(1)如果JSON属性和表的字段一致,那么可以正常解析
(2)如果JSON属性少于表的字段,那么存在的属性可以正常解析,不存在的字段设置为空null
(3)如果JSON属性多于表的字段,那么多于属性不做解析
(4)JSON属性和表的字段会进行不区分大小写的解析。源JSON文件,格式不一致。
虽然txt文件中的文本内容与hive 表的字段没有一一对应,但是查询的时候也不会报错,能正常解析的就正常解析,解析不了的用“null”补充。
更换第3行”id“和”name“的位置之后,重新查询。
{"name": "Lily","id": 1003,"age": 32,"tel": 123}
查询结果,第3行正常解析,根据名称解析,和位置无关。
test_log.id test_log.name test_log.age
1001 Seven 30
1002 Lucky NULL
1003 Lily 32
1003 Lily 32
1003 Lily NULL
正常解析第四行,说明不区分大小写。
DROP TABLE IF EXISTS test_log;
CREATE EXTERNAL TABLE IF NOT EXISTS test_log(
id bigint,
name string,
age int
) COMMENT '日志数据'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
LOCATION 'obs://bigdata-test1233/seven/warehouse313/gmall/test_log';
-- 建表完成之后,将编写好的json文本复制到表路径下,就可以正常查询了
select * from test_log;
(2)ODS层日志表建表语句
/*
EXTERNAL -- 外部表
LOCATION -- 指定存储位置
日志数据格式:
-- 页面浏览日志:JSON中包含有JSON
JSON表中存在JSON嵌套的情况,一般会将最外层的JSON对象的属性作为JSON表的字段
common
actions
displays
page
err
ts
-- APP启动日志:JSON 中嵌套JSON
common
start
err
ts
-- 表的字段类型应该采用特殊类型:array,map,struct
*/
(3)array:使用[] 表示数组
定义时加泛型:ids array<int>
访问时使用[] 加索引的方式访问数组元素。
select ids,ids[1] ids_1,ids[3], -- 超出索引长度,不会报错,而是用null补充array(ids[1],ids[2]) new_array, -- 可以取出数组的一部分组成新的数组array_contains(ids, 'e') is_exist -- 判断元素是否存在于数组中:array_contains(数组名,元素)
from (select `array`('a', 'b', 'c') ids) t;
select ids,
ids[1] ids_1,
ids[3], -- 超出索引长度,不会报错,而是用null补充
array(ids[1],ids[2]) new_array, -- 可以取出数组的一部分组成新的数组
array_contains(ids, 'e') is_exist -- 判断元素是否存在于数组中:array_contains(数组名,元素)
from (select `array`('a', 'b', 'c') ids) t;
ids ids_1 _c2 new_array is_exist
["a","b","c"] b NULL ["b","c"] false
(4)map:{"k":"v"}
select dat
from (select map('a','b','c','d') dat) t;dat
{"a":"b","c":"d"}
Time taken: 0.55 seconds, Fetched: 1 row(s)
-- map 定义时泛型约束,dat map<string, string>
-- map数据的访问,不能直接通过key进行操作,比如 dat.a,会报错:
select dat,
dat.a
from ( select map('a','b','c','d') dat) t;SemanticException [Error 10042]: Line 2:7 . Operator is only supported on struct or list of struct types 'a'
-- map数据的访问,采用类似数组的方式,比如:
select dat,dat['a']
from ( select map('a','b','c','d') dat) t;
-- 如果key不存在,那么直接返回null
select dat,dat['a'],dat[0], -- 返回nullmap_keys(dat), --- 返回 ["a","c"]map_values(dat) -- 返回 ["b","d"]
from ( select map('a','b','c','d') dat) t;
(5)struct
-- struct 定义时指定属性和类型:obj struct<id: int, name:string>
-- 构建结构体的函数:struct(),会将所有数据作为属性值存储,属性名用col1...coln 代替
select struct('a', 'b', 'c');
-- 返回:{"col1":"a","col2":"b","col3":"c"}
-- named_struct():参数必须是偶数
select named_struct('a', 'b', 'c', 'd');
-- 返回:{"a":"b","c":"d"}
-- 结构体通过 . 的方式获取属性值
select obj,obj.a
from (select named_struct('a', 'b', 'c', 'd') obj) t ;
-- map 和struct 的区别:
(1)泛型,可以根据数据的类型来选择使用map还是struct,如果数据类型要求一致,选择map,如果数据类型不一致,选择struct。
(2)struct中的属性名称是固定的,只要约束后就不能变化
(3)map中的key的数量不是固定的,可以动态改变;而结构体的属性不能变化
1)建表语句
create database gmall;
use gmall;
DROP TABLE IF EXISTS ods_log_inc;
CREATE EXTERNAL TABLE ods_log_inc
(
`common` STRUCT<ar :STRING,
ba :STRING,
ch :STRING,
is_new :STRING,
md :STRING,
mid :STRING,
os :STRING,
sid :STRING,
uid :STRING,
vc :STRING> COMMENT '公共信息',
`page` STRUCT<during_time :STRING,
item :STRING,
item_type :STRING,
last_page_id :STRING,
page_id :STRING,
from_pos_id :STRING,
from_pos_seq :STRING,
refer_id :STRING> COMMENT '页面信息',
`actions` ARRAY<STRUCT<action_id:STRING,
item:STRING,
item_type:STRING,
ts:BIGINT>> COMMENT '动作信息',
`displays` ARRAY<STRUCT<display_type :STRING,
item :STRING,
item_type :STRING,
`pos_seq` :STRING,
pos_id :STRING>> COMMENT '曝光信息',
`start` STRUCT<entry :STRING,
first_open :BIGINT,
loading_time :BIGINT,
open_ad_id :BIGINT,
open_ad_ms :BIGINT,
open_ad_skip_ms :BIGINT> COMMENT '启动信息',
`err` STRUCT<error_code:BIGINT,
msg:STRING> COMMENT '错误信息',
`ts` BIGINT COMMENT '时间戳'
) COMMENT '活动信息表'
PARTITIONED BY (`dt` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
LOCATION '/warehouse/gmall/ods/ods_log_inc/'
TBLPROPERTIES ('compression.codec'='org.apache.hadoop.io.compress.GzipCodec');
DROP TABLE IF EXISTS ods_log_inc;
CREATE EXTERNAL TABLE ods_log_inc
(`common` STRUCT<ar :STRING,ba :STRING,ch :STRING,is_new :STRING,md :STRING,mid :STRING,os :STRING,sid :STRING,uid :STRING,vc :STRING> COMMENT '公共信息',`page` STRUCT<during_time :STRING,item :STRING,item_type :STRING,last_page_id :STRING,page_id :STRING,from_pos_id :STRING,from_pos_seq :STRING,refer_id :STRING> COMMENT '页面信息',`actions` ARRAY<STRUCT<action_id:STRING,item:STRING,item_type:STRING,ts:BIGINT>> COMMENT '动作信息',`displays` ARRAY<STRUCT<display_type :STRING,item :STRING,item_type :STRING,`pos_seq` :STRING,pos_id :STRING>> COMMENT '曝光信息',`start` STRUCT<entry :STRING,first_open :BIGINT,loading_time :BIGINT,open_ad_id :BIGINT,open_ad_ms :BIGINT,open_ad_skip_ms :BIGINT> COMMENT '启动信息',`err` STRUCT<error_code:BIGINT,msg:STRING> COMMENT '错误信息',`ts` BIGINT COMMENT '时间戳'
) COMMENT '活动信息表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
LOCATION 'obs://bigdata-test1233/seven/warehouse313/gmall/ods/ods_log_inc/'
TBLPROPERTIES ('compression.codec'='org.apache.hadoop.io.compress.GzipCodec');
2)数据装载
-- hdfs
load data inpath '/origin_data/gmall/log/topic_log/2022-06-08' into table ods_log_inc partition(dt='2022-06-08');-- obs
load data inpath 'obs://bigdata-test1233/origin_data/gmall/log/topic_log/2022-06-08/' into table ods_log_inc partition(dt='2022-06-08');
3)每日数据装载脚本
(1)在~/bin目录下创建hdfs_to_ods_log.sh
$ vim hdfs_to_ods_log.sh
(2)编写如下内容
#!/bin/bash# 定义变量方便修改
APP=gmall# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fiecho ================== 日志日期为 $do_date ==================
sql="
load data inpath 'obs://bigdata-test1233/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log_inc partition(dt='$do_date');
"
hive -e "$sql"
(3)增加脚本执行权限
$ chmod +x hdfs_to_ods_log.sh
(4)脚本用法
$ hdfs_to_ods_log.sh 2022-06-08
5.8.2 业务表
1 活动信息表(全量表)
DROP TABLE IF EXISTS ods_activity_info_full;
CREATE EXTERNAL TABLE ods_activity_info_full
(
`id` STRING COMMENT '活动id',
`activity_name` STRING COMMENT '活动名称',
`activity_type` STRING COMMENT '活动类型',
`activity_desc` STRING COMMENT '活动描述',
`start_time` STRING COMMENT '开始时间',
`end_time` STRING COMMENT '结束时间',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间'
) COMMENT '活动信息表'
PARTITIONED BY (`dt` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/gmall/ods/ods_activity_info_full/'
TBLPROPERTIES ('compression.codec'='org.apache.hadoop.io.compress.GzipCodec');
2 活动规则表(全量表)
DROP TABLE IF EXISTS ods_activity_rule_full;
CREATE EXTERNAL TABLE ods_activity_rule_full
(
`id` STRING COMMENT '编号',
`activity_id` STRING COMMENT '活动ID',
`activity_type` STRING COMMENT '活动类型',
`condition_amount` DECIMAL(16, 2) COMMENT '满减金额',
`condition_num` BIGINT COMMENT '满减件数',
`benefit_amount` DECIMAL(16, 2) COMMENT '优惠金额',
`benefit_discount` DECIMAL(16, 2) COMMENT '优惠折扣',
`benefit_level` STRING COMMENT '优惠级别',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间'
) COMMENT '活动规则表'
PARTITIONED BY (`dt` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/gmall/ods/ods_activity_rule_full/'
TBLPROPERTIES ('compression.codec'='org.apache.hadoop.io.compress.GzipCodec');
3 一级品类表(全量表)
DROP TABLE IF EXISTS ods_base_category1_full;
CREATE EXTERNAL TABLE ods_base_category1_full
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '分类名称',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间'
) COMMENT '一级品类表'
PARTITIONED BY (`dt` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/gmall/ods/ods_base_category1_full/'
TBLPROPERTIES ('compression.codec'='org.apache.hadoop.io.compress.GzipCodec');
4 二级品类表(全量表)
DROP TABLE IF EXISTS ods_base_category2_full;
CREATE EXTERNAL TABLE ods_base_category2_full
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '二级分类名称',
`category1_id` STRING COMMENT '一级分类编号',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间'
) COMMENT '二级品类表'
PARTITIONED BY (`dt` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/warehouse/gmall/ods/ods_base_category2_full/'
TBLPROPERTIES ('compression.codec'='org.apache.hadoop.io.compress.GzipCodec');
5 三级品类表(全量表)
DROP TABLE IF EXISTS ods_base_category3_full;
CREATE EXTERNAL TABLE ods_base_category3_full
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '三级分类名称',
`category2_id` STRING COMMENT '二级分类编号',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间'
) COMMENT '三级品类表'
PARTITIONED BY (`dt` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'