Flink 实时数仓(九)【DWS 层搭建(三)交易域汇总表创建】

前言

        今天立秋,任务是完成 DWS 剩余的表,不知道今天能不能做完,欲速则不达,学不完就明天继续,尽量搞懂每一个需求;

1、交易域下单各窗口汇总表

任务:从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数新增下单用户数,封装为实体类,写入 ClickHouse。

1.1、思路分析

首先,现在 DWS 这张表依赖于下单事务事实表,而下单事务事实表又依赖于订单预处理表,订单预处理表正是我们所讨论的会发生数据迟到的表(因为订单明细活动表和订单明细优惠券表要和主表进行 left join);

这个需求和昨天的交易域支付各窗口汇总表几乎一样,只不过这次我们不再对数据做去重了,因为上游订单明细活动表和订单明细优惠券表 left join 迟到不会影响我们表中的字段数据(表中字段的数据全部来自订单明细),所以下面我们在对 order_detail_id 分组后,我们只需要等到第一条数据来(不管乱序不乱序,因为我们要的字段在主表里,而撤回流写入 kafka 的null我们可以在分组前转 json 流的时候直接过滤掉),后面的数据直接丢弃即可(都是 left join 活动和优惠券的数据,不需要);

1.2、代码实现

1.2.1、创建 ck 表及 Bean

​
create table if not exists dws_trade_order_window
(stt                          DateTime,edt                          DateTime,order_unique_user_count      UInt64,order_new_user_count         UInt64,order_activity_reduce_amount Decimal(38, 20),order_coupon_reduce_amount   Decimal(38, 20),order_origin_total_amount    Decimal(38, 20),ts                           UInt64
) engine = ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt);​

 表中的 order_activity_reduce_amount 和 order_coupon_reduce_amount 字段并不需要订单明细活动表和订单明细优惠券表参与,因为 order_detail 和 order_info 表中都有关于优惠券和活动减免金额的字段;所以我们不需要考虑去重,因为对于这个需求,并不需要担心因为 订单明细活动表和订单明细优惠券表 的 left join 数据迟到,它俩迟到对这张表字段并不影响;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;@Data
@AllArgsConstructor
@Builder
public class TradeOrderBean {// 窗口起始时间String stt;// 窗口关闭时间String edt;// 下单独立用户数Long orderUniqueUserCount;// 下单新用户数Long orderNewUserCount;// 下单活动减免金额Double orderActivityReduceAmount;// 下单优惠券减免金额Double orderCouponReduceAmount;// 下单原始金额Double orderOriginalTotalAmount;// 时间戳Long ts;
}

1.2.2、读取下单事务事实表并转为 JSON 流

// TODO 2. 读取 kafka dwd_trade_order_detailString groupId = "dws_trade_order_window";DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));// TODO 3. 转为 JSON 对象SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);out.collect(jsonObject);} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});

1.2.3、分组第一次去重

去重一共分为两步,第一步按照最细粒度 order_detail_id 进行,但是我们要求的是用户数,所以其实这里这一步只是为了多练习一下数据有撤回流时数据重复以及完整性的问题;

这里用的富函数版本的 filter 算子,因为涉及状态、ttl(上下午)等

// TODO 4. 第一次去重(根据 order_detail_id 进行分组)KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));// TODO 5. 针对 order_detail_id 进行去重(保留第一条数据即可,因为要使用状态编程所以使用Rich)SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {// 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)private ValueState<String> state;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);stateDescriptor.enableTimeToLive(ttlConfig);state = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {String data = state.value();if (data == null) {state.update("1"); //  随便存就行return true;}return false;}});

1.2.4、提取事件时间生成水位线

这里依然选择提取 create_time 字段,更贴近事件时间;

这里的定时器可以设置 5s 或者稍微久一点,因为我们上游订单预处理表在生成的过程中需要 join,我们当时给了 5s,也就是说我们能保证 5s 数据就能完整,所以这里的状态保存 5 s我们就能保证数据肯定是完整的,订单明细活动表和订单明细优惠券表的 left join 肯定已经完成了,不会有迟到数据了。而事实上,当第一条数据来的时候我们就会输出,因为我们所需要的字段数据是在 order_detail 中的,是订单预处理表的主表(订单明细表),而我们这里的下单事务事实表又是直接依赖于订单预处理表,所以订单明细活动表和订单明细优惠券表 的 left join 即使迟到,也不会受影响。

