大数据实时数仓Hologres(四):基于Flink+Hologres搭建实时数仓

文章目录

基于Flink+Hologres搭建实时数仓

一、使用示例

二、方案架构

1、架构优势 

2、Hologres核心优势

三、实践场景

四、项目准备

1、创建阿里云账号AccessKey

2、准备MySQL数据源

五、构建实时数仓​编辑

1、管理元数据

2、构建ODS层

2.1、创建CDAS同步作业ODS

2.2、查看MySQL同步到Hologres的3张表数据

3、构建DWD层

3.1、创建DWD层宽表

3.2、实现实时消费ODS层orders、orders_pay表的binlog,写入DWD层

3.3、查看宽表dwd_orders数据

4、构建DWS层

4.1、创建DWS层聚合表

4.2、数据写入DWS层表

4.3、查看DWS层数据

5、数据探查 

5.1、流模式探查

5.2、批模式探查

5.3、应用1:Key-Value服务

5.4、应用2:明细查询

5.5、应用3:实时报表


基于Flink+Hologres搭建实时数仓

一、使用示例

随着社会数字化发展,企业对数据时效性的需求越来越强烈。除传统的面向海量数据加工场景设计的离线场景外,大量业务需要解决面向实时加工、实时存储、实时分析的实时场景问题。

如何搭建实时数仓?

 

二、方案架构

实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。Hologres是一站式实时数仓,支持数据实时写入与更新,实时数据写入即可查。Hologres与Flink深度集成,能够提供一体化的实时数仓联合解决方案。

基于 Flink+Hologres 的 Streaming Warehouse 方案

1、架构优势 

  • 支持高效更新、 修正与查询

Hologres的每一层数据都支持高效更新与修正、写入即可查,解决了传统实时数仓解决方案的中间层数据不易查、不易更新、不易修正的问题。

  • 支持高效复用

Hologres的每一层数据都可单独对外提供服务,数据的高效复用,真正实现数仓分层复用的目标。

  • 架构简单

模型统一,架构简化。实时ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Hologres中,可以降低架构复杂度,提高数据处理效率。

 

2、Hologres核心优势

 

三、实践场景

四、项目准备

1、创建阿里云账号AccessKey

  • 使用阿里云账号登录控制台
  • 将鼠标悬浮在右上方的账号图标上,单击 AccessKey 管理 
  • 在安全提示对话框,阅读安全提示信息,然后单击继续使用
  • AccessKey 在 AccessKey 页面,单击创建 AccessKey
  • 根据界面提示完成安全验证
  • 在创建AccessKey对话框,查看 AccessKey ID 和 AccessKey Secret。可以单击下载 CSV 文件,下载 AccessKey 信息。单击复制,复制AccessKey 信息。 

注意:一定要保存好AccessKey ID和AccessKey Secret 

  • 选中我已保存好AccessKey Secret
  • 单击确定 

2、准备MySQL数据源

通过DMS登录RDS MySQL。 在已登录的SQLConsole窗口,输入如下命令后单击执行。创建order_dw数据库。

create database order_dw;

创建后,点击左侧实例中的order_dw数据库,在order_dw的SQL窗口输入建表和插入数据的代码,详细代码见代码文档 点击执行后生成对应的表和数据,如下:

 

用型Hologres实例。开通实例后,需要在Hologres开发平台创建order_dw数据库: 进入Hologres管理控制台,单击左侧实例列表。

在实例列表页面,单击实例名称,进入实例详情页。 

在实例详情页左侧导航栏,单击数据库管理

在DB授权页面,单击右上角新增数据库

在新增数据库对话框,选择实例名并填写数据库名称,简单权限策略选择SPM

 

五、构建实时数仓

1、管理元数据

创建 Hologres Catalog:

  • 在Flink开发平台,点击左侧SQL开发,点击作业草稿右侧的加号,新建作业草稿。

  • 模板选择空白的流作业,作业名称为test,引擎版本选择 vvr-6.0.7-flink-1.15 

创建作业后,将如下代码拷贝到test作业的SQL编辑器上,修改目标参数取值后,选中代码片段后单击左侧代码行上的运行。

