大数据-268 实时数仓 - ODS层 将 Kafka 中的维度表写入 DIM

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)

章节内容

  • ODS
  • Lambda架构
  • Kappa架构

在这里插入图片描述

基本介绍

在 Kafka 中写入维度表(DIM)通常涉及将实时或批处理数据从 Kafka 主题(Topic)读取,并根据数据流中的信息更新维度表(DIM),这在数据仓库或数据湖的 ETL(提取、转换、加载)过程中非常常见。维度表(DIM)存储的是与业务数据相关的维度信息,例如客户、产品、地理位置等,用于支持 OLAP(联机分析处理)查询。

理解 Kafka 数据流

Kafka 是一个分布式流平台,用于高吞吐量的消息传递。在 ETL 过程中,Kafka 通常用作数据的消息队列或者流处理的来源。每当新数据生成时,它会被发布到 Kafka 中的某个主题(Topic),然后消费者(Consumer)可以从主题中获取数据进行处理。

设计维度表(DIM)

维度表通常包含业务实体的详细信息,如产品名称、客户信息、时间维度等。与事实表(Fact)不同,维度表的数据较为静态,但可能会随着时间更新(例如,客户地址变更或产品类别更新)。每个维度表通常有一个唯一的主键(如 customer_id 或 product_id)来标识记录。

Kafka 消费者(Consumer)

为了从 Kafka 中读取维度数据,需要创建一个消费者(Consumer),它会从 Kafka 的某个主题(Topic)中读取消息。这些消息通常是 JSON 格式,包含需要写入维度表的信息。消费者将从 Kafka 主题中获取数据,可能包括以下步骤:

  • 连接到 Kafka 集群。
  • 订阅一个或多个主题(Topics)。
  • 消费消息并将其传递给后续的处理逻辑。
  • 消费者的实现可以使用 Kafka 提供的客户端库,例如 Kafka 的 Java 客户端、Python 的 confluent-kafka 等。

数据处理和转换

在读取到 Kafka 消息后,消费者需要对数据进行必要的处理和转换。对于维度数据,处理逻辑可能包括:

  • 数据解析:将消息从 Kafka 中的格式(例如 JSON)解析成结构化数据。
  • 校验数据:检查数据是否符合业务规则,是否完整,是否有效。
  • 维度数据更新:如果 Kafka 中的消息包含的维度信息已经存在,则更新相关记录;如果是新维度,则插入新记录。

维度表的更新

维度表的更新通常有两种常见的方式:

  • 全量更新:每次从 Kafka 获取到新的数据时,都将其覆盖到维度表中。这种方式适用于数据变动较少或者可以接受重写的场景。
  • 增量更新:根据时间戳、有效性标志或版本号等信息,更新已有的维度记录。这种方式适用于数据会有更新(如地址或状态变更)的场
    景。

增量更新时,通常会执行以下操作:

  • 查找是否已有该维度记录(例如通过 dimension_id)。
  • 如果存在且数据发生变化,则更新该记录,同时更新 valid_to 时间,并插入一条新的记录,设置 valid_from 和 valid_to 时间。
  • 如果不存在该记录,则直接插入新的维度数据。

写入到目标存储(DIM)

在数据处理后,需要将更新后的维度数据写入目标存储。这通常是一个数据库(例如 MySQL、PostgreSQL 或 NoSQL 数据库)或数据仓库(例如 Snowflake、Google BigQuery、Redshift)中的维度表(DIM)。

数据存储更新(事务性考虑)

对于维度表的更新,通常需要确保数据的一致性。可以使用事务来确保数据在更新过程中的一致性,防止数据丢失或重复。例如,可以在事务中执行所有的更新和插入操作,确保如果操作失败,可以回滚。

TableObject

创建样例 TableObject

case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable

AreaInfo

case class AreaInfo(id: String,name: String,pid: String,sname: String,level: String,citycode: String,yzcode: String,mername: String,Lng: String,Lat: String,pinyin: String)

DataInfo

case class DataInfo(modifiedTime: String,orderNo: String,isPay: String,orderId: String,tradeSrc: String,payTime: String,productMoney: String,totalMoney: String,dataFlag: String,userId: String,areaId: String,createTime: String,payMethod: String,isRefund: String,tradeType: String,status: String
)

ConnHBase

