Flink实时数仓(六)【DWD 层搭建(四)交易域、互动域、用户域实现】

前言

        今天的任务是完成 DWD 层剩余的事实表;今年的秋招开得比往年早,所以要抓紧时间了,据了解,今年的 hc 还是不多,要是晚点投铁定寄中寄了;

        今天还是个周末,不过记忆里我好像整个大学都没有好好放松过过一个周末,毕竟只有周末的空教室才是最多的,可以抢到带插座的位置;emm... 等一切尘埃落定一定好好放松放松~

1、交易域下单事务事实表

        昨天我们创建了订单与处理表(把订单明细、订单、订单明细活动、订单明细优惠券等订单相关和表和 MySQL lookup 字典表关联到了一起),就是为了之后做关于订单的事实表的时候不用再去频繁关联造成重复计算;

1.1、实现思路

  • 从 Kafka dwd_trade_order_pre_process 主题读取订单预处理数据
  • 筛选下单明细数据:新增数据(type = 'insert')
  • 写入 Kafka 下单明细主题

1.2、代码实现

1.2.1、读取订单与处理表

这里只需要保留有用的数据,其它字段(比如 old 、order_status 这些字段这里完全没用,是给取消订单表用的)直接丢弃掉 

// TODO 2. 读取订单预处理表tableEnv.executeSql("create table dwd_trade_order_pre_process(" +"id ," +"order_id ," +"user_id ," +"sku_id ," +"sku_name ," +"province_id ," +"activity_id ," +"activity_rule_id ," +"coupon_id ," +"date_id ," +"create_time ," +"operate_date_id ," +"operate_time ," +"source_id ," +"source_type_id ," +"source_type_name ," +"sku_num ," +"split_original_amount ," +"split_activity_amount ," +"split_coupon_amount ," +"split_total_amount ," +"`type`" +")" + MyKafkaUtil.getKafkaDDL("dwd_trade_order_pre_process", "trade_detail"));

1.2.2、过滤出下单数据

订单预处理表的粒度是商品,所以一旦有订单被下单,这里新增记录数 = 订单内商品件数

// TODO 3. 过滤出下单数据,即新增数据Table filterTable = tableEnv.sqlQuery("SELECT " +"id ," +"order_id ," +"user_id ," +"sku_id ," +"sku_name ," +"sku_num ," +"province_id ," +"activity_id ," +"activity_rule_id ," +"coupon_id ," +"create_time ," +"operate_date_id ," +"operate_time ," +"source_id ," +"source_type_id ," +"source_type_name ," +"split_activity_amount ," +"split_coupon_amount ," +"split_total_amount" +") " +"FROM dwd_trade_order_pre_process " +"WHERE `type`='insert'");tableEnv.createTemporaryView("filter_table",filterTable);

1.2.3、创建下单数据表

// TODO 4. 创建 dwd 层下单数据表tableEnv.executeSql("CREATE TABLE dwd_trade_order_detail (" +"        id STRING," +"        order_id STRING," +"        user_id STRING," +"        sku_id STRING," +"        sku_name STRING," +"        sku_num STRING," +"        province_id STRING," +"        activity_id STRING," +"        activity_rule_id STRING," +"        coupon_id STRING," +"        create_time STRING," +"        operate_date_id STRING," +"        operate_time STRING," +"        source_id STRING," +"        source_type_id STRING," +"        source_type_name STRING," +"        split_activity_amount STRING," +"        split_coupon_amount STRING," +"        split_total_amount STRING" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_order_detail"));

1.2.4、将数据写出到 Kafka

// TODO 5. 将数据写出到 kafkatableEnv.executeSql("INSERT INTO dwd_trade_order_detail SELECT * FROM filter_table");

2、交易域取消订单事务事实表

这个需求也非常简单

 思路

  • 读取预处理表(上面的下单表不需要保留 old、order_status字段,但是这里必须保留)
  • 过滤出取消订单数据
    • type = 'update'
    • order_status = '1003'
    • old['order_status'] is not null(这个条件可有可无)
  • 写出到 Kafka
