二百二十九、离线数仓——离线数仓Hive从Kafka、MySQL到ClickHouse的完整开发流程

一、目的

为了整理离线数仓开发的全流程,算是温故知新吧

离线数仓的数据源是Kafka和MySQL数据库,Kafka存业务数据,MySQL存维度数据

采集工具是Kettle和Flume,Flume采集Kafka数据,Kettle采集MySQL数据

离线数仓是Hive

目标数据库是ClickHouse

任务调度器是海豚

二、数据采集

(一)Flume采集Kafka数据

1、Flume配置文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 1200000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.fileType = SequenceFile
a1.sinks.k1.hdfs.codeC = gzip

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2、用海豚调度Flume任务

#!/bin/bash
source /etc/profile

/usr/local/hurys/dc_env/flume/flume190/bin/flume-ng agent -n a1 -f /usr/local/hurys/dc_env/flume/flume190/conf/queue.properties

3、目标路径

(二)Kettle采集MySQL维度数据

1、Kettle任务配置

2、用海豚调度Kettle任务

#!/bin/bash
source /etc/profile

/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/mysql_to_hdfs/ -trans=23_MySQL_to_HDFS_tb_radar_lane level=Basic >>/home/log/kettle/23_MySQL_to_HDFS_tb_radar_lane_`date +%Y%m%d`.log 

3、目标路径

三、ODS层

(一)业务数据表

use hurys_dc_ods;create external table  if not exists  ods_queue(queue_json  string
)
comment '静态排队数据表——静态分区'
partitioned by (day string)
stored as SequenceFile
;
--刷新表分区
msck repair table ods_queue;
--查看表分区
show partitions ods_queue;
--查看表数据
select * from ods_queue;

(二)维度数据表

use hurys_dc_basic;create  external  table  if not exists  tb_device_scene(id        int      comment '主键id',device_no string   comment '设备编号',scene_id  string   comment '场景编号'
)
comment '雷达场景表'
row format delimited fields terminated by ','
stored as  textfile  location '/data/tb_device_scene'
tblproperties("skip.header.line.count"="1") ;
--查看表数据
select * from hurys_dc_basic.tb_device_scene;

四、DWD层

(一)业务数据清洗

1、业务数据的JSON有多层

--1、静态排队数据内部表——动态分区  dwd_queue
create  table  if not exists  dwd_queue(device_no    string          comment '设备编号',lane_num     int             comment '车道数量',create_time  timestamp       comment '创建时间',lane_no      int             comment '车道编号',lane_type    int             comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',queue_count  int             comment '排队车辆数',queue_len    decimal(10,2)   comment '排队长度(m)',queue_head   decimal(10,2)   comment '排队第一辆车距离停止线距离(m)',queue_tail   decimal(10,2)   comment '排队最后一辆车距离停止线距离(m)'
)
comment '静态排队数据表——动态分区'
partitioned by (day string)
stored as orc
;
--动态插入数据with t1 as(
selectget_json_object(queue_json,'$.deviceNo')   device_no,get_json_object(queue_json,'$.createTime') create_time,get_json_object(queue_json,'$.laneNum')    lane_num,get_json_object(queue_json,'$.queueList')  queue_list
from hurys_dc_ods.ods_queue)
insert  overwrite  table  hurys_dc_dwd.dwd_queue partition(day)
selectt1.device_no,t1.lane_num,substr(create_time,1,19)                                               create_time ,get_json_object(list_json,'$.laneNo')                                  lane_no,get_json_object(list_json,'$.laneType')                                lane_type,get_json_object(list_json,'$.queueCount')                              queue_count,cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2))       queue_len,cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2))       queue_head,cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))       queue_tail,date(t1.create_time) day
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,'\\[|\\]','') ,   --将json数组两边的中括号去掉'\\}\\,\\{','\\}\\;\\{'),  --将json数组元素之间的逗号换成分号'\\;') --以分号作为分隔符(split函数以分号作为分隔))list_queue as list_json
where  device_no is not null  and create_time is not null and  get_json_object(list_json,'$.queueLen') between 0 and 500
and  get_json_object(list_json,'$.queueHead')  between 0 and 500 and  get_json_object(list_json,'$.queueTail')  between 0 and 500 and  get_json_object(list_json,'$.queueCount') between 0 and 100
group by t1.device_no, t1.lane_num, substr(create_time,1,19), get_json_object(list_json,'$.laneNo'), get_json_object(list_json,'$.laneType'), get_json_object(list_json,'$.queueCount'), cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2)), cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2)), cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2)), date(t1.create_time)
;
--查看分区
show partitions dwd_queue;
--查看数据
select * from dwd_queue
where day='2024-03-11';
--删掉表分区
alter table hurys_dc_dwd.dwd_queue drop partition (day='2024-03-11');