CREATE CATALOG dw WITH ('type' = 'hologres','endpoint' = '<ENDPOINT>','username' = '<USERNAME>','password' = '<PASSWORD>','dbname' = 'order_dw','binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。'cdcmode' = 'true','connectionpoolname' = 'the_conn_pool','ignoredelete' = 'true',  -- 宽表merge需要开启,防止回撤。'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。'table_property.binlog.level' = 'replica', -- 也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。'table_property.binlog.ttl' = '259200');

需要修改以下参数取值为我们实际Hologres服务信息。

其中usernamepassword是前面创建的阿里云账号的AccessKey IDAccessKey Secret;endpoint是hologres实例的指定vpc地址;在hologres实例详情中的网络信息下可以看到。

说明:

创建Catalog时可以设置默认的源表、维表和结果表的WITH参数,也可以设置创建Hologres物理表的默认属性,例如上方table_property开头的参数。

验证1:创建成功后,可以在元数据栏看到对应的catalog 及其信息,以及hologres中的数据库。

 

创建MySQL Catalog:

  • 将如下代码拷贝到test作业的SQL编辑器上,修改目标参数取值后,选中代码片段后单击左侧代码行上的运行。
CREATE CATALOG mysqlcatalog WITH('type' = 'mysql','hostname' = '<hostname>','port' = '3306','username' = '<username>','password' = '<password>','default-database' = 'order_dw'
);
  • 需要修改以下参数取值为我们实际MySQL服务信息。
  • 其中用户名和密码是自己创建的MySQL 高权限账号。MySQL ip地址可以在RDS 实例详情中数据库连接看到。

验证2:创建完成后,同样可以在元数据栏看到此catalog的信息以及mysql的数据库 

 

2、构建ODS层

2.1、创建CDAS同步作业ODS

a) 在Flink开发平台,新建名为ODS的SQL流作业(步骤与test作业相同,引擎一致),并将如下代码拷贝到SQL编辑器。

CREATE DATABASE IF NOT EXISTS dw.order_dw   -- 创建catalog 时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。
AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。
/*+ OPTIONS('server-id'='8001-8004') */ ;   -- 指定mysql-cdc源表。

b) 单击右上方的部署,进行作业部署。

基于Catalog的CREATE DATABASE AS(CDAS)语句功能,详细可见文档,可以一次性把ODS层建出来。 ODS层一般不直接做OLAP,主要作为流式作业的事件驱动,开启binlog即可满足需求。

c) 单击左侧导航栏的作业运维,单击刚刚部署的ODS作业操作列的启动,选择无状态启动启动作业

 

点击启动,稍等一会,作业状态变成运行中 

2.2、查看MySQL同步到Hologres的3张表数据
  • 点击上方SQL编辑器,数据库选择order_dw,在SQL编辑器上执行如下命令

---查orders中的数据。
SELECT * FROM orders;
---查orders_pay中的数据。
SELECT * FROM orders_pay;
---查product_catalog中的数据。
SELECT * FROM product_catalog;
  • 点击运行后,可以看到三张表中的结果。可以与mysql中数据进行比较

 

3、构建DWD层

3.1、创建DWD层宽表
  • 通过Flink Catalog功能在Hologres中建DWD层的宽表dwd_orders。  
  • 在Flink开发平台,将如下代码拷贝到test作业的SQL编辑器后,选中目标片段后单击左侧代码行上的运行。
-- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。
CREATE TABLE dw.order_dw.dwd_orders (order_id bigint not null,order_user_id string,order_shop_id bigint,order_product_id bigint,order_product_catalog_name string,order_fee numeric(20,2),order_create_time timestamp,order_update_time timestamp,order_state int,pay_id bigint,pay_platform int comment 'platform 0: phone, 1: pc',pay_create_time timestamp,PRIMARY KEY(order_id) NOT ENFORCED
);
-- 支持通过catalog修改Hologres物理表属性。
ALTER TABLE dw.order_dw.dwd_orders SET ('table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。
);

3.2、实现实时消费ODS层orders、orders_pay表的binlog,写入DWD层

在Flink开发平台,新建名为DWD的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。通过如下SQL作业,orders表会与product_catalog表进行维表关联,将最终结果写入dwd_orders表中,实现数据的实时打宽。