public class DwdTradeCancelDetail {public static void main(String[] args) throws Exception {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 开启checkpointenv.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次// 1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());// TODO 2. 读取订单预处理表tableEnv.executeSql("create table dwd_trade_order_pre_process(" +"id ," +"order_id ," +"user_id ," +"order_status ," +"sku_id ," +"sku_name ," +"province_id ," +"activity_id ," +"activity_rule_id ," +"coupon_id ," +"date_id ," +"create_time ," +"operate_date_id ," +"operate_time ," +"source_id ," +"source_type_id ," +"source_type_name ," +"sku_num ," +"split_original_amount ," +"split_activity_amount ," +"split_coupon_amount ," +"split_total_amount ," +"`type` ," +"`old` " +")" + MyKafkaUtil.getKafkaDDL("dwd_trade_order_pre_process", "trade_detail"));// TODO 3. 过滤出取消订单的数据Table filterTable = tableEnv.sqlQuery("" +"SELECT" +"       id ," +"       order_id ," +"       user_id ," +"       order_status ," +"       sku_id ," +"       sku_name ," +"       province_id ," +"       activity_id ," +"       activity_rule_id ," +"       coupon_id ," +"       date_id ," +"       create_time ," +"       operate_date_id ," +"       operate_time ," +"       source_id ," +"       source_type_id ," +"       source_type_name ," +"       sku_num ," +"       split_original_amount ," +"       split_activity_amount ," +"       split_coupon_amount ," +"       split_total_amount ," +"       `type` ," +"       `old` " +"FROM dwd_trade_order_pre_process " +"WHERE `type`='update' " +"old['order_status'] is not null" +"AND order_status='1003'");tableEnv.createTemporaryView("filter_table",filterTable);// TODO 4. 创建 dwd 层取消订单表 tableEnv.executeSql("CREATE TABLE dwd_trade_cancel_detail" +"id STRING," +"order_id STRING," +"user_id STRING," +"order_status STRING," +"sku_id STRING," +"sku_name STRING," +"province_id STRING," +"activity_id STRING," +"activity_rule_id STRING," +"coupon_id STRING," +"date_id STRING," +"create_time STRING," +"operate_date_id STRING," +"operate_time STRING," +"source_id STRING," +"source_type_id STRING," +"source_type_name STRING," +"sku_num STRING," +"split_original_amount STRING," +"split_activity_amount STRING," +"split_coupon_amount STRING," +"split_total_amount STRING," +"`type` STRING," +"`old` map<String,String> " + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_cancel_detail"));// TODO 5. 写出数据 tableEnv.executeSql("INSERT INTO dwd_trade_cancel_detail SELECT * FROM filter_table");// TODO 6. 启动任务env.execute("DwdTradeCancelDetail");}
}

3、交易域支付成功事务事实表

        支付成功表的主表并不是订单表或者订单明细表,而是支付表,在业务系统中存在支付表(payment_info),每行代表一个订单的支付情况(成功或失败)。

        但是我们应该拿它去和下单明细表做关联,因为我们如果之后需要对支付记录做一个数据分析,就应该尽可能保证支付事实表应该有一个细的粒度和丰富的维度,而关联下单明细表之后不仅粒度商品,还增加了很多sku维度(比如商品类型、)

3.1、实现思路

1)设置 ttl

        支付成功事务事实表需要将业务数据库中的支付信息表 payment_info 数据与订单明细表关联。订单明细数据是在下单时生成的,经过一系列的处理进入订单明细主题,通常支付操作在下单后 15min 内完成即可,因此,支付明细数据可能比订单明细数据滞后 15min。考虑到可能的乱序问题,ttl 设置为 15min + 5s。

2)获取订单明细数据

        用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集,所以我们直接从 dwd 层的下单明细表去读取;

3)筛选支付表数据

        获取支付类型、回调时间(支付成功时间)、支付成功时间戳。

        生产环境下,用户支付后,业务数据库的支付表会插入一条数据,此时的回调时间和回调内容为空。通常底层会调用第三方支付接口,接口会返回回调信息,如果支付成功则回调信息不为空,此时会更新支付表,补全回调时间和回调内容字段的值,并将 payment_status 字段的值修改为支付成功对应的状态码(本项目为 1602)。支付成功之后,支付表数据不会发生变化。因此,只要操作类型为 update 且状态码为 1602 即为支付成功数据。

由上述分析可知,支付成功对应的业务数据库变化日志应满足两个条件:

(1)payment_status 字段的值为 1602;

(2)操作类型为 update。

4)构建 MySQL-LookUp 字典表

        为的是对支付方式字段做维度退化;

5)关联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

        支付成功业务过程的最细粒度为一个 sku 的支付成功记录,payment_info 表的粒度与最细粒度相同,将其作为主表。

(1) payment_info 表在订单明细表中必然存在对应数据,主表不存在独有数据,因此通过内连接与订单明细表关联;

(2) 与字典表的关联是为了获取 payment_type 对应的支付类型名称,主表不存在独有数据,通过内连接与字典表关联。下文与字典表的关联同理,不再赘述。

3.2、代码实现

3.2.1、读取 topic_db 数据

 注意:这个程序中我们需要消费三张表,其中两张来组 Kafka 需要我们指定消费者组,所以我们这里可以把它们的组设置为一样的,毕竟都在一个程序里:

// TODO 2. 读取 topic_db 数据(这里的消费者组id尽量和下面保持一致)tableEnv.executeSql(MyKafkaUtil.getTopicDb("dwd_trade_pay_detail_suc"));

3.2.2、过滤出支付成功数据

// TODO 3. 过滤出支付成功的数据Table paymentInfo = tableEnv.sqlQuery("SELECT " +"data['user_id'] user_id, " +"data['order_id'] order_id, " +"data['payment_type'] payment_type, " +"data['callback_time'] callback_time, " +"pt " + // 用来和 lookup 关联"FROM topic_db " +"WHERE `database` = `gmall` " +"AND `table` = `payment_info` " +"AND `type` = 'update' " +"AND data['payment_status'] = '1602'");tableEnv.createTemporaryView("payment_info",paymentInfo);

3.2.3、消费下单明细事务事实表

// TODO 3. 过滤出支付成功的数据Table paymentInfo = tableEnv.sqlQuery("SELECT " +"data['user_id'] user_id, " +"data['order_id'] order_id, " +"data['payment_type'] payment_type, " +"data['callback_time'] callback_time, " +"pt " + // 用来和 lookup 关联"FROM topic_db " +"WHERE `database` = `gmall` " +"AND `table` = `payment_info` " +"AND `type` = 'update' " +"AND data['payment_status'] = '1602'");tableEnv.createTemporaryView("payment_info",paymentInfo);

3.2.4、读取 MySQL lookup 表