2、业务数据的JSON只有一层

--2、转向比数据内部表——动态分区  dwd_turnratio
create  table  if not exists  dwd_turnratio(device_no       string        comment '设备编号',cycle           int           comment '转向比数据周期' ,create_time     timestamp     comment '创建时间',volume_sum      int           comment '指定时间段内通过路口的车辆总数',speed_avg       decimal(10,2) comment '指定时间段内通过路口的所有车辆速度的平均值',volume_left     int           comment '指定时间段内通过路口的左转车辆总数',speed_left      decimal(10,2) comment '指定时间段内通过路口的左转车辆速度的平均值',volume_straight int           comment '指定时间段内通过路口的直行车辆总数',speed_straight  decimal(10,2) comment '指定时间段内通过路口的直行车辆速度的平均值',volume_right    int           comment '指定时间段内通过路口的右转车辆总数',speed_right     decimal(10,2) comment '指定时间段内通过路口的右转车辆速度的平均值',volume_turn     int           comment '指定时间段内通过路口的掉头车辆总数',speed_turn      decimal(10,2) comment '指定时间段内通过路口的掉头车辆速度的平均值'
)
comment '转向比数据表——动态分区'
partitioned by (day string)   --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
stored as orc                 --表存储数据格式为orc
;
--动态插入数据
--解析json字段、去重、非空、volumeSum>=0
--speed_avg、speed_left、speed_straight、speed_right、speed_turn 等字段保留两位小数
--0<=volume_sum<=1000、0<=speed_avg<=150、0<=volume_left<=1000、0<=speed_left<=100、0<=volume_straight<=1000
--0<=speed_straight<=150、0<=volume_right<=1000、0<=speed_right<=100、0<=volume_turn<=100、0<=speed_turn<=100
with t1 as(
selectget_json_object(turnratio_json,'$.deviceNo')        device_no,get_json_object(turnratio_json,'$.cycle')           cycle,get_json_object(turnratio_json,'$.createTime')      create_time,get_json_object(turnratio_json,'$.volumeSum')       volume_sum,cast(get_json_object(turnratio_json,'$.speedAvg')     as decimal(10,2))    speed_avg,get_json_object(turnratio_json,'$.volumeLeft')      volume_left,cast(get_json_object(turnratio_json,'$.speedLeft')    as decimal(10,2))    speed_left,get_json_object(turnratio_json,'$.volumeStraight')  volume_straight,cast(get_json_object(turnratio_json,'$.speedStraight')as decimal(10,2))    speed_straight,get_json_object(turnratio_json,'$.volumeRight')     volume_right,cast(get_json_object(turnratio_json,'$.speedRight')   as decimal(10,2))    speed_right ,case when  get_json_object(turnratio_json,'$.volumeTurn')  is null then 0 else get_json_object(turnratio_json,'$.volumeTurn')  end as   volume_turn ,case when  get_json_object(turnratio_json,'$.speedTurn')   is null then 0 else cast(get_json_object(turnratio_json,'$.speedTurn')as decimal(10,2))   end as   speed_turn
from hurys_dc_ods.ods_turnratio)
insert overwrite table hurys_dc_dwd.dwd_turnratio partition (day)
selectt1.device_no,cycle,substr(create_time,1,19)              create_time ,volume_sum,speed_avg,volume_left,speed_left,volume_straight,speed_straight ,volume_right,speed_right ,volume_turn,speed_turn,date(create_time) day
from t1
where device_no is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left  between 0 and 1000
and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150
and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100
group by t1.device_no, cycle, substr(create_time,1,19), volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn, date(create_time)
;
--查看分区
show partitions dwd_turnratio;
--查看数据
select * from hurys_dc_dwd.dwd_turnratio
where day='2024-03-11';
--删掉表分区
alter table hurys_dc_dwd.dwd_turnratio drop partition (day='2024-03-11');

