KDP数据分析实战:从0到1完成数据实时采集处理到可视化

智领云自主研发的开源轻量级Kubernetes数据平台,即Kubernetes Data Platform (简称KDP),能够为用户提供在Kubernetes上的一站式云原生数据集成与开发平台。在最新的v1.1.0版本中,用户可借助 KDP 平台上开箱即用的 Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset 等开源组件快速搭建实时、半实时或批量采集、处理、分析的数据流水线以及可视化报表展示,可视化展示效果如下:

247984561aa6a7370c0c0741a330aded.png

以下我们将介绍一个实时订单数据流水线从数据采集到数据处理,最后到可视化展示的详细建设流程。

 1.流水线设计

借助 KDP 平台的开源组件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成数据实时采集处理及可视化分析,架构如下: 

8ea91c86309be540823ed486fe2b0dce.jpeg

1.1 数据流

  • 直接使用Flink构建实时数仓,由Flink进行清洗加工转换和聚合汇总,将各层结果集写入Kafka中;

  • ClickHouse从Kafka分别订阅各层数据,将各层数据持久化到ClickHouse中,用于之后的查询分析。

1.2 数据表

本次分析数据基于mock数据,包含数据实时采集处理及可视化分析:

  • 消费者表:customers

字段

字段说明

id

用户ID

name

姓名

age

年龄

gender

性别

  • 订单表:orders

字段

字段说明

order_id

订单ID

order_revenue

订单金额

order_region

下单地区

customer_id

用户ID

create_time

下单时间

1.3 环境说明

在 KDP 页面安装如下组件并完成组件的 QuickStart:

  • MySQL: 实时数据数据源及 Superset/Airflow 元数据库,安装时需要开启binlog

  • Kafka: 数据采集sink

  • Flink: 数据采集及数据处理

  • ClickHouse: 数据存储

  • Superset: 数据可视化

  • Airflow: 作业调度

2. 数据集成与处理

文中使用的账号密码信息请根据实际集群配置进行修改。

2.1 创建MySQL表

2.2 创建 Kafka Topic

进入Kafka broker pod,执行命令创建 Topic,也可以通过Kafka manager 页面创建,以下为进入pod并通过命令行创建的示例:

export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" bin/kafka-topics.sh --create \--topic ods-order \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP bin/kafka-topics.sh --create \--topic ods-customers \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dwd-order-customer-valid \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dws-agg-by-region \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP

2.3 创建 ClickHouse 表

进入clickhouse pod,使用`clickhouse-client`执行命令创建表,以下为建表语句:

CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dwd-order-customer-valid',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECTorder_id,order_revenue,order_region,create_time,customer_id,customer_age,customer_name,customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;CREATE TABLE kdp_demo.kafka_dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dws-agg-by-region',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECTorder_region,order_cnt,order_total_revenue
FROM kdp_demo.kafka_dws_agg_by_region;

2.4 创建 Flink SQL 作业

2.4.1 SQL部分

CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(`order_id` INT NOT NULL,`order_revenue` FLOAT NOT NULL,`order_region` STRING NOT NULL,`customer_id` INT NOT NULL,`create_time` TIMESTAMP,PRIMARY KEY(`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'orders'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (`id` INT NOT NULL,`age` FLOAT NOT NULL,`name` STRING NOT NULL,`gender` STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'customers'
);-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (`order_id` INT,`order_revenue` FLOAT,`order_region` VARCHAR(40),`customer_id` INT,`create_time` TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-order','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`gender` STRING,PRIMARY KEY (customer_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-customers','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (`order_id` INT,`order_revenue` FLOAT,`order_region` STRING,`create_time` TIMESTAMP,`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`customer_gender` STRING,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dwd-order-customer-valid','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (`order_region` VARCHAR(40),`order_cnt` BIGINT,`order_total_revenue` FLOAT,PRIMARY KEY (order_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dws-agg-by-region','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTOdwd_order_customer_valid
SELECTo.order_id,o.order_revenue,o.order_region,o.create_time,c.id as customer_id,c.age as customer_age,c.name as customer_name,c.gender as customer_gender
FROMcustomers_src cJOIN orders_src o ON c.id = o.customer_id
WHEREc.id <> -1;
INSERT INTOdws_agg_by_region
SELECTorder_region,count(*) as order_cnt,sum(order_revenue) as order_total_revenue
FROMdwd_order_customer_valid
GROUP BYorder_region;
-- END;

2.4.2 使用 StreamPark 创建 Flink SQL 作业

具体使用参考 StreamPark 文档。

maven 依赖:

<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version>
</dependency>

2.5 创建 Airflow DAG

2.5.1 DAG 文件部分

import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_agodefault_args = {'owner': 'admin','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,
}dag = DAG('kdp_demo_order_data_insert',description='Insert into orders by using random data',schedule_interval=timedelta(minutes=1),start_date=days_ago(1),catchup=False,tags=['kdp-example'],
)# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "广州", "深圳", "成都", "杭州", "重庆", "武汉", "西安", "苏州", "天津", "南京", "郑州","长沙", "东莞", "青岛", "宁波", "沈阳", "昆明", "合肥", "大连", "厦门", "哈尔滨", "福州", "济南", "温州","佛山", "南昌", "长春", "贵阳", "南宁", "金华", "石家庄", "常州", "泉州", "南通", "太原", "徐州", "嘉兴","乌鲁木齐", "惠州", "珠海", "扬州", "兰州", "烟台", "汕头", "潍坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入数据的 BashOperator
insert_data_orders = BashOperator(task_id='insert_data_orders',bash_command=f'''mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"''',dag=dag,
)
insert_data_orders

2.5.2 DAG 说明及执行

当前Airflow安装时,需要指定可访问的git 仓库地址,因此需要将 Airflow DAG 提交到 Git 仓库中。每分钟向orders表插入一条数据。

2.6 数据验证

使用ClickHouse验证数据:

(1)进入ClickHouse客户端

clickhouse-client 
# default pass: ckdba.123

(2)执行查询

SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT count(*) FROM kdp_demo.dwd_order_customer_valid;

(3)对比验证MySQL中数据是否一致

select count(*) from kdp_demo.orders;

3. 数据可视化

在2.6中数据验证通过后,可以通过Superset进行数据可视化展示。使用账号`admin/admin`登录Superset页面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io

3.1 创建图表

导入我们制作好的图表:

  1. 下载面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip

  2. 导入面板

(1)选择下载的文件导入

eed49ffb69952693ad5a46da6be81f08.png

(2)输入 ClickHouse 的用户`default`的默认密码`ckdba.123`:

4c07d54c7ad0f2f66085481d5bfe77ab.png

3.2 效果展示

最终的实时订单数据图表展示如下,随着订单数据的更新,图表中的数据也会实时更新:

57bd6fa097e66e1da7dc8aa103a599b4.png

快速体验

🚀GitHub项目:

https://github.com/linktimecloud/kubernetes-data-platform

欢迎您参与开源社区的建设🤝

 - FIN -       

1ad0c68fe5ea3d59a392d306012eef0b.png

更多精彩推

  • 我们开源啦!一键部署免费使用!Kubernetes上直接运行大数据平台!

  • 开源 KDP  v1.1.0 版本正式发布,新增数据集成开发应用场景

  • 在 KubeSphere 上快速安装和使用 KDP 云原生数据平台

  • 在 Rancher 上快速安装和使用 KDP 云原生数据平台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/370874.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLMs之gptpdf:gptpdf的简介、安装和使用方法、案例应用之详细攻略

LLMs之gptpdf&#xff1a;gptpdf的简介、安装和使用方法、案例应用之详细攻略 目录 gptpdf的简介 1、处理流程 第一步&#xff0c;使用 PyMuPDF 库&#xff0c;对 PDF 进行解析出所有非文本区域&#xff0c;并做好标记&#xff0c;比如: 第二步&#xff0c;使用视觉大模型&…

RUST 编程语言 绘制随机颜色图片 画圆形 画矩形 画直线

什么是Rust Rust是一种系统编程语言&#xff0c;旨在提供高性能和安全性。它是由Mozilla和其开发社区创建的开源语言&#xff0c;设计目标是在C的应用场景中提供一种现代、可靠和高效的选择。Rust的目标是成为一种通用编程语言&#xff0c;能够处理各种计算任务&#xff0c;包…

基于若依的文件上传、下载

基于若依实现文件上传、下载 文章目录 基于若依实现文件上传、下载1、前端实现-文件上传1.1 通用上传分析1.2 修改实现上传接口 2、后端实现-文件上传3、后端实现-文件下载4、前端实现-文件下载 官网其实也写了&#xff0c;但是我是自己改造封装了一下&#xff0c;再次迈向全栈…

3.js - 模板渲染 - 简单

3.js 真tm枯燥啊&#xff0c;狗都不学 效果图 源码 // ts-nocheck// 引入three.js import * as THREE from three// 导入轨道控制器 import { OrbitControls } from three/examples/jsm/controls/OrbitControls// 导入lil.gui import { GUI } from three/examples/jsm/libs/li…

elementui中日期/时间的禁用处理,使用传值的方式

项目中,经常会用到 在一个学年或者一个学期或者某一个时间段需要做的某件事情,则我们需要在创建这个事件的时候,需要设置一定的时间周期,那这个时间周期就需要给一定的限制处理,避免用户的误操作,优化用户体验 如下:需求为,在选择学年后,学期的设置需要在学年中,且结束时间大…

使用京东云主机搭建幻兽帕鲁游戏联机服务器全流程,0基础教程

使用京东云服务器搭建幻兽帕鲁Palworld游戏联机服务器教程&#xff0c;非常简单&#xff0c;京东云推出幻兽帕鲁镜像系统&#xff0c;镜像直接选择幻兽帕鲁镜像即可一键自动部署&#xff0c;不需要手动操作&#xff0c;真正的新手0基础部署幻兽帕鲁&#xff0c;阿腾云整理基于京…

【LabVIEW学习篇 - 2】:LabVIEW的编程特点

文章目录 LabVIEW的编程特点图形编程天然并行运行基于数据流运行 LabVIEW的编程特点 图形编程 LabVIEW使用图形化的图形化编程语言&#xff08;G语言&#xff09;&#xff0c;用户通过在程序框图中拖放和连接各种节点&#xff08;Nodes&#xff09;来编写程序。每个节点代表一…

图像增强 目标检测 仿射变换 图像处理 扭曲图像

1.背景 在目标检测中&#xff0c;需要进行图像增强。这里的代码模拟了旋转、扭曲图像的功能&#xff0c;并且在扭曲的时候&#xff0c;能够同时把标注的结果也进行扭曲。 这里忽略了读取xml的过程&#xff0c;假设图像IMG存在对应的标注框&#xff0c;且坐标为左上、右下两个…

vue学习笔记(购物车小案例)

用一个简单的购物车demo来回顾一下其中需要注意的细节。 先看一下最终效果 功能&#xff1a; &#xff08;1&#xff09;全选按钮和下面的商品项的选中状态同步&#xff0c;当下面的商品全部选中时&#xff0c;全选勾选&#xff0c;反之&#xff0c;则不勾选。 &#xff08…

SpringBoot新手快速入门系列教程四:创建第一个SringBoot的API

首先我们用IDEA新建一个项目&#xff0c;请将这些关键位置按照我的设置设置一下 接下来我将要带着你一步一步创建一个Get请求和Post请求&#xff0c;通过客户端请求的参数&#xff0c;以json格式返回该参数{“message”:"Hello"} 1,先在IDE左上角把这里改为文件模式…

华为云OBS 通过S3客户端访问

华为云好像没有对S3协议的支持说明其实底层是支持S3协议的。 使用S3的时候我们会需要endpoint&#xff0c;桶名字&#xff0c;region&#xff0c;AWS_ACCESS_KEY,AWS_SECRET_KEY 其中endpoint 就是图片中的&#xff0c;桶名字也很容易找到&#xff0c;region 就是你的endpoint…

【开源项目】LocalSend 局域网文件传输工具

【开源项目】LocalSend 局域网文件传输工具 一个免费、开源、跨平台的局域网传输工具 LocalSend 简介 LocalSend 是一个免费的开源跨平台的应用程序&#xff0c;允许用户在不需要互联网连接的情况下&#xff0c;通过本地网络安全地与附近设备共享文件和消息。 项目地址&…

liunx文件系统,日志分析

文章目录 1.inode与block1.1 inode与block概述1.2 inode的内容1.3 文件存储1.4 inode的大小1.5 inode的特殊作用 2.硬链接与软链接2.1链接文件分类 3.恢复误删除的文件3.1 案例:恢复EXT类型的文件3.2 案例:恢复XFS类型的文件3.2.1 xfsdump使用限制 4.分析日志文件4.1日志文件4.…

docker部署redis/mongodb/

一、redis 创建/root/redis/conf/redis.conf 全部执行命令如下 docker run -it -d --name redis -p 6379:6379 --net mynet --ip 172.18.0.9 -m 400m -v /root/redis/conf:/usr/local/etc/redis -e TXAsia/Shangehai redis redis-server /usr/local/etc/redis/redis.conf 部署…

Java 基础--File - IO流(2)

I/O流 定义 数据从硬盘流向内存为输入流&#xff0c;数据从内存流向硬盘为输出流。输入也叫读取数据&#xff0c;输出也叫写出数据。 IO分类 1.按照数据的流向分为&#xff1a;输入流和输出流 ①输入流&#xff1a;把数据从其他设备上读取到内存中的流 ②输出流&#xff1…

python小练习04

三国演义词频统计与词云图绘制 import jieba import wordcloud def analysis():txt open("三国演义.txt",r,encodingutf-8).read()words jieba.lcut(txt)#精确模式counts {}for word in words:if len(word) 1:continueelif word "诸葛亮" or word &q…

软件系统架构的一些常见专业术语

分层架构是逻辑上的&#xff0c;在物理部署上&#xff0c;三层结构可以部署在同一个物理机器上&#xff0c;但是随着网站业务的发展&#xff0c;必然需要对已经分层的模块分离部署&#xff0c;即三层结构分别部署在不同的服务器上&#xff0c;使网站拥有更多的计算资源以应对越…

前端Web开发HTML5+CSS3+移动web视频教程 Day4 CSS 第2天

P44 - P 四个知识点&#xff1a; 复合选择器 CSS特性 背景属性 显示模式 复合选择器 复合选择器仍然是选择器&#xff0c;只要是选择器&#xff0c;作用就是找标签。复合选择器就是把基础选择器进行组合使用。组合了之后就可以在大量的标签里面更快更精准地找标签了。找…

[附源码]基于Flask的演唱会购票系统

摘要 随着互联网技术的普及和发展&#xff0c;传统购票方式因其效率低下、流程繁琐等问题已难以满足现代社会的需求。本文设计并实现了一个基于Flask框架的演唱会购票系统&#xff0c;该系统集成了用户管理、演唱会信息管理、票务管理以及数据统计与分析等功能模块&#xff0c…

项目实战--Spring Boot + Minio文件切片上传下载

1.搭建环境 引入项目依赖 <!-- 操作minio的java客户端--> <dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.5.2</version> </dependency> <!-- jwt鉴权相应依赖--> &…