尚硅谷大数据项目《在线教育之实时数仓》笔记005

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第9章 数仓开发之DWD层

P031

P032

P033

P034

P035

P036

P037

P038

P039

P040


第9章 数仓开发之DWD层

P031

DWD层设计要点:

(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。

(2)DWD层表名的命名规范为dwd_数据域_表名

存放事实表,从kafka的topic_log和topic_db中读取需要用到的业务流程相关数据,将业务流程关联起来做成明细数据写回kafka当中。

尚硅谷大数据学科全套教程\3.尚硅谷大数据学科--项目实战\尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

在线教育实时业务总线矩阵.xlsx

9.1.3 图解

P032

package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DateFormatUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @author * @create 2023-04-21 14:01*/
public class BaseLogApp {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 从kafka中读取主流数据String topicName = "topic_log";String groupId = "base_log_app";DataStreamSource<String> baseLogSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId),WatermarkStrategy.noWatermarks(),"base_log_source");//TODO 3 对数据进行清洗转换// 3.1 定义侧输出流OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") {};// 3.2 清洗转换SingleOutputStreamOperator<JSONObject> cleanedStream = baseLogSource.process(new ProcessFunction<String, JSONObject>() {@Overridepublic void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);out.collect(jsonObject);} catch (Exception e) {ctx.output(dirtyStreamTag, value);}}});// 3.3 将脏数据写出到kafka对应的主题SideOutputDataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);String dirtyTopicName = "dirty_data";dirtyStream.sinkTo(KafkaUtil.getKafkaProducer(dirtyTopicName, "dirty_trans"));//TODO 4 新老访客标记修复//TODO 5 数据分流//TODO 6 写出到kafka不同的主题//TODO 7 执行任务}
}

P033

KafkaUtil.java

P034

新老访客逻辑介绍

P035

BaseLogApp.java

//TODO 4 新老访客标记修复

[atguigu@node001 log]$ pwd
/opt/module/data_mocker/01-onlineEducation/log
[atguigu@node001 log]$ cat -n 200 app.2023-09-19.log
{"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411}
{"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411
}

P036

BaseLogApp.java

//TODO 5 数据分流

P037

//TODO 6 写出到kafka不同的主题

hadoop、zookeeper、kafka。

  1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic

  2. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic

  3. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic

  4. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic

  5. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic

  6. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
[2023-11-01 14:36:17,581] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 2 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-11-01 14:36:18,710] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 6 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-11-01 14:36:18,720] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] The following subscribed topics are not assigned to any members: [page_topic]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[atguigu@node001 ~]$ f1.sh start-------- 启动 node001 采集flume启动 -------
[atguigu@node001 ~]$ cd /opt/module/data
data/        data_mocker/ datax/       
[atguigu@node001 ~]$ cd /opt/module/data
data/        data_mocker/ datax/       
[atguigu@node001 ~]$ cd /opt/module/data_mocker/
[atguigu@node001 data_mocker]$ cd 01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ ll
总用量 30460
-rw-rw-r-- 1 atguigu atguigu     2223 9月  19 10:43 application.yml
-rw-rw-r-- 1 atguigu atguigu  4057995 7月  25 10:28 edu0222.sql
-rw-rw-r-- 1 atguigu atguigu 27112074 7月  25 10:28 edu2021-mock-2022-06-18.jar
drwxrwxr-x 2 atguigu atguigu     4096 10月 26 14:01 log
-rw-rw-r-- 1 atguigu atguigu     1156 7月  25 10:44 logback.xml
-rw-rw-r-- 1 atguigu atguigu      633 7月  25 10:45 path.json
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar 
SLF4J: Class path contains multiple SLF4J bindings.

P038

9.2 流量域独立访客事务事实表

P039

