Flink电商实时数仓(六)

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//核心业务逻辑//1. 读取TopicDB主题数据createTopicDb(groupId,tableEnv);//2. 筛选支付成功的数据,从业务数据topic_db中filterPaymentTable(tableEnv);//3. 读取下单详情表数据, 从kafka读取数据createOrderDetailTable(tableEnv, groupId);//4. 创建base.dic字典表,从HBase维度数据中读取createBaseDic(tableEnv);//tableEnv.executeSql("select * from order_detail").print();//tableEnv.executeSql("select * from base_dic").print();//tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");//5. 使用interval join 完成支付成功流和订单详情数据关联intervalJoin(tableEnv);//6. 使用lookup join完成维度退化Table resultTable = lookupJoin(tableEnv);//7. 创建upsert kafka连接器写出createKafkaSink(tableEnv);resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();}

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//核心业务逻辑//1. 读取topic_db数据//stream.print();//2. 清洗过滤和转换, jsonObjStream是主流数据SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);//jsonObjStream.print();//3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);//tableProcessDwd.print();4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);//5. 连接主流和广播流,对主流数据进行判断是否需要保留SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);//processStream.print();//6. 筛选最后需要写出的字段SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);//7. 通过sink_table的表名来动态写出到对应kafka主题//在setRecordSerializer()设置dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());}

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/224634.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LED驱动电源

LED驱动电源 常用电子元器件 TB62726AFG LED SOP-24 文章目录 LED驱动电源前言一、LED驱动电源是什么二、TB62726AFG LED SOP-24总结 前言 LED驱动电源可以根据应用需求采用不同的输入和输出电源类型、电源转换拓扑、调光方式等。常见的LED驱动电源类型包括恒流驱动电源、恒…

c# OpenCvSharp 检测(斑点检测、边缘检测、轮廓检测)(五)

在C#中使用OpenCV进行图像处理时&#xff0c;可以使用不同的算法和函数来实现斑点检测、边缘检测和轮廓检测。 斑点检测边缘检测轮廓检测 一、斑点检测&#xff08;Blob&#xff09; 斑点检测是指在图像中找到明亮或暗的小区域&#xff08;通常表示为斑点&#xff09;&#…

数据智慧:C#中编程实现自定义计算的Excel数据透视表

前言 数据透视表&#xff08;Pivot Table&#xff09;是一种数据分析工具&#xff0c;通常用于对大量数据进行汇总、分析和展示。它可以帮助用户从原始数据中提取关键信息、发现模式和趋势&#xff0c;并以可视化的方式呈现。 在数据透视表中&#xff0c;数据分析师通常希望进…

doris数据模型,06-Aggregate(聚合模型)

聚合模型的特点 将表中的列分为Key和Value。 Key是数据的维度列&#xff0c;比如时间&#xff0c;地区等等。key相同时会发生聚合。 Value是数据的指标列&#xff0c;比如点击量&#xff0c;花费等等。 每个指标列还会有自己的聚合函数&#xff0c;如&#xff1a;sum&#xff…

【开源】基于JAVA语言的企业项目合同信息系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 合同审批模块2.3 合同签订模块2.4 合同预警模块2.5 数据可视化模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 合同审批表3.2.2 合同签订表3.2.3 合同预警表 四、系统展示五、核心代码5.1 查询合同…

geyser互通服基岩版进不去

Java版需要在服务器安全组开通TCP端口&#xff08;如果有宝塔&#xff0c;也需要开通&#xff09; geyser下载好的安装运行也需要开通端口&#xff0c;但是它是UDP的&#xff08;但是我同时也开启了TCP&#xff0c;可能不需要&#xff1f; Java 版玩家隧道 Java 版玩家使用 T…

使用教程之【SkyWant.[2304]】路由器操作系统,破解移动【Netkeeper】校园网【小白篇】

许多高校目前饱受Netkeeper认证的痛苦&#xff0c;普通路由器无法使用&#xff0c; 教你利用SkyWant的Netkeeper认证软件来使你的SkyWant路由器顺利认证上网&#xff0c;全宿舍又可以合作共赢了&#xff01; 步骤一&#xff1a;正确连接网线&#xff0c;插电开机 正确连接网…

<meta name=“Keywords“ content=““ >、<meta name=“Description“ content=““ > 等用法解释

今天在看网站代码&#xff0c;发现类似<meta name"Keywords" content"" >、<meta name"Description" content"" >这样的写法&#xff0c;不知道具体代表什么意思&#xff0c;于是上网搜了一下&#xff0c;下面是在网上找到…

计算机组成原理综合6