(二)维度数据清洗

create table if not exists  dwd_radar_lane(device_no         string  comment '雷达编号',lane_no           string  comment '车道编号',lane_id           string  comment '车道id',lane_direction    string  comment '行驶方向',lane_type         int     comment '车道类型 0渠化,1来向路段,2出口,3去向路段,4路口,5非路口路段,6其他',lane_length       float   comment '车道长度',lane_type_name    string  comment '车道类型名称'
)
comment '雷达车道信息表'
stored as orc
;
--create table if not exists  dwd_radar_lane  stored as orc as
--加载数据
insert overwrite table  hurys_dc_dwd.dwd_radar_lane
select
device_no, lane_no, lane_id, lane_direction, lane_type,lane_length ,case when lane_type='0' then '渠化'when lane_type='1' then '来向路段'when lane_type='2' then '出口'when lane_type='3' then '去向路段'end as lane_type_name
from hurys_dc_basic.tb_radar_lane
where lane_length is not null
group by device_no, lane_no, lane_id, lane_direction, lane_type, lane_length
;
--查看表数据
select * from hurys_dc_dwd.dwd_radar_lane;

五、DWS层

create  table  if not exists  dws_statistics_volume_1hour(device_no        string         comment '设备编号',scene_name       string         comment '场景名称',lane_no          int            comment '车道编号',lane_direction   string         comment '车道流向',section_no       int            comment '断面编号',device_direction string         comment '雷达朝向',sum_volume_hour  int            comment '每小时总流量',start_time       timestamp      comment '开始时间'
)
comment '统计数据流量表——动态分区——1小时周期'
partitioned by (day string)
stored as orc
;
--动态加载数据  --两个一起 1m41s 、 convert.join=false  1m43s、
--注意字段顺序  查询语句中字段顺序与建表字段顺序一致
insert  overwrite  table  hurys_dc_dws.dws_statistics_volume_1hour  partition(day)
selectdwd_st.device_no,dwd_sc.scene_name,dwd_st.lane_no,dwd_rl.lane_direction,dwd_st.section_no,dwd_rc.device_direction,sum(volume_sum) sum_volume_hour,concat(substr(create_time, 1, 14), '00:00') start_time,day
from hurys_dc_dwd.dwd_statistics as dwd_stright join hurys_dc_dwd.dwd_radar_lane as dwd_rlon dwd_rl.device_no=dwd_st.device_no and dwd_rl.lane_no=dwd_st.lane_noright join hurys_dc_dwd.dwd_device_scene as dwd_dson dwd_ds.device_no=dwd_st.device_noright join hurys_dc_dwd.dwd_scene as dwd_scon dwd_sc.scene_id = dwd_ds.scene_idright join hurys_dc_dwd.dwd_radar_config as dwd_rcon dwd_rc.device_no=dwd_st.device_no
where dwd_st.create_time is not null
group by dwd_st.device_no, dwd_sc.scene_name, dwd_st.lane_no, dwd_rl.lane_direction, dwd_st.section_no, dwd_rc.device_direction, concat(substr(create_time, 1, 14), '00:00'), day
;
--查看分区
show partitions dws_statistics_volume_1hour;
--查看数据
select * from hurys_dc_dws.dws_statistics_volume_1hour
where day='2024-02-29';

六、ADS层

这里的ADS层,其实就是用Kettle把Hive的DWS层结果数据同步到ClickHouse中,也是一个Kettle任务而已

这样用海豚进行调度每一层的任务,整个离线数仓流程就跑起来了

七、海豚调度任务(除了2个采集任务外)

(一)delete_stale_data(根据删除策略删除ODS层原始数据)

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
day_30_ago_date=`date -d "30 day ago " +%Y-%m-%d`