// TODO 5. 读取 mysql lookup 表tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

3.2.5、关联 3 张表

关联支付成功数据、下单明细和字典表:

        // TODO 6. 关联 3 张表Table resultTable = tableEnv.sqlQuery("SELECT " +"       od.id order_detail_id," +"       od.order_id," +"       od.user_id," +"       od.sku_id," +"       od.sku_name," +"       od.province_id," +"       od.activity_id," +"       od.activity_rule_id," +"       od.coupon_id," +"       pi.payment_type payment_type_code," +"       dic.dic_name payment_type_name," +"       pi.callback_time," +"       od.source_id," +"       od.source_type_code," +"       od.source_type_name," +"       od.sku_num," +"       od.split_activity_amount," +"       od.split_coupon_amount," +"       od.split_total_amount split_payment_amount " +"FROM payment_info pi" +"join dwd_trade_order_detail od" +"on pi.order_id = od.order_id" +"join `base_dic` for system_time as of pi.proc_time as dic" +"on pi.payment_type = dic.dic_code ");tableEnv.createTemporaryView("result_table",resultTable);

3.2.6、创建 Kafka 支付成功事务事实表并写入数据

// TODO 7. 创建 kafka 支付成功表tableEnv.executeSql("CREATE TABLE dwd_trade_pay_detail_suc (" +"       order_detail_id string," +"       order_id string," +"       user_id string," +"       sku_id string," +"       sku_name string," +"       province_id string," +"       activity_id string," +"       activity_rule_id string," +"       coupon_id string," +"       payment_type_code string," +"       payment_type_name string," +"       callback_time string," +"       source_id string," +"       source_type_code string," +"       source_type_name string," +"       sku_num string," +"       split_activity_amount string," +"       split_coupon_amount string," +"       split_payment_amount string," +"       primary key(order_detail_id) not enforced "+ MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_pay_detail_suc"));

这里加主键是为了方便让相同 key 到一个分区中,方便下游做去重(也可以不加);

最后的 env 不需要执行,因为我们执行的是 tableEnv ,如果执行 env 的话会有警告:没有操作可执行;

4、交易域退单事务事实表

退单指的是支付成功后取消订单的操作,而取消订单指的是还没支付时取消订单的操作;

4.1、思路

        退单并不需要关联下单事务事实表,因为退单表的粒度本身就是商品(退单表包含 sku_id,有了这个 sku_id 足够之后去DIM关联 sku 维表了);我们这里需要关联订单表,为的是从中获得一些分析维度(比如 province_id,我认为还可以添加活动id和优惠券id等维度外键)。

退单操作会影响到订单表(order_status 从 1002['已支付'] 变为 1005['退款中']),所以我们可以在过滤订单表的时候,同时过滤出退单的数据,减少性能的消耗;

注意:这里我们关联的是订单表,因为退单肯定会触发修改订单表,它俩是几乎同时发生的,所以我们可以一次性从 topic_db 中读取出来。也没必要去订单预处理表去读(订单预处理表还依赖订单表,所以根本没必要)。

所以,大致思路就是:

  • 创建 topic_db 表
  • 过滤出订单表中的退单数据和退单表
    • 退单表过滤(table = 'order_refund_info')
    • 订单表中的退单数据(order_status='1005' & type='update' & order_status字段发生更新)
  • 创建 mysql 字典表的 lookup 表(获得退款类型和退款原因类型)
  • 对 3 张表进行关联
  • 创建 kafak dwd_trade_order_refund 表
  • 写入数据

4.2、实现

4.2.1、创建 topic_db 表并过滤出退单数据

// TODO 2. 读取 topic_db 数据tableEnv.executeSql(MyKafkaUtil.getTopicDb("dwd_trade_order_refund"));// TODO 3. 过滤出退单表Table refundTable = tableEnv.sqlQuery("SELECT " +"data['d'] id, " +"data['user_id'] user_id, " +"data['order_id'] order_id, " +"data['sku_id'] sku_id, " +"data['refund_type'] refund_type, " +"data['refund_num'] refund_num, " +"data['refund_amount'] refund_amount, " +"data['refund_reason_type'] refund_reason_type, " +"data['refund_reason_txt'] refund_reason_txt, " +"data['create_time'] create_time, " +"pt " +"FROM topic_db " +"WHERE `database`='gmall' " +"AND `table`='order_refund_info' ");tableEnv.createTemporaryView("order_refund_info",refundTable);

4.2.2、过滤出订单表中的退单数据

// TODO 4. 过滤出订单表中的退单数据Table orderInfoRefund = tableEnv.sqlQuery("SELECT " +"data['id'] id," +"data['province_id'] province_id," +"`old` " +"FROM topic_db " +"WHERE `database`='gmall' " +"AND `table`='order_info ' " +"AND `type` = 'update' " +"AND order_status = '1005' " +"AND old['order_status'] is not null");tableEnv.createTemporaryView("order_info_refund",orderInfoRefund);

4.2.3、创建 MySQL lookup 退单表

为的是获得退款类型和退款原因类型

// TODO 5. 读取 mysql lookup 表tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

4.2.4、关联 3 张表的数据