// TODO 6. 提取事件时间生成 watermarkSingleOutputStreamOperator<JSONObject> jsonWithWmDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return DateFormatUtil.toTs(element.getString("create_time"));}}));

1.2.5、按照 uid 分组第二次去重

// TODO 7. 按照 user_id 分组KeyedStream<JSONObject, String> keyedByUidDS = jsonWithWmDS.keyBy(json -> json.getString("user_id"));// TODO 8. 提取独立下单用户SingleOutputStreamOperator<TradeOrderBean> tradeOrderDS = keyedByUidDS.map(new RichMapFunction<JSONObject, TradeOrderBean>() {private ValueState<String> lastOrderDtState;@Overridepublic void open(Configuration parameters) throws Exception {lastOrderDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-order-dt", String.class));}@Overridepublic TradeOrderBean map(JSONObject value) throws Exception {String lastOrderDt = lastOrderDtState.value();String curDt = value.getString("create_time").split(" ")[0];// 下单独立用户数long orderUniqueUserCount = 0;// 下单新用户数long orderNewUserCount = 0;if (lastOrderDt == null) {orderUniqueUserCount = 1L;orderNewUserCount = 1L;lastOrderDtState.update(curDt);} else if (!lastOrderDt.equals(curDt)) {orderUniqueUserCount = 1L;lastOrderDtState.update(curDt);}// 取出下单件数和单价Integer sku_num = value.getInteger("sku_num");Double order_price = value.getDouble("order_price");return new TradeOrderBean("", "",orderUniqueUserCount,orderNewUserCount,value.getDouble("split_activity_amount"),value.getDouble("split_coupon_amount"),sku_num * order_price,null); // ts 后面开窗的时候都会给当前时间,到时候再补充}});

1.2.6、开窗聚合写出到 clickhouse

开窗为了时效性,聚合是因为我们的指标是一天的统计量,所以对一天的数据需要进行聚合;同样使用增量聚合函数 + 全量聚合函数来补全字段;

// TODO 9. 开窗聚合SingleOutputStreamOperator<TradeOrderBean> resultDS = tradeOrderDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TradeOrderBean>() {@Overridepublic TradeOrderBean reduce(TradeOrderBean value1, TradeOrderBean value2) throws Exception {value1.setOrderUniqueUserCount(value1.getOrderUniqueUserCount() + value2.getOrderUniqueUserCount());value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());value1.setOrderOriginalTotalAmount(value1.getOrderOriginalTotalAmount() + value2.getOrderOriginalTotalAmount());value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());value1.setOrderCouponReduceAmount(value1.getOrderCouponReduceAmount() + value2.getOrderCouponReduceAmount());value1.setOrderActivityReduceAmount(value1.getOrderActivityReduceAmount() + value2.getOrderActivityReduceAmount());return value1;}}, new AllWindowFunction<TradeOrderBean, TradeOrderBean, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<TradeOrderBean> values, Collector<TradeOrderBean> out) throws Exception {TradeOrderBean next = values.iterator().next();next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setTs(System.currentTimeMillis());out.collect(next);}});// TODO 10. 写入 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_order_window values(?,?,?,?,?,?,?,?)"));// TODO 11. 启动任务env.execute("DwsTradeOrderWindow");

2、交易域用户-SPU粒度下单各窗口汇总表

        这应该是目前为止最难的一个需求了,从 Kafka 订单明细主题读取数据(dwd_trade_order_detail),过滤 null 数据(因为下单事务事实表依赖于的订单预处理表,而这张表的生成需要 left join 活动和优惠券表,所以会形成回撤流)并按照唯一键对数据去重,关联维度信息(因为 order_info 和 order_detail 中都没有 spu 信息),按照维度(user,spu,trademark,category)分组,统计各维度各窗口的订单数和订单金额,将数据写入 ClickHouse 交易域品牌-品类-用户-SPU粒度下单各窗口汇总表;

2.1、需求分析

        可以看到,我们的建表语句中不仅保留了我们需求中要求的用户和sku_id,我们还同时保留了品牌、品类等字段,为的是之后可以扩展更多的需求而不用再去创建;其实我们设置可以将这个需求的粒度再做小一点,做到用户-sku粒度,同时保留 spu信息,这样我们就可以实现更多细粒度的需求了;

        我们需要关联 6 张维表:首先根据 sku_id 去读取 sku_info ,得到该 sku_id 的 spu_id、3个category_id以及trademark_id ,然后再去分别关联这 5 张表(通过 category3 去关联 category2,再用category2去获取category1);