#静态排队数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=${day_30_ago_date}
fi

#轨迹数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_track/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_track/day=${day_30_ago_date}
fi

#动态排队数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_queue_dynamic/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_queue_dynamic/day=${day_30_ago_date}
fi

#区域数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_area/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_area/day=${day_30_ago_date}
fi

#事件数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_event/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_event/day=${day_30_ago_date}
fi

#删除表分区
hive -e "
use hurys_dc_ods;

alter table hurys_dc_ods.ods_area drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_event drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_queue drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_queue_dynamic drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_track drop partition (day='$day_30_ago_date')
"

(二)flume(Flume采集Kafka业务数据)

(三)create_database_table(自动创建Hive和ClickHouse的库表)

1、创建Hive库表

#! /bin/bash
source /etc/profile

hive -e "
source  1_dws.sql
"

2、创建ClickHouse库表

#! /bin/bash
source /etc/profile

clickhouse-client --user default --password hurys@123 -d default --multiquery <1_ads.sql

(四)hive_dws(DWS层任务)

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dws;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=2000;
    
            
insert  overwrite  table  hurys_dc_dws.dws_statistics_volume_1hour  partition(day='$yesdate')
select
       dwd_st.device_no,
       dwd_sc.scene_name,
       dwd_st.lane_no,
       dwd_rl.lane_direction,
       dwd_st.section_no,
       dwd_rc.device_direction,
       sum(volume_sum) sum_volume_hour,
       concat(substr(create_time, 1, 14), '00:00') start_time
from hurys_dc_dwd.dwd_statistics as dwd_st
    right join hurys_dc_dwd.dwd_radar_lane as dwd_rl
              on dwd_rl.device_no=dwd_st.device_no and dwd_rl.lane_no=dwd_st.lane_no
    right join hurys_dc_dwd.dwd_device_scene as dwd_ds
              on dwd_ds.device_no=dwd_st.device_no
    right join hurys_dc_dwd.dwd_scene as dwd_sc
              on dwd_sc.scene_id = dwd_ds.scene_id
    right join hurys_dc_dwd.dwd_radar_config as dwd_rc
              on dwd_rc.device_no=dwd_st.device_no
where dwd_st.create_time is not null  and  day= '$yesdate'
group by dwd_st.device_no, dwd_sc.scene_name, dwd_st.lane_no, dwd_rl.lane_direction, dwd_st.section_no, dwd_rc.device_direction, concat(substr(create_time, 1, 14), '00:00')    
"

(五)hive_basic(维度表基础库)

#! /bin/bash
source /etc/profile

hive -e "
set hive.vectorized.execution.enabled=false;

use hurys_dc_basic
"

(六)dolphinscheduler_log(删除海豚日志文件)

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

cd  /usr/local/hurys/dc_env/dolphinscheduler/dolphin/logs/

rm -rf dolphinscheduler-api.$yesdate*.log
rm -rf dolphinscheduler-master.$yesdate*.log
rm -rf dolphinscheduler-worker.$yesdate*.log

(七)Kettle_Hive_to_ClickHouse(Kettle采集Hive的DWS层数据同步到ClickHouse的ADS层中)

#!/bin/bash
source /etc/profile

/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/hive_to_clickhouse/ -trans=17_Hive_to_ClickHouse_ads_avg_volume_15min level=Basic >>/home/log/kettle/17_Hive_to_ClickHouse_ads_avg_volume_15min_`date +%Y%m%d`.log 

(八)Kettle_MySQL_to_HDFS(Kettle采集MySQL维度表数据到HDFS中)

(九)hive_dwd(DWD层任务)

1、业务数据的JSON有多层

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dwd;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=1500;

with t1 as(
select
       get_json_object(queue_json,'$.deviceNo')   device_no,
       get_json_object(queue_json,'$.createTime') create_time,
       get_json_object(queue_json,'$.laneNum')    lane_num,
       get_json_object(queue_json,'$.queueList')  queue_list
from hurys_dc_ods.ods_queue
where date(get_json_object(queue_json,'$.createTime')) = '$yesdate'
    )