// TODO 6. 关联 3 张表Table resultTable = tableEnv.sqlQuery("SELECT " +"ri.id," +"ri.user_id," +"ri.order_id," +"ri.sku_id," +"oi.province_id," +"date_format(ri.create_time,'yyyy-MM-dd') date_id," +"ri.create_time," +"ri.refund_type," +"type_dic.dic_name," +"ri.refund_reason_type," +"reason_dic.dic_name," +"ri.refund_reason_txt," +"ri.refund_num," +"ri.refund_amount " +"from order_refund_info ri" +"join " +"order_info_refund oi" +"on ri.order_id = oi.id" +"join " +"base_dic for system_time as of ri.proc_time as type_dic" +"on ri.refund_type = type_dic.dic_code" +"join" +"base_dic for system_time as of ri.proc_time as reason_dic" +"on ri.refund_reason_type=reason_dic.dic_code");tableEnv.createTemporaryView("result_table",resultTable);

4.2.5、创建退单事务事实表并写入数据

// TODO 7. 创建 Kafka 退单事务事实表tableEnv.executeSql("CREATE TABLE dwd_trade_order_refund (" +"id string," +"user_id string," +"order_id string," +"sku_id string," +"province_id string," +"date_id string," +"create_time string," +"refund_type_code string," +"refund_type_name string," +"refund_reason_type_code string," +"refund_reason_type_name string," +"refund_reason_txt string," +"refund_num string," +"refund_amount string," +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_order_refund"));// TODO 8. 写入数据tableEnv.executeSql("INSERT INTO dwd_trade_order_refund SELECT * FROM result_table");

5、交易域退款成功事务事实表

退款成功这个业务过程会影响到退款表(插入数据)、订单表(订单状态)和退单表(退款状态)

主要任务

  • 从退款表中提取退款成功的数据,关联字典表对支付类型做维度退化
  • 从订单表中提取退款成功的订单数据
  • 从退单表中提取退款成功的商品明细数据

补充

  • 退款表的粒度同样是商品(包含 sku_id)
  • 关联订单表为的是获取(province_id 和 user_id)
  • 关联退单表是为的获得商品的 sku_num
  • 关联字典表是为了对退款表的支付类型做维度退化

5.1、思路

思路和前面都是一样的:

  • 创建 topic_db 表
  • 过滤出退款表中退款成功的数据
    • refund_status = '0705'
  • 过滤出订单表和退单表中退款成功的数据
    • 订单表:order_status = '1006'
    • 退单表:redund_status = '0705'
  • 创建字典表的 lookup 表(为的是对支付类型字段做维度退化)
  • 关联 4 张表
    • 根据 order_id 进行 join
  • 创建 Kafka 主题并写入数据

5.2、实现

一样的套路,不解释了

public class DwdTradeRefundPaySuc {public static void main(String[] args) {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 开启checkpointenv.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次// 1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());// 1.3 设置状态 ttltableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));// TODO 2. 读取 topic_db 数据tableEnv.executeSql(MyKafkaUtil.getTopicDb("dwd_trade_refund_pay_suc"));// TODO 3. 过滤出退款表Table refundPayment = tableEnv.sqlQuery("SELECT " +"data['id'] id," +"data['order_id'] order_id," +"data['sku_id'] sku_id," +"data['payment_type'] payment_type," +"data['callback_time'] callback_time," +"data['total_amount'] total_amount," +"pt " +"FROM topic_db " +"WHERE `database`='gmall' " +"AND `table` = 'refund_payment' " +"AND data['refund_status']='0705' " +"AND `type` = 'update' " +"AND old['refund_status'] is not null");tableEnv.createTemporaryView("refund_payment",refundPayment);// TODO 4. 过滤出订单表中退款成功的数据Table orderRefundSuc = tableEnv.sqlQuery("SELECT " +"data['id'] id, " +"data['user_id'] user_id, " +"data['province_id'] province_id, " +"pt " +"FROM topic_db " +"WHERE `database`='gmall' " +"AND `table`='order_info' " +"AND `type` = 'update'" +"AND data['order_status'] = '1006' " +"AND old['order_status'] is not null");tableEnv.createTemporaryView("order_refund_suc",orderRefundSuc);// TODO 5. 过滤出退单表中退款成功的数据Table refundSuc = tableEnv.sqlQuery("SELECT " +"data['order_id'] order_id, " +"data['sku_id'] sku_id, " +"data['refund_num'] refund_num, " +"pt " +"FROM topic_db " +"WHERE `database`='gmall' " +"AND `table`='order_refund_info' " +"AND `type` = 'update'" +"AND data['order_status'] = '0705' " +"AND old['refund_status'] is not null");tableEnv.createTemporaryView("order_refund_info",refundSuc);// TODO 6. 创建 MySQL lookup表tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());// TODO 7. 关联 4 张表Table resultTable = tableEnv.sqlQuery("select" +"rp.id," +"oi.user_id," +"rp.order_id," +"rp.sku_id," +"oi.province_id," +"rp.payment_type," +"dic.dic_name payment_type_name," +"date_format(rp.callback_time,'yyyy-MM-dd') date_id," +"rp.callback_time," +"ri.refund_num," +"rp.total_amount," +"from refund_payment rp " +"join " +"order_info oi" +"on rp.order_id = oi.id" +"join" +"order_refund_info ri" +"on rp.order_id = ri.order_id" +"and rp.sku_id = ri.sku_id" +"join " +"base_dic for system_time as of rp.proc_time as dic" +"on rp.payment_type = dic.dic_code");tableEnv.createTemporaryView("result_table", resultTable);// TODO 8. 创建 Kafka 退款成功事务事实表tableEnv.executeSql("create table dwd_trade_refund_pay_suc(" +"id string," +"user_id string," +"order_id string," +"sku_id string," +"province_id string," +"payment_type_code string," +"payment_type_name string," +"date_id string," +"callback_time string," +"refund_num string," +"refund_amount string "+")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_refund_pay_suc"));// TODO 8. 写入数据tableEnv.executeSql("INSERT INTO dwd_trade_refund_pay_suc SELECT * FROM result_table");}
}

6、工具域优惠券类事务事实表

我们先看看业务系统中 coupon_use 中的数据:

        这张表的粒度是一个订单,每行数据代表的哪个用户在什么时候领取的券,什么时候使用了券( using_time 表示使用时间,used_time 表示支付时间),什么时候过期等信息;

6.1、思路

        每发生优惠券领取这样一个业务过程,coupon_use 数据库中就会 insert 一条新的数据,所以我们只需要直接从 topic_db 中过滤出 type = 'insert' 的数据即可;

        而关于优惠券下单使用(下单)和优惠券使用(支付)这两个业务过程除了过滤出 type = 'update' 之外,只需要分别对 using_time(下单) 和 used_time(支付) 是否为 null 进行过滤即可;

总结

  • 优惠券领取
    • type = ''insert'
  • 优惠券使用(下单)
    • type = 'update' AND using_time is not null AND used_time is null
    • 或者 type = 'update' AND data[coupon_status] = '1402' AND old[coupon_status] = '1401'
  • 优惠券使用(支付)
    • type = 'update' AND used_time is not null
    • 或者 type = 'update' AND data[coupon_status] = '1402' AND old[coupon_status] = '1401'

6.2、实现

6.2.1、工具域优惠券领取事务事实表

很简单,不多废话了

public class DwdToolCouponGet {
public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// TODO 2. 状态后端设置env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");System.setProperty("HADOOP_USER_NAME", "lyh");// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表tableEnv.executeSql("create table `topic_db`(" +"`database` string," +"`table` string," +"`data` map<string, string>," +"`type` string," +"`ts` string" +")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_get"));// TODO 4. 读取优惠券领用数据,封装为表Table resultTable = tableEnv.sqlQuery("select" +"data['id']," +"data['coupon_id']," +"data['user_id']," +"date_format(data['get_time'],'yyyy-MM-dd') date_id," +"data['get_time']," +"ts" +"from topic_db" +"where `table` = 'coupon_use'" +"and `type` = 'insert'");tableEnv.createTemporaryView("result_table", resultTable);// TODO 5. 建立 Kafka-Connector dwd_tool_coupon_get 表tableEnv.executeSql("create table dwd_tool_coupon_get (" +"id string," +"coupon_id string," +"user_id string," +"date_id string," +"get_time string," +"ts string" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_get"));// TODO 6. 将数据写入 Kafka-Connector 表tableEnv.executeSql("insert into dwd_tool_coupon_get select * from result_table");}
}

6.2.2、工具域优惠券使用(下单)事务事实表

public class DwdToolCouponOrder {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// TODO 2. 状态后端设置env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");System.setProperty("HADOOP_USER_NAME", "lyh");// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表tableEnv.executeSql("create table `topic_db` (" +"`database` string," +"`table` string," +"`data` map<string, string>," +"`type` string," +"`old` map<string, string>," +"`ts` string" +")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_order"));// TODO 4. 读取优惠券领用表数据,筛选满足条件的优惠券下单数据Table couponUseOrder = tableEnv.sqlQuery("select" +"data['id'] id," +"data['coupon_id'] coupon_id," +"data['user_id'] user_id," +"data['order_id'] order_id," +"date_format(data['using_time'],'yyyy-MM-dd') date_id," +"data['using_time'] using_time," +"ts" +"from topic_db" +"where `table` = 'coupon_use'" +"and `type` = 'update'" +"and data['coupon_status'] = '1402'" +"and `old`['coupon_status'] = '1401'");tableEnv.createTemporaryView("result_table", couponUseOrder);// TODO 5. 建立 Kafka-Connector dwd_tool_coupon_order 表tableEnv.executeSql("create table dwd_tool_coupon_order(" +"id string," +"coupon_id string," +"user_id string," +"order_id string," +"date_id string," +"order_time string," +"ts string" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_order"));// TODO 6. 将数据写入 Kafka-Connector 表tableEnv.executeSql("" +"insert into dwd_tool_coupon_order select " +"id," +"coupon_id," +"user_id," +"order_id," +"date_id," +"using_time order_time," +"ts from result_table");}
}

6.2.3、工具域优惠券使用(支付)事务事实表

public class DwdToolCouponPay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// TODO 2. 状态后端设置env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");System.setProperty("HADOOP_USER_NAME", "lyh");// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表tableEnv.executeSql("create table `topic_db` (" +"`database` string," +"`table` string," +"`data` map<string, string>," +"`type` string," +"`old` string," +"`ts` string" +")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_pay"));// TODO 4. 读取优惠券领用表数据,筛选优惠券使用(支付)数据Table couponUsePay = tableEnv.sqlQuery("select" +"data['id'] id," +"data['coupon_id'] coupon_id," +"data['user_id'] user_id," +"data['order_id'] order_id," +"date_format(data['used_time'],'yyyy-MM-dd') date_id," +"data['used_time'] used_time," +"`old`," +"ts" +"from topic_db" +"where `table` = 'coupon_use'" +"and `type` = 'update'" +"and data['used_time'] is not null");tableEnv.createTemporaryView("coupon_use_pay", couponUsePay);// TODO 5. 建立 Kafka-Connector dwd_tool_coupon_order 表tableEnv.executeSql("create table dwd_tool_coupon_pay(" +"id string," +"coupon_id string," +"user_id string," +"order_id string," +"date_id string," +"payment_time string," +"ts string" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_pay"));// TODO 6. 将数据写入 Kafka-Connector 表tableEnv.executeSql("" +"insert into dwd_tool_coupon_pay select " +"id," +"coupon_id," +"user_id," +"order_id," +"date_id," +"used_time payment_time," +"ts from coupon_use_pay");}
}

7、互动域

7.1、思路分析

7.2、实现

7.2.1、互动域收藏商品事务事实表

 我可以看一下业务系统中的收藏表:

        可以看到,收藏表的粒度是商品,每收藏一件商品 favor_info 表就会 insert 一条数据,每取消收藏一件商品,并不会删除记录,而是将收藏表的 is_cancel 字段设为 1 并给 cancel_time 字段补充值;所以过滤条件很简单:

  1. insert 
  2. update 并且 is_cancel = 1 (取消收藏后又再次收藏)
public class DwdInteractionFavorAdd {
public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// TODO 2. 状态后端设置env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");System.setProperty("HADOOP_USER_NAME", "lyh");// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表tableEnv.executeSql("create table topic_db(" +"`database` string," +"`table` string," +"`type` string," +"`data` map<string, string>," +"`ts` string" +")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_interaction_favor_add"));// TODO 4. 读取收藏表数据Table favorInfo = tableEnv.sqlQuery("select" +"data['id'] id," +"data['user_id'] user_id," +"data['sku_id'] sku_id," +"date_format(data['create_time'],'yyyy-MM-dd') date_id," +"data['create_time'] create_time," +"ts" +"from topic_db" +"where `table` = 'favor_info'" +"and (`type` = 'insert' or (`type` = 'insert' and data['is_cancel'] = '1'))");tableEnv.createTemporaryView("favor_info", favorInfo);// TODO 5. 创建 Kafka-Connector dwd_interaction_favor_add 表tableEnv.executeSql("create table dwd_interaction_favor_add (" +"id string," +"user_id string," +"sku_id string," +"date_id string," +"create_time string," +"ts string" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_interaction_favor_add"));// TODO 6. 将数据写入 Kafka-Connector 表tableEnv.executeSql("" +"insert into dwd_interaction_favor_add select * from favor_info");}
}

7.2.2、互动域评价事务事实表

任务:建立 MySQL-Lookup 字典表,读取评论表数据,关联字典表以获取评价(好评、中评、差评、自动),将结果写入 Kafka 评价主题

所以这张表的逻辑页很简单:

  • 从 topic_db 中过滤出评价表的数据
  • 从评价表中过滤出 type='insert' 的数据(其实也只有 insert 类型了)
  • 创建 mysql lookup 字典表(为的是对评价等级进行维度退化)
  • 关联两张表
  • 写入Kafka
public class DwdInteractionComment {
public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 获取配置对象Configuration configuration = tableEnv.getConfig().getConfiguration();// 为表关联时状态中存储的数据设置过期时间configuration.setString("table.exec.state.ttl", "5 s");// TODO 2. 状态后端设置env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");System.setProperty("HADOOP_USER_NAME", "lyh");// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表tableEnv.executeSql("create table topic_db(" +"`database` string," +"`table` string," +"`type` string," +"`data` map<string, string>," +"`proc_time` as PROCTIME()," +"`ts` string" +")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_interaction_comment"));// TODO 4. 读取评论表数据Table commentInfo = tableEnv.sqlQuery("select" +"data['id'] id," +"data['user_id'] user_id," +"data['sku_id'] sku_id," +"data['order_id'] order_id," +"data['create_time'] create_time," +"data['appraise'] appraise," +"proc_time," +"ts" +"from topic_db" +"where `table` = 'comment_info'" +"and `type` = 'insert'");tableEnv.createTemporaryView("comment_info", commentInfo);// TODO 5. 建立 MySQL-LookUp 字典表tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());// TODO 6. 关联两张表Table resultTable = tableEnv.sqlQuery("select" +"ci.id," +"ci.user_id," +"ci.sku_id," +"ci.order_id," +"date_format(ci.create_time,'yyyy-MM-dd') date_id," +"ci.create_time," +"ci.appraise," +"dic.dic_name," +"ts" +"from comment_info ci" +"join" +"base_dic for system_time as of ci.proc_time as dic" +"on ci.appraise = dic.dic_code");tableEnv.createTemporaryView("result_table", resultTable);// TODO 7. 建立 Kafka-Connector dwd_interaction_comment 表tableEnv.executeSql("create table dwd_interaction_comment(" +"id string," +"user_id string," +"sku_id string," +"order_id string," +"date_id string," +"create_time string," +"appraise_code string," +"appraise_name string," +"ts string" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_interaction_comment"));// TODO 8. 将关联结果写入 Kafka-Connector 表tableEnv.executeSql("" +"insert into dwd_interaction_comment select * from result_table");}
}

7.2.3、用户域用户注册事务事实表

主要任务:读取用户表数据,获取注册时间,将用户注册信息写入 Kafka 用户注册主题

如何分辨是新用户还是老用户很简单:新 insert 进来的都是新用户,所以思路很简单:

  • 从 topic_db 中过滤出用户表的数据
  • 从用户表中过滤出 type = 'insert' 的数据
  • 写入 Kafka
public class DwdUserRegister {
public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));// TODO 2. 启用状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");System.setProperty("HADOOP_USER_NAME", "lyh");// TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表tableEnv.executeSql("create table topic_db(" +"`database` string," +"`table` string," +"`type` string," +"`data` map<string, string>," +"`ts` string" +")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_detail"));// TODO 4. 读取用户表数据Table userInfo = tableEnv.sqlQuery("select" +"data['id'] user_id," +"data['create_time'] create_time," +"ts" +"from topic_db" +"where `table` = 'user_info'" +"and `type` = 'insert'");tableEnv.createTemporaryView("user_info", userInfo);// TODO 5. 创建 Kafka-Connector dwd_user_register 表tableEnv.executeSql("create table `dwd_user_register`(" +"`user_id` string," +"`date_id` string," +"`create_time` string," +"`ts` string" +")" + MyKafkaUtil.getKafkaSinkDDL("dwd_user_register"));// TODO 6. 将输入写入 Kafka-Connector 表tableEnv.executeSql("insert into dwd_user_register" +"select " +"user_id," +"date_format(create_time, 'yyyy-MM-dd') date_id," +"create_time," +"ts" +"from user_info");}
}

总结

        至此,DWD 层搭建完毕,实时数仓的 DWD 层没有离线数仓那么多分类(周期快照、累积快照等),所以做起来虽然不是那么简单,但是有迹可循;

        明天开始就是 DWS 层的搭建了,终于可以用到 clickhouse 了,浅浅期待一下~

        这次做实时数仓项目比离线快很多,但是并不是囫囵吞枣,看视频三年了,是在水视频过任务还是自己真心想理解一个项目自己很清楚。这个实时数仓和之前学的离线数仓的数据源都是一样的,对这个模拟出来的业务系统比较熟悉了,而且离线和实时有很多共同点所以学的很快,但是毕竟时间也花费了不少,每天的早八晚八;

        很多人做项目也好,学新技术也好,看到几十个小时的时长就想着过任务一样,自己骗自己看过视频就算学会了,就算过去了,好像再也不用看了一样。前期基础不好好学,后期很难有那种"灵光乍现"、"触类旁通"的感觉,因为知识储备就不够。就像我自己每天说要看书(今天下单一本《乡土中国》),但是老想想着刷刷抖音,骗自己抖音的地摊文化更有营养,到头来现在在写作或者表达的时候还是经常词穷;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/394512.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

山姆.奥特尔曼发文暗示:新模型“草莓“即将发布?

山姆.奥特尔曼发文 山姆.奥特尔曼在今天发布了一条推特,图片中所展示的是已经成熟的几颗草莓。要知道之前的"草莓"事件闹的可是沸沸扬扬,而恰巧山姆.奥特尔曼在此时又发出了一条这样的文章。会不会是刻意的在暗示什么呢? 新的模型即将发布&#xff1f; 山姆.奥特尔曼…

视频编辑SDK解决方案,提升了视频内容的趣味性和观赏性

美摄科技&#xff0c;作为视频编辑技术的领航者&#xff0c;凭借其前沿的视频编辑SDK解决方案&#xff0c;正以前所未有的方式&#xff0c;重新定义着互联网泛娱乐、硬件大屏、匿名社交等多个行业场景的体验边界。我们不仅仅是在编辑视频&#xff0c;更是在编织一个充满想象与互…

GIt最新教程通俗易懂

Git学习笔记 一、Git版本控制分类1.1 本地版本控制1.2 集中版本1.3 分布式版本控制系统1.5 Git和SVN的区别二、Git的历史 三、Gti基础学习3.1 Git的基础学习3.2 启动Git 3.3 Git基本的配置3.3.1 配置文件相关位置 四、Git基本理论&#xff08;核心&#xff09;4.1 Git 的工作流…

自驾畅游保定:参观总督署,品美食文化

这是学习笔记的第 2490篇文章 前几天跟孩子聊天&#xff0c;孩子说暑假都没出去玩了&#xff0c;暑假旅行的作业咋写&#xff1f;让我有满满的负疚感&#xff0c;去附近的公园、吃点美食不算旅游&#xff0c;得了&#xff0c;得安排一下一日游。 几个月前心心念的去保定&#x…

Vue 3+Vite+Eectron从入门到实战系列之(四)一Electron热身运动(二)

在electron里面能不呢实现暗黑模式和明亮模式的切换&#xff1f;我们怎么读取主进程里面的数据和系统数据。这篇就是来实现这几个效果的 实现效果 更改系统的主题色 在 App.vue 中添加代码。 <el-button type"warning" click"changeTheme">更改系…

红豆0感舒适衬衫2.0,让这个七夕节变得有点不一样

七夕节又到了&#xff0c;在这个佳人相约的传统节日&#xff0c;红豆舒适男装以“七夕穿红豆 爱人更舒适”为主题带来了24届红豆七夕节。 这是红豆集团连续24年举办红豆七夕节&#xff0c;红豆七夕节也不仅是广受年轻人喜爱的传统节日盛典&#xff0c;同样也是红豆集团打造的特…

CV党福音:YOLOv8实现分类

YOLO作为目标检测领域的常青树&#xff0c;如今以及更新到了YOLOv10&#xff0c;并且还有YOLOX、YOLOS等变体&#xff0c;可以说该系列已经在目标检测领域占据了半壁江山&#xff0c;如今&#xff0c;YOLOv8的发行者ultralytics竟有一统江山之意&#xff0c;其在提出的框架中不…

【书生大模型实战营(暑假场)】入门任务二 Git 关卡

入门任务二 Git 关卡 参考&#xff1a; 教程任务 注意&#xff1a; 项目Github链接 1 闯关任务 1.1 使用 Git 完成破冰介绍 本任务将基于开发机实现&#xff0c;重点在于熟悉Git操作。首先要了解 Git操作的常见四部曲&#xff0c;即&#xff1a;舔 Add&#xff0c;提 Co…

计算机毕业设计选题推荐-电缆行业生产管理系统-Java/Python项目实战

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

Gradle 入门指南:从安装到基础用法详解

文章目录 Gradle 简介安装 Gradle创建和配置 Gradle 项目将 Gradle 项目打成jar包多项目聚合示例项目结构步骤详解 Gradle 简介 Gradle 是一个基于 Groovy 和 Kotlin 的构建工具&#xff0c;用于自动化构建、依赖管理和项目管理。它结合了 Maven 的依赖管理和 Ant 的灵活性&am…

【MySQL】库操作,数据类型

目录 MySQL简介SQL语句分类库操作语句展示数据库创建数据库使用数据库删除数据库 数据类型整型浮点型字符串日期类型 MySQL简介 数据库有关系型数据库和非关系型数据库。 关系型数据库&#xff1a;是指采用了关系模型来组织数据的数据库。 简单来说&#xff0c;关系模型指的就…

智能化的Facebook未来:AI如何重塑社交网络的面貌?

随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;社交网络的面貌正在经历深刻的变革。Facebook&#xff08;现Meta Platforms&#xff09;作为全球最大的社交媒体平台之一&#xff0c;正积极探索如何利用AI技术来提升用户体验、优化内容管理并推动平台创新。…

线上预约陪诊平台医院陪诊系统源码就医陪护小程序APP开发

项目分析 随着医疗行业的数字化转型和人们对健康需求的日益增长&#xff0c;线上预约陪诊系统作为一种新兴的医疗服务模式&#xff0c;正逐渐受到市场的关注和认可。本文将从市场前景、使用人群、盈利模式以及竞品分析等多个角度&#xff0c;全面探讨线上预约陪诊系统的技术性…

【稳定ACM出版、EI检索|武汉场线上线下结合】2024年第五届医学人工智能国际学术会议(ISAIMS 2024,8月13-17)

第五届医学人工智能国际学术会议&#xff08;ISAIMS2024&#xff09;将于2024年8月13-17日于荷兰阿姆斯特丹自由大学召开&#xff0c;国内分会场将于2024年10月25-27日于中国武汉召开。 本届会议将继续围绕人工智能在医学领域的最新研究成果&#xff0c;为来自国内外高等院校、…

反转链表(LeetCode)

题目 给你单链表的头节点&#xff0c;请你反转链表&#xff0c;并返回反转后的链表 解题 class ListNode:def __init__(self, value0, nextNone):self.value valueself.next nextdef reverse_linked_list_recursive(head: ListNode) -> ListNode:# 空链表或单节点链表if …

计算机网络-传输层

网络层不具有重传&#xff0c;需要传输层来控制。 Tcp&#xff1a;需要将传输的数据进行分段传输&#xff0c;并且能够建立会话&#xff0c;具备流量控制&#xff0c;是一种可靠的传输协议UDP&#xff1a;一个数据包就能完成数据通信&#xff0c;不分段&#xff0c;不需要建立…

人工智能|人工智能教育的发展现状及趋势

智能的热潮正席卷全球。国家在人工智能领域展开战略布局&#xff0c;人工智能人才成为国家急需的高层次技术人才。据领英发布的《全球 Al 领域人才报告》显示&#xff0c;国内人工智能人才缺口达到 500 多万。 毫无疑问&#xff0c;人工智能将不可阻挡地影响所有产业。给自己一…

【数据结构】线性表(线性表的定义和基本操作)

计算机考研408-数据结构笔记本之——第二章 线性表 2.1 线性表的定义和基本操作 1 线性表的定义(数据结构三要素——逻辑结构&#xff09; 线性表是具有相同数据类型的n(n≥0)个数据元素的有限序列. 其中n为表长&#xff0c;当n0 时线性表是一个空表。 若用L命名线性表&…

【C++】:智能指针 -- RAII思想shared_ptr剖析

目录 一&#xff0c;内存泄漏二&#xff0c;智能指针的使用及原理2.1 RAII思想2.2 auto_ptr2.3 unique_ptr 三&#xff0c;shared_ptr(重点)3.1 shared_ptr的原理及使用3.2 shared_ptr的模拟实现1. 基本框架2. 引用计数的设计3. 拷贝构造4. 析构函数5. 赋值拷贝 3.3 shared_ptr…

详解Xilinx FPGA高速串行收发器GTX/GTP(4)--TX/RX接口的数据位宽和时钟设计

目录 1、时钟设计 2、TX接口 3、接口位宽与时钟的关系 4、时钟来源方案 5、TX端内部的时钟分频设计 6、RX接口 文章总目录点这里:《FPGA接口与协议》专栏的说明与导航 1、时钟设计 GT收发器内部比较复杂,所使用的时钟就不止一个,比较主要的时钟有两个,架构…