目录
一、芒果TV实时数仓建设历程
1.1 阶段一:Storm/Flink Java+Spark SQL
1.2 阶段二:Flink SQL+Spark SQL
1.3 阶段三:Flink SQL+StarRocks
二、自研Flink实时计算调度平台介绍
2.1 现有痛点
2.2 平台架构设计
三、Flink SQL实时数仓分层实践
四、Flink SQL实时数仓生产过程遇到的问题
4.1 多表关联
4.2 复杂的表处理
4.3 State过大
4.4 Checkpoint 不能顺利完成
五、StarRocks选型背景及问题
六、基于Flink SQL+StarRocks实时分析数仓
6.1 明细模型
6.2 主键模型
6.3 聚合模型
6.4 物化视图
七、未来展望
7.1 湖仓一体
7.2 低代码
原文大佬的这篇实时数仓建设案例有借鉴意义,这里摘抄下来用作学习和知识沉淀。
一、芒果TV实时数仓建设历程
芒果TV实时数仓的建设分为三个阶段,14-19 年为第一阶段,技术选型采用 Storm/Flink Java+Spark SQL。20-22 年上半年为第二阶段,技术选型采用 Flink SQL+Spark SQL 。22 年下半年-至今为第三阶段,技术选型采用 Flink SQL+ StarRocks。每一次升级都是在原有基础上进行迭代,以求更全面的功能,更快的速度,能更好的满足业务方的需求。
1.1 阶段一:Storm/Flink Java+Spark SQL
芒果 TV 的实时数据处理很早就开始了,最开始用的是 Storm,到了 18 年时,Flink 横空出世。Flink 的 State状态与流处理的优势让人眼前一亮,所以改用了 Flink 来搭建实时数仓,但当时主要以满足业务方需求为主,进行烟囱式的开发,基本流程是接上游kafka的数据,使用flink java进行相关业务逻辑处理后,将数据输出至对象存储中。然后使用spark sql对数据进行统计等二次加工处理后,再交付客户使用。此阶段优点是利用了Flink的长处,让数据从源头到终端更实时化了,满足了业务方对数据的时效性与业务需求,缺点是一个需求就开发一个功能,没有进行实时数仓的建设和沉淀。
1.2 阶段二:Flink SQL+Spark SQL
基于上一阶段的技术积累与发现的问题,提出了建设实时数仓的新方案。此时Flink sql功能已经初步完善,能满足搭建数仓各方面的需求,SQL 化相较 Flink Java 也能降低开发、维护等各方面成本,于是选择 Flink SQL 来搭建实时数仓。此阶段对实时数仓进行了分层架构设计,这个后面有详细讲解。
基本流程是接上游 Kafka 数据进行格式化后输出至 Kafka,下层接到 Kafka 数据进行字段处理、垃圾数据过滤等操作后输出至 Kafka,最后一层接 Kafka 数据进行维度扩展,然后将数据写至对象存储中。再由 Spark SQL 读取对象存储中的数据进行统计等处理后,交付客户使用。
此阶段的优点是实现了数仓的分层架构设计,对各层数据定义了标准化,实现了各层数据解耦,避免了烟囱式的开发,解决了重复开发等问题,实时数仓逐步走向成熟。缺点是使用Spark SQL进行后续的统计与汇总时,不够灵活。需要提前设计好指标,面对客户多变的需求时,往往不能很及时的响应。
1.3 阶段三:Flink SQL+StarRocks
随着实时数仓的建设逐步加深,Spark SQL不够灵活,处理速度不够快的弊端越发突出。此时StarRocks进入了我们的视线,其MPP的架构,向量化引擎,多表Join等特性所展现出来在性能、易用性等方面的优势,都很好的弥补了 Spark SQL 在这块的不足。于是经调研后决定,在实时数仓中用 StarRocks 替换掉 Spark SQL 。在此阶段,前面用 Flink SQL 搭建的实时数仓分层构架并未改变,而下游用 Spark SQL 进行统计分析的相关功能,逐步替换成了用 StarRocks 来做。
之前使用Spark SQL先将数据进行统计与汇总后,将最终结果写入对象存储中,而现在是直接用 StarRocks 对明细数据进行汇总,展示到前端页面中。这么做的好处是能更快、更灵活的满足业务方的需求,减少了开发工作量,减少了测试、上线等时间。StarRocks 优秀的性能让即席查询速度并未变慢,功能更强大,更灵活,交付速度变更快了。
二、自研Flink实时计算调度平台介绍
2.1 现有痛点
- 原生任务命令复杂,调试麻烦,开发成本比较高;
- 连接器,UDF,Jar任务包等无法管理,调试复杂,经常遇到依赖冲突问题
- 无法做到统一的监控报警以及对资源上的权限管理
- sql开发任务复杂,没有一个好用的编辑器和代码管理及保存平台
- 基础表、维表、catalog没有记录和可视化的平台
- 多版本和跨云任务无法很好的管理
2.2 平台架构设计
实时Flink调度平台架构图:
平台主要分为三个部分:
(1) Phoenix Web 模块主要负责面向用户。
-
集群部署与任务提交。
- 公司各内部业务权限管理
-
UDF,连接器等三方依赖 Jar 包管理。
-
多类型监控报警以及日志管理。
-
SQL 可视化编辑和校验以及多版本存储。
-
公司各内部业务权限管理。
(2) Flink SQL Gateway 和 Flink Jar Gateway 都是基于开源版本修改定制后的服务,支持SQL符合业务场景的解析和校验,以及Jar任务的提交,支持本地模式。Yarn-per-job 模式和 Application 模式,也支持自动的保存点Savepoint。
-
进行 SQL 的解析和校验。
-
加载SQL和Jar任务所需要的三方依赖。
-
SQL 任务连接 Catalog 存储进行关联和映射。
-
Checkpoint 和 Savepoint 的自动管理和恢复。
-
Jar 类型任务启动参数的注入。
-
运行时配置的自适应。
-
多类型的提交方式适配。
(3) 混合多云模块主要负责启动任务的分发和云之间的信息管理。
三、Flink SQL实时数仓分层实践
使用Flink SQL 搭建实时数仓时,首要问题是数仓分层架构如何解决,业界内有许多优秀的经验可以参考,同时也基于我们的情况,最终采用了如下数仓架构:
ODS层:原始日志层,在该层将上游 Binlog 日志、用户行为日志、外部数据等数据源同步至数仓,对多种数据源,多种格式的数据通过统一UDF函数解析,格式化,最终输出格式化JSON数据
DW层:数据明细层,在该层主要进行错误数据过滤,字段转义,统一字段名等处理,输出的数据已能满足日常基础分析的使用。
DM层:数据模型层,在该层进行扩维,补充相关的公共信息。再按业务进行分域,输出的数据
具有更丰富的维度,可以满足高级分析的数据使用需求。
ST 层:数据应用层,按业务,功能等维度进行汇总,交由给前端页面进行展现,输出的数据可交付 Web、App、小程序等功能使用。
四、Flink SQL实时数仓生产过程遇到的问题
在搭建实时数仓时,遇到了不少的问题,下面挑几个典型的问题讲解一下解决思路:
4.1 多表关联
在使用 Flink SQL 搭建实时数仓初期,涉及多表关联时,有些维表的数据在 Hive 里,有些维表又在 MySQL 中,甚至还有些维表数据在其它 OLAP 中,该选择何种关联方式,需要综合考虑性能 ,功能等方面,总结出如下规则:
- 流表关联维表(小数据量),使用Lookup Join, 维表数据量在十万以下时,可使用hive表做维表,因为离线数仓中的维表数据大部分都在 Hive 中,这样的话就可以直接复用,省去数据导入导出的额外工作,并且性能方面没有瓶颈,维表小时更新后,Flink SQL 也能读到最新数据。
- 流表关联维表(大数据量),使用Lookup Join,维表数据量在十万-千万以下时,可用Mysql做维表,此时用 Hive 维表已不能满足性能需求。可将数据导出至 MySQL 中,利用缓存机制,也能很好的满足要求。
- 流表关联流表,使用 Interval Join,通过两个流表的时间字段来控制关联范围,这种关联方式是目前用的比较多的,使用方式要跟离线比较接近。
4.2 复杂的表处理
4.3 State过大
在两个流表进行关联或进行汇总统计时,Flink的机制是会将数据缓存在State中,这就会导致State过大,导致GC频繁,进而任务失败。针对这种情况,在研究了 Flink 的内存机制后,得出的解决方案如下:
- 缩短时间范围,根据业务需求,适当减少关联时两条流的时间范围。
- 调整 Managed Memory 大小,可以调整 Managed Memory 占比,适当的缩小其它内存的使用。
- 设置State的TTL来避免缓存过多的数据
4.4 Checkpoint 不能顺利完成
任务中频繁出现 Checkpoint expired before completing异常,在实际生产环境中,发现有任务频繁的报这个错误,这个错误指Checkpoint不能顺序完成,因为Flink的Checkpoint有Barrier机制来保证数据的Exactly-once 精确一次性语义。如果一批数据处理不完,Checkpoint就完成不了。导致这个错误原因有多种,不同的问题也有不同的解答,接下来列举一下各场景与解决方案:
- Checkpoint 的超时时长设置的太短,导致 Checkpoint 还没完成就被报了超时,这个问题比较常见。解决方案就是设置长一点,我们一般根据任务类型,会设置 6 秒-2 分钟不等。
- 任务有被压,因为一个任务内有多个操作,其中一个操作耗时长影响了整个任务的执行,这个问题比较常见。解决方案是可以从WebUI上找到执行缓慢的Task
-
内存不足,我们在生产环境中一般使用 rocksdb statebackend,默认会保留全量 Checkpoint。而这种情况下,在遇到有关联、分组统计等使用了 heap statebackend 的任务中,计算的中间结果会缓存到 State中,State的内存默认是总内存的 40%,在这种计算中会不太够,从而导致频率的 GC,也影响了 Checkpoint 的执行。解决方案如下:
调大 TaskManager的内存,TaskManager 的内存调大后,其它内存区域也会相应调大。
调大 Managed Memory 的内存占比,就是设置 taskmanager.memory.managed.fraction 这个参数,可根据实际情况来,实际生产中最高可调到 90%。这种方法只调大了 ManagedMemory 一块,如果内存资源并不是很充裕时,可以用这种方式。
- 改用增量Checkpoint,根据实际情况调整State的TTL时间,并开启增量Checkpoint,甚至都不用调内存大小,也能解决问题。
五、StarRocks选型背景及问题
在之前的框架中我们是以Flink流式处理引擎完成原始日志的清洗,数据的打宽与轻度聚合,再落地到分布式文件系统或对象存储,通过离线Spark SQL五分钟级别的调度批处理,结果会通过Presto等引擎去查询,这样的架构在生产环境中渐渐显露出很多问题:
- 存在重复计算的问题,原始数据会在不同的任务中反复清洗,有的需要多个原始数据的关联也会反复的清洗,大量浪费了计算资源,代码和数据流可重用性很差。
-
为了满足离线批处理历史累计值和当前 5 分钟窗口的计算指标,在流量高峰期和当日指标累计到晚上时很可能在 5 分钟之内无法完成指标的计算,有很大的超时风险,业务会反馈实时指标的延迟。
-
由于离线Spark 批处理在多维组合分析并且又要求实时性情况下,略显乏力。业务的在线化,催生出很多实时的场景,另一方面运营的精细化和分析的平民化也催生出多维的分析需求,这些场景下需要粒度特别细,维度特别丰富的地层数据,这两部分的叠加起来就催生出了实时多维分析的场景。这时候我们需要不断的增加维度组合,增加结果字段,增加计算资源来满足以上场景,但是还是略显乏力。
-
在数据时效性日益增加的今天,很多场景下数据的时效性提出了秒级毫秒级的要求,之前5分钟级别的方式不能满足业务需求。
-
在之前的实时任务中经常需要在Flink内存中做流和流的Join,由于上游多个数据流的数据到达时间不一致,很难设计合适的window去在计算引擎里面打宽数据,采用Flink Interval Join时多个流的时间间隔太久,状态数据会非常庞大,启用mapState之类的状态计算又过于定制
-
在线上有大型活动或者大型节目时,实时数据量暴增,实时的大批量写入的情况下,写入延迟大,写入效率不高,数据积压。
-
对于 Flink 清洗或者计算的结果可能需要多个存储介质中,对于明细数据我们可能会存储在分布式文件系统或者对象存储,这时候是 Flink+HDFS,对于业务更新流数据,可能是 Flink CDC+hbase(cassandra或者其他 key-value 数据库),对于 Flink 产生回撤流数据可能是 Flink+MySQL(redis),对于风控类数据或者传统的精细化的看版可能是 Flink+ elasticsearch,对于大批量日志数据指标分析可能是Flink+clickhouse,难以统一,资源大量损耗,维护成本同样高。
总体分析,早期架构以下问题:
- 数据源多样,维护成本比较高
- 性能不足,写入延迟大,大促的场景会有数据积压,交互式查询体验较差
- 各个数据源割裂,无法关联查询,形成众多的数据孤岛,从开发的角度,每个引擎都需要投入相应的学习开发成本,程序复杂度比较高。
- 实时性要求高,并且开发效率快,代码或者数据可重复利用性强。
- 实时任务开发没有同一套标准,各自为战。
六、基于Flink SQL+StarRocks实时分析数仓
基于已经搭建完毕的 Flink SQL 的数仓分层体系,且由 StarRocks2.5X 版本升级到 StarRocks3.0X 存算分离版本并已大规模投入在生产环境中。
实时和离线湖仓一体的架构图:
6.1 明细模型
在大数据生产环境中最常见的日志数据,特点是数据量大,多维度的灵活复杂计算,计算指标多,实时性强,秒级别的高性能查询,简单稳定实时流写入,大表的Join,高基数字符列去重
使用Flink SQL+StarRocks 都能满足,首先实时平台上使用Flink SQL快速对实时流日志数据进行清洗,打宽,同时StarRocks提供 Flink-Connector-StarRocks连接器开箱即用,并且支持Exactly-once精准一次性语义和事务支持,底层通过Stream Load低延迟快速导入。
通过高效简单地Flink SQL建表模式,批量百万级写入,速度快,同时针对生产环境中单表十亿级别以上的数据,在计算多维度用户访问次数,和用户去重数据,能达到秒级别。
6.2 主键模型
对于数仓中的数据变更方式:
- 方式一:某些OLAP数据仓库数据仓库提供 Merge on Read模型的更新功能,完成数据变更,例如(clickhouse)。
Merge on Read 模式在写入时简单高效,但读取时会消耗大量的资源在版本合并上,同时由于 merge 算子的存在,使得谓词无法下推、索引无法使用,严重的影响了查询的性能。 StarRocks 提供了基于 Delete and Insert 模式的主键模型,避免了因为版本合并导致的算子无法下推的问题。主键模型适合需要对数据进行实时更新的场景,可以更好的解决行级别的更新操作,支撑百万级别的 TPS,适合MySQL 或其他业务库同步到StarRocks 的场景。
-
方式二:简单来说就是创建新分区表,删除旧的分区表数据,然后批量刷写过去。
在新的分区中插入修改后的数据,通过分区交换完成数据变更。通过批量刷写的方式会要重新建表,删除分区数据,刷写数据过程繁杂,还可能导致出错。
而且通过Flink CDC和StarRocks完美结合可以实现业务库到OLAP数据仓库端到端的全量+增量的实时同步,一个任务可以搞定批量和实时的全部问题,并且高效稳定,同时主键模型也可以解决Flink中回撤流输出的问题,支持按条件更新,支持按列更新,这些都是传统OLAP数据库很多不兼具的优点。
6.3 聚合模型
在实时数仓中还有一种场景,我们不太关心原始的明细数据,多为汇总类查询,比如 SUM、MAX、MIN 等类型的查询,旧数据更新不频繁,只会追加新的数据,这个时候可以考虑使用聚合模型。建表时,支持定义排序键和指标列,并为指标列指定聚合函数。当多条数据具有相同的排序键时,指标列会进行聚合。在分析统计和汇总数据时,聚合模型能够减少查询时所需要处理的数据,提升查询效率。
针对聚合指标,之前是放在Flink中统计,状态数据会存在内存中,会导致状态数据持续增长,斌并且消耗大量资源,将Flink的单纯统计修改为Flink SQL + StarRocks聚合模型,Flink这里只需要明细数据进行清洗并导入到 StarRocks,效率非常高且稳定。
实际生产环境中,聚合模型主要用来统计用户观看时长,点击量,订单统计等。
6.4 物化视图
数据仓库环境中的应用程序经常基于多个大表执行复杂查询,通常涉及多表之间数十亿行数据的关联和聚合。要实现这种实时多表关联并查询结果的方式,在之前我们可能会把此项内容放在 Flink 实时数仓中去处理,分层处理关联,合并,统计等任务,最后输出结果层数据,处理此类查询通常会大量消耗系统资源和时间,造成极高的查询成本。
现在可以考虑使用Flink SQL+StarRocks 的新思路去处理这种大规模的分层计算问题,使得 Flink SQL 这里只需要处理一些简单清洗任务,把大量重复计算的逻辑下推到StarRocks去执行,多个实时流实时落地 ,在StarRocks可以建立多级物化视图的建模方式,StarRocks 的物化视图不仅支持内表和内表关联,也支持内表和外表关联。例如:数据分布在MySQL,Hudi,Hive 等都可以通过StarRocks 物化视图的方式查询加速,并设定定期刷新规则,从而避免手动调度关联任务。其中最大的一个特点时,当有新的查询对已构建了物化视图的基表进行查询时,系统自动判断是否可以复用物化视图中的预计算结果处理查询。如果可以复用,系统会直接从相关的物化视图读取预计算结果,以避免重复计算消耗系统资源和时间。查询的频率越高或查询语句越复杂,性能增益就会越很明显。
实时即未来,StarRocks 在逐渐实现这样的能力,StarRocks 和 Flink 结合去构建实时数据分析体系的联合解决方案,将在一定程度上颠覆既有的一些禁锢,形成实时数据分析新范式。
七、未来展望
7.1 湖仓一体
当前芒果 TV 已经实现了流批一体的数仓建设,而未来的重点是湖仓一体的建设。数据湖的特点在于可以存储各种类型和格式的原始数据,包括结构化数据、半结构化数据和非结构化数据。而数据仓库则是对数据进行结构化和整理,以满足特定的业务需求。
湖仓一体将数据仓库和数据湖的特点融合在一起,打造一个统一的数据中心,实现对数据的集中管理。湖仓一体的架构能够提供更好的安全性、成本效益和开放性,既能够存储和管理大量原始数据,又能够将数据整理成结构化的形式,为分析和查询提供便利。
通过建立湖仓一体,芒果 TV 能够向公司内部提供更丰富的数据服务,支持业务决策和创新,实现对数据的全面掌控和管理,包括数据的采集、存储、处理和分析。同时,湖仓一体还能够支持多种计算引擎和工具的使用,如 Flink、Spark、Hive 等,使得数据处理和分析更加灵活和高效。
7.2 低代码
现在的开发方式是在自研的平台上写 SQL 提交任务,这种方式在面对一些清洗场景时,大部分是重复工作,有较大的提升空间。低代码是时下比较热门的概念,其在降本增效方面的优势很大。我们的下一步的计划是逐步实现低代码,第一阶段是将实时平台与数据上报平台进行打通,通过读取上报平台里相关元数据,能够自动生成对应的数据清洗任务,解放生产力,提升工作效率与交付速度。
低代码的优势在于它能够将开发过程中的重复工作进行自动化和简化,减少了开发人员的编码工作量。通过可视化的方式,开发人员可以通过拖拽和配置来完成任务,而无需编写大量的代码。这不仅提高了开发效率,还降低了出错的风险。
总结而言,基于 Flink技术的特点,芒果 TV 在未来的数仓建设中将注重实现湖仓一体的架构,以实现对数据的全面管理和利用。同时,芒果 TV 计划逐步实现低代码的开发方式,以提高开发效率和交付速度。
参考文章:
芒果 TV 基于 Flink 的实时数仓建设实践