补码表示&#xff1a; X&#xff1a;1111 1111 1111 1101 Y&#xff1a;1111 1111 1101 1111 Z&#xff1a;0111 1111 1111 1100 转原码表示&#xff1a;从右往左找第一个“1”&#xff0c;左边的所有数值位按位取反 X&#xff1a;1111 1111 1111 1101 1000 0000 00…

学Java的第二天

一、常量 1.值不可以变化的量。 2. 分类&#xff1a; 字符串常量 用双引号括起来的多个字符&#xff0c;可以包含 0、1 或多个&#xff0c;例如 "a" 、 "abc" 、 " 中国 " 整数常量&#xff0c;例如&#xff1a; -10 、 0 、 88 小数常量&…

LeetCode-回文链表(234)

题目描述&#xff1a; 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 因为这一题是受到876题求链表中间节点的启发&#xff0c;所以在这里也加一下。 876.链表的中间结点…

泛微OA xmlrpcServlet接口任意文件读取漏洞(CNVD-2022-43245)

CNVD-2022-43245 泛微e-cology XmlRpcServlet接口处存在任意文件读取漏洞&#xff0c;攻击者可利用漏洞获取敏感信息。 1.漏洞级别 中危 2.影响范围 e-office < 9.5 202201133.漏洞搜索 fofa 搜索 app"泛微-OA&#xff08;e-cology&#xff09;"4.漏洞复现 …

快速安装方式安装开源OpenSIPS和CP控制界面

OpenSIPS是目前世界上主流的两个SIP软交换引擎(其中另外一个是kamailio)或者SIP信令服务器&#xff08;个人认为是比较正确的称谓&#xff09;。关于Opensips的基础和一些参数配置和安装方式笔者在很久以前的历史文档中有非常多的介绍。最近&#xff0c;很多用户使用OpenSIPS软…

Settings中电池选项-Android13

Settings中电池选项-Android13 1、设置中界面2、电池计算2.1 充电时间计算2.1.1 BatteryUsageStats获取2.1.2 BatteryStatsImpl计算 2.2 电池剩余使用时间2.2.1 Estimate获取2.2.2 BatteryStatsImpl计算 3、电池信息来源4、命令模拟* 日志 [电池]Android 9.0 电池未充电与充电字…

[内功修炼]函数栈帧的创建与销毁

文章目录 1:什么是函数栈帧2:理解函数栈帧能解决什么问题呢3:函数栈帧的创建与销毁的解析3.1:什么是栈3.2:认识相关寄存器与汇编指令相关寄存器相关汇编指令 3.3 解析函数栈帧的创建和销毁3.3.1 预备知识3.3.2 详细解析一:调用main函数,为main函数开辟函数栈帧First:push前push…

华为云Windows Server服务器下,Node使用pm2-logrotate分割pm2日志,解决pm2日志内存占用过高的问题。

一、简介 PM2 是一个守护进程管理器&#xff0c;它将帮助您管理和保持您的应用程序在线。PM2 入门很简单&#xff0c;它以简单直观的 CLI 形式提供&#xff0c;可通过 NPM 安装。官网地址&#xff1a;https://pm2.keymetrics.io/ 二、问题&#xff1a;pm2日志内存占用过高&am…

12.25

led.c #include "led.h" void all_led_init() {RCC_GPIO | (0X3<<4);//时钟使能GPIOE_MODER &(~(0X3<<20));//设置PE10输出GPIOE_MODER | (0X1<<20);//设置PE10为推挽输出GPIOE_OTYPER &(~(0x1<<10));//PE10为低速输出GPIOE_OSPEED…

ElasticSearch之RestClient笔记

1. ElasticSearch 1.1 倒排索引 1.2 ElasticSearch和Mysql对比 1.3 RestClient操作 导入依赖 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.15.…

linux:下载、网络请求、端口

一&#xff1a;ping命令 可以通过ping命令,检查指定的网络服务器是否是可联通状态 语法: ping [-c num] ip或主机名 1、选项&#xff1a;-c,检查的次数&#xff0c;不使用-c选项&#xff0c;将无限次数持续检查 2、参数&#xff1a;ip或主机名&#xff0c;被检查的服务器的…

时序预测 | Matlab实现SSA-CNN-BiLSTM麻雀算法优化卷积双向长短期记忆神经网络时间序列预测

时序预测 | Matlab实现SSA-CNN-BiLSTM麻雀算法优化卷积双向长短期记忆神经网络时间序列预测 目录 时序预测 | Matlab实现SSA-CNN-BiLSTM麻雀算法优化卷积双向长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现SSA-CNN-BiLSTM麻雀算…