create table if not exists dws_trade_user_spu_order_window
(stt            DateTime,edt            DateTime,trademark_id   String,trademark_name String,category1_id   String,category1_name String,category2_id   String,category2_name String,category3_id   String,category3_name String,user_id        String,spu_id         String,spu_name       String,order_count    UInt64,order_amount   Decimal(38, 20),ts             UInt64
) engine = ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt, spu_id, spu_name, user_id);

        我们在创建该表的 Java Bean 的时候需要额外补充一些字段——sku_id 和 一个 Set 类型的 orderset,因为没有 sku_id 字段就无法关联任一维表,而 order_set 是我们将来做去重的重要依据;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;import java.util.Set;@Data
@AllArgsConstructor
@Builder
public class TradeUserSpuOrderBean {// 窗口起始时间String stt;// 窗口结束时间String edt;// 品牌 IDString trademarkId;// 品牌名称String trademarkName;// 一级品类 IDString category1Id;// 一级品类名称String category1Name;// 二级品类 IDString category2Id;// 二级品类名称String category2Name;// 三级品类 IDString category3Id;// 三级品类名称String category3Name;// 订单 ID@TransientSinkSet<String> orderIdSet;// sku_id@TransientSinkString skuId;// 用户 IDString userId;// spu_idString spuId;// spu 名称String spuName;// 下单次数Long orderCount;// 下单金额Double orderAmount;// 时间戳Long ts;
}

在上面的 Java Bean 中我们用到了构造者模式,因为我们在把数据转为 JavaBean 类型的数据流的时候,很多字段需要关联维表才能得到,而初始化构造太烦人了,所以我们通过构造者模式来不断丰富对象的属性值; 

表中的 order_count 字段是指对某一品类商品下单的次数,也就是指用户在不同 order_id 中下单相同的 spu 数;而用户在一个 order_id 中下单的多个相同 spu 是只能计数为 1 的;

所以考虑到一个 order_id 中的 spu_id 可能重复:

 而且 sku_id 也可能发生重复(可能一个订单中的一个商品买了多件,虽然 order_id 和 sku_id 肯定相同,但是每件用了不同的券,有的平台会视为一条(sku_id相同,sku_num=2),但是有的平台可能会视为两条sku_id相同,sku_num=1的数据)。

所以我们必须做去重,那应该怎么去重?

  • 读取数据源(下单事务事实表)
  • 转为 JSON 流
  • 按照唯一键去重(除去上游表 left join 导致的回撤流的重复数据)
  • 转为 JavaBean 类型的流
  • 关联维表(只补充 tm_id,spu_id 和 category3_id)
  • 添加水位线
  • 分组(按照维度 user,spu,trademark,category)
  • 开窗
    • 创建一个 set 集合存储 order_id
  • 聚合(根据 order_id 进行去重,set 的大小就是 order_count)
  • 关联维表(补充 tm_name,spu_name,category的name 以及 category2 和 category1 的 id 和 name)

为什么我们要在开窗聚合前后去关联两次维表呢?

  • 首先,在开窗前我们只关联了分组所必须依赖的字段
  • 开窗后的字段都是一些对分组没有影响的字段,所以聚合后再去关联不仅省去了没必要的数据传输,而且聚合后的数据量更小,关联时读取 hbase 的数据量小(数据量小就减少了关联的次数)

2.2、代码实现

2.2.1、读取下单事务事实表 + 转换JSON + 分组 + 去重

这部分和上面的需求一样,直接拿来:

        // TODO 2. 读取 kafka dwd_trade_order_detailString groupId = "dws_trade_trademark_category_user_order_window";DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));// TODO 3. 转为 JSON 格式SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);out.collect(jsonObject);} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});// TODO 4. 第一次去重(根据 order_detail_id 进行分组)KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));// TODO 5. 针对 order_detail_id 去重上游 left join 导致的重复数据(保留第一条数据即可,因为要使用状态编程所以使用Rich)SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {// 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)private ValueState<String> state;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);stateDescriptor.enableTimeToLive(ttlConfig);state = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {String data = state.value();if (data == null) {state.update("1"); //  随便存就行return true;}return false;}});

