Flink 实时数仓(四)【DWD 层搭建(二)流量域事实表】

前言

        昨天刚搬到新校区,新校区小的可怜,好在之后出去实习交通可以方便点;待在学院太受限了,早点离开!

        今天开始完成 DWD 层剩余的需求,上一节我们把日志数据根据不同类型分流写入到了不同的主题;

1、流量域独立访客事务事实表

        独立访客指的其实就是我们 web 端日志分析指标中常说的 UV,上一节我们已经把页面日志写入到 dwd_page_traffic_log 主题当中了,所以这里我们直接对这个主题进行消费处理;

1.1、实现思路

        既然是独立访客,就必须对日志中的数据做去重(独立访客数一般用来做日活指标,因为我们的机器一般都是 24 小时全年无休的,所以我们实时数仓也可以做这种日级别的指标需求,通过状态来存储历史就可以实现),而怎么判断访客是否重复?这就又用到了 Flink 中的状态编程(状态就是历史);和上一节我们判断新老访客一样,我们这里也可以给每个 mid 维护一个名为 lastVisitDate 的 ValueState(对 mid 进行 keyby),存储上一次访问的日期(注意是日期,只精确到天),每来一条数据就判断它的 lastVisitDate:

  • 如果 lastVisitDate 为 null 或者 不是今天,则保留数据,否则丢弃

一旦进入第二天,lastVisitDate 状态就应该被清空(设置状态 TTL 为 1 天)

此外,对于 0 点的数据我们这里需要明确统计规则:

  • 独立访客数据对应的页面必然是会话起始页面,last_page_id 必为 null;所以对于跨天的访问不能计算在内(昨天到今天访问了多个页面,而今天页面的 last_page_id 必然不为 null),我们需要在消费数据后的第一步就需要进行过滤;

1.2、代码实现 

public class DwdTrafficUniqueVisitorDetail {public static void main(String[] args) throws Exception {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数// 1.1 开启checkpointenv.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次// 1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());// TODO 2. 消费 kafka dwd_traffic_page_log 主题String topic = "dwd_traffic_page_log";String groupId = "uvDetail";DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));// TODO 3. 过滤 last_page_id != Null 的数据// 使用 flatMap 而没用 filter,因为 flatMap 可以把过滤和转json 两步都一起完成SingleOutputStreamOperator<JSONObject> jsonDS = pageDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);// 获取 last_page_idString last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");if (last_page_id == null) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});// TODO 4. 按照 mid 分组KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getJSONObject("common").getString("mid"));// TODO 5. 使用状态编程实现按照 mid 的日期进行去重// 使用富函数,因为富函数提供更多的信息如上下文等SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {private ValueState<String> lastVisitDate = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("lastVisit", String.class);lastVisitDate = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {// 获取状态数据 & 当前数据中的时间并转为日期String lastDate = lastVisitDate.value();Long ts = value.getLong("ts");String curDate = DateFormatUtil.toDate(ts);if (lastDate == null || !lastDate.equals(curDate)) {// 更新状态lastVisitDate.update(curDate);return true;}return false;}});// TODO 6. 数据写入 kafkaString targetTopic = "dwd_traffic_unique_visitor_detail";uvDS.map(data -> data.toJSONString()).addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));// TODO 7. 执行任务env.execute("DwdTrafficUniqueVisitorDetail");}}

1.3、TTL 优化

        上面我们的代码逻辑看起来已经没什么问题了,但是我们可以设想:假设一个用户,2024-01-01 首次登录之后,它的 lastVisitDate 状态会一直存储 2024-01-01,如果他下一次登录是在 2024-12-31,那么期间的 364 天我们依然要一直存储它的状态;而我们判断用户是否已经登录的逻辑是:lastVisitDate 是否为null 或者 lastVisitDate<今天,所以我们完全可以在一天之后把该用户的 lastVisitDate 状态清空,来减少状态的保存开销!

TTL 是给状态描述器设置的,而状态描述器是构造状态对象的必须参数!

TTL 是状态的一个属性,当我们修改状态值的时候,TTL 本身并不会更新!这里,我们需要在状态描述器中设置 TTL 的更新策略为创建或更新状态值的时候就更新 TTL ,重新开始过期倒计时;

我们只需要修改上面第 5 步,在初始化状态时,在状态描述器中给状态添加 TTL 属性:

            @Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("lastVisit", String.class);// 给状态添加 TTLstateDescriptor.enableTimeToLive(new StateTtlConfig.Builder(Time.days(1))// 设置 TTL 可更新,并且在创建或更新状态的时候更新.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());lastVisitDate = getRuntimeContext().getState(stateDescriptor);}

2、流量域用户跳出事务事实表