BEGIN STATEMENT SET;
INSERT INTO dw.order_dw.dwd_orders (order_id,order_user_id,order_shop_id,order_product_id,order_fee,order_create_time,order_update_time,order_state,order_product_catalog_name) 
SELECT o.*, dim.catalog_name 
FROM dw.order_dw.orders as o
LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dimON o.product_id = dim.product_id;INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time)SELECT * FROM dw.order_dw.orders_pay;
END;

 

3.3、查看宽表dwd_orders数据

在HoloWeb开发页面连接Hologres实例并登录目标数据库(order_dw)后,在SQL编辑器上执行如下命令

4、构建DWS层

4.1、创建DWS层聚合表

这里通过Flink Catalog功能,在Hologres中创建dws层的聚合dws_users以及dws_shops:

在Flink开发平台,将如下代码拷贝到test作业的SQL编辑器,选中目标片段后单击左侧代码行上的运行

-- 用户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_users (user_id string not null,ds string not null,paied_buy_fee_sum numeric(20,2) not null, -- '当日完成支付的总金额'primary key(user_id,ds) NOT ENFORCED
);
-- 商户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_shops (shop_id bigint not null,ds string not null,paied_buy_fee_sum numeric(20,2) not null, -- '当日完成支付总金额'primary key(shop_id,ds) NOT ENFORCED
);

4.2、数据写入DWS层表

这里实时消费DWD层的宽表dw.order_dw.dwd_orders,在Flink中做聚合计算,最终写入Hologres中的DWS表:

在Flink开发平台,新建名为DWS的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。

BEGIN STATEMENT SET;INSERT INTO dw.order_dw.dws_users
SELECTorder_user_id,DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,SUM (order_fee)     --order_fee订单费用,来自于mysql的buy_fee
FROM dw.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');INSERT INTO dw.order_dw.dws_shopsSELECTorder_shop_id,DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,SUM (order_fee)FROM dw.order_dw.dwd_orders cWHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');END;

4.3、查看DWS层数据

查看DWS层的聚合结果,其结果会根据上游数据的变更实时更新:

在HoloWeb开发页面连接Hologres实例并登录目标数据库后,在SQL编辑器上执行如下命令

  • 查询dws_users表结果
SELECT * FROM dws_users;

  • 查询dws_shops表结果
SELECT * FROM dws_shops;

5、数据探查 

5.1、流模式探查
  • 新建并启动数据探查流作业。

新建名为Data-exploration的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。

-- 流模式探查,打印到print可以看到数据的变化情况。
CREATE TEMPORARY TABLE print_sink(order_id bigint not null,order_user_id string,order_shop_id bigint,order_product_id bigint,order_product_catalog_name string,order_fee numeric(20,2),order_create_time timestamp,order_update_time timestamp,order_state int,pay_id bigint,pay_platform int,pay_create_time timestamp,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'print'
);
INSERT INTO print_sink SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --这里的startTime是binlog生成的时间
WHERE order_user_id = 'user_001';

 

  • 查看数据探查结果

在作业运维详情页面,单击目标作业名称,在作业探查页签下左侧运行日志页签,单击运行Task Managers页签下的Path, ID。在Stdout页面搜索(按ctrl+f)user_001相关的日志信息。

 

5.2、批模式探查
  • 接下来要用到调试功能,所以需要创建Session集群。

点击左侧Session管理,点击创建Session集群。

名称自定义,状态选择RUNNING,引擎版本选择vvr-6.0.7-flink-1.15,Task Managers数量为2,其余参数默认即可。 

  • 创建完成后,Session集群会自动启动。等待一会,状态变成运行中。

 

 

  • 开始批模式探查 

将如下代码拷贝到test作业中,选中这段代码,单击调试。选择刚刚创建的Session集群。

SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00’;--批量模式支持filter下推,提升批作业执行效率。

批模式探查是获取当前时刻的终态数据,在Flink作业开发界面调试结果如图所示:

5.3、应用1:Key-Value服务

根据主键查询DWS层的聚合指标表,支持百万级RPS

在HoloWeb开发页面order_dw库下查询指定用户指定日期的消费额的代码示例如下

SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';

5.4、应用2:明细查询

对DWD层宽表进行OLAP分析