class ConnHBase {def connToHbase:Connection ={val conf : Configuration = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum","h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")conf.set("hbase.zookeeper.property.clientPort","2181")conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)val connection = ConnectionFactory.createConnection(conf)connection}
}

SinkHBase

class SinkHBase extends RichSinkFunction[util.ArrayList[TableObject]] {var connection : Connection = _var hbTable : Table = _override def open(parameters: Configuration): Unit = {connection = new ConnHBase().connToHbasehbTable = connection.getTable(TableName.valueOf("wzk_area"))}override def close(): Unit = {if (hbTable != null) {hbTable.close()}if (connection != null) {connection.close()}}override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = {value.forEach(x => {println(x.toString)val database: String = x.databaseval tableName: String = x.tableNameval typeInfo: String = x.typeInfoif ((database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_trade_orders"))) {if (typeInfo.equalsIgnoreCase("insert")) {value.forEach(x => {val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])insertTradeOrders(hbTable, info)})} else if (typeInfo.equalsIgnoreCase("update")) {} else if (typeInfo.equalsIgnoreCase("delete")) {}}if (database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_area")) {if (typeInfo.equalsIgnoreCase("insert")) {value.forEach(x => {val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])insertArea(hbTable, info)})} else if (typeInfo.equalsIgnoreCase("update")) {value.forEach(x => {val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])insertArea(hbTable, info)})} else if (typeInfo.equalsIgnoreCase("delete")) {value.forEach(x => {val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])deleteArea(hbTable, info)})}}})}def insertTradeOrders(hbTable: Table, dataInfo: DataInfo): Unit = {val tableName = "wzk_trade_orders"val columnFamily = "f1"// 如果表不存在则创建createTableIfNotExists(connection, tableName, columnFamily)val put = new Put(dataInfo.orderId.getBytes)put.addColumn("f1".getBytes, "modifiedTime".getBytes, dataInfo.modifiedTime.getBytes())put.addColumn("f1".getBytes, "orderNo".getBytes, dataInfo.orderNo.getBytes())put.addColumn("f1".getBytes, "isPay".getBytes, dataInfo.isPay.getBytes())put.addColumn("f1".getBytes, "orderId".getBytes, dataInfo.orderId.getBytes())put.addColumn("f1".getBytes, "tradeSrc".getBytes, dataInfo.tradeSrc.getBytes())put.addColumn("f1".getBytes, "payTime".getBytes, dataInfo.payTime.getBytes())put.addColumn("f1".getBytes, "productMoney".getBytes, dataInfo.productMoney.getBytes())put.addColumn("f1".getBytes, "totalMoney".getBytes, dataInfo.totalMoney.getBytes())put.addColumn("f1".getBytes, "dataFlag".getBytes, dataInfo.dataFlag.getBytes())put.addColumn("f1".getBytes, "userId".getBytes, dataInfo.userId.getBytes())put.addColumn("f1".getBytes, "areaId".getBytes, dataInfo.areaId.getBytes())put.addColumn("f1".getBytes, "createTime".getBytes, dataInfo.createTime.getBytes())put.addColumn("f1".getBytes, "payMethod".getBytes, dataInfo.payMethod.getBytes())put.addColumn("f1".getBytes, "isRefund".getBytes, dataInfo.isRefund.getBytes())put.addColumn("f1".getBytes, "tradeType".getBytes, dataInfo.tradeType.getBytes())put.addColumn("f1".getBytes, "status".getBytes, dataInfo.status.getBytes())hbTable.put(put)}def insertArea(hbTable: Table, areaInfo: AreaInfo): Unit = {// val tableName = "wzk_area"// val columnFamily = "f1"// 如果表不存在则创建// createTableIfNotExists(connection, tableName, columnFamily)println(areaInfo.toString)val put = new Put(areaInfo.id.getBytes())put.addColumn("f1".getBytes(), "name".getBytes(), areaInfo.name.getBytes())put.addColumn("f1".getBytes(), "pid".getBytes(), areaInfo.pid.getBytes())put.addColumn("f1".getBytes(), "sname".getBytes(), areaInfo.sname.getBytes())put.addColumn("f1".getBytes(), "level".getBytes(), areaInfo.level.getBytes())put.addColumn("f1".getBytes(), "citycode".getBytes(), areaInfo.citycode.getBytes())put.addColumn("f1".getBytes(), "yzcode".getBytes(), areaInfo.yzcode.getBytes())put.addColumn("f1".getBytes(), "mername".getBytes(), areaInfo.mername.getBytes())put.addColumn("f1".getBytes(), "lng".getBytes(), areaInfo.Lng.getBytes())put.addColumn("f1".getBytes(), "lat".getBytes(), areaInfo.Lat.getBytes())put.addColumn("f1".getBytes(), "pinyin".getBytes(), areaInfo.pinyin.getBytes())hbTable.put(put)}def deleteArea(hbTable: Table, areaInfo: AreaInfo): Unit = {val delete = new Delete(areaInfo.id.getBytes)hbTable.delete(delete)}def createTableIfNotExists(connection: Connection, tableName: String, columnFamily: String): Unit = {val admin = connection.getAdmintry {val table = TableName.valueOf(tableName)// 检查表是否存在if (!admin.tableExists(table)) {val tableDescriptor = new HTableDescriptor(table)val columnDescriptor = new HColumnDescriptor(columnFamily.getBytes())tableDescriptor.addFamily(columnDescriptor)// 创建表admin.createTable(tableDescriptor)println(s"表 $tableName 创建成功")} else {println(s"表 $tableName 已存在")}} finally {admin.close()}}}

SourceKafka

class SourceKafka {def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {val props = new Properties()props.setProperty("bootstrap.servers", "h121.wzk.icu:9092")props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.setProperty("group.id", "hbase-test")props.setProperty("auto.offset.reset", "earliest")new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(), props)}}