insert  overwrite  table  hurys_dc_dwd.dwd_queue partition(day='$yesdate')
select
        t1.device_no,
        t1.lane_num,
        substr(create_time,1,19)                                               create_time ,
        get_json_object(list_json,'$.laneNo')                                  lane_no,
        get_json_object(list_json,'$.laneType')                                lane_type,
        get_json_object(list_json,'$.queueCount')                              queue_count,
        cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2))       queue_len,
        cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2))       queue_head,
        cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))       queue_tail
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\\\[|\\\\]','') ,      --将json数组两边的中括号去掉
                                 '\\\\}\\\\,\\\\{','\\\\}\\\\;\\\\{'),   --将json数组元素之间的逗号换成分号
                   '\\\\;')   --以分号作为分隔符(split函数以分号作为分隔)
          )list_queue as list_json
where  device_no is not null  and  get_json_object(list_json,'$.queueLen') between 0 and 500 and  get_json_object(list_json,'$.queueHead')  between 0 and 500 and  get_json_object(list_json,'$.queueTail')  between 0 and 500 and  get_json_object(list_json,'$.queueCount') between 0 and 100
group by t1.device_no, t1.lane_num, substr(create_time,1,19), get_json_object(list_json,'$.laneNo'), get_json_object(list_json,'$.laneType'), get_json_object(list_json,'$.queueCount'), cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2)), cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2)), cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))
"

2、业务数据的JSON单层

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dwd;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=1500;

with t1 as(
select
        get_json_object(turnratio_json,'$.deviceNo')        device_no,
        get_json_object(turnratio_json,'$.cycle')           cycle,
        get_json_object(turnratio_json,'$.createTime')      create_time,
        get_json_object(turnratio_json,'$.volumeSum')       volume_sum,
        cast(get_json_object(turnratio_json,'$.speedAvg')     as decimal(10,2))    speed_avg,
        get_json_object(turnratio_json,'$.volumeLeft')      volume_left,
        cast(get_json_object(turnratio_json,'$.speedLeft')    as decimal(10,2))    speed_left,
        get_json_object(turnratio_json,'$.volumeStraight')  volume_straight,
        cast(get_json_object(turnratio_json,'$.speedStraight')as decimal(10,2))    speed_straight,
        get_json_object(turnratio_json,'$.volumeRight')     volume_right,
        cast(get_json_object(turnratio_json,'$.speedRight')   as decimal(10,2))    speed_right ,
        case when  get_json_object(turnratio_json,'$.volumeTurn')  is null then 0 else get_json_object(turnratio_json,'$.volumeTurn')  end as   volume_turn ,
        case when  get_json_object(turnratio_json,'$.speedTurn')   is null then 0 else cast(get_json_object(turnratio_json,'$.speedTurn')as decimal(10,2))   end as   speed_turn
from hurys_dc_ods.ods_turnratio
where date(get_json_object(turnratio_json,'$.createTime')) = '$yesdate'
)
insert overwrite table hurys_dc_dwd.dwd_turnratio partition (day='$yesdate')
select
       t1.device_no,
       cycle,
       substr(create_time,1,19)              create_time ,
       volume_sum,
       speed_avg,
       volume_left,
       speed_left,
       volume_straight,
       speed_straight ,
       volume_right,
       speed_right ,
       volume_turn,
       speed_turn
from t1
where device_no is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left  between 0 and 1000 and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150 and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100
group by t1.device_no, cycle, substr(create_time,1,19), volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn
"

3、维度数据

#! /bin/bash
source /etc/profile

hive -e "
use hurys_dc_dwd;

set hive.vectorized.execution.enabled=false;

insert overwrite table hurys_dc_dwd.dwd_holiday
select
day, holiday,year
from hurys_dc_basic.tb_holiday
group by day, holiday, year
"

(十)hive_ods(ODS层任务)

#! /bin/bash
source /etc/profile

hive -e "
use hurys_dc_ods;

msck repair table ods_queue;

msck repair table ods_turnratio;

msck repair table ods_queue_dynamic;

msck repair table ods_statistics;

msck repair table ods_area;