在HoloWeb开发页面查询某个客户23年2月特定支付平台支付的订单明细的代码示例如下

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  
AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;

5.5、应用3:实时报表

基于DWD层宽表数据展示实时报表,支持秒级响应

在HoloWeb开发页面order_dw数据库下查询23年2月内每个品类的订单总量和订单总金额的代码示例如下

SELECTTO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,  --订单创建时间order_product_catalog_name, --订单类别名称COUNT(*),   --订单总量SUM(order_fee) --订单总金额
FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'and order_create_time < '2023-03-01 00:00:00'
GROUP BYorder_create_date,order_product_catalog_name
ORDER BYorder_create_date,order_product_catalog_name
;


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/436509.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GS-SLAM论文阅读笔记--GEVO

前言 这篇文章看着就让人好奇。众所周知&#xff0c;高斯是一个很不错的建图方法&#xff0c;但是本文的题目居然是只用高斯进行单目VO&#xff0c;咱也不知道这是怎么个流程&#xff0c;看了一下作者来自于MIT&#xff0c;说不定是个不错的工作&#xff0c;那就具体看看吧&am…

算法-汉诺塔问题(Hanoi tower)

介绍 汉诺塔是源于印度的一个古老传说的小游戏&#xff0c;简单来说就是有三根柱子&#xff0c;开始的时候&#xff0c;第一根柱子上圆盘由大到小&#xff0c;自下往上排列。这个小游戏要实现的目的呢&#xff0c;就是要把第一根柱子上的圆盘移到第三根的柱子上去&#xff1b;…

部标主动安全(ADAS+DMS)对接说明

1.前言 上一篇介绍了部标&#xff08;JT/T1078&#xff09;流媒体对接说明&#xff0c;这里说一下如何对接主动安全附件服务器。 流媒体的对接主要牵扯到4个方面&#xff1a; &#xff08;1&#xff09;平台端&#xff1a;业务端系统&#xff0c;包含前端呈现界面。 &#x…

企业数字化转型的深层次问题与战略解读——基于TOGAF框架的深入分析与解决方案

数字化转型的必然性与复杂性 随着全球化和技术进步的推动&#xff0c;数字化转型成为企业保持竞争力、提升效率、满足客户需求的重要战略选择。然而&#xff0c;数字化转型并不仅仅是技术的简单引入&#xff0c;它涉及到业务模式、运营流程、组织架构以及企业文化的深刻变革。…

对比学习训练是如何进行的

对比学习&#xff08;Contrastive Learning&#xff09;是一种自监督学习的方法&#xff0c;旨在通过拉近相似样本的表示、拉远不相似样本的表示来学习特征表示。在训练过程中&#xff0c;模型并不依赖标签&#xff0c;而是通过样本之间的相似性进行学习。以下是对比学习的基本…

Another redis desktop manager使用说明

Another redis desktop manager使用说明 概述界面介绍图示说明连接界面设置界面查看操作日志主界面信息进入redis-cli控制台更多 概述 Another Redis Desktop Manager是一个开源的跨平台 Redis 客户端&#xff0c;提供了简洁易用的图形用户界面&#xff08;GUI&#xff09;&am…

C++ 数据结构算法细节相关

细节 队列 这段代码实现的是二叉树的层序遍历&#xff0c;也就是按照树的层次&#xff0c;一层一层地遍历节点。下面我会为你详细解释这段代码。 queue <TreeNode*> q; 这是一个队列&#xff0c;队列中存放的是指向TreeNode的指针。队列&#xff08;queue&#xff09;是…

云原生数据库 PolarDB

简介&#xff1a;云原生数据库 PolarDB 是阿里云自研产品&#xff0c;在存储计算分离架构下&#xff0c;利用了软硬件结合的优势&#xff0c;为用户提供秒级弹性、高性能、海量存储、安全可靠的数据库服务。100%兼容MySQL和PostgreSQL生态&#xff0c;支持分布式扩展&#xff0…

Mybatis总结

Mybatis 概述及搭建 原是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation 迁移到了 Google Code&#xff0c;随着开发团队转投GoogleCode 旗下&#xff0c; iBatis3.x正式更名为MyBatis。 MyBatis 是一款优秀的持久层框架。 MyBatis 避免了几乎所有…