2.2.2、转为 Java Bean 类型的流

        // TODO 6. 将流转为 JavaBean 类型SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuDS = filterDS.map(line -> {HashSet<String> orderIds = new HashSet<>();orderIds.add(line.getString("order_id"));return TradeUserSpuOrderBean.builder().skuId(line.getString("sku_id")).userId(line.getString("user_id")).orderAmount(line.getDouble("split_total_amount")).orderIdSet(orderIds).ts(DateFormatUtil.toTs(line.getString("create_time"), true)).build();});

2.2.3、维表查询工具类封装

之后我们要将数据流和维表进行关联,那我们就必须去 Phoenix 查询数据,所以我们这里封装一个工具类:

/*** 适用于任何 JDBC 方式访问的数据库中的任何查询语句*/
public class JdbcUtil {/*** @param connection 连接对象* @param sql sql语句* @param clz 返回类型对象* @param underScoreToCamel 是否下划线转为驼峰* @param <T> 返回类型* @return 结果集列表*/public static <T> List<T> queryList(Connection connection, String sql, Class<T> clz, boolean underScoreToCamel) throws SQLException, InstantiationException, IllegalAccessException, InvocationTargetException {// 创建集合用于存放结果ArrayList<T> result = new ArrayList<>();// 预编译 SQLPreparedStatement preparedStatement = connection.prepareStatement(sql);// 执行查询ResultSet resultSet = preparedStatement.executeQuery();// 获取查询的元数据信息ResultSetMetaData metaData = resultSet.getMetaData();int columnCount = metaData.getColumnCount();// 遍历结果集while (resultSet.next()) {T t = clz.newInstance();for (int i = 0; i < columnCount; i++) {// 获取列名和列值String columnName = metaData.getColumnName(i);Object value = resultSet.getObject(columnName);// 判断是否需要进行下划线与驼峰命名的转换if (underScoreToCamel) {columnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName.toLowerCase());}// 赋值BeanUtils.setProperty(t, columnName, value);}resultSet.close();preparedStatement.close();result.add(t);}return result;}
}
public class DimUtil {public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {// 拼接 SQLString querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";// 查询数据List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);// 返回结果return queryList.get(0);}}

现在有个问题是:Phoenix 查询数据的速度每条平均 7 ms 左右,这显然不能满足大数据的场景,关于 HBase 的查询数据流程我们很熟悉:

  • 客户端向 zookeeper 查询元数据表所在的 regionServer 地址
  • zookeeper 返回 metastore 地址
  • 客户端去 regionServer 查询出 metastore 信息并保存
  • 根据 metastore 中的信息去 regionServer 去读取数据同时缓存一份到读缓存

所以我们说 HBase 的连接是一个重量级的连接,过程很长很繁琐,那我们就得考虑怎么优化这个查询速度了;

2.2.4、旁路缓存

为了提高查询效率,我们需要对数据做一个缓存,但是并不是把所有维表缓存起来,那样就没有必要把海量数据存到 HBase 了,我们只缓存热点数据;

旁路缓存注意事项:

  • 缓存要设过期时间,不然冷数据会常驻缓存,浪费资源

  • 要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存(在写入 Phoenix 前判断 Maxwell 传过来的数据中 type 字段是否为 update,如果是删除缓存中对应的数据)

关于 Redis 的表设计:

  • 数据类型:String
  • Key:String(TableName+id)使用 String 做 key 只能存一条数据,方便管理每条数据的生命周期
public class JedisUtil {private static JedisPool jedisPool;private static void initJedisPool() {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(100);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000);poolConfig.setTestOnBorrow(true);jedisPool = new JedisPool(poolConfig, "hadoop102", 6379, 10000);}public static Jedis getJedis() {if (jedisPool == null) {initJedisPool();}// 获取Jedis客户端Jedis jedis = jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis = getJedis();String pong = jedis.ping();System.out.println(pong);}}

重写 DimUtil:

    public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {// 先查询 RedisJedis jedis = JedisUtil.getJedis();String redisKey = "DIM:"+tableName+":"+key;String dimJsonStr = jedis.get(redisKey);if (dimJsonStr != null){// 重置过期时间jedis.expire(redisKey,24 * 60 * 60);// 归还连接jedis.close();// 返回维表数据return JSONObject.parseObject(dimJsonStr);}// 拼接 SQLString querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";// 查询数据List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);// 写入到 RedisJSONObject dimInfo = queryList.get(0);jedis.set(redisKey,dimInfo.toJSONString());// 设置过期时间jedis.expire(redisKey,24 * 60 * 60);// 归还连接jedis.close();// 返回结果return dimInfo;}public static void deleteDimInfo(String tableName,String key){// 获取连接Jedis jedis = JedisUtil.getJedis();// 删除数据jedis.del("DIM"+tableName+":"+key);// 归还连接jedis.close();}

修改 DimSinkFunction 的 invoke 方法(如果是 update 数据就去删除旧的缓存) : 

    @Overridepublic void invoke(JSONObject value, Context context) throws Exception {// 获取连接DruidPooledConnection connection = druidDataSource.getConnection();// 写出数据(需要知道写出的表名、字段)String sinkTable = value.getString("sinkTable");JSONObject data = value.getJSONObject("data");// 获取数据类型String type = value.getString("type");if (type!=null && type.equals("update")) {// Phoenix 都是大写DimUtil.deleteDimInfo(sinkTable.toUpperCase(),data.getString("id"));}// 如果插入数据失败 invoke 方法抛出的 Exception 会导致程序停止PhoenixUtil.upsertValues(connection,sinkTable,data);// 归还连接connection.close();}

2.2.5、异步 IO 优化查询速度

在Flink 流处理过程中,经常需要和外部系统进行交互,如通过维度表补全事实表中的维度字段。

默认情况下,在Flink 算子中,单个并行子任务只能以同步方式与外部系统交互:将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种方式将大量时间耗费在了等待结果上。

为了提高处理效率,可以有两种思路。

(1)增加算子的并行度,但需要耗费更多的资源。

(2)异步 IO。

Flink 在1.2中引入了Async I/O,将IO操作异步化。在异步模式下,单个并行子任务可以连续发送多个请求,按照返回的先后顺序对请求进行处理,发送请求后不需要阻塞式等待,省去了大量的等待时间,大幅提高了流处理效率,解决了与外部系统交互时网络延迟成为系统瓶颈的问题。

异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,因此单个并行子任务可以连续发送多个请求,从而提高并发效率。对于涉及网络IO的操作,可以显著减少因为请求等待带来的性能损耗。

构建一个线程池工具类:

public class ThreadPoolUtil {private static ThreadPoolExecutor threadPoolExecutor;private ThreadPoolUtil(){}// 单例模式public ThreadPoolExecutor getThreadPoolExecutor() {if (threadPoolExecutor == null){synchronized (ThreadPoolUtil.class){if (threadPoolExecutor == null){threadPoolExecutor = new ThreadPoolExecutor(4,20,100,TimeUnit.SECONDS,new LinkedBlockingDeque<>());}}}return threadPoolExecutor;}
}

2.2.6、编写异步函数

这个异步函数类我们使用了泛型,为了方便扩展之后的其它需求(不同流的数据类型不一样),并且添加了 2 个抽象方法:

  • getKey:因为我们的流中的数据(读取自下单事务事实表)已经携带了读取 Phoenix 维表所需要的 id 主键,但是我们又不能在代码中写死,所以我们通过这个抽象方法来让调用者实现获取主键字段的方法
  • addAttribute():同样由于不同流的数据类型不同,我们把给流字段补充这个操作(从 hbase 读到 jsonObject 数据返回)交给调用者去完成

在异步函数中我们来创建 Phoenix 客户端线程去读取数据;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> {private DruidDataSource dataSource;private ThreadPoolExecutor threadPoolExecutor;private String tableName;public DimAsyncFunction(){}public DimAsyncFunction(String tableName) {this.tableName = tableName;}@Overridepublic void open(Configuration parameters) throws Exception {// 创建 phoenix 连接池dataSource = DruidDSUtil.createDataSource();threadPoolExecutor = ThreadPoolUtil.getThreadPoolExecutor();}@Overridepublic void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {threadPoolExecutor.execute(new Runnable() {@Overridepublic void run() {// 获取连接DruidPooledConnection connection = null;try {connection = dataSource.getConnection();// 查询维表 获取维度信息String key = getKey(input);JSONObject dimInfo = DimUtil.getDimInfo(connection, tableName, key);// 将维度信息补充至当前数据if (dimInfo != null){addAttribute(input,dimInfo);}// 归还连接connection.close();// 将结果写出resultFuture.complete(Collections.singletonList(input));} catch (Exception e) {System.out.println("关联维表失败: "+input+","+tableName);e.printStackTrace();}}});}@Overridepublic void timeout(T input, ResultFuture<T> resultFuture) throws Exception {System.out.println("timeout"+input);}public abstract String getKey(T input);public abstract void addAttribute(T pojo,JSONObject dimInfo);
}

2.2.7、使用异步IO关联 sku_info 

我们的关联分为两步:

  1. 在开窗聚合前只关联 sku_info 只补充维度字段(spu_id、user_id、category3_id、tm_id),为的是减少不必要的数据传输,而且聚合后主流数据量变小,这时候再去关联其它维表字段可以减少查询的数量
  2. 在聚合后关联 spu、tm、category3、category2、category1 补充其它字段(spu_name、tm_name、category3_name、category2_id、category2_name、category1_id、category1_name)
// TODO 7. 关联 sku_info 维表补充 spu_id、tm_id、category3_idSingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithSkuDS = AsyncDataStream.unorderedWait(tradeUserSpuDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SKU_INFO") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getSkuId();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setSpuId(dimInfo.getString("SPU_ID"));pojo.setTrademarkId(dimInfo.getString("TM_ID"));pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));}},100,TimeUnit.SECONDS);

2.2.8、提取水位线

        // TODO 8. 提取事件时间并生成 watermarkSingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithWmDS = tradeUserSpuWithSkuDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeUserSpuOrderBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TradeUserSpuOrderBean>() {@Overridepublic long extractTimestamp(TradeUserSpuOrderBean element, long recordTimestamp) {return element.getTs();}}));

2.2.9、分组开窗聚合

        按照粒度分组,之后增量聚合把相同粒度的数据的订单总数和订单金额进行累加,在全量聚合中再补充窗口起止时间、ck版本时间戳、订单总数等字段;

// TODO 9. 分组开窗聚合KeyedStream<TradeUserSpuOrderBean, Tuple4<String, String, String, String>> keyStream = tradeUserSpuWithWmDS.keyBy(line -> Tuple4.of(line.getSpuId(), line.getCategory3Id(), line.getTrademarkId(), line.getUserId()));SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceDS = keyStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TradeUserSpuOrderBean>() {@Overridepublic TradeUserSpuOrderBean reduce(TradeUserSpuOrderBean value1, TradeUserSpuOrderBean value2) throws Exception {value1.getOrderIdSet().addAll(value2.getOrderIdSet());value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());return value1;}}, new WindowFunction<TradeUserSpuOrderBean, TradeUserSpuOrderBean, Tuple4<String, String, String, String>, TimeWindow>() {@Overridepublic void apply(Tuple4<String, String, String, String> stringStringStringStringTuple4, TimeWindow window, Iterable<TradeUserSpuOrderBean> input, Collector<TradeUserSpuOrderBean> out) throws Exception {TradeUserSpuOrderBean next = input.iterator().next();next.setTs(System.currentTimeMillis());next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setOrderCount((long) next.getOrderIdSet().size());out.collect(next);}});

2.2.10、异步IO关联其它维表并写出到 ck

// TODO 10. 关联 spu、tm、category 维表的 name 字段以及 category2 和 category1 的 id 和 name 字段// TODO 10.1 关联 SpuSingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithSpuDS = AsyncDataStream.unorderedWait(reduceDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SPU_INFO") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getSpuId();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setSpuName(dimInfo.getString("SPU_NAME"));}},100, TimeUnit.SECONDS);// TODO 10.2 关联 tmSingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceWithSpuDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_TRADEMARK") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getTrademarkId();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setTrademarkName(dimInfo.getString("TM_NAME"));}},100, TimeUnit.SECONDS);// TODO 10.3 关联 category3SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY3") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getCategory3Id();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setCategory3Name(dimInfo.getString("NAME"));pojo.setCategory2Id("CATEGORY2_ID");}},100, TimeUnit.SECONDS);// TODO 10.4 关联 category2SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY2") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getCategory2Id();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setCategory2Name(dimInfo.getString("NAME"));pojo.setCategory1Id("CATEGORY1_ID");}},100, TimeUnit.SECONDS);// TODO 10.5 关联 category1SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY1") {@Overridepublic String getKey(TradeUserSpuOrderBean input) {return input.getCategory1Id();}@Overridepublic void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {pojo.setCategory1Name(dimInfo.getString("NAME"));}},100, TimeUnit.SECONDS);// TODO 11. 写出到 clickhousereduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_order_window values " +"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));// TODO 12. 启动env.execute("DwsTradeUserSpuOrderWindow");

总结

        至此,DWS 层这两张表终于完成,耗时一天,第二张表是目前为止最复杂的一张,也是 DWS 层第一次关联维表;

        DWS 层的需求只剩两个,明天应该一上午就能完成,毕竟和这个需求差不多;ADS 层就简单了,明天下午应该就能结束;

        关于异步IO,我把链接放这,之后复习再来查看:异步 I/O | Apache Flink

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/394189.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot中使用过滤器filter

过滤器Filter 在 Java 中&#xff0c;Filter&#xff08;过滤器&#xff09;是一种用于对请求进行预处理和后处理的机制。 工作原理&#xff1a; 当一个请求到达服务器时&#xff0c;会先经过一系列配置好的过滤器。过滤器可以检查请求的参数、头信息、请求体等内容&#xf…

【ARM】v8架构programmer guide(3)_ARMv8的寄存器

目录 4.ARMv8 registers 4.1 AArch64 特殊寄存器 4.1.1 Zero register 4.1.2 Stack pointer &#xff08;SP) 4.1.3 Program Counter &#xff08;PC) 4.1.4 Exception Link Register(ELR) 4.1.5 Saved Process Status Register &#xff08;SPSR&#xff09; 4.2 Proc…

PythonStudio 控件使用常用方式(十三)TScrollBox

PythonStudio是一个极强的开发Python的IDE工具&#xff0c;它使用的是Delphi的控件&#xff0c;常用的内容是与Delphi一致的。但是相关文档并一定完整。现在我试试能否逐步把它的控件常用用法写一点点&#xff0c;也作为PythonStudio的参考。 从1.2.1版开始&#xff0c;Python…

mac 2k显示器 配置

前言 今年5月份买了一个2k显示器&#xff0c;刚收到的时候发现只有一个1080 x 720&#xff08;HiDPI&#xff09;分辨率是人眼看起来比较舒服的&#xff0c;于是一直用着。但是直到开始写前端代码的时候&#xff0c;我才发现&#xff0c;网页在2k显示器和内建显示器的布局竟然…

Go语言中gin+gorm开发前端端分离博客时遇到的问题,gorm执行查询时如何选中特定字段?

代码获取 本篇文章的代码放在了Github上&#xff0c;可以免费获取。 https://github.com/zhangdapeng520/zdpgo_gin_examples 概述 在查询用户信息的时候&#xff0c;由于密码这个字段比较敏感&#xff0c;需要进行处理&#xff0c;不要返回给前端。 我一开始的解决方案是直…

SpringBoot+Mybatis 分页

无论多数据源,还是单数据源,分页都一样,刚开始出了点错,是因为PageHelper的版本问题 这里用的SpringBoot3 SpringBoot2应该是没有问题的 相关代码 dynamic-datasourceMybatis多数据源使用-CSDN博客 依赖 <?xml version"1.0" encoding"UTF-8"?&g…

CocosCreator使用 ProtoBuf WebSocket与服务器对接方法

在 Cocos Creator 中使用 .proto 文件和转换成 TypeScript&#xff08;TS&#xff09;两者各有其优缺点&#xff0c;具体选择取决于你的项目需求和团队的开发习惯。以下是两者的一些比较&#xff1a; 1、使用 .proto 文件的优点&#xff1a; 跨语言支持&#xff1a;Protocol B…

学习STM32(6)-- STM32单片机ADCDAC的应用

1 引 言 深入了解并掌握STM32F103单片机在模拟数字转换&#xff08;ADC&#xff09;和数字模拟转换&#xff08;DAC&#xff09;应用方面的功能和操作。学习如何配置STM32F103的ADC模块&#xff0c;实现模拟信号到数字信号的精确转换&#xff1b;同时&#xff0c;探索DAC模块…

著名人工智能新经济数字经济新能源新质生产力讲师培训师教授专家唐兴通分享人工智能社会学商业模式创新人工智能就业工作与教育学习出海跨境数字化转型数字营销数字销售

添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 2024 年是人工智能在工作中真正应用的一年。根据微软和领英进行的调查&#xff08;2024年5月&#xff09;&#xff0c;在过去六个月中&#xff0c;生成式人工智能的使用量几乎翻了一番&#xff0c;全球75%的…

Android RadioGroup实现多行显示,并保持单选

公司项目最近有个这样的需求&#xff0c;要求实现【多个文本&#xff0c;多行显示&#xff0c;且同时只能选中一个】。设计图效果如下&#xff1a; 看上去很简单&#xff0c;使用 RadioGroup LinearLayout RadioButton 快速实现&#xff1a; <RadioGroupandroid:id"…

[C++进阶数据结构]二叉搜索树

多态讲完了,我们来讲点轻松的(也许)。 我们之前讲过二叉树&#xff0c;而二叉树中&#xff0c;又有一种特殊的树称之为二叉搜索树。 一、二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为空…

中电信翼康济世数据中台基于Apache SeaTunnel构建数据集成平台经验分享

作者 | 中电信翼康工程师 代来 编辑 | Debra Chen 一. 引言 Apache SeaTunnel作为一个高性能、易用的数据集成框架&#xff0c;是快速落地数据集成平台的基石。本文将从数据中台战略背景、数据集成平台技术选型、降低Apache SeaTunnel使用门槛及未来展望几个方面&#xff0c…

【环绕字符串中唯一的子字符串】python刷题记录

R4-字符串 动态规划 class Solution:def findSubstringInWraproundString(self, s: str) -> int:dp[0]*26num1#dp初始化dp[ord(s[0])-ord(a)]1for c1,c2 in pairwise(s):if not (ord(c2)-ord(c1)-1)%26:num1else:num1dp[id]max(dp[id : ord(c2)-ord(a)],num)return sum(dp)p…

调用azure的npm实现outlook_api模拟查看邮件、发送邮件(实现web版接受outlook邮件第一步)

文章目录 ⭐前言⭐注册azure应用&#x1f496;添加权限 ⭐调用npm 实现收发邮件&#x1f496;安装依赖&#x1f496;创建appSettings.js 放置密钥&#x1f496;创建graphHelper.js封装功能&#x1f496;主文件index.js 对外暴露&#x1f496;效果 ⭐结束 ⭐前言 大家好&#x…

Flutter GPU 是什么?为什么它对 Flutter 有跨时代的意义?

Flutter 3.24 版本引入了 Flutter GPU 概念的新底层图形 API flutter_gpu &#xff0c;还有 flutter_scene 的 3D 渲染支持库&#xff0c;它们目前都是预览阶段&#xff0c;只能在 main channel 上体验&#xff0c;并且依赖 Impeller 的实现。 Flutter GPU 是 Flutter 内置的底…

2024最新Mysql锁机制与优化实践以及MVCC底层原理剖析

锁机制详解 锁是计算机协调多个进程或线程并发访问某一资源的机制。 在数据库中&#xff0c;除了传统的计算资源&#xff08;如CPU、RAM、I/O等&#xff09;的争用以外&#xff0c;数据也是一种供需要用户共享的资源。如何保证数据并发访问的一致性、有效性是所有数据库必须解…

3Done学习笔记

一、基本操作 1、旋转视角 使用左下角立方体选择&#xff1b; 右键可以拖动视角&#xff1b; 中间滑轮按住拖动整个舞台界面。 2、平移和旋转 右键选择移动&#xff0c;有两种方式。 第一种选择起始点&#xff0c;按照起始点位置移动到终止点。第二种直接根据轮盘旋转或…

【docker】docker容器部署常用服务

1、容器部署nginx&#xff0c;并且新增一个页面 docker run -d -p 81:80 --name nginx2 nginx docker exec -it nginx2 /bin/bashcd /usr/share/nginx/html/ echo "hello world">>hello.html2、容器部署redis&#xff0c;成功部署后向redis中添加一条数据 do…

【C/C++笔记】:易错难点3 (二叉树)

选择题 &#x1f308;eg1 一棵有15个节点的完全二叉树和一棵同样有15个节点的普通二叉树&#xff0c;叶子节点的个数最多会差多少个&#xff08;&#xff09;&#xff1f; 正确答案&#xff1a; C A. 3 B. 5 C. 7 D. 9 解析&#xff1a;普通二叉树的叶子节…

WPF学习笔记

WPF WPF&#xff08;Windows Presentation Foundation&#xff0c;Windows呈现基础&#xff09;是微软推出的基于Windows 的用户界面框架&#xff0c;属于.NET Framework 3.0的一部分。它提供了统一的编程模型、语言和框架&#xff0c;真正做到了分离界面设计人员与开发人员的…