msck repair table ods_pass;

msck repair table ods_track;

msck repair table ods_evaluation;

msck repair table ods_event;
"

目前,整个离线数仓的流程大致就是这样,有问题的后面再完善!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/295135.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式任务调度框架XXL-JOB

1、概述 XXL-JOB是一个分布式任务调度平台&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线&#xff0c;开箱即用。 官方文档&#xff1a;https://www.xuxueli.com/xxl-job/#%E4%BA%8C%E3%80%81%E5%BF%AB%E9%80%9F%E…

ENSP DHCP 配置不同网段

配置不同网段。 配置&#xff1a;192.168.80.254 192.168.10.254 192.168.20.254 给绑定信息选择网卡&#xff0c;出端口编号改为2&#xff0c;勾选双向通道&#xff0c;点击增加。 接下来把vmnet 1和vmnet 2 都按上图所示。 打开路由器&#xff0c;就开始配置。

文本自动粘贴编辑器:支持自动粘贴并筛选手机号码,让信息处理更轻松

在信息时代的浪潮中&#xff0c;文本处理已成为我们日常工作与生活的重要组成部分。无论是商务沟通、社交互动还是个人事务处理&#xff0c;手机号码的筛选与粘贴都显得尤为关键。然而&#xff0c;传统的文本处理方式效率低下、易出错&#xff0c;已无法满足现代人的高效需求。…

MegaSeg Pro for Mac v6.3.1 注册激活版 音视频DJ混音工具

MegaSeg Pro for Mac是一款专业的DJ和广播自动化软件&#xff0c;旨在为音乐专业人士提供强大的音乐播放和演播功能。这款软件具有多种功能&#xff0c;包括强大的音乐库管理&#xff0c;支持导入和组织大量音乐文件&#xff0c;可以轻松管理你的音乐收藏。它支持广泛的音频格式…

将excel数据拆分成多个excel文件

一、背景&#xff1a; 平时在日常工作中&#xff0c;经常需要将excel的文件数据进行拆分&#xff0c;拆分成多个excel文件&#xff0c;然而用人工来处理这个既耗时&#xff0c;又费精力&#xff0c;眼睛会疲劳&#xff0c;时间长了操作上会出现失误&#xff0c;导致数据拆分错…

【嵌入式硬件】光耦

1.光耦作用 光耦一般用于信号的隔离。当两个电路的电源参考点不相关时,使用光耦可以保证在两边不共地的情况下,完成信号的传输。 2.光耦原理 光耦的原理图如下所示,其内部可以看做一个特殊的“三极管”; 一般的三极管是通过基极B和发射极E间的电流,去控制集电极C和发射极…

OpenHarmony实战:Combo解决方案之ASR芯片移植案例

本方案基于 OpenHarmony LiteOS-M 内核&#xff0c;使用 ASR582X 芯片的 DEV.WIFI.A 开发板进行开发移植。作为典型的 IOT Combo&#xff08;Wi-FiBLE&#xff09;解决方案&#xff0c;本文章介绍 ASR582X 的适配过程。 编译移植 目录规划 本方案的目录结构使用 Board 和 So…

Mac上怎么合并多张图片?

Mac上怎么合并多张图片&#xff1f;上班过的小伙伴都应该知道&#xff0c;合并拼接图片是一件非常重要且经常需要使用到的图片处理技术&#xff0c;将多张图片合并拼成一张之后能够展现出更多的图片内容。在Mac电脑上&#xff0c;合并多张图片是一项常见的任务&#xff0c;无论…

大话设计模式之抽象工厂模式

抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;是一种创建型设计模式&#xff0c;它提供了一种方式来创建一系列相关或依赖对象的家族&#xff0c;而无需指定其具体类。该模式通过提供一个抽象工厂接口&#xff0c;定义了一组可以创建不同类型对象的方法&#…

JavaScript_与html结合方式

JavaScript_语法 ECMAScript&#xff1a;客户端脚本语言的标准 1.基本语法 1.1 与html结合方式&#xff08;2种&#xff09; 1. 内部JS 定义<script>,标签体内容就是js代码 2. 外部JS 定义<script>,通过src属性引入外部的 js文件 注意&#xff1a; 1.<script>…