系列二、案例实操

一、创建表空间 1.1、概述 在Oracle数据库中&#xff0c;表空间是一个逻辑存储单位&#xff0c;它是Oracle数据库中存储数据的地方。 1.2、超级管理员登录 sqlplus / as sysdba 1.3、创建表空间 create tablespace water_boss datafile C:\Programs\oracle11g\oradata\orcl\…

Spring Cloud Alibaba-(6)Spring Cloud Gateway【网关】

Spring Cloud Alibaba-&#xff08;1&#xff09;搭建项目环境 Spring Cloud Alibaba-&#xff08;2&#xff09;Nacos【服务注册与发现、配置管理】 Spring Cloud Alibaba-&#xff08;3&#xff09;OpenFeign【服务调用】 Spring Cloud Alibaba-&#xff08;4&#xff09;Sen…

华为-IPv6与IPv4网络互通的6to4自动隧道配置实验

IPv4向IPv6的过渡不是一次性的,而是逐步地分层次地。在过渡时期,为了保证IPv4和IPv6能够共存、互通,人们发明了一些IPv4/IPv6的互通技术。 本实验以6to4技术为例,阐述如何配置IPv6过渡技术。 配置参考 R1 # sysname R1 # ipv6# interface GigabitEthernet0/0/1ip address 200…

【C语言指南】数据类型详解(下)——自定义类型

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;《C语言指南》 期待您的关注 目录 引言 1. 结构体&#xff08;Struct&#xff09; 2. 联合体&#xff08;Union&#xff09; 3…

【网络安全 | 渗透工具】自动化 .env/.git文件检测

原创文章,禁止转载。 文章目录 1. 安装 DotGit2. 配置 DotGit3. 使用 DotGit 检测 .env / .git 文件1. 安装 DotGit 在谷歌应用商店中搜索 DotGit 并进行安装: 2. 配置 DotGit 安装完成后,可以在设置中开启或关闭相关功能: 3. 使用 DotGit 检测 .env / .git 文件 接下来…

centos7安装Redis单机版

一、检查是否有GCC环境 gcc --version # 提示-bash: gcc: 未找到命令 说明没有gcc环境# 安装gcc环境 yum install gcc# 如果yum源报错 # 1.检查网络是否正常 ping www.baidu.com # 2.备份当前的yum源 mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo…

Redis篇(Java操作Redis)

目录 讲解一&#xff1a;简介 讲解二&#xff1a;Jedis Github 一、创建项目、 二、添加依赖 三、配置文件 四、Java连接Redis 五、通过Redis连接池获取连接对象并操作服务器 六、封装JedisUtil对外提供连接对象获取方法 七、Java操作Redis五种数据类型 1. 连接与释放…

避免glibc版本而报错,CentOS等Linux安装node.js完美方法

概述 对于Node.js v18.x或更高&#xff0c;Node.js官方默认是在Ubuntu 20.04, Debian 10, RHEL 8,CentOS 8等高版操作系统上编译得到的&#xff0c;高版本操作系统的glibc版本≥2.28。所以&#xff0c;下载Node.js后&#xff0c;也需要glibc版本≥2.28才能使用。 而CentOS 7.x等…

《安富莱嵌入式周报》第343期:雷电USB4开源示波器正式发布,卓越的模拟前端低噪便携示波器,自带100W电源的便携智能烙铁,NASA航空航天锂电池设计

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 更新一期视频教程 【授人以渔】CMSIS-RTOS V2封装层专题视频&#xff0c;一期视频将常用配置和用法梳理清楚&#xff0…

JMeter对jdbc request以及foreach和loop controller的使用

Jmeter中jdbc request和foreach控制器 1. 使用variable name实现对数据库查询结果的遍历 在foreach controller中&#xff0c;注意要做variable name的关联(correlation), 否则没法取回这里的jdbc request返回的结果。这里的input variable prefix一定要和jdbc request中的var…

【React】react项目中的redux使用

1. store目录结构设计 2. react组件中使用store中的数据——useSelector 3. react组件中修改store中的数据——useDispatch 4. 示例 react-basic\src\store\moduels\counterStore.js import { createSlice } from reduxjs/toolkitconst counterStore createSlice({name: cou…