KafkaToHBase

object KafkaToHBase {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer = new SourceKafka().getKafkaSource("dwshow")kafkaConsumer.setStartFromLatest()val sourceStream = env.addSource(kafkaConsumer)val mapped: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => {val jsonObj: JSONObject = JSON.parseObject(x)val database: AnyRef = jsonObj.get("database")val table: AnyRef = jsonObj.get("table")val typeInfo: AnyRef = jsonObj.get("type")val objects = new util.ArrayList[TableObject]()jsonObj.getJSONArray("data").forEach(x => {objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))println(x.toString)})objects})mapped.addSink(new SinkHBase)env.execute()}
}

启动项目

我们对表进行修改:
在这里插入图片描述
可以看到控制台对饮输出了内容:
在这里插入图片描述
别的表也尝试修改一下:
在这里插入图片描述
查看 HBase 可以看到数据已经有了:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/503699.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法5--位运算

目录 基础经典例题[面试题 01.01. 判定字符是否唯一](https://leetcode.cn/problems/is-unique-lcci/description/)[268. 丢失的数字](https://leetcode.cn/problems/missing-number/description/)[371. 两整数之和](https://leetcode.cn/problems/sum-of-two-integers/descrip…

基于STM32设计的仓库环境监测与预警系统

目录 项目开发背景设计实现的功能项目硬件模块组成设计思路系统功能总结使用的模块的技术详情介绍总结 1. 项目开发背景 随着工业化和现代化的进程,尤其是在制造业、食品业、医药业等行业,仓库环境的监控和管理成为了至关重要的一环。尤其是在存储易腐…

代码随想录day38 动态规划6

题目:322.零钱兑换 279.完全平方数 139.单词拆分 多重背包 背包总结 需要重做:322,139 322. 零钱兑换 思路:零钱,可取多次-》完全背包。 注意: 五部: 1.dp[j]:价值为j的时候,最…

HackMyVM-Again靶机的测试报告

目录 一、测试环境 1、系统环境 2、使用工具/软件 二、测试目的 三、操作过程 1、信息搜集 2、Getshell 3、提权 四、结论 一、测试环境 1、系统环境 渗透机:kali2021.1(192.168.101.127) 靶 机:Linux(192.168.101.204) 物理机:wi…

UDP_TCP

目录 1. 回顾端口号2. UDP协议2.1 理解报头2.2 UDP的特点2.3 UDP的缓冲区及注意事项 3. TCP协议3.1 报头3.2 流量控制2.3 数据发送模式3.4 捎带应答3.5 URG && 紧急指针3.6 PSH3.7 RES 1. 回顾端口号 在 TCP/IP 协议中,用 “源IP”, “源端口号”…

Android存储方案对比(SharedPreferences 、 MMKV 、 DataStore)

简介:本文介绍了Android开发中常用的键值对存储方案,包括SharedPreferences、MMKV和DataStore,并且对比了它们在性能、并发处理、易用性和稳定性上的特点。通过实际代码示例,帮助开发者根据项目需求选择最适合的存储方案&#xff…

Unity-Mirror网络框架-从入门到精通 总目录

前言 在现代游戏开发中,网络功能日益成为提升游戏体验的关键组成部分。本系列文章将为读者提供对Mirror网络框架的深入了解,涵盖从基础到高级的多个主题。Mirror是一个用于Unity的开源网络框架,专为多人游戏开发设计,它使得开发者…

element输入框及表单元素自定义前缀

如图所示&#xff1a; <el-input class"custom-input" placeholder"请输入" prefix-icon"prefix" v-model"form.name" clearable></el-input> :deep(.custom-input) {.el-input__icon {display: inline-block;width: 40…

现代谱估计的原理及MATLAB仿真(二)(AR模型法、MVDR法、MUSIC法)

现代谱估计的原理及MATLAB仿真AR参数模型法&#xff08;参数模型功率谱估计&#xff09;、MVDR法&#xff08;最小方差无失真响应法&#xff09;、MUSIC法&#xff08;多重信号分类法&#xff09; 文章目录 前言一、AR参数模型1 原理2 MATLAB仿真 二、MVDR法1 原理2 MATLAB仿真…

对话|全年HUD前装将超330万台,疆程技术瞄准人机交互“第一屏”

2024年&#xff0c;在高阶智驾进入快速上车的同时&#xff0c;座舱人机交互也在迎来新的增长点。Chat GPT、AR-HUD、车载投影等新配置都在带来新增量机会。 高工智能汽车研究院监测数据显示&#xff0c;2024年1-10月&#xff0c;中国市场&#xff08;不含进出口&#xff09;乘用…

LabVIEW之树形控件

一、树形控件基本构成 树形控件这个名称非常形象&#xff0c;其如同树一样&#xff0c;是典型的分层结构。树形控件的属性和方法使用非常灵活&#xff0c;树形控件的内容既可以静态编辑&#xff0c;也可以通过编程来动态填充。静态编辑树形控件适用于内容不变的应用场景&#…

springboot 集成 etcd

springboot 集成 etcd 往期内容 ETCD 简介docker部署ETCD 前言 好久不见各位小伙伴们&#xff0c;上两期内容中&#xff0c;我们对于分布式kv存储中间件有了简单的认识&#xff0c;完成了docker-compose 部署etcd集群以及可视化工具 etcd Keeper&#xff0c;既然有了认识&a…

gateway的路径匹配介绍

gateway是一个单独服务。通过网关端口和predicates进行匹配服务 1先看配置。看我注解你就明白了。其实就是/order/**配置机制直接匹配到orderservice服务。 2我试着请求一个路径&#xff0c;请求成功。下面第三步是请求的接口。 3接口。

嵌入式中QT实现文本与线程控制方法

第一:利用QT进行文件读写实现 利用QT进行读写文本的时候进行读写,读取MP3歌词的文本,对这个文件进行读写操作。 实例代码,利用Qfile,对文件进行读写。 //读取对应文件文件,头文件的实现。 #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #incl…

书籍推荐:Kubernetes 修炼手册

这本书是 2020 年出版的&#xff0c;比较新&#xff0c;从 0 到 1 介绍了 k8s 中的相关技术和概念&#xff0c;翻译质量也可以&#xff0c;适合作为初学 k8s 的课外书。 书中比较关键的就是中间那几个章节&#xff0c;基本掌握 k8s 中 Pod、svc、StatefulSet、ConfigMap、Volum…

计算机网络 (29)网络地址转换NAT

前言 网络地址转换&#xff08;Network Address Translation&#xff0c;NAT&#xff09;是计算机网络中的一种重要协议&#xff0c;它主要用于将私有IP地址转换为公共IP地址&#xff0c;以实现内部网络与外部网络之间的通信。 一、基本概念 NAT是一种在局域网&#xff08;LAN&…

三极管工作状态分析

NPN三极管 下面是NPN三极管&#xff08;也称N管&#xff09;的标识和内部结构图&#xff1a; NPN三极管由两个PN结构成&#xff0c;靠近C&#xff08;集电极&#xff09;一侧的PN结称为集电结&#xff1b;靠近E&#xff08;发射极&#xff09;一侧的PN结称为发射结&#xff1…

基于RedHat9部署WordPress+WooCommerce架设购物网站

系统版本信息&#xff1a;Red Hat Enterprise Linux release 9.2 (Plow) WordPress版本信息&#xff1a;wordpress-6.6.2-zh_CN WooCommerce版本信息&#xff1a;woocommerce.9.5.1 环境架构&#xff1a;LNMP&#xff08;RedHat9nginx1.20.1PHP 8.0.27MySQL8.0.30&#xff09; …

【雷达】雷达的分类

文章目录 前言类别性质主要雷达分系统及其现代技术发展国外发展 前言 前言 类别 性质 按作用分类 军用雷达&#xff1a;&#xff08;按载体&#xff09;地面雷达、舰载雷达、机载雷达、星载雷达、 艇载雷达、弹载雷达 民用雷达&#xff1a;交通管制雷达、港口管制雷达、气象雷…

基于RK3568/RK3588大车360度环视影像主动安全行车辅助系统解决方案,支持ADAS/DMS

产品设计初衷 HS-P2-2D是一款针对大车盲区开发的360度全景影像 安全行车辅助系统&#xff0c;通过车身四周安装的超广角像机&#xff0c;经算法合成全景鸟瞰图&#xff0c;通过鸟瞰图&#xff0c;司机非常清楚的看清楚车辆四周情况&#xff0c;大大降低盲区引发的交通事故。 产…