【opencv】教程代码 —videoio(2)将两个视频的每一帧逐一读取并计算其PSNR 和MSSIM...

本教程开始介绍的源代码将对每一帧执行PSNR测量&#xff0c;并且只对PSNR低于输入值的帧进行SSIM测量。为了可视化的目的&#xff0c;我们在OpenCV窗口中展示两幅图像&#xff0c;并将PSNR和MSSIM值打印到控制台。期望看到如下内容&#xff1a; video-input-psnr-ssim.cpp 将两…

【“状态机” 解析UART不定长度的协议帧】

【“状态机” 解析UART不定长度的协议帧】 1. 数据帧格式2. 状态机原理3. 代码实现 通信设计中考虑协议的灵活性&#xff0c;经常把协议设计成“不定长度”。如果一个系统接收上述“不定长度”的协议帧&#xff0c;将会有一个挑战–如何高效接收与解析。一个实例如下图&#xf…

Linux (Ubuntu)- mysql8 部署

1.基本部署 01》》先查看OS类型&#xff0c;如果是Ubuntu在往下边看 rootspray:/etc/mysql/mysql.conf.d# lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.6 LTS Release: …

Spring AOP + 自定义注解 实现公共字段的填充

Spring AOP 自定义注解 实现公共字段的填充 代码冗,不利于后期维护. 定义操作这些字段的方法类型 实现步骤&#xff1a; 自定义注解AutoFill,用于表示操作这些公共字段的方法自定义切面类AutoFillAspect,统一拦截&#xff0c;通过反射获取方法入参&#xff0c;并填充公共字段…

springboot之MybatisPlus

文章目录 一、ORM二、mybatis实际操作三、mybatis-plus 一、ORM 简单来说ORM就是一个能够帮我们把java中Bean类映射到数据库中。 使用mybatis-plus。 配置架包 <!-- MyBatisPlus依赖 --><dependency><groupId>com.baomidou</groupId><art…

多态--下

文章目录 概念多态如何实现的指向谁调谁&#xff1f;例子分析 含有虚函数类的大小是多少&#xff1f;虚函数地址虚表地址多继承的子类的大小怎么计算&#xff1f;练习题虚函数和虚继承 概念 优先使用组合、而不是继承; 继承会破坏父类的封装、因为子类也可以调用到父类的函数;…

MyBatis-Plus04(条件构造器)

条件构造器和常用接口 wrapper介绍 Wrapper &#xff1a; 条件构造抽象类&#xff0c;最顶端父类 AbstractWrapper &#xff1a; 用于查询条件封装&#xff0c;生成 sql 的 where 条件 QueryWrapper &#xff1a; 查询条件封装 UpdateWrapper &#xff1a; Update 条件封装 A…

源支付V7开源版2.99,修复各种提示错误

源支付V7开源版2.99&#xff0c;修复各种提示错误 加密说明&#xff1a;200拿来的&#xff0c;只有8.1这个文件加密&#xff0c;其他文件无任何加密&#xff0c;已修复各种提示错误 测试其他开源版安装提示错误&#xff0c;有几个文件是加密的 注&#xff1a;开发不易&#…

【更新】上市公司-ZF环保补贴、补助数据(2008-2022年)

01、数据简介 环保补贴&#xff0c;又称绿色补贴&#xff0c;是ZF在环保领域实施的一种特定补贴。它主要针对那些在经济主体意识上存在偏差或由于资金私有制而无法有效进行环保投资的企业。环保补贴的目的是解决环保问题&#xff0c;帮助企业改进环保设备和工艺&#xff0c;以…

职场新军—网络安全,这个行业的实际情况你知道吗?

作为IT行业的香饽饽&#xff0c;网络安全工程师可谓是金字塔尖上的摇钱树。这专业近几年的薪资一直狂飙&#xff0c;简直就是职业版的“月薪过万”&#xff01;但别急着入坑&#xff0c;毕竟不是每个学了网络安全的都能在金钱雨中舞蹈&#xff0c;有时候还得小心不被淘汰。 听…