前言
今天立秋,任务是完成 DWS 剩余的表,不知道今天能不能做完,欲速则不达,学不完就明天继续,尽量搞懂每一个需求;
1、交易域下单各窗口汇总表
任务:从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数和新增下单用户数,封装为实体类,写入 ClickHouse。
1.1、思路分析
首先,现在 DWS 这张表依赖于下单事务事实表,而下单事务事实表又依赖于订单预处理表,订单预处理表正是我们所讨论的会发生数据迟到的表(因为订单明细活动表和订单明细优惠券表要和主表进行 left join);
这个需求和昨天的交易域支付各窗口汇总表几乎一样,只不过这次我们不再对数据做去重了,因为上游订单明细活动表和订单明细优惠券表 left join 迟到不会影响我们表中的字段数据(表中字段的数据全部来自订单明细),所以下面我们在对 order_detail_id 分组后,我们只需要等到第一条数据来(不管乱序不乱序,因为我们要的字段在主表里,而撤回流写入 kafka 的null我们可以在分组前转 json 流的时候直接过滤掉),后面的数据直接丢弃即可(都是 left join 活动和优惠券的数据,不需要);
1.2、代码实现
1.2.1、创建 ck 表及 Bean
create table if not exists dws_trade_order_window
(stt DateTime,edt DateTime,order_unique_user_count UInt64,order_new_user_count UInt64,order_activity_reduce_amount Decimal(38, 20),order_coupon_reduce_amount Decimal(38, 20),order_origin_total_amount Decimal(38, 20),ts UInt64
) engine = ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt);
表中的 order_activity_reduce_amount 和 order_coupon_reduce_amount 字段并不需要订单明细活动表和订单明细优惠券表参与,因为 order_detail 和 order_info 表中都有关于优惠券和活动减免金额的字段;所以我们不需要考虑去重,因为对于这个需求,并不需要担心因为 订单明细活动表和订单明细优惠券表 的 left join 数据迟到,它俩迟到对这张表字段并不影响;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;@Data
@AllArgsConstructor
@Builder
public class TradeOrderBean {// 窗口起始时间String stt;// 窗口关闭时间String edt;// 下单独立用户数Long orderUniqueUserCount;// 下单新用户数Long orderNewUserCount;// 下单活动减免金额Double orderActivityReduceAmount;// 下单优惠券减免金额Double orderCouponReduceAmount;// 下单原始金额Double orderOriginalTotalAmount;// 时间戳Long ts;
}
1.2.2、读取下单事务事实表并转为 JSON 流
// TODO 2. 读取 kafka dwd_trade_order_detailString groupId = "dws_trade_order_window";DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));// TODO 3. 转为 JSON 对象SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);out.collect(jsonObject);} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});
1.2.3、分组第一次去重
去重一共分为两步,第一步按照最细粒度 order_detail_id 进行,但是我们要求的是用户数,所以其实这里这一步只是为了多练习一下数据有撤回流时数据重复以及完整性的问题;
这里用的富函数版本的 filter 算子,因为涉及状态、ttl(上下午)等
// TODO 4. 第一次去重(根据 order_detail_id 进行分组)KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));// TODO 5. 针对 order_detail_id 进行去重(保留第一条数据即可,因为要使用状态编程所以使用Rich)SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {// 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)private ValueState<String> state;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);stateDescriptor.enableTimeToLive(ttlConfig);state = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {String data = state.value();if (data == null) {state.update("1"); // 随便存就行return true;}return false;}});
1.2.4、提取事件时间生成水位线
这里依然选择提取 create_time 字段,更贴近事件时间;
这里的定时器可以设置 5s 或者稍微久一点,因为我们上游订单预处理表在生成的过程中需要 join,我们当时给了 5s,也就是说我们能保证 5s 数据就能完整,所以这里的状态保存 5 s我们就能保证数据肯定是完整的,订单明细活动表和订单明细优惠券表的 left join 肯定已经完成了,不会有迟到数据了。而事实上,当第一条数据来的时候我们就会输出,因为我们所需要的字段数据是在 order_detail 中的,是订单预处理表的主表(订单明细表),而我们这里的下单事务事实表又是直接依赖于订单预处理表,所以订单明细活动表和订单明细优惠券表 的 left join 即使迟到,也不会受影响。
// TODO 6. 提取事件时间生成 watermarkSingleOutputStreamOperator<JSONObject> jsonWithWmDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return DateFormatUtil.toTs(element.getString("create_time"));}}));
1.2.5、按照 uid 分组第二次去重
// TODO 7. 按照 user_id 分组KeyedStream<JSONObject, String> keyedByUidDS = jsonWithWmDS.keyBy(json -> json.getString("user_id"));// TODO 8. 提取独立下单用户SingleOutputStreamOperator<TradeOrderBean> tradeOrderDS = keyedByUidDS.map(new RichMapFunction<JSONObject, TradeOrderBean>() {private ValueState<String> lastOrderDtState;@Overridepublic void open(Configuration parameters) throws Exception {lastOrderDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-order-dt", String.class));}@Overridepublic TradeOrderBean map(JSONObject value) throws Exception {String lastOrderDt = lastOrderDtState.value();String curDt = value.getString("create_time").split(" ")[0];// 下单独立用户数long orderUniqueUserCount = 0;// 下单新用户数long orderNewUserCount = 0;if (lastOrderDt == null) {orderUniqueUserCount = 1L;orderNewUserCount = 1L;lastOrderDtState.update(curDt);} else if (!lastOrderDt.equals(curDt)) {orderUniqueUserCount = 1L;lastOrderDtState.update(curDt);}// 取出下单件数和单价Integer sku_num = value.getInteger("sku_num");Double order_price = value.getDouble("order_price");return new TradeOrderBean("", "",orderUniqueUserCount,orderNewUserCount,value.getDouble("split_activity_amount"),value.getDouble("split_coupon_amount"),sku_num * order_price,null); // ts 后面开窗的时候都会给当前时间,到时候再补充}});
1.2.6、开窗聚合写出到 clickhouse
开窗为了时效性,聚合是因为我们的指标是一天的统计量,所以对一天的数据需要进行聚合;同样使用增量聚合函数 + 全量聚合函数来补全字段;
// TODO 9. 开窗聚合SingleOutputStreamOperator<TradeOrderBean> resultDS = tradeOrderDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TradeOrderBean>() {@Overridepublic TradeOrderBean reduce(TradeOrderBean value1, TradeOrderBean value2) throws Exception {value1.setOrderUniqueUserCount(value1.getOrderUniqueUserCount() + value2.getOrderUniqueUserCount());value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());value1.setOrderOriginalTotalAmount(value1.getOrderOriginalTotalAmount() + value2.getOrderOriginalTotalAmount());value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());value1.setOrderCouponReduceAmount(value1.getOrderCouponReduceAmount() + value2.getOrderCouponReduceAmount());value1.setOrderActivityReduceAmount(value1.getOrderActivityReduceAmount() + value2.getOrderActivityReduceAmount());return value1;}}, new AllWindowFunction<TradeOrderBean, TradeOrderBean, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<TradeOrderBean> values, Collector<TradeOrderBean> out) throws Exception {TradeOrderBean next = values.iterator().next();next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setTs(System.currentTimeMillis());out.collect(next);}});// TODO 10. 写入 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_order_window values(?,?,?,?,?,?,?,?)"));// TODO 11. 启动任务env.execute("DwsTradeOrderWindow");
2、交易域用户-SPU粒度下单各窗口汇总表
这应该是目前为止最难的一个需求了,从 Kafka 订单明细主题读取数据(dwd_trade_order_detail),过滤 null 数据(因为下单事务事实表依赖于的订单预处理表,而这张表的生成需要 left join 活动和优惠券表,所以会形成回撤流)并按照唯一键对数据去重,关联维度信息(因为 order_info 和 order_detail 中都没有 spu 信息),按照维度(user,spu,trademark,category)分组,统计各维度各窗口的订单数和订单金额,将数据写入 ClickHouse 交易域品牌-品类-用户-SPU粒度下单各窗口汇总表;
2.1、需求分析
可以看到,我们的建表语句中不仅保留了我们需求中要求的用户和sku_id,我们还同时保留了品牌、品类等字段,为的是之后可以扩展更多的需求而不用再去创建;其实我们设置可以将这个需求的粒度再做小一点,做到用户-sku粒度,同时保留 spu信息,这样我们就可以实现更多细粒度的需求了;
我们需要关联 6 张维表:首先根据 sku_id 去读取 sku_info ,得到该 sku_id 的 spu_id、3个category_id以及trademark_id ,然后再去分别关联这 5 张表(通过 category3 去关联 category2,再用category2去获取category1);
create table if not exists dws_trade_user_spu_order_window
(stt DateTime,edt DateTime,trademark_id String,trademark_name String,category1_id String,category1_name String,category2_id String,category2_name String,category3_id String,category3_name String,user_id String,spu_id String,spu_name String,order_count UInt64,order_amount Decimal(38, 20),ts UInt64
) engine = ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt, spu_id, spu_name, user_id);
我们在创建该表的 Java Bean 的时候需要额外补充一些字段——sku_id 和 一个 Set 类型的 orderset,因为没有 sku_id 字段就无法关联任一维表,而 order_set 是我们将来做去重的重要依据;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;import java.util.Set;@Data
@AllArgsConstructor
@Builder
public class TradeUserSpuOrderBean {// 窗口起始时间String stt;// 窗口结束时间String edt;// 品牌 IDString trademarkId;// 品牌名称String trademarkName;// 一级品类 IDString category1Id;// 一级品类名称String category1Name;// 二级品类 IDString category2Id;// 二级品类名称String category2Name;// 三级品类 IDString category3Id;// 三级品类名称String category3Name;// 订单 ID@TransientSinkSet<String> orderIdSet;// sku_id@TransientSinkString skuId;// 用户 IDString userId;// spu_idString spuId;// spu 名称String spuName;// 下单次数Long orderCount;// 下单金额Double orderAmount;// 时间戳Long ts;
}
在上面的 Java Bean 中我们用到了构造者模式,因为我们在把数据转为 JavaBean 类型的数据流的时候,很多字段需要关联维表才能得到,而初始化构造太烦人了,所以我们通过构造者模式来不断丰富对象的属性值;
表中的 order_count 字段是指对某一品类商品下单的次数,也就是指用户在不同 order_id 中下单相同的 spu 数;而用户在一个 order_id 中下单的多个相同 spu 是只能计数为 1 的;
所以考虑到一个 order_id 中的 spu_id 可能重复:
而且 sku_id 也可能发生重复(可能一个订单中的一个商品买了多件,虽然 order_id 和 sku_id 肯定相同,但是每件用了不同的券,有的平台会视为一条(sku_id相同,sku_num=2),但是有的平台可能会视为两条sku_id相同,sku_num=1的数据)。
所以我们必须做去重,那应该怎么去重?
- 读取数据源(下单事务事实表)
- 转为 JSON 流
- 按照唯一键去重(除去上游表 left join 导致的回撤流的重复数据)
- 转为 JavaBean 类型的流
- 关联维表(只补充 tm_id,spu_id 和 category3_id)
- 添加水位线
- 分组(按照维度 user,spu,trademark,category)
- 开窗
- 创建一个 set 集合存储 order_id
- 聚合(根据 order_id 进行去重,set 的大小就是 order_count)
- 关联维表(补充 tm_name,spu_name,category的name 以及 category2 和 category1 的 id 和 name)
为什么我们要在开窗聚合前后去关联两次维表呢?
- 首先,在开窗前我们只关联了分组所必须依赖的字段
- 开窗后的字段都是一些对分组没有影响的字段,所以聚合后再去关联不仅省去了没必要的数据传输,而且聚合后的数据量更小,关联时读取 hbase 的数据量小(数据量小就减少了关联的次数)
2.2、代码实现
2.2.1、读取下单事务事实表 + 转换JSON + 分组 + 去重
这部分和上面的需求一样,直接拿来:
// TODO 2. 读取 kafka dwd_trade_order_detailString groupId = "dws_trade_trademark_category_user_order_window";DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));// TODO 3. 转为 JSON 格式SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);out.collect(jsonObject);} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});// TODO 4. 第一次去重(根据 order_detail_id 进行分组)KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));// TODO 5. 针对 order_detail_id 去重上游 left join 导致的重复数据(保留第一条数据即可,因为要使用状态编程所以使用Rich)SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {// 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)private ValueState<String> state;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);stateDescriptor.enableTimeToLive(ttlConfig);state = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {String data = state.value();if (data == null) {state.update("1"); // 随便存就行return true;}return false;}});
2.2.2、转为 Java Bean 类型的流
// TODO 6. 将流转为 JavaBean 类型SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuDS = filterDS.map(line -> {HashSet<String> orderIds = new HashSet<>();orderIds.add(line.getString("order_id"));return TradeUserSpuOrderBean.builder().skuId(line.getString("sku_id")).userId(line.getString("user_id")).orderAmount(line.getDouble("split_total_amount")).orderIdSet(orderIds).ts(DateFormatUtil.toTs(line.getString("create_time"), true)).build();});
2.2.3、维表查询工具类封装
之后我们要将数据流和维表进行关联,那我们就必须去 Phoenix 查询数据,所以我们这里封装一个工具类:
/*** 适用于任何 JDBC 方式访问的数据库中的任何查询语句*/
public class JdbcUtil {/*** @param connection 连接对象* @param sql sql语句* @param clz 返回类型对象* @param underScoreToCamel 是否下划线转为驼峰* @param <T> 返回类型* @return 结果集列表*/public static <T> List<T> queryList(Connection connection, String sql, Class<T> clz, boolean underScoreToCamel) throws SQLException, InstantiationException, IllegalAccessException, InvocationTargetException {// 创建集合用于存放结果ArrayList<T> result = new ArrayList<>();// 预编译 SQLPreparedStatement preparedStatement = connection.prepareStatement(sql);// 执行查询ResultSet resultSet = preparedStatement.executeQuery();// 获取查询的元数据信息ResultSetMetaData metaData = resultSet.getMetaData();int columnCount = metaData.getColumnCount();// 遍历结果集while (resultSet.next()) {T t = clz.newInstance();for (int i = 0; i < columnCount; i++) {// 获取列名和列值String columnName = metaData.getColumnName(i);Object value = resultSet.getObject(columnName);// 判断是否需要进行下划线与驼峰命名的转换if (underScoreToCamel) {columnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName.toLowerCase());}// 赋值BeanUtils.setProperty(t, columnName, value);}resultSet.close();preparedStatement.close();result.add(t);}return result;}
}
public class DimUtil {public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {// 拼接 SQLString querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";// 查询数据List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);// 返回结果return queryList.get(0);}}
现在有个问题是:Phoenix 查询数据的速度每条平均 7 ms 左右,这显然不能满足大数据的场景,关于 HBase 的查询数据流程我们很熟悉:
- 客户端向 zookeeper 查询元数据表所在的 regionServer 地址
- zookeeper 返回 metastore 地址
- 客户端去 regionServer 查询出 metastore 信息并保存
- 根据 metastore 中的信息去 regionServer 去读取数据同时缓存一份到读缓存
所以我们说 HBase 的连接是一个重量级的连接,过程很长很繁琐,那我们就得考虑怎么优化这个查询速度了;
2.2.4、旁路缓存
为了提高查询效率,我们需要对数据做一个缓存,但是并不是把所有维表缓存起来,那样就没有必要把海量数据存到 HBase 了,我们只缓存热点数据;
旁路缓存注意事项:
-
缓存要设过期时间,不然冷数据会常驻缓存,浪费资源
-
要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存(在写入 Phoenix 前判断 Maxwell 传过来的数据中 type 字段是否为 update,如果是删除缓存中对应的数据)
关于 Redis 的表设计:
- 数据类型:String
- Key:String(TableName+id)使用 String 做 key 只能存一条数据,方便管理每条数据的生命周期
public class JedisUtil {private static JedisPool jedisPool;private static void initJedisPool() {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(100);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000);poolConfig.setTestOnBorrow(true);jedisPool = new JedisPool(poolConfig, "hadoop102", 6379, 10000);}public static Jedis getJedis() {if (jedisPool == null) {initJedisPool();}// 获取Jedis客户端Jedis jedis = jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis = getJedis();String pong = jedis.ping();System.out.println(pong);}}
重写 DimUtil:
public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {// 先查询 RedisJedis jedis = JedisUtil.getJedis();String redisKey = "DIM:"+tableName+":"+key;String dimJsonStr = jedis.get(redisKey);if (dimJsonStr != null){// 重置过期时间jedis.expire(redisKey,24 * 60 * 60);// 归还连接jedis.close();// 返回维表数据return JSONObject.parseObject(dimJsonStr);}// 拼接 SQLString querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";// 查询数据List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);// 写入到 RedisJSONObject dimInfo = queryList.get(0);jedis.set(redisKey,dimInfo.toJSONString());// 设置过期时间jedis.expire(redisKey,24 * 60 * 60);// 归还连接jedis.close();// 返回结果return dimInfo;}public static void deleteDimInfo(String tableName,String key){// 获取连接Jedis jedis = JedisUtil.getJedis();// 删除数据jedis.del("DIM"+tableName+":"+key);// 归还连接jedis.close();}
修改 DimSinkFunction 的 invoke 方法(如果是 update 数据就去删除旧的缓存) :
@Overridepublic void invoke(JSONObject value, Context context) throws Exception {// 获取连接DruidPooledConnection connection = druidDataSource.getConnection();// 写出数据(需要知道写出的表名、字段)String sinkTable = value.getString("sinkTable");JSONObject data = value.getJSONObject("data");// 获取数据类型String type = value.getString("type");if (type!=null && type.equals("update")) {// Phoenix 都是大写DimUtil.deleteDimInfo(sinkTable.toUpperCase(),data.getString("id"));}// 如果插入数据失败 invoke 方法抛出的 Exception 会导致程序停止PhoenixUtil.upsertValues(connection,sinkTable,data);// 归还连接connection.close();}
2.2.5、异步 IO 优化查询速度
在Flink 流处理过程中,经常需要和外部系统进行交互,如通过维度表补全事实表中的维度字段。
默认情况下,在Flink 算子中,单个并行子任务只能以同步方式与外部系统交互:将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种方式将大量时间耗费在了等待结果上。
为了提高处理效率,可以有两种思路。
(1)增加算子的并行度,但需要耗费更多的资源。
(2)异步 IO。
Flink 在1.2中引入了Async I/O,将IO操作异步化。在异步模式下,单个并行子任务可以连续发送多个请求,按照返回的先后顺序对请求进行处理,发送请求后不需要阻塞式等待,省去了大量的等待时间,大幅提高了流处理效率,解决了与外部系统交互时网络延迟成为系统瓶颈的问题。
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,因此单个并行子任务可以连续发送多个请求,从而提高并发效率。对于涉及网络IO的操作,可以显著减少因为请求等待带来的性能损耗。
构建一个线程池工具类:
public class ThreadPoolUtil {private static ThreadPoolExecutor threadPoolExecutor;private ThreadPoolUtil(){}// 单例模式public ThreadPoolExecutor getThreadPoolExecutor() {if (threadPoolExecutor == null){synchronized (ThreadPoolUtil.class){if (threadPoolExecutor == null){threadPoolExecutor = new ThreadPoolExecutor(4,20,100,TimeUnit.SECONDS,new LinkedBlockingDeque<>());}}}return threadPoolExecutor;}
}
2.2.6、编写异步函数
这个异步函数类我们使用了泛型,为了方便扩展之后的其它需求(不同流的数据类型不一样),并且添加了 2 个抽象方法:
- getKey:因为我们的流中的数据(读取自下单事务事实表)已经携带了读取 Phoenix 维表所需要的 id 主键,但是我们又不能在代码中写死,所以我们通过这个抽象方法来让调用者实现获取主键字段的方法
- addAttribute():同样由于不同流的数据类型不同,我们把给流字段补充这个操作(从 hbase 读到 jsonObject 数据返回)交给调用者去完成
在异步函数中我们来创建 Phoenix 客户端线程去读取数据;
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> {private DruidDataSource dataSource;private ThreadPoolExecutor threadPoolExecutor;private String tableName;public DimAsyncFunction(){}public DimAsyncFunction(String tableName) {this.tableName = tableName;}@Overridepublic void open(Configuration parameters) throws Exception {// 创建 phoenix 连接池dataSource = DruidDSUtil.createDataSource();threadPoolExecutor = ThreadPoolUtil.getThreadPoolExecutor();}@Overridepublic void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {threadPoolExecutor.execute(new Runnable() {@Overridepublic void run() {// 获取连接DruidPooledConnection connection = null;try {connection = dataSource.getConnection();// 查询维表 获取维度信息String key = getKey(input);JSONObject dimInfo = DimUtil.getDimInfo(connection, tableName, key);// 将维度信息补充至当前数据if (dimInfo != null){addAttribute(input,dimInfo);}// 归还连接connection.close();// 将结果写出resultFuture.complete(Collections.singletonList(input));} catch (Exception e) {System.out.println("关联维表失败: "+input+","+tableName);e.printStackTrace();}}});}@Overridepublic void timeout(T input, ResultFuture<T> resultFuture) throws Exception {System.out.println("timeout"+input);}public abstract String getKey(T input);public abstract void addAttribute(T pojo,JSONObject dimInfo);
}
2.2.7、使用异步IO关联 sku_info
我们的关联分为两步:
- 在开窗聚合前只关联 sku_info 只补充维度字段(spu_id、user_id、category3_id、tm_id),为的是减少不必要的数据传输,而且聚合后主流数据量变小,这时候再去关联其它维表字段可以减少查询的数量
- 在聚合后关联 spu、tm、category3、category2、category1 补充其它字段(spu_name、tm_name、category3_name、category2_id、category2_name、category1_id、category1_name)
// TODO 7. 关联 sku_info 维表补充 spu_id、tm_id、category3_idSingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithSkuDS = AsyncDataStream.unorderedWait(tradeUserSpuDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SKU_INFO") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getSkuId();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setSpuId(dimInfo.getString("SPU_ID"));pojo.setTrademarkId(dimInfo.getString("TM_ID"));pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));}},100,TimeUnit.SECONDS);
2.2.8、提取水位线
// TODO 8. 提取事件时间并生成 watermarkSingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithWmDS = tradeUserSpuWithSkuDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeUserSpuOrderBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TradeUserSpuOrderBean>() {@Overridepublic long extractTimestamp(TradeUserSpuOrderBean element, long recordTimestamp) {return element.getTs();}}));
2.2.9、分组开窗聚合
按照粒度分组,之后增量聚合把相同粒度的数据的订单总数和订单金额进行累加,在全量聚合中再补充窗口起止时间、ck版本时间戳、订单总数等字段;
// TODO 9. 分组开窗聚合KeyedStream<TradeUserSpuOrderBean, Tuple4<String, String, String, String>> keyStream = tradeUserSpuWithWmDS.keyBy(line -> Tuple4.of(line.getSpuId(), line.getCategory3Id(), line.getTrademarkId(), line.getUserId()));SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceDS = keyStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TradeUserSpuOrderBean>() {@Overridepublic TradeUserSpuOrderBean reduce(TradeUserSpuOrderBean value1, TradeUserSpuOrderBean value2) throws Exception {value1.getOrderIdSet().addAll(value2.getOrderIdSet());value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());return value1;}}, new WindowFunction<TradeUserSpuOrderBean, TradeUserSpuOrderBean, Tuple4<String, String, String, String>, TimeWindow>() {@Overridepublic void apply(Tuple4<String, String, String, String> stringStringStringStringTuple4, TimeWindow window, Iterable<TradeUserSpuOrderBean> input, Collector<TradeUserSpuOrderBean> out) throws Exception {TradeUserSpuOrderBean next = input.iterator().next();next.setTs(System.currentTimeMillis());next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setOrderCount((long) next.getOrderIdSet().size());out.collect(next);}});
2.2.10、异步IO关联其它维表并写出到 ck
// TODO 10. 关联 spu、tm、category 维表的 name 字段以及 category2 和 category1 的 id 和 name 字段// TODO 10.1 关联 SpuSingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithSpuDS = AsyncDataStream.unorderedWait(reduceDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SPU_INFO") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getSpuId();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setSpuName(dimInfo.getString("SPU_NAME"));}},100, TimeUnit.SECONDS);// TODO 10.2 关联 tmSingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceWithSpuDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_TRADEMARK") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getTrademarkId();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setTrademarkName(dimInfo.getString("TM_NAME"));}},100, TimeUnit.SECONDS);// TODO 10.3 关联 category3SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY3") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getCategory3Id();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setCategory3Name(dimInfo.getString("NAME"));pojo.setCategory2Id("CATEGORY2_ID");}},100, TimeUnit.SECONDS);// TODO 10.4 关联 category2SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY2") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getCategory2Id();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setCategory2Name(dimInfo.getString("NAME"));pojo.setCategory1Id("CATEGORY1_ID");}},100, TimeUnit.SECONDS);// TODO 10.5 关联 category1SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY1") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getCategory1Id();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setCategory1Name(dimInfo.getString("NAME"));}},100, TimeUnit.SECONDS);// TODO 11. 写出到 clickhousereduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_order_window values " +"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));// TODO 12. 启动env.execute("DwsTradeUserSpuOrderWindow");
总结
至此,DWS 层这两张表终于完成,耗时一天,第二张表是目前为止最复杂的一张,也是 DWS 层第一次关联维表;
DWS 层的需求只剩两个,明天应该一上午就能完成,毕竟和这个需求差不多;ADS 层就简单了,明天下午应该就能结束;
关于异步IO,我把链接放这,之后复习再来查看:异步 I/O | Apache Flink