package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DateFormatUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author yhm* @create 2023-04-21 16:24*/
public class DwdTrafficUniqueVisitorDetail {public static void main(String[] args) throws Exception {// TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);// TODO 2 读取kafka日志主题数据String topicName = "dwd_traffic_page_log";DataStreamSource<String> pageLogStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_unique_visitor_detail"), WatermarkStrategy.noWatermarks(), "unique_visitor_source");// TODO 3 转换结构,过滤last_page_id不为空的数据SingleOutputStreamOperator<JSONObject> firstPageStream = pageLogStream.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String lastPageID = jsonObject.getJSONObject("page").getString("last_page_id");if (lastPageID == null) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});// TODO 4 安装mid分组KeyedStream<JSONObject, String> keyedStream = firstPageStream.keyBy(new KeySelector<JSONObject, String>() {@Overridepublic String getKey(JSONObject value) throws Exception {return value.getJSONObject("common").getString("mid");}});// TODO 5 判断独立访客SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter(new RichFilterFunction<JSONObject>() {ValueState<String> lastVisitDtState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<String> stringValueStateDescriptor = new ValueStateDescriptor<>("last_visit_dt", String.class);// 设置状态的存活时间stringValueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L))// 设置状态的更新模式为创建及写入// 每次重新写入的时候记录时间  到1天删除状态.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());lastVisitDtState = getRuntimeContext().getState(stringValueStateDescriptor);}@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {String visitDt = DateFormatUtil.toDate(jsonObject.getLong("ts"));String lastVisitDt = lastVisitDtState.value();// 对于迟到的数据,last日期会大于visit日期,数据也不要if (lastVisitDt == null || (DateFormatUtil.toTs(lastVisitDt) < DateFormatUtil.toTs(visitDt))) {lastVisitDtState.update(visitDt);return true;}return false;}});// TODO 6 将独立访客数据写出到对应的kafka主题String targetTopic = "dwd_traffic_unique_visitor_detail";SingleOutputStreamOperator<String> sinkStream = filteredStream.map((MapFunction<JSONObject, String>) JSONAware::toJSONString);sinkStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "unique_visitor_trans"));// TODO 7 运行任务env.execute();}
}

P040

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_unique_visitor_detail
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_page_log[atguigu@node001 01-onlineEducation]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/180656.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ajax-axios发送 get请求 或者 发送post请求带有请求体参数

/* axios v0.21.1 | (c) 2020 by Matt Zabriskie */ !function(e,t){"object"typeof exports&&"object"typeof module?module.exportst():"function"typeof define&&define.amd?define([],t):"object"typeof export…

单链表经典算法

移除链表元素 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 思路&#xff1a;&#xff08;1&#xff09;创建三个结构体指针&#xff0c;分别代表一条新链表的头newhead&#xff0c;…

面试10000次依然会问的【ReentrantLock】,你还不会?

引言 在并发编程的世界中&#xff0c;ReentrantLock扮演着至关重要的角色。它是一个实现了重入特性的互斥锁&#xff0c;提供了比synchronized关键字更加灵活的锁定机制。ReentrantLock属于java.util.concurrent.locks包&#xff0c;是Java并发API的一部分。 与传统的synchro…

隐私保护多领域推荐的紧密度共聚类联邦概率偏好分布模型

论文链接 Federated Probabilistic Preference Distribution Modelling with Compactness Co-Clustering for Privacy-Preserving Multi-Domain Recommendation 引言 这篇论文提出的概率偏好分布是通过使用高斯分布来表示用户和项目的偏好。在论文中&#xff0c;作者提出了一…

10.MySQL事务(上)

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 目录 前言&#xff1a; 是什么&#xff1f; 为什么? 怎么做&#xff1f; 前言&#xff1a; 本篇文章将会说明什么是事务&#xff0c;为什么会出现事务&#xff1f;事务是怎么做的&#xff1f; 是什么&#xff1f; 我…

Nginx反向代理和负载均衡

文章目录 前言一、Nginx介绍二、Nginx的作用1.正向代理2.反向代理3.负载均衡之轮询4.负载均衡之加权轮询 三、Nginx安装1.Nginx下载2.启动Nginx3.检查Nginx是否启动成功4.配置监听5.关闭nginx 前言 比如公司项目刚刚上线的时候&#xff0c;并发量小&#xff0c;用户使用的少&a…

ElementuiPlus的table组件实现行拖动与列拖动

借助了插件sortablejs。这种方法只适合做非树状table。如果想实现树状table&#xff0c;并且可拖动。可以试一下aggridVue3这个插件 <template><div class"draggable" style"padding: 20px"><el-table row-key"id" :data"t…

计算机组成与结构-计算机体系结构

计算机体系结构 指令系统 Flynn分类法 SISD&#xff08;单指令流单数据流&#xff09; 结构 控制部分&#xff1a;一个处理器&#xff1a;一个主存模块&#xff1a;一个 代表 单处理器系统 SIMD&#xff08;单指令流多数据流&#xff09; 结构 控制部分&#xff1a;一个处理…

CleanMyMac2024破解版如何下载?

CleanMyMac作为一款专业的苹果电脑清理软件&#xff0c;它不仅仅能单纯的卸载不用、少用的应用&#xff0c;同时还支持&#xff1a;1、清理应用程序的数据文件&#xff0c;将应用重置回初始状态&#xff0c;减少空间占用&#xff1b;2、自动检查应用更新&#xff0c;保持应用的…

Python画图之动态爱心

Python画出动态爱心&#xff08;有趣小游戏&#xff09; 一、效果图二、Python代码 一、效果图 二、Python代码 import random from math import sin, cos, pi, log from tkinter import *CANVAS_WIDTH 640 # 画布的宽 CANVAS_HEIGHT 480 # 画布的高 CANVAS_CENTER_X CANV…

【PID专题】MATLAB如何实现PID?

MATLAB是一种非常强大的工具&#xff0c;用于实现和分析PID&#xff08;比例-积分-微分&#xff09;控制器。在MATLAB中&#xff0c;您可以使用控制系统工具箱来设计、模拟和调整PID控制系统。 以下是一般步骤&#xff0c;演示如何在MATLAB中实现PID控制&#xff1a; 1. 打开MA…

PHP进销存ERP系统源码

PHP进销存ERP系统源码 系统介绍&#xff1a; 扫描入库库存预警仓库管理商品管理供应商管理。 1、电脑端手机端&#xff0c;手机实时共享&#xff0c;手机端一目了然。 2、多商户Saas营销版 无限开商户&#xff0c;用户前端自行注册&#xff0c;后台管理员审核开通 3、管理…

【服务器】Java连接redis及使用Java操作redis、使用场景

一、Java连接redis-No-SQL 1、导入依赖 在你的项目里面导入redis的pom依赖 <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency> 2、连接redis 连接redis //…

Redis-使用java代码操作Redis

&#x1f3c5;我是默&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; ​ &#x1f31f;在这里&#xff0c;我要推荐给大家我的专栏《Linux》。&#x1f3af;&#x1f3af; &#x1f680;无论你是编程小白&#xff0c;还是有一定基础的程序员&#xff0c;这…

HarmonyOS UI 开发

引言 HarmonyOS 提供了强大的 UI 开发工具和组件&#xff0c;使开发者能够创建吸引人的用户界面。本章将详细介绍在 HarmonyOS 中应用 JS、CSS、HTML&#xff0c;HarmonyOS 的 UI 组件以及如何自定义 UI 组件。 目录 JS、CSS、HTML 在 HarmonyOS 中的应用HarmonyOS 的 UI 组…

【STM32】基于HAL库建立自己的低功耗模式配置库(STM32L4系列低功耗所有配置汇总)

【STM32】基于HAL库建立自己的低功耗模式配置库&#xff08;STM32L4系列低功耗所有配置汇总&#xff09; 文章目录 低功耗模式&#xff08;此章节可直接跳过&#xff09;低功耗模式简介睡眠模式停止模式待机模式 建立自己的低功耗模式配置库通过结构体的方式来进行传参RTC配置…

将Bean注入Spring容器的五种方式

将bean放入Spring容器中有哪些方式&#xff1f; 我们知道平时在开发中使用Spring的时候&#xff0c;都是将对象交由Spring去管理&#xff0c;那么将一个对象加入到Spring容器中&#xff0c;有哪些方式呢&#xff0c;下面我就来总结一下 1、Configuration Bean 这种方式其实也是…

数据结构 | 单链表专题【详解】

数据结构 | 单链表专题【详解】 文章目录 数据结构 | 单链表专题【详解】链表的概念及结构单链表的实现头文件打印尾插头插尾删头删查找在指定位置之前插入数据在指定位置之后插入数据删除pos节点删除pos之后的节点销毁链表 顺序表遗留下来的问题 中间/头部的插⼊删除&#xff…

vue3后台管理系统之实现分页功能

例子&#xff1a;用户 请求格式 返回数据类型 {"code": 200,"message": "获取所有用户成功","total": 19,"totalPages": 2,"currentPage": 1,"data": [{"id": 1,"username": &qu…

uniapp小程序九宫格抽奖

定义好奖品下标&#xff0c;计时器开始抽奖&#xff0c;请求接口&#xff0c;出现中奖奖品之后&#xff0c;获取中奖商品对应的奖品下标&#xff0c;再次计时器判断当前移动的小标是否为中奖商品的下标&#xff0c;并且是否转到3圈&#xff08;防止转1圈就停止&#xff09;&…