        跳出的概念:跳出指的是用户在一次会话中只访问了一个页面的情况(注意:粒度是会话),我们在之前做离线数仓的时候做过跳出率的指标,对于离线数仓,我们可以在 DWS 层构建一张流量域近1日会话粒度页面浏览表(dws_traffic_session_page_view_1d),通过下面的 SQL 就可以统计出该指标:

SELECT 
CAST(SUM(IF(page_count=1,1,0))/COUNT(*)) AS DECIMAL(16,2) AS bounce_rate
FROM dws_traffic_session_page_view_1d

        在这里的实时数仓中,我们不可能等到一天结束最后才去计算跳出率;但是我们这里又没有 session_id,所以我们只能换一种思路:

思路1(会话窗口)

  • 使用会话窗口,为每个 mid 开启一个会话窗口并指定间隔为 10 s;一旦到了 10s 触发窗口关闭,计算窗口内的数据条数,> 1 条则说明这次会话没有发生跳出;

        这种思路的问题很明显:① 如果我短时间(10s内)发生多个跳出,但是正好这些跳出都在一个会话,这会导致窗口结束时误以为这不是跳出,毕竟窗口内有多条数据;② 可能我的一次正常的会话,被会话窗口切分到两个不同的会话窗口,结果把一个非跳出访问计算为 2 个跳出访问;

思路2(状态编程)

        在离线数仓中,当我们没有 session_id 时,我们可以一天的数据按照 mid 进行分组,然后根据时间戳字段进行排序,这样来计算一个 session;但是这里是实时数仓,我们不知道什么时候一个 session 会结束,所以我们可以设置一个定时器,定时器时间范围内的数据如果没数据来就视作一个会话结束,触发计算;并结合状态编程,把新会话的首页存入状态

  • 遇到 last_page 为 null 的数据就试着取出状态
    • 如果状态为 null,则该页面是新的会话起始页,开启定时器将数据自身写入状态
    • 如果状态不为 null,说明刚跳出一次,并且在定时器时间范围内又进来一次;这种情况需要将第一条数据(跳出的数据,也就是写入状态中的数据)输出,然后将自身写入状态,定时器依然存在,等时间到了触发计算
  • 如果 last_page 不为 null,则状态中的数据和该条数据都丢弃

这种思路同样存在问题,当数据是乱序的时候一切都就乱套了;

思路3(Flink CEP)

Flink CEP 其实就是使用 状态编程 + within 开窗 来处理这种复杂事件

Flink CEP 定义的规则之间的连续策略

  • 严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。对应方法为 next()
  • 松散连续: 忽略匹配的事件之间的不匹配的事件。对应方法为followedBy();
  • 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。对应方法为followedByAny()

定义模式之前的代码

        这里需要注意:因为我们后面要保证数据有序,所以我们最好指定事件时间的提取字段,并添加水位线设置合理的超时时间(理论上可以保证数据绝对有序):

public class DwdTrafficUserJumpDetail {public static void main(String[] args) {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数// 1.1 开启checkpointenv.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次// 1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());// TODO 2. 消费 kafka dwd_traffic_page_log 主题String topic = "dwd_traffic_page_log";String groupId = "user_jump_detail";DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));// TODO 3. 将数据转为 JSONSingleOutputStreamOperator<JSONObject> jsonDS = pageDS.map(JSON::parseObject);// TODO 4. 提取事件时间 & 按照 mid 分组KeyedStream<JSONObject, String> keyedStream = jsonDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return element.getLong("ts");}})).keyBy(json -> json.getJSONObject("common").getString("mid"));

接下来是核心的定义 CEP 模式的代码:

        // TODO 5. 定义 CEP 模式序列// 泛型方法类型指的是流的类型(下面的 start 和 next 作为提取事件的 key)Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject value) throws Exception {return value.getJSONObject("page").getString("last_page_id") == null;}}).next("next").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject value) throws Exception {return value.getJSONObject("page").getString("last_page_id") == null;}}).within(Time.seconds(10L));// 等价于 循环模式 共用一个 key: start Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject value) throws Exception {return value.getJSONObject("page").getString("last_page_id") == null;}}).times(2) // 默认是宽松近邻 followedBy.consecutive() // 严格近邻 next.within(Time.seconds(10L));// TODO 6. 建模式序列作用到流上PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);// TODO 7. 提取事件(匹配上的时间 和 超时时间)OutputTag<String> timeoutTag = new OutputTag<>("timeout");SingleOutputStreamOperator<String> selectDS = patternStream.select(timeoutTag,// 超时数据new PatternTimeoutFunction<JSONObject, String>() {// 对于超时数据来说,当前的数据第一个规则匹配上了,第二个没有匹配上导致超时,那么我们要提取的就是当前数据(第一个数据,第二个数据没来)// 这里的 Map 的 v 是 List 数据类型,因为考虑到我们可能使用的是循环模式(只有一个key)@Overridepublic String timeout(Map<String, List<JSONObject>> map, long l) throws Exception {return map.get("start").get(0).toJSONString();}}, // 匹配上的数据new PatternSelectFunction<JSONObject, String>() {// 匹配上的数据,我们只要第一个数据,因为只能证明第一个数据是跳出数据@Overridepublic String select(Map<String, List<JSONObject>> map) throws Exception {return map.get("start").get(0).toJSONString();}});DataStream<String> timeoutDS = selectDS.getSideOutput(timeoutTag);// TODO 8. 合并两种事件DataStream<String> unionDS = selectDS.union(timeoutDS);// TODO 9. 合并后的数据写入 kafkaString targetTopic = "dwd_traffic_user_jump_detail";unionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));// TODO 10. 启动任务env.execute("DwdTrafficUserJumpDetail");}
}

上面我们定义了两种匹配规则:

