尚硅谷大数据项目《在线教育之实时数仓》笔记007

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第9章 数仓开发之DWD层

P053

P054

P055

P056

P057

P058

P059

P060

P061

P062

P063

P064

P065


第9章 数仓开发之DWD层

P053

9.6 用户域用户注册事务事实表
9.6.1 主要任务

读取用户表数据,读取页面日志数据,关联两张表补全用户注册操作的维度信息,写入 Kafka 用户注册主题。

P054

9.6.4 代码

Kafka | Apache Flink

 

P055

//TODO 4 读取page主题数据dwd_traffic_page_log
//TODO 5 过滤用户表数据
//TODO 6 过滤注册日志的维度信息

P056

package com.atguigu.edu.realtime.app.dwd.db;import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author yhm* @create 2023-04-23 17:36*/
public class DwdUserUserRegister {public static void main(String[] args) {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 设置表的TTL
//        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10L));EnvUtil.setTableEnvStateTtl(tableEnv, "10s");//TODO 3 读取topic_db的数据String groupId = "dwd_user_user_register2";KafkaUtil.createTopicDb(tableEnv, groupId);
//        tableEnv.executeSql("select * from topic_db").print();//TODO 4 读取page主题数据dwd_traffic_page_logtableEnv.executeSql("CREATE TABLE page_log (\n" +"  `common` map<string,string>,\n" +"  `page` string,\n" +"  `ts` string\n" +")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));//TODO 5 过滤用户表数据Table userRegister = tableEnv.sqlQuery("select \n" +"    data['id'] id,\n" +"    data['create_time'] create_time,\n" +"    date_format(data['create_time'],'yyyy-MM-dd') create_date,\n" +"    ts\n" +"from topic_db\n" +"where `table`='user_info'\n" +"and `type`='insert'" +"");tableEnv.createTemporaryView("user_register", userRegister);//TODO 6 过滤注册日志的维度信息Table dimLog = tableEnv.sqlQuery("select \n" +"    common['uid'] user_id,\n" +"    common['ch'] channel, \n" +"    common['ar'] province_id, \n" +"    common['vc'] version_code, \n" +"    common['sc'] source_id, \n" +"    common['mid'] mid_id, \n" +"    common['ba'] brand, \n" +"    common['md'] model, \n" +"    common['os'] operate_system \n" +"from page_log\n" +"where common['uid'] is not null \n"//"and page['page_id'] = 'register'");tableEnv.createTemporaryView("dim_log", dimLog);//TODO 7 join两张表格Table resultTable = tableEnv.sqlQuery("select \n" +"    ur.id user_id,\n" +"    create_time register_time,\n" +"    create_date register_date,\n" +"    channel,\n" +"    province_id,\n" +"    version_code,\n" +"    source_id,\n" +"    mid_id,\n" +"    brand,\n" +"    model,\n" +"    operate_system,\n" +"    ts, \n" +"    current_row_timestamp() row_op_ts \n" +"from user_register ur \n" +"left join dim_log pl \n" +"on ur.id=pl.user_id");tableEnv.createTemporaryView("result_table", resultTable);//TODO 8 写出数据到kafkatableEnv.executeSql(" create table dwd_user_user_register(\n" +"    user_id string,\n" +"    register_time string,\n" +"    register_date string,\n" +"    channel string,\n" +"    province_id string,\n" +"    version_code string,\n" +"    source_id string,\n" +"    mid_id string,\n" +"    brand string,\n" +"    model string,\n" +"    operate_system string,\n" +"    ts string,\n" +"    row_op_ts TIMESTAMP_LTZ(3) ,\n" +"    PRIMARY KEY (user_id) NOT ENFORCED\n" +")" + KafkaUtil.getUpsertKafkaDDL("dwd_user_user_register"));tableEnv.executeSql("insert into dwd_user_user_register " +"select * from result_table");}
}

P057

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_register

P058

9.7 交易域下单事务事实表
9.7.1 主要任务

从 Kafka 读取 topic_db 主题数据,筛选订单明细表和订单表数据,读取 dwd_traffic_page_log 主题数据,筛选订单页日志,关联三张表获得交易域下单事务事实表,写入 Kafka 对应主题。

P059

DwdTradeOrderDetail,TODO1 ~ TODO7

P060

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_order_detail
package com.atguigu.edu.realtime.app.dwd.db;import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author yhm* @create 2023-04-24 15:18*/
public class DwdTradeOrderDetail {public static void main(String[] args) {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 设置表格TTLEnvUtil.setTableEnvStateTtl(tableEnv, "10s");//TODO 3 从kafka读取业务数据topic_dbString groupId = "dwd_trade_order_detail";KafkaUtil.createTopicDb(tableEnv, groupId);//TODO 4 从kafka读取日志数据dwd_traffic_page_logtableEnv.executeSql("create table page_log(\n" +"    common map<String,String>,\n" +"    page map<String,String>,\n" +"    ts string\n" +")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));//TODO 5 过滤订单详情表Table orderDetail = tableEnv.sqlQuery("select \n" +"    data['id'] id,\n" +"    data['course_id'] course_id,\n" +"    data['course_name'] course_name,\n" +"    data['order_id'] order_id,\n" +"    data['user_id'] user_id,\n" +"    data['origin_amount'] origin_amount,\n" +"    data['coupon_reduce'] coupon_reduce,\n" +"    data['final_amount'] final_amount,\n" +"    data['create_time'] create_time,\n" +"    date_format(data['create_time'], 'yyyy-MM-dd') create_date,\n" +"    ts\n" +"from topic_db\n" +"where `table`='order_detail'\n" +"and type='insert'");tableEnv.createTemporaryView("order_detail", orderDetail);//TODO 6 过滤订单表Table orderInfo = tableEnv.sqlQuery("select \n" +"    data['id'] id, \n" +"    data['out_trade_no'] out_trade_no, \n" +"    data['trade_body'] trade_body, \n" +"    data['session_id'] session_id, \n" +"    data['province_id'] province_id\n" +"from topic_db\n" +"where `table`='order_info'\n" +"and type='insert'");tableEnv.createTemporaryView("order_info", orderInfo);//TODO 7 获取下单日志Table orderLog = tableEnv.sqlQuery("select \n" +"    common['sid'] session_id,\n" +"    common['sc'] source_id\n" +"from page_log\n" +"where page['page_id']='order'");tableEnv.createTemporaryView("order_log", orderLog);//TODO 8 关联3张表格Table resultTable = tableEnv.sqlQuery("select \n" +"    od.id,\n" +"    od.course_id,\n" +"    od.course_name,\n" +"    od.order_id,\n" +"    od.user_id,\n" +"    od.origin_amount,\n" +"    od.coupon_reduce,\n" +"    od.final_amount,\n" +"    od.create_time,\n" +"    oi.out_trade_no,\n" +"    oi.trade_body,\n" +"    oi.session_id,\n" +"    oi.province_id,\n" +"    ol.source_id,\n" +"    ts,\n" +"    current_row_timestamp() row_op_ts \n" +"from order_detail od\n" +"join order_info oi\n" +"on od.order_id=oi.id\n" +"left join order_log ol\n" +"on oi.session_id=ol.session_id");tableEnv.createTemporaryView("result_table", resultTable);//TODO 9 创建upsert kafkatableEnv.executeSql("create table dwd_trade_order_detail( \n" +"    id string,\n" +"    course_id string,\n" +"    course_name string,\n" +"    order_id string,\n" +"    user_id string,\n" +"    origin_amount string,\n" +"    coupon_reduce string,\n" +"    final_amount string,\n" +"    create_time string,\n" +"    out_trade_no string,\n" +"    trade_body string,\n" +"    session_id string,\n" +"    province_id string,\n" +"    source_id string,\n" +"    ts string,\n" +"    row_op_ts TIMESTAMP_LTZ(3) ,\n" +"    primary key(id) not enforced \n" +")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_detail"));//TODO 10 写出数据到kafkatableEnv.executeSql("insert into dwd_trade_order_detail " +"select * from result_table");}
}

P061

9.8 交易域支付成功事务事实表
9.8.1 主要任务

从 Kafka topic_db主题筛选支付成功数据、从dwd_trade_order_detail主题中读取订单事实数据,关联两张表形成支付成功宽表,写入 Kafka 支付成功主题。

P062

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_pay_suc_detail
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

P063

9.9 事实表动态分流

9.9.1 主要任务

DWD层余下的事实表都是从topic_db中取业务数据库一张表的变更数据,按照某些条件过滤后写入Kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。

读取优惠券领用数据,写入 Kafka 优惠券领用主题。

P064

BaseDBApp

//TODO 1 创建环境设置状态后端

//TODO 2 读取业务topic_db主流数据

//TODO 3 清洗转换topic_db数据

//TODO 4 使用flinkCDC读取dwd配置表数据

//TODO 5 创建广播流

//TODO 6 连接两个流

//TODO 7 过滤出需要的dwd表格数据

P065

package com.atguigu.edu.realtime.app.dwd.db;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DwdBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.bean.DwdTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;/*** @author yhm* @create 2023-04-24 18:05*/
public class BaseDBApp {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 读取业务topic_db主流数据String groupId = "base_DB_app";DataStreamSource<String> dbStream = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", groupId), WatermarkStrategy.noWatermarks(), "base_db");//TODO 3 清洗转换topic_db数据SingleOutputStreamOperator<JSONObject> jsonObjStream = dbStream.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!("bootstrap-start".equals(type) || "bootstrap-insert".equals(type) || "bootstrap-complete".equals(type))) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});jsonObjStream.print();//TODO 4 使用flinkCDC读取dwd配置表数据MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).username("root").password("123456").databaseList("edu_config").tableList("edu_config.dwd_table_process")// 定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema())// 设置读取数据的模式.startupOptions(StartupOptions.initial()).build();//TODO 5 创建广播流DataStreamSource<String> tableProcessStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "dwd_table_process");MapStateDescriptor<String, DwdTableProcess> dwdTableProcessState = new MapStateDescriptor<>("dwd_table_process_state", String.class, DwdTableProcess.class);BroadcastStream<String> broadcastDS = tableProcessStream.broadcast(dwdTableProcessState);//TODO 6 连接两个流BroadcastConnectedStream<JSONObject, String> connectStream = jsonObjStream.connect(broadcastDS);//TODO 7 过滤出需要的dwd表格数据SingleOutputStreamOperator<JSONObject> processStream = connectStream.process(new DwdBroadcastProcessFunction(dwdTableProcessState));//TODO 8 将数据写出到kafkaprocessStream.sinkTo(KafkaUtil.getKafkaProducerBySchema(new KafkaRecordSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {String topic = element.getString("sink_table");element.remove("sink_table");return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());}}, "base_db_app_trans"));//TODO 9 执行任务env.execute();}
}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_cart_add
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar 

启动maxwell。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/188016.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

lv11 嵌入式开发 ARM指令集上 5

1 导学 1.1 指令集 指令 能够指示处理器执行某种运算的命令称为指令&#xff08;如加、减、乘 ...&#xff09; 指令在内存中以机器码&#xff08;二进制&#xff09;的方式存在 每一条指令都对应一条汇编 程序是指令的有序集合 指令集 处理器能识别的指令…

红黑树的模拟实现

一、介绍 1. 概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路径会比其他路径长出俩倍&#xff0c…

【Redis】list常用命令内部编码使用场景

文章目录 前置知识列表类型的特点 命令LPUSHLPUSHXRPUSHRPUSHXLRANGELPOPRPOPLINDEXLREMLINSERTLTRIMLSETLLEN 阻塞版本命令BLPOPBRPOP 命令总结内部编码测试内部编码 使用场景消息队列分频道的消息队列 模拟栈和队列 前置知识 列表类型是⽤来存储多个有序的字符串&#xff0c…

吴恩达《机器学习》7-1->7-4:过拟合问题、代价函数、线性回归的正则化、正则化的逻辑回归模型

一、过拟合的本质 过拟合是指模型在训练集上表现良好&#xff0c;但在新数据上的泛化能力较差。考虑到多项式回归的例子&#xff0c;我们可以通过几个模型的比较来理解过拟合的本质。 线性模型&#xff08;欠拟合&#xff09;&#xff1a; 第一个模型是一个线性模型&#xff0…

量子计算和量子通信技术:引领潜力无限的未来

近年来&#xff0c;随着量子计算和量子通信技术的迅速发展&#xff0c;它们在各个领域的广泛应用前景引起了人们的极大兴趣。本文将深入探讨量子计算和量子通信技术的普遍应用&#xff0c;以及它们预示的未来&#xff0c;同时提出业内人士需要注意的事项。 介绍&#xff1a;量子…

OushuDB 专家认证第四期报名开始啦!

OushuDB 专家认证培训第四期今日正式启动&#xff01;本次培训为偶数科技面向生态合作伙伴与客户公开举办的线上培训&#xff0c;旨在共同发展 OushuDB 生态。 报名时间&#xff1a;2023年11月9日9:00—11月30日12:00 报名方式&#xff1a;偶数科技官网&#xff08;点击下方阅…

C/C++数据结构之链表题目答案与解析

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 数据结构初阶 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.前言 2.题目…

灵活运用Vue指令:探究v-if和v-for的使用技巧和注意事项

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! 目录 ⭐ 专栏简介 &#x1f4d8; 文章引言 一、作…

2023年Q3乳品行业数据分析(乳品市场未来发展趋势)

随着人们生活水平的不断提高以及对健康生活的追求不断增强&#xff0c;牛奶作为优质蛋白和钙的补充品&#xff0c;市场需求逐年增加。 今年Q3&#xff0c;牛奶乳品市场仍呈增长趋势。根据鲸参谋电商数据分析平台的相关数据显示&#xff0c;2023年7月-9月&#xff0c;牛奶乳品市…

计算机毕设 大数据工作岗位数据分析与可视化 - python flask

文章目录 0 前言1 课题背景2 实现效果3 项目实现3.1 概括 3.2 Flask实现3.3 HTML页面交互及Jinja2 4 **完整代码**5 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往往达不到毕业答辩的要…

【Excel】补全单元格值变成固定长度

我们知道股票代码都为6位数字&#xff0c;但深圳中小板代码前面以0开头&#xff0c;数字格式时前面的0会自动省略&#xff0c;现在需要在Excel表格补全它。如下图&#xff1a; 这时我们需要用到特殊的函数&#xff1a;TEXT或者RIGHT TEXT函数是Excel中一个非常有用的函数。TEX…

c: struct sort descending and ascending in windows and Ubuntu

/*** file StudentStructSort.h* author geovindu,Geovin Du,涂聚文 (geovindu163.com)* ide: vscode c11,c17 Ubuntu 22.4* brief 结构体排序示例* date 2023-11-05* version 0.1* copyright geovindu 站在巨人的肩膀上 Standing on the Shoulders of Giants**/#ifnd…

海康工业相机如何提高相机帧率

影响帧率的因素 相机参数 帧率限制使能 像素格式 曝光时间 数据包大小&#xff08;网口&#xff09; 相机默认参数 ADC位深 系统环境设置

opencv创建图片,绘制图片,画框,划线,改变像素点颜色

文章目录 创建空白图片创建一张渐变色彩色绘制多边形绘制多线改变像素点颜色 创建空白图片 bool tool_class::creatEmpty(int width, int height, std::string image_p) {// 创建一个空白图像cv::Mat blankImage(height, width, CV_8UC3, cv::Scalar(255, 255, 255));// 保存图…

CSS3 分页、框大小、弹性盒子

一、CSS3分页&#xff1a; 网站有很多个页面&#xff0c;需要使用分页来为每个页面做导航。示例&#xff1a; <style> ul.pagination { display: inline-block; padding: 0; margin: 0; } ul.pagination li {display: inline;} ul.pagination li a { color: black; f…

给CAD中添加自定义菜单CUIX

本文以AutoCAD2020为例&#xff0c;介绍如何添加自定义菜单。 打开AutoCAD2020&#xff0c;在命令行执行CUI并回车&#xff0c;出现菜单 进入菜单编辑界面 点击传输&#xff0c;然后新建 在菜单上右键&#xff0c;添加自定义菜单 点击保存&#xff0c;即可存为cuix文件。之后…

arduino 简易智能花盆

编辑器&#xff1a;arduino IDE 主板&#xff1a;arduino uno 传感器&#xff1a; 0.96寸的OLED屏&#xff08;四脚&#xff09; 声音模块 土壤温湿度模块 DS18B20温度模块&#xff08;这里用到防水的&#xff09; 光敏电阻模块&#xff08;买成三脚的了只能显示高低&#x…

【uniapp】文件授权验真系统(含代码)

文章目录 前言一、框架选用二、数据库设计三、设计上传列表四、上传操作1.前端2.后端 五、修改操作六、访问操作七、二维码生成八、二维码访问九、删除操作总结 前言 吐槽&#xff1a;终于开通了【资源绑定】的功能了&#xff0c;之前还要一个一个的去贴链接 之前的同学联系…

家居美学:将水离子壁炉融入你的现代装饰

当谈及家居装饰和壁炉选择时&#xff0c;水离子雾化壁炉是一个备受瞩目的话题。水离子雾化壁炉的美学价值&#xff0c;还为室内装饰带来全新的维度。它甚至能够激发室内装饰的灵感。 水离子雾化壁炉是现代美学的标志&#xff0c;融合了简洁、线条清晰的设计。这种壁炉常常采用不…

地区 IP 库

地区 & IP 库 yudao-spring-boot-starter-biz-ip (opens new window)业务组件&#xff0c;提供地区 & IP 库的封装。 #1. 地区 AreaUtils (opens new window)是地区工具类&#xff0c;可以查询中国的省、市、区县&#xff0c;也可以查询国外的国家。 它的数据来自 …