系列文章目录
物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一
文章目录
- 系列文章目录
- 前言
- 一、代码编写
- 1.文件创建
- 1.主程序
- 2.实体类
- 3.自定义触发器
- 4.自定义聚合函数
- 5.在HbaseUtil中添加查询方法
- 6.JedisUtil工具类
- 7.封装DimUtil工具类,使用旁路缓存优化查询维度
- 8.修改实体类TmsConfigDimBean
- 9.传递op
- 10.获取线程池的工具类
- 11.异步关联函数DimAsyncFunction
- 12.ClickHouseUtil工具类
- 13. DimSinkFunction
- 14. TransientSink注解
- 2.主程序
- 3.开窗聚合
- 1.MyTriggerFunction
- 2.MyAggregationFunction
- 4.关联维度信息
- 1.DimAsyncFunction
- 2.DimJoinFunction
- 3.ThreadPoolUtil
- 4.DimUtil
- 5.JedisUtil
- 6. HbaseUtil
- 7. DwsBoundOrgSortDayBean
- 8.补充维度字段
- 9. MyBroadcastProcessFunction
- 10. DimSinkFunction
- 5.写入CK
- 1. ClickHouseUtil
- 2.TransientSink
- 二、代码测试
- 1.程序启动
- 2.修改kafka分区
- 3.ck建表
- 1.建库
- 2.建表
- 3.物化视图
- 4.查看结果
- 总结
前言
这次博客,我们进行各机构分拣次数的统计。统计当日各机构的分拣次数,并补充城市、省份等维度信息,写入ClickHouse对应表。要求每十秒钟更新一次统计结果。
大体流程如图。
一、代码编写
1.文件创建
1.主程序
2.实体类
3.自定义触发器
4.自定义聚合函数
5.在HbaseUtil中添加查询方法
6.JedisUtil工具类
7.封装DimUtil工具类,使用旁路缓存优化查询维度
8.修改实体类TmsConfigDimBean
9.传递op
10.获取线程池的工具类
11.异步关联函数DimAsyncFunction
12.ClickHouseUtil工具类
以上就是这次博客要更改或创建的java文件。
13. DimSinkFunction
当维度数据更新时,删除redis中的对应数据。
14. TransientSink注解
某些字段不需要写入ClickHouse,但对统计有帮助,我们可以通过添加自定义注解,在写出时获取字段的TransientSink注解,通过判断是否注解是否为空在写出时忽略指定字段。
2.主程序
DwsBoundOrgSortDay需要完成的任务如以下流程图。
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwsBoundOrgSortDayBean;
import com.atguigu.tms.realtime.utils.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class DwsBoundOrgSortDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// kafka读取数据String topic = "tms_dwd_bound_sort";String groupId = "dws_tms_dwd_bound_sort";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中的数据进行类型转换 jsonStr-> 实体类SingleOutputStreamOperator<DwsBoundOrgSortDayBean> dwsBoundOrgSortDayBeanSingleOutputStreamOperator = kafkaStrDS.map(new MapFunction<String, DwsBoundOrgSortDayBean>() {@Overridepublic DwsBoundOrgSortDayBean map(String jsonStr) throws Exception {DwdBoundSortBean dwdBoundSortBean = JSON.parseObject(jsonStr, DwdBoundSortBean.class);return DwsBoundOrgSortDayBean.builder().orgId(dwdBoundSortBean.getOrgId()).sortCountBase(1L).ts(dwdBoundSortBean.getTs() + 8 * 60 * 60 * 1000L).build();}});// 指定Watermark以及提取事件事件字段SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withWatermarkDS = dwsBoundOrgSortDayBeanSingleOutputStreamOperator.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsBoundOrgSortDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsBoundOrgSortDayBean>() {@Overridepublic long extractTimestamp(DwsBoundOrgSortDayBean boundOrgSortDayBean, long recordTimestamp) {return boundOrgSortDayBean.getTs();}}));// withWatermarkDS.print("###");// 按照机构id进行分组KeyedStream<DwsBoundOrgSortDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsBoundOrgSortDayBean::getOrgId);// 开窗WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1L)));// 指定自定义触发器WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<>());// 聚合SingleOutputStreamOperator<DwsBoundOrgSortDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsBoundOrgSortDayBean>() {@Overridepublic DwsBoundOrgSortDayBean add(DwsBoundOrgSortDayBean value, DwsBoundOrgSortDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setSortCountBase(accumulator.getSortCountBase() + 1);return accumulator;}},new ProcessWindowFunction<DwsBoundOrgSortDayBean, DwsBoundOrgSortDayBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsBoundOrgSortDayBean> elements, Collector<DwsBoundOrgSortDayBean> out) throws Exception {for (DwsBoundOrgSortDayBean element : elements) {// 获取窗口起始时间long stt = context.window().getStart();// 将窗口时间左移8小时 并转换格式element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度(城市、省份)// 关联机构维度 获取机构名称// 异步I/OSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withOrgNameDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setOrgName(dimInfoJsonObj.getString("org_name"));String orgParentId = dimInfoJsonObj.getString("org_parent_id");sortDayBean.setJoinOrgId(orgParentId != null?orgParentId:sortDayBean.getOrgId());}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getOrgId());}},60,TimeUnit.SECONDS);// 补充城市IDSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityIdDS = AsyncDataStream.unorderedWait(withOrgNameDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityId(dimInfoJsonObj.getString("region_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getJoinOrgId());}},60,TimeUnit.SECONDS);// 关联地区维度表 根据城市的id获取城市名称以及当前城市所属的省份idSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityNameAndProvinceIdDS = AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityName(dimInfoJsonObj.getString("name"));sortDayBean.setProvinceId(dimInfoJsonObj.getString("parent_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getCityId());}},60, TimeUnit.SECONDS);// 关联地区维度表 根据省份的id获取省份的名称SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withProvinceDS = AsyncDataStream.unorderedWait(withCityNameAndProvinceIdDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setProvinceName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getProvinceId());}},60, TimeUnit.SECONDS);withProvinceDS.print(">>>>");// 将关联的结果写入ck中withProvinceDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_bound_org_sort_day_base values(?,?,?,?,?,?,?,?,?)"));env.execute();}
}
现在我们就按照主程序的调用来完成其他文件的编写。
3.开窗聚合
开窗之前没有用到新的函数,所以不说了。
1.MyTriggerFunction
自定义触发器
package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;//自定义触发器 每10s触发一次窗口计算
public class MyTriggerFunction<T> extends Trigger<T, TimeWindow> {@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ValueStateDescriptor<Boolean> valueStateDescriptor= new ValueStateDescriptor<Boolean>("isFirstState",Boolean.class);ValueState<Boolean> isFirstState = ctx.getPartitionedState(valueStateDescriptor);Boolean isFirst = isFirstState.value();if(isFirst == null){//如果是窗口中的第一个元素//将状态中的值进行更新isFirstState.update(true);//注册定时器 当前事件时间向下取整后 + 10s后执行ctx.registerEventTimeTimer(timestamp -timestamp%10000L + 2000L);}else if(isFirst){isFirstState.update(false);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}//time 表示事件时间触发器 触发时间@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {long end = window.getEnd();if(time < end){if(time + 2000L < end){ctx.registerEventTimeTimer(time + 2000L);}return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}
}
2.MyAggregationFunction
自定义聚合函数
package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.functions.AggregateFunction;public abstract class MyAggregationFunction<T> implements AggregateFunction<T,T,T> {@Overridepublic T createAccumulator() {return null;}@Overridepublic T getResult(T accumulator) {return accumulator;}@Overridepublic T merge(T a, T b) {return null;}
}
4.关联维度信息
1.DimAsyncFunction
异步I/O
package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DimJoinFunction;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.ThreadPoolUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.Collections;
import java.util.concurrent.ExecutorService;public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {private String tableName;public DimAsyncFunction(String tableName) {this.tableName = tableName;}private ExecutorService executorService;@Overridepublic void open(Configuration parameters) throws Exception {executorService = ThreadPoolUtil.getInstance();}@Overridepublic void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {// 从线程池中获取线程,发送异步请求executorService.submit(new Runnable() {@Overridepublic void run() {// 根据流中的对象获取要作为查询条件的主键或者外键Tuple2<String, String> keyNameAndValue = getCondition(obj);// 根据查询条件获取维度对象JSONObject dimInfoJsonObj = DimUtil.getDimInfo(TmsConfig.HBASE_NAMESPACE, tableName, keyNameAndValue);// 将维度对象的属性补充到流中的对象上if (dimInfoJsonObj != null) {join(obj, dimInfoJsonObj);}// 向下游传递数据resultFuture.complete(Collections.singleton(obj));}});}
}
2.DimJoinFunction
我们将需要DimAsyncFunction中一些需要子类实现的函数写入DimJoinFunction,当作接口接入。
package com.atguigu.tms.realtime.beans;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;public interface DimJoinFunction<T> {void join(T obj, JSONObject dimInfoJsonObj);Tuple2<String, String> getCondition(T obj);
}
3.ThreadPoolUtil
异步I/O中用作创建线程池的工具类
package com.atguigu.tms.realtime.utils;import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private static volatile ThreadPoolExecutor poolExecutor;public static synchronized ThreadPoolExecutor getInstance() {if (poolExecutor == null) {synchronized (ThreadPoolUtil.class){if (poolExecutor == null) {poolExecutor = new ThreadPoolExecutor(4,20,300,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE));}}}return poolExecutor;}
}
4.DimUtil
在维度关联时,我们需要从hbase中获取维度信息,为了为了优化查询速度,我们引入了redis,流程如图所示
package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class DimUtil {public static JSONObject getDimInfo(String namespace, String tableName, Tuple2<String, String> nameAndValue) {// 获取的查询条件中的字段名以及字段值String keyName = nameAndValue.f0;String keyValue = nameAndValue.f1;// 拼接从Redis中查询数据的KeyString redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;// 操作Redis的客户端Jedis jedis = null;// 用于存放从Redis查询的维度数据String dimJsonStr = null;// 用于封装返回结果JSONObject dimJsonObj = null;// 先从缓存中查询维度数据try {jedis = JedisUtil.getJedis();dimJsonStr = jedis.get(redisKey);if (StringUtils.isNotEmpty(dimJsonStr)) {// 如果在缓存中能够找到要查询的维度dimJsonObj = JSON.parseObject(dimJsonStr);} else {// 如果在缓存中,没有找到要查询的维度数据if ("id".equals(keyName)) {dimJsonObj = HbaseUtil.getRowByPrimaryKey(namespace, tableName, nameAndValue);} else {dimJsonObj = HbaseUtil.getRowByForeignKey(namespace, tableName, nameAndValue);}if (dimJsonObj != null && jedis != null) {jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());}}} catch (Exception e) {log.error("从Redis中查询维度数据发生了一场", e);} finally {if (jedis != null) {System.out.println("关闭客户端");jedis.close();}}return dimJsonObj;}// 从Redis中删除缓存的维度数据public static void delCached(String tableName, Tuple2<String, String> keyNameAndValue) {String keyName = keyNameAndValue.f0;String keyValue = keyNameAndValue.f1;String redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;Jedis jedis = null;try {jedis = JedisUtil.getJedis();jedis.decr(redisKey);}catch (Exception e){log.error("清除Redis中缓存的维度数据时发生了异常", e);}finally {if (jedis != null) {jedis.close();}}}
}
5.JedisUtil
用于连接reids的jedis客户端。
先在pom中引入依赖
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version>
</dependency>
package com.atguigu.tms.realtime.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class JedisUtil {private static JedisPool jedisPool;static {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(1000);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000L);poolConfig.setTestOnBorrow(true);jedisPool=new JedisPool(poolConfig,"hadoop102",6379,10000);}public static Jedis getJedis(){System.out.println("创建Jedis客户端");Jedis jedis = jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis = getJedis();String pong = jedis.ping();System.out.println(pong);}
}
6. HbaseUtil
之前我们在HbaseUtil完成了创建表和插入操作,现在来完成查询操作。
package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HbaseUtil {private static Connection conn;static {try {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", TmsConfig.hbase_zookeeper_quorum);conn = ConnectionFactory.createConnection(conf);} catch (IOException e) {throw new RuntimeException(e);}}// 创建表public static void createTable(String nameSpace, String tableName, String... families) {Admin admin = null;try {if (families.length < 1) {System.out.println("至少需要一个列族");return;}admin = conn.getAdmin();// 判断表是否存在if (admin.tableExists(TableName.valueOf(nameSpace, tableName))) {System.out.println(nameSpace + ":" + tableName + "已存在");return;}TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));// 指定列族for (String family : families) {ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();builder.setColumnFamily(familyDescriptor);}admin.createTable(builder.build());} catch (IOException e) {throw new RuntimeException(e);} finally {if (admin != null) {try {admin.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 向hbase插入对象public static void putPow(String namespace, String tableName, Put put) {BufferedMutator mutator = null;try {BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(namespace, tableName));params.writeBufferSize(5 * 1024 * 1024);params.setWriteBufferPeriodicFlushTimeoutMs(3000L);mutator = conn.getBufferedMutator(params);mutator.mutate(put);} catch (IOException e) {throw new RuntimeException(e);} finally {if (mutator != null) {try {mutator.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 根据主键从Hbase表中查询一行数据public static JSONObject getRowByPrimaryKey(String namespace, String tableName, Tuple2<String, String> rowKeyNameAndKey) {Table table = null;JSONObject dimJsonObj = null;String rowKeyName = rowKeyNameAndKey.f0;String rowKeyValue = rowKeyNameAndKey.f1;try {table = conn.getTable(TableName.valueOf(namespace, tableName));Result result = table.get(new Get(Bytes.toBytes(rowKeyValue)));Cell[] cells = result.rawCells();if (cells.length > 0) {dimJsonObj = new JSONObject();dimJsonObj.put(rowKeyName, rowKeyValue);for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println("从Hbase表中没有找到对应的维度数据");}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}//根据外键从hbase表中查询一行数据public static JSONObject getRowByForeignKey(String namespace, String tableName, Tuple2<String, String> foreignKeyNameAndKey) {Table table = null;JSONObject dimJsonObj = null;try {table = conn.getTable(TableName.valueOf(namespace, tableName));Scan scan = new Scan();String foreignKeyName = foreignKeyNameAndKey.f0;String foreignKeyValue = foreignKeyNameAndKey.f1;SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes(foreignKeyName), CompareOperator.EQUAL,Bytes.toBytes(foreignKeyValue));singleColumnValueFilter.setFilterIfMissing(true);scan.setFilter(singleColumnValueFilter);ResultScanner scanner = table.getScanner(scan);Result result = scanner.next();if (result!=null){Cell[] cells = result.rawCells();if (cells.length > 0) {dimJsonObj = new JSONObject();dimJsonObj.put("id", Bytes.toString(result.getRow()));for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println("从Hbase表中没有找到对应的维度数据");}}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}
}
7. DwsBoundOrgSortDayBean
自定义的工具类,其中包含我们要写入ck的字段
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class DwsBoundOrgSortDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 用于关联获取省份信息的机构 ID@TransientSinkString joinOrgId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 分拣次数Long sortCountBase;// 时间戳Long ts;
}
8.补充维度字段
我代码编写我们需要维度表的外键字段,所以我们重新修改mysql维度表,添加外键字段。
DROP TABLE IF EXISTS `tms_config_dim`;
CREATE TABLE `tms_config_dim` (`source_table` varchar(200) NOT NULL COMMENT '数据源表',`sink_table` varchar(200) DEFAULT NULL COMMENT '目标表',`sink_family` varchar(200) DEFAULT NULL COMMENT '目标表列族',`sink_columns` varchar(200) DEFAULT NULL COMMENT '目标表列',`sink_pk` varchar(256) DEFAULT NULL COMMENT '主键字段',`foreign_keys` varchar(256) DEFAULT NULL COMMENT '外键查询字段',PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='物流实时配置表';
然后从新导入数据。
然后我们使用dimapp同步一下数据即可,具体方法可看Dim层搭建。
9. MyBroadcastProcessFunction
我们之前在DIM层的搭建中,在MyBroadcastProcessFunction的processElement函数中过滤掉了外键,但现在需要他,我们把它加上。
在传递前添加一段代码
// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)
String op = jsonObj.getString("op");
if ("u".equals(op)) {afterJsonObj.put("op", op);// 从配置表中获取当前维度表关联的外键名String foreignKeys = tmsConfigDimBean.getForeignKeys();// 定义个json对象,用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj = new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr = foreignKeys.split(",");for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before = jsonObj.getJSONObject("before");String foreignKeyBefore = before.getString(foreignName);String foreignKeyAfter = afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put("foreign_key", foreignjsonObj);
}
完成代码
package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.TmsConfigDimBean;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;// 自定义类 完成主流和广播流的处理
public class MyBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor;private Map<String, TmsConfigDimBean> configMap = new HashMap<>();private String username;private String password;public MyBroadcastProcessFunction(MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor, String[] args) {this.mapStateDescriptor = mapStateDescriptor;ParameterTool parameterTool = ParameterTool.fromArgs(args);this.username = parameterTool.get("mysql-username", "root");this.password = parameterTool.get("mysql-password", "000000");}@Overridepublic void open(Configuration parameters) throws Exception {// 将配置表中的数据进行预加载-JDBCClass.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://hadoop102:3306/tms_config?useSSL=false&useUnicode=true" +"&user=" + username + "&password=" + password +"&charset=utf8&TimeZone=Asia/Shanghai";Connection conn = DriverManager.getConnection(url);PreparedStatement ps = conn.prepareStatement("select * from tms_config.tms_config_dim");ResultSet rs = ps.executeQuery();ResultSetMetaData metaData = rs.getMetaData();while (rs.next()) {JSONObject jsonObj = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);Object columValue = rs.getObject(i);jsonObj.put(columnName, columValue);}TmsConfigDimBean tmsConfigDimBean = jsonObj.toJavaObject(TmsConfigDimBean.class);configMap.put(tmsConfigDimBean.getSourceTable(), tmsConfigDimBean);}rs.close();ps.close();conn.close();super.open(parameters);}@Overridepublic void processElement(JSONObject jsonObj, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {// 获取操作的业务数据库的表名String table = jsonObj.getString("table");// 获取广播状态ReadOnlyBroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 根据操作的业务数据库的表名 到广播状态中获取对应的配置信息TmsConfigDimBean tmsConfigDimBean;if ((tmsConfigDimBean = broadcastState.get(table)) != null || (tmsConfigDimBean = configMap.get(table)) != null) {// 如果对应的配置信息不为空 是维度信息// 获取after对象,对应的是影响的业务数据表中的一条记录JSONObject afterJsonObj = jsonObj.getJSONObject("after");// 数据脱敏switch (table) {// 员工表信息脱敏case "employee_info":String empPassword = afterJsonObj.getString("password");String empRealName = afterJsonObj.getString("real_name");String idCard = afterJsonObj.getString("id_card");String phone = afterJsonObj.getString("phone");// 脱敏empPassword = DigestUtils.md5Hex(empPassword);empRealName = empRealName.charAt(0) +empRealName.substring(1).replaceAll(".", "\\*");//知道有这个操作 idCard是随机生成的,和标准的格式不一样 所以这里注释掉// idCard = idCard.matches("(^[1-9]\\d{5}(18|19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$)")// ? DigestUtils.md5Hex(idCard) : null;phone = phone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(phone) : null;afterJsonObj.put("password", empPassword);afterJsonObj.put("real_name", empRealName);afterJsonObj.put("id_card", idCard);afterJsonObj.put("phone", phone);break;// 快递员信息脱敏case "express_courier":String workingPhone = afterJsonObj.getString("working_phone");workingPhone = workingPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(workingPhone) : null;afterJsonObj.put("working_phone", workingPhone);break;// 卡车司机信息脱敏case "truck_driver":String licenseNo = afterJsonObj.getString("license_no");licenseNo = DigestUtils.md5Hex(licenseNo);afterJsonObj.put("license_no", licenseNo);break;// 卡车信息脱敏case "truck_info":String truckNo = afterJsonObj.getString("truck_no");String deviceGpsId = afterJsonObj.getString("device_gps_id");String engineNo = afterJsonObj.getString("engine_no");truckNo = DigestUtils.md5Hex(truckNo);deviceGpsId = DigestUtils.md5Hex(deviceGpsId);engineNo = DigestUtils.md5Hex(engineNo);afterJsonObj.put("truck_no", truckNo);afterJsonObj.put("device_gps_id", deviceGpsId);afterJsonObj.put("engine_no", engineNo);break;// 卡车型号信息脱敏case "truck_model":String modelNo = afterJsonObj.getString("model_no");modelNo = DigestUtils.md5Hex(modelNo);afterJsonObj.put("model_no", modelNo);break;// 用户地址信息脱敏case "user_address":String addressPhone = afterJsonObj.getString("phone");addressPhone = addressPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(addressPhone) : null;afterJsonObj.put("phone", addressPhone);break;// 用户信息脱敏case "user_info":String passwd = afterJsonObj.getString("passwd");String realName = afterJsonObj.getString("real_name");String phoneNum = afterJsonObj.getString("phone_num");String email = afterJsonObj.getString("email");// 脱敏passwd = DigestUtils.md5Hex(passwd);if (StringUtils.isNotEmpty(realName)) {realName = DigestUtils.md5Hex(realName);afterJsonObj.put("real_name", realName);}phoneNum = phoneNum.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(phoneNum) : null;email = email.matches("^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$")? DigestUtils.md5Hex(email) : null;afterJsonObj.put("birthday", DateFormatUtil.toDate(afterJsonObj.getInteger("birthday") * 24 * 60 * 60 * 1000L));afterJsonObj.put("passwd", passwd);afterJsonObj.put("phone_num", phoneNum);afterJsonObj.put("email", email);break;}// 过滤不需要的维度属性String sinkColumns = tmsConfigDimBean.getSinkColumns();filterColum(afterJsonObj, sinkColumns);// 补充输出目的的表名String sinkTable = tmsConfigDimBean.getSinkTable();afterJsonObj.put("sink_table", sinkTable);// 补充rowKeyString sinkPk = tmsConfigDimBean.getSinkPk();afterJsonObj.put("sink_pk", sinkPk);// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)String op = jsonObj.getString("op");if ("u".equals(op)) {afterJsonObj.put("op", op);// 从配置表中获取当前维度表关联的外键名String foreignKeys = tmsConfigDimBean.getForeignKeys();// 定义个json对象,用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj = new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr = foreignKeys.split(",");for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before = jsonObj.getJSONObject("before");String foreignKeyBefore = before.getString(foreignName);String foreignKeyAfter = afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put("foreign_key", foreignjsonObj);}// 将维度数据传递out.collect(afterJsonObj);}}private void filterColum(JSONObject afterJsonObj, String sinkColumns) {String[] fieldArr = sinkColumns.split(",");List<String> fieldList = Arrays.asList(fieldArr);Set<Map.Entry<String, Object>> entrySet = afterJsonObj.entrySet();entrySet.removeIf(entry -> !fieldList.contains(entry.getKey()));}@Overridepublic void processBroadcastElement(String jsonStr, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {JSONObject jsonObj = JSON.parseObject(jsonStr);// 获取广播状态BroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 获取对配置表的操作类型String op = jsonObj.getString("op");if ("d".equals(op)) {String sourceTable = jsonObj.getJSONObject("before").getString("source_table");broadcastState.remove(sourceTable);configMap.remove(sourceTable);} else {TmsConfigDimBean configDimBean = jsonObj.getObject("after", TmsConfigDimBean.class);String sourceTable = configDimBean.getSourceTable();broadcastState.put(sourceTable, configDimBean);configMap.put(sourceTable, configDimBean);}}
}
10. DimSinkFunction
添加清除代码
// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉if ("u".equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));// 删除当前维度数据在Redis中对应外键的缓存Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();for (Map.Entry<String, Object> entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}
完整代码
package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.HbaseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;import java.util.Map;
import java.util.Set;public class DimSinkFunction implements SinkFunction<JSONObject> {public void invoke(JSONObject jsonObj, Context context) throws Exception {// 获取输出目的地表名和rowKeyString sinkTable = jsonObj.getString("sink_table");String sinkPk = jsonObj.getString("sink_pk");jsonObj.remove("sink_table");jsonObj.remove("sink_pk");String op = jsonObj.getString("op");jsonObj.remove("op");JSONObject foreignKeyJsonObj = jsonObj.getJSONObject("foreign_key");jsonObj.remove("foreign_key");// 获取json中的每一个键值对Set<Map.Entry<String, Object>> entrySet = jsonObj.entrySet();Put put = new Put(jsonObj.getString(sinkPk).getBytes());for (Map.Entry<String, Object> entry : entrySet) {if (!sinkPk.equals(entry.getKey())) {put.addColumn("info".getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());}}System.out.println("向hbase表中插入数据");HbaseUtil.putPow(TmsConfig.HBASE_NAMESPACE, sinkTable, put);// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉if ("u".equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));// 删除当前维度数据在Redis中对应外键的缓存Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();for (Map.Entry<String, Object> entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}}
}
5.写入CK
1. ClickHouseUtil
先导入需要的依赖。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId></exclusion></exclusions>
</dependency>
ClickHouseUtil
package com.atguigu.tms.realtime.utils;import com.atguigu.tms.realtime.beans.TransientSink;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class ClickHouseUtil {// 获取SinkFunctionpublic static <T> SinkFunction<T> getJdbcSink(String sql) {SinkFunction<T> sinkFunction = JdbcSink.<T>sink(sql,new JdbcStatementBuilder<T>() {@Overridepublic void accept(PreparedStatement ps, T obj) throws SQLException {// 将流中对象的属性给问号占位符赋值// 获取单签流中对象岁数的类型 以及类中的属性Field[] fieldsArr = obj.getClass().getDeclaredFields();// 遍历所有属性int skipNum = 0;for (int i = 0; i < fieldsArr.length; i++) {Field field = fieldsArr[i];// 判断当前属性是否需要向流中保存TransientSink transientSink = field.getAnnotation(TransientSink.class);if (transientSink != null) {skipNum++;continue;}// 设置私有属性的访问权限field.setAccessible(true);try {Object fieldValue = field.get(obj);ps.setObject(i + 1 - skipNum, fieldValue);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}}},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(3000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(TmsConfig.CLICKHOUSE_DRIVER).withUrl(TmsConfig.CLICKHOUSE_URL).build());return sinkFunction;}
}
2.TransientSink
package com.atguigu.tms.realtime.beans;// 自定义主键 用于标记不需要向ck中保存的属性import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}
二、代码测试
1.程序启动
根据代码逻辑,我们需要启动以下程序。
hdfs、zk、kafka、hbase、redise、ck、OdsApp、DwdBoundRelevantApp、DimApp和DwsBoundOrgSortDay。其中DimApp只需启动一次完成维度数据更新即可。
2.修改kafka分区
再从kafka读取数据时,应该保证kafka有4个分区,不然聚合无法成功。
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_bound_sort --partitions 4
3.ck建表
1.建库
DROP DATABASE IF EXISTS tms_realtime;
CREATE DATABASE IF NOT EXISTS tms_realtime;
USE tms_realtime;
2.建表
DROP TABLE IF EXISTS dws_bound_org_sort_day_base;
CREATE TABLE IF NOT EXISTS dws_bound_org_sort_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`province_id` String COMMENT '省份ID',`province_name` String COMMENT '省份名称',`sort_count_base` UInt64 COMMENT '分拣次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);
3.物化视图
DROP VIEW IF EXISTS dws_bound_org_sort_day;
CREATE MATERIALIZED VIEW IF NOT EXISTS dws_bound_org_sort_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `province_id` String, `province_name` String, `sort_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(sort_count_base, ts) AS sort_count
FROM dws_bound_org_sort_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;
4.查看结果
当运行程序后,开始生成数据,等待执行完成之后,可以在ck中使用如下代码查看。
clickhouse-client -m
-m 参数代表可以使用回车。
SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(sort_count) AS sort_count
FROM dws_bound_org_sort_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;
总结
至此,Dws的部分搭建就结束了,为了方便进行文件管理,我把项目开源到了github上。
项目地址:https://github.com/lcc-666/tms-parent