  1. 第一条数据的 last_page_id 为 null ,且超时没有收到第二条数据,认定该条数据为跳出数据
  2. 第二条数据的 last_page_id 为 null ,则认定第一条数据是跳出数据

        超时时间内规则一被满足,未等到第二条数据则会被判定为超时数据。所以我们只要把超时数据和 满足连续两条数据的 last_page_id 均为 null 中的第一条数据 union 起来,得到的即为答案所需数据;

总结

        至此,流量域的三个需求都已经完成;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/392961.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云端医疗解决方案:互联网医院系统的云计算架构与实现

随着云计算技术的成熟和普及&#xff0c;医疗行业开始探索云端解决方案&#xff0c;以应对数据存储、计算能力和系统扩展性等方面的挑战。互联网医院系统作为医疗信息化的重要组成部分&#xff0c;通过云计算架构实现了高效、灵活和可扩展的医疗服务。本文将深入探讨互联网医院…

2024 AI开发者大赛火热进行中!

“iFLYTEK AI 开发者大赛”是由科大讯飞发起&#xff0c;中国信息协会联合主办的人工智能竞赛平台&#xff0c;汇聚产学研各界力量&#xff0c;面向全球开发者发起数据算法及创新应用类挑战&#xff0c;推动人工智能前沿科学研究和创新成果转化&#xff0c;培育人工智能产业人才…

YOLOv10改进 | 主干篇 | YOLOv10引入CVPR2023 顶会论文BiFormer用于主干修改

1. 使用之前用于注意力的BiFormer在这里用于主干修改。 YOLOv10改进 | 注意力篇 | YOLOv10引入BiFormer注意力机制 2. 核心代码 from collections import OrderedDict from functools import partial from typing import Optional, Union import torch import torch.nn as n…

如何评估并选择最佳的国内项目管理软件?

国内外主流的10款国内项目管理软件对比&#xff1a;PingCode、Worktile、Jira 、Basecamp、Trello、Asana 、Wrike、Tower 、禅道、Teambition 。 在选择适合自己企业的项目管理软件时&#xff0c;很多人会感到无从下手&#xff0c;担心无法找到既符合预算又能满足团队需求的解…

上网防泄密,这些雷区不要碰!九招教你如何防泄密

李明&#xff1a;“最近看到不少关于信息泄露的新闻&#xff0c;真是让人担忧。咱们在工作中&#xff0c;稍有不慎就可能触碰到泄密的雷区啊。” 王芳&#xff1a;“确实&#xff0c;网络安全无小事。尤其是我们这种经常需要处理敏感信息的岗位&#xff0c;更得小心谨慎。那你…

一行代码教你使用Python制作炫酷二维码

二维码&#xff0c;我们日常生活中随处可见的编码方式&#xff0c;凭借其方便快捷的信息承载能力&#xff0c;已经渗透到各行各业。 MyQR 的介绍 MyQR 是一个 Python 库&#xff0c;用于生成自定义二维码&#xff0c;包括带有 Logo、彩色和动态的二维码。它基于 Python 的 qr…

书生大模型实战营第三期——入门岛——Git基础知识

第三关&#xff1a;Git基础知识 任务如下&#xff1a; 任务描述 破冰活动&#xff1a;自我介绍 每位参与者提交一份自我介绍。 提交地址&#xff1a;GitHub - InternLM/Tutorial: LLM&VLM Tutorial 的 camp3 分支&#xff5e;实践项目&#xff1a;构建个人项目 创建一个个人…

电脑硬盘坏了数据可以恢复吗?如何恢复硬盘数据?

电脑硬盘坏了数据可以恢复吗&#xff1f;对于这种问题&#xff0c;还需要具体问题具体分析的&#xff0c;一般是可以恢复。 硬盘损坏可以分为物理损坏和逻辑损坏两种情况&#xff1a; 1.逻辑损坏 这通常是由于软件问题&#xff0c;如文件系统错误、病毒攻击、误删除、格式化等…

未发先火,Smartbi AIChat频频“出圈”

近日&#xff0c;思迈特正式官宣&#xff0c;将于8月8日线上新品发布会上推出自研的全新AI应用——Smartbi AIChat&#xff0c;这款应用在还未正式推向市场前&#xff0c;已获得媒体、分析机构等多方关注&#xff0c;热度飙升&#xff0c;思迈特软件及其新品再一次成为业界内外…

社交媒体分享预览图片和内容修改

在facebook发帖分享链接时&#xff0c;设置预览图片和内容 设置预览图片和内容 <head> <meta name"description" content"我是内容" /> </head> <body><img src"./1.jpg" alt"SEO Image" style"dis…

VSCode在windows系统下使用conda虚拟环境配置

如何解决CondaError: Run ‘conda init‘ before ‘conda activate‘_condaerror: run conda init before conda activat-CSDN博客 首先检查自己的anaconda是否是添加到整个的环境变量里了 打开cmd如果conda和python都能够识别那么就是配置成功了 然后看插件是否安装&#xf…

SQL注入实例(sqli-labs/less-9)

0、初始页面 1、爆库名 使用python脚本 def inject_database1(url):name for i in range(1, 20):low 32high 128mid (low high) // 2while low < high:payload "1 and if(ascii(substr(database(),%d,1)) > %d ,sleep(2),0)-- " % (i, mid)res {"…

Linux进程概念

目录 一.冯诺依曼体系 为什么程序运行会加载到内存 二.进程概念 1.进程控制块PCB 2.进程标识符 使用ps命令 使用pgrep命令 使用系统调用 3.进程状态 孤儿进程 守护进程(精灵进程) 4.进程优先级 三.环境变量 一.冯诺依曼体系 数据在设备之间的传输实质是数据的来回拷…

【Qt】项目代码

main.cpp文件 argc&#xff1a;命令行参数个数。*argv[ ]&#xff1a;每一个命令行参数的内容。main的形参就是命令行参数。QApplication a(argc, argv) 编写一个Qt的图形化界面程序&#xff0c;一定需要QApplication对象。 widget w; 在创建项目的时候&#xff0c;勾选widg…

mysql源码编译启动debug

对于没有C语言基础的同学来说&#xff0c;想看看源码&#xff0c;在搞定编辑器做debug的时候就被劝退了&#xff0c;发生点啥了&#xff0c;完全看不懂&#xff0c;不知道从哪里入手去做debug&#xff1b;我为了看看 mysql 的 insert buffer 到底存的是索引页还是数据页&#x…

ViT和SwinTransformer详解

ViT是Google brain发表于ICLR21上的工作&#xff0c;开创性将transformer用在vision领域&#xff0c;且图像识别性能超CNN&#xff0c;至今引用3.8w&#xff1b;原文&#xff1a;https://arxiv.org/pdf/2010.11929 SwinTransformer是微软亚洲研究院发表于ICCV21上&#xff0c;…

双回路校园智能电表是什么?什么叫双回路校园智能电表?

在智慧校园的建设浪潮中&#xff0c;双回路校园智能电表作为一种创新的能源计量与管理解决方案&#xff0c;正逐渐成为校园电力系统改造与升级的关键要素。本文旨在深入探讨双回路校园智能电表的概念、工作原理、核心优势及其在校园能源管理中的应用实践。 一、定义与工作原理…

Harbor 仓库一键安装

文章目录 一、场景说明二、脚本职责三、参数说明四、操作示例五、注意事项 一、场景说明 本自动化脚本旨在为提高研发、测试、运维快速部署应用环境而编写。 脚本遵循拿来即用的原则快速完成 CentOS 系统各应用环境部署工作。 统一研发、测试、生产环境的部署模式、部署结构、…

一文理清生产管理的“4管”和“8理”!

一提到生产管理&#xff0c;很多人的第一反应可能是车间里忙碌的身影、流水线上飞速运转的机器&#xff0c;还有一张张密密麻麻的生产计划表。但实际上&#xff0c;生产管理远不止于此。 “科学管理之父”弗雷德里克温斯洛泰勒认为&#xff1a;管理就是确切地知道你要别人干什…

CompletableFuture详解

CompletableFuture详解 学习链接:https://juejin.cn/post/7124124854747398175?searchId20240806151438B643DF2AAD2FC5E6F11E 一、CompletableFuture简介 在JAVA8开始引入了全新的CompletableFuture类&#xff0c;它是Future接口的一个实现类。也就是在Future接口的基础上&a…