点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(正在更新…)
# 章节内容
上节我们完成了如下的内容:
- 广告业务 测试
- FlumeAgent 加载ODS、DWD层
导入数据
加载ADS层
ad_show_page
sh /opt/wzk/hive/ads_load_ad_show_page.sh 2020-07-21
执行结果如下图所示:
查看Hive中的数据:
hive use ads;
select * from ads_ad_show_place limit 5;
运行结果如下图所示:
继续导入其他数据:
sh /opt/wzk/hive/ads_load_ad_show_page.sh 2020-07-22
sh /opt/wzk/hive/ads_load_ad_show_page.sh 2020-07-23
sh /opt/wzk/hive/ads_load_ad_show_page.sh 2020-07-24
...省略
最终的Hive的数据量如下所示:
select count(*) from ads_ad_show_place;
对应图片为:
ad_show_page_window
sh /opt/wzk/hive/ads_load_ad_show_page_window.sh 2020-07-21
执行结果如下所示:
查看Hive中的数据如下:
hive use ads;
select * from ads_ad_show_place_window limit 5;
执行结果如下图所示:
继续加载其他的数据:
sh /opt/wzk/hive/ads_load_ad_show_page_window.sh 2020-07-22
sh /opt/wzk/hive/ads_load_ad_show_page_window.sh 2020-07-23
sh /opt/wzk/hive/ads_load_ad_show_page_window.sh 2020-07-24
...省略
Hive中的数据总数如下:
select count(*) from ads_ad_show_place_window;
运行结果如下所示:
导出数据
执行步骤
- 在MySQL创建对应的表
- 创建配置文件(JSON)
- 执行命令,使用JSON配置文件,测试
- 编写执行脚本(Shell)
- Shell脚本的测试
MySQL
drop table if exists dwads.ads_ad_show_place;
create table dwads.ads_ad_show_place(ad_action tinyint,hour varchar(2),place varchar(20),product_id int,cnt int,dt varchar(10)
);
执行结果如下图所示:
DataX
配置文件
vim /opt/wzk/datax/ads_ad_show_place.json
写入的内容如下所示:
{"job":{"setting":{"speed":{"channel":1}},"content":[{"reader":{"name":"hdfsreader","parameter":{"path":"/user/hive/warehouse/ads.db/ads_ad_show_place/dt=$do_date/*","defaultFS":"hdfs://h121.wzk.icu:9000","column":[{"index":0,"type":"string"},{"index":1,"type":"string"},{"index":2,"type":"string"},{"index":3,"type":"string"},{"index":4,"type":"string"},{"type":"string","value":"$do_date"}],"fileType":"text","encoding":"UTF-8","fieldDelimiter":","}},"writer":{"name":"mysqlwriter","parameter":{"writeMode":"insert","username":"hive","password":"hive@wzk.icu","column":["ad_action","hour","place","product_id","cnt","dt"],"preSql":["delete from ads_ad_show_placewhere dt='$do_date'"],"connection":[{"jdbcUrl":"jdbc:mysql://h122.wzk.icu:3306/dwads?useUnicode=true&characterEncoding=utf-8","table":["ads_ad_show_place"]}]}}}]}
}
写入内容如下所示:
DataX 简介
DataX 是由阿里巴巴开源的分布式离线数据同步工具,用于解决不同存储系统之间的数据传输问题。它被设计为一种高效、稳定、易扩展的工具,能够适应多种复杂的数据同步需求。
核心特点
支持多种数据源:
- 关系型数据库: MySQL, PostgreSQL, Oracle, SQL Server, DB2 等。
- NoSQL 数据库: MongoDB, HBase 等。
- 大数据存储系统: Hive, MaxCompute (ODPS), HDFS。
- 其他: FTP 文件、ElasticSearch 等。
高性能:
- 基于多线程的并行架构,能充分利用机器的多核性能。
- 支持分片传输,提高数据传输的吞吐量。
灵活性和易用性:
- 配置文件化:使用 JSON 格式的配置文件定义任务,简单直观。
- 支持任务调度,可与调度工具集成实现定时任务。
- 兼容多种数据格式和传输协议。
扩展性:
- 插件式架构,开发者可以通过编写 Reader 和 Writer 插件支持新的数据源。
开源与社区支持:
- 基于 Apache 2.0 开源协议,开发者可以自由使用和修改。
- 拥有活跃的社区和丰富的文档支持。
组成部分
Reader:
- 负责从数据源中读取数据。
- 示例:MySQLReader, HdfsReader。
Writer:
- 负责将数据写入目标存储。
- 示例:MySQLWriter, HdfsWriter。
Framework:
- DataX 的核心调度引擎,负责 Reader 和 Writer 的协调工作。
- 提供错误处理、数据校验、性能优化等功能。
Transform:
- 用于对传输的数据进行处理和转换(可选)。
- 例如数据格式的转换、字段的增删改等。
工作流程
初始化:
- 加载用户配置的 JSON 文件,解析 Reader 和 Writer 的配置。
- 准备任务上下文。
读取数据:
- Reader 读取源数据并以批量的方式输出。
转换数据:
- 可选步骤,Transform 模块对数据进行处理。
写入数据:
- Writer 接收 Reader 输出的数据并将其写入目标存储。
任务管理与监控:
- DataX 提供实时的任务运行日志和统计信息,包括速度、成功率、错误信息等。
执行导出
vim /opt/wzk/hive/ads_ad_show_place.sh
写入的内容如下所示:
#!/bin/bash
source /etc/profile
JSON=/opt/wzk/datax
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" $JSON/ads_ad_show_place.json
写入结果如下图:
执行脚本可以得到结果:
sh /opt/wzk/hive/ads_ad_show_place.sh 2020-07-21
执行过程如下图所示:
查看结果
执行结束
查看数据库的结果如下所示: