Spark Structured Streaming 分流或双写多表 / 多数据源(Multi Sinks / Writes)

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

在 Spark Structured Streaming 中,我们有时候需要将最后的处理结果分流或双写到多张表或多个数据源(Multi Sinks / Writes),一个典型的例子是:在 CDC 数据入湖场景里,一个 Kafka Topic 上存放着整库或多张表的 CDC 消息,使用 Spark 从 Kafka 中摄取这些消息后,需要根据消息中提供的数据库名和数据表名对 CDC 消息分流,然后写到数据湖上对应的 ODS 表中,这就是一种典型的“数据分流”场景。在 Spark Structured Streaming 中,实现多表 / 多数据源的分流或双写主要依赖 foreachBatchforeach 这两个方法,本文就围绕它们介绍一下分流或双写多表 / 多数据源的具体实现。

首先,要明确 foreachforeachBatch 都是 action,也就意味着使用它们时已经到了流的末端,绝大数情况下,就是要将记录写入目标数据源了,这也是foreachforeachBatch 这两个方法绝大多数的应用场景。通常,在 Spark 中将数据写入一个数据源是这样做的(以写 parquet 文件为例):

writeStream.format("parquet").option("path", "path/to/destination/dir").start()

由于 Spark 内置了 parquet 格式的 data writer, 所以我们只需填写一些相应的配置,就可以直接把 DF 按对应的格式写到目标位置了,那什么情况下我们要使用 foreachforeachBatch 呢?下面展开来介绍一下。

1. foreachBatch 的应用场景


大多数情况下,一条的流处理的 pipeline 都是从一个 Source 开始,中间经历各种处理后,最终写入了一个 Sink,但是,在某些场景下,我们流的重点可能需要写入的并不是一个 Sink,而是多个,典型的情形有:

  • 数据分流:需要将数据“分流”写入不同的数据源或数据表( 简单说就是 dispatch )

  • 数据多写:需要同时向多个下游数据源相同相同数据( 简单说就是 duplicate )

虽然我们可以非常“粗暴”地通过 for 循环构建多个 writer 实现上述两种典型的写入需求,但是这种做法会让每一个 sink 变成独立的 streaming query(作业),是代价很高的应对方法,并不实用。最好的做法就是通过 foreachBatch 来实现,实际上上面两种需求正是 foreachBatch 的典型应用场景。我们看一下 foreachBatch 的接口声明:

def foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]

我们需要为 foreachBatch 传入一个函数字面量,它有两参数,第一个对应一个 micro-batch 的 DataFrame, 第二个是这个 micro-batch 的 ID,拿到 micro-batch 的 DataFrame 后,我们可以在这个 DataFrame 上作相应的转换处理,最后调用现成的 writer 写入目标端。这里涉及到 Spark Streaming 的 Micro-Batch,也就是上述参数列表中的 Dataset[T] 类型的那个 DataFrame ,关于 Micro-Batch 在流上运行方式,下图给出了非常形象的描绘:

在这里插入图片描述

简单地说,Micro-Batch 模式下需要收集齐一定量(或一小段时间范围内)的数据,整理成一个 DataFrame 去处理,它的延迟是在秒级。上图下方时间轴上的每一小撮数据就是 foreachBatch 中传入的那个 DataFrame。

这里,我们特别澄清一个容易误解的地方: foreachBatch 是没有“循环”语义的,这里的 foreach 其实是意在针对每一个 micro-batch 的,不是空间维度上迭代多个 micro-batch, 而是时间维度上针对每一个流经的 micro-batch 进行处理。这里也能提现从 source 构建出的 DF 和这个方法里的 micro-batch 的 DF 的差异,前者是一个无界的 DF,本质上是一个流,更加“实体”的 DF 其实是 foreachBatch 中的这个 DF,它是较短时间内聚齐的“一小撮”数据,边界是确定的!

下面,我们针对分流和双写两种典型场景给出详细的示例代码。

1.1. 通过 foreachBatch 实现数据“分流”


我们以向两种不同的 Hudi 表写入数据为例,先将数据过滤,得到分流后的 DF,然后向对应的 Hudi 表中写入:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>// 分流 table_1 的数据并写入filteredDF1 = batchDF.filter(...)filteredDF1.write.format("hudi").option(TABLE_NAME, "table_1").mode(SaveMode.Append).save("/path/1")// 分流 table_2 的数据并写入filteredDF2 = batchDF.filter(...)filteredDF2.write.format("hudi").option(TABLE_NAME, "table_2").mode(SaveMode.Append).save("/path/2")
}

1.2. 通过 foreachBatch 实现数据“双写”


我们以向两种不同的数据源写入数据为例,可以调用多次 write 操作,但是,由于每次写入都会导致数据被 recomputed,流本身可能不再存在或状态发生了改变,所以,必须要在写入前使用 persist, 保证向下游多次写入的数据是完全一样,最后记得再执行一遍 unpersist 即可。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>batchDF.persist()batchDF.write.format("csv").save(...)  // location 1, 对应数据源的 data writer 是已存在的batchDF.write.format("hudi").save(...)  // location 2, 对应数据源的 data writer 是已存在的batchDF.unpersist()
}

2. foreach 的应用场景


foreachBatch 很实用,但是在如下两种场景下无法工作的:

  • 没有现成的支持目标数据源的 data writer;
  • 当前流运行于 continuous processing 模式,不支持 micro-batch

如果是上述情形,我们就得使用 foreach 了,因为 foreach 要自行实现对目标数据源的链接和读写,同时,它的自定义处理逻辑又是作用到每一行上的,所以它能解决上述两种场景的问题。某种程度上,foreach 相比 foreachBatch 是一种更底层的 API。使用 foreach 需要提供一个 ForeachWriter,实现 open, process, 和close 三个方法,不过要注意的是这三个方案的调用时机是不同的,open / close 显然是 per-partition 要调用一次的, proess 则是要针对每条记录进行处理的。以下是 一个自行实现 foreach 的代码模板:

streamingDatasetOfString.writeStream.foreach(// 没有现成的 DataStreamWriter,需要自行实现行级别的存储逻辑。new ForeachWriter[String] {// 在 partition 这个粒度上创建针对目标数据源的连接,这比较符合常规def open(partitionId: Long, version: Long): Boolean = {// Open connection}// 数据梳理逻辑会作用到记录级别,而不是 miro-batch 的 df 级别。def process(record: String): Unit = {// Write string to connection}// 关闭连接,释放资源def close(errorOrNull: Throwable): Unit = {// Close the connection}}
).start()

关于 foreach 更多信息可以参考官方文档,这里就不再深究了,大多数情况,我们更多使用的还是 foreachBatch

3. 小结


foreachforeachBatch 都能在向目标数据源写入数据时实现定制化的逻辑,它们之间的差别在于:

  • foreachBatch多应用于数据分流或双写场景,目标数据源往往是已经有线程的 data writer 了
  • foreach 则要自行实现对目标数据的连接和读写处理
  • 两者操纵数据的颗粒度不同,foreach 对数据的梳理逻辑(process 方法)作用到 DF 中的每一行上,而 foreachBatch 则直接操纵的是每一个 micro-batch 对应的 DF。

参考资料

  • Spark 关于 foreach 和 foreachBatch 的官方文档
  • Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/318908.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类预测 | MATLAB实现LSSVM最小二乘支持向量机多分类预测

分类预测 | MATLAB实现LSSVM最小二乘支持向量机多分类预测 目录 分类预测 | MATLAB实现LSSVM最小二乘支持向量机多分类预测分类效果基本介绍程序设计参考资料分类效果 基本介绍 MATLAB实现LSSVM最小二乘支持向量机多分类预测。最小二乘支持向量机(Least Squares Support Vecto…

android 分区存储(沙盒存储)适配总结

目录 一、分区存储概念 1.外部存储分类 2.分区存储如何影响文件访问 二、分区适配方案 1. 应用分区存储的文件访问规定 (1).应用专属目录--私有目录 (2).共享目录文件--公有目录 2.MediaStore API介绍 3.Storage Access Framework介绍 三、所有文件访问权限 四、总结…

【开发技巧 | 第二篇】IDEA新增作者信息、方法参数返回值

文章目录 2.IDEA新增作者信息、方法参数返回值2.1类新增作者信息2.2方法新增参数返回信息2.3测试2.3.1新建类2.3.2新建方法 2.IDEA新增作者信息、方法参数返回值 2.1类新增作者信息 打开IDEA的Settings,Editor->Code Style->File and Code Templates->Inc…

LabVIEW鸡蛋品质智能分级系统

LabVIEW鸡蛋品质智能分级系统 随着现代农业技术的飞速发展,精确、高效的农产品质量控制已成为行业的重要需求。其中,鸡蛋作为日常膳食中不可或缺的重要组成部分,其品质直接关系到消费者的健康与满意度。本文设计并实现了一套基于LabVIEW的鸡…

工厂模式和策略模式区别

工厂模式和策略模式都是面向对象设计模式,但它们的目的和应用场景有所不同。 工厂模式是一种创建型设计模式,旨在通过使用一个工厂类来创建对象,而不是直接使用new关键字来创建对象。这样做可以使系统更容易扩展和维护,因为新的对…

Kotlin基础​​

数据类型 定义变量 var表示定义变量,可以自动推导变量类型,所以Int可以不用写。 定义常量 条件语句 if表达式可以返回值,该值一般写在if里的最后一行 类似switch的用法 区间 循环 a是标签,可以直接break到标签的位置&#xf…

超详细的Vue脚手架

文章目录 Node.js介绍安装快速入门控制台输出使用函数模块化编程 npm包管理器介绍命令初始化命令本地安装(了解)全局安装(掌握)批量下载淘宝npm镜像(建议使用) Webpack介绍安装快速入门方式一:webpack原始方式方式二:基于NPM方式 webpack-dev-server 开发…

GPU并行计算CUDA

一、CUDA 和 GPU 简介 CUDA 是显卡厂商 NVIDIA 推出的运算平台,是一种通用并行计算架构,使得 GPU 能够解决复杂的计算问题。开发人员可以使用 C 语言来为 CUDA 架构编写程序,可以在支持 CUDA 的处理器上以超高性能运行,CUDA 3.0 …

【Docker学习】docker version查看版本信息

就像很多应用一样,docker也使用version来查看版本信息。但因为docker包含有不少独立组件,version的作用范围会更广一些。 用法1: docker --version 描述: 输出安装的Docker CLI 的版本号。关于Docker CLI,请访问。 实操…

ROS2专栏(三) | 理解ROS2的动作

​ 1. 创建一个动作 目标: 在ROS 2软件包中定义一个动作。 1.1 新建包 设置一个 workspace 并创建一个名为 action_tutorials_interfaces 的包: mkdir -p ros2_ws/src #you can reuse existing workspace with this naming convention cd ros2_ws/s…

【C++】:类和对象(下)

目录 一,再谈构造函数1.初始化列表2. 隐式类型转换的过程及其优化3. 隐式类型转换的使用4. explcit关键字5. 单参数和多参数构造函数的隐式类型转换 二,static成员1.静态成员变量2.静态成员函数3. static 成员的应用 三,友元3.1 友元函数3.2 …

SQL注入漏洞扫描---sqlmap

what SQLMap是一款先进的自动执行SQL注入的审计工具。当给定一个URL时,SQLMap会执行以下操作: 判断可注入的参数。判断可以用哪种SQL注入技术来注入。识别出目标使用哪种数据库。根据用户的选择,读取哪些数据库中的数据。 更详细语法请参考…

Mac 安装 JDK21 流程

一、下载JDK21 访问Oracle官方网站或选择OpenJDK作为替代品。Oracle JDK从11版本开始是商业的,可能需要支付费用。OpenJDK是一个免费开源选项。 Oracle JDK官方网站:Oracle JDK Downloads OpenJDK官方网站:OpenJDK Downloads 这里以JDK21为…

Nginx实现端口转发与负载均衡配置

前言:当我们的软件体系结构较为庞大的时候,访问量往往是巨大的,所以我们这里可以使用nginx的均衡负载 一、配置nginx实现端口转发 本地tomcat服务端口为8082 本地nginx端口为8080 目的:将nginx的8080转发到tomcat的8082端口上…

如何从Mac电脑恢复任何删除的视频

Microsoft Office是包括Mac用户在内的人们在世界各地创建文档时使用的最佳软件之一。该软件允许您创建任何类型的文件,如演示文稿、帐户文件和书面文件。您可以使用 MS Office 来完成。所有Microsoft文档都可以在Mac上使用。大多数情况下,您处理文档&…

网络安全审计

一、什么叫网络安全审计 网络安全审计是按照一定的安全策略,利用记录、系统活动和用户活动等信息,检查、审查和检验操作时间的环境及活动,从而发现系统漏洞、入侵行为或改善系统性能的过程,它是提高系统安全性的重要手段。 系统…

springboot整合mybatis配置多数据源(mysql/oracle)

目录 前言导入依赖坐标创建mysql/oracle数据源配置类MySQLDataSourceConfigOracleDataSourceConfig application.yml配置文件配置mysql/oracle数据源编写Mapper接口编写Book实体类编写测试类 前言 springboot整合mybatis配置多数据源,可以都是mysql数据源&#xff…

2024 年 16 款最佳屏幕录制软件榜单

屏幕录制软件是一个经常被忽视的方便工具。 无论您是想要制作教程的内容创建者还是想要录制演示文稿的业务人员,屏幕录像机都是必须的!但是,在使用您能找到的第一台录音机之前,我们建议您花些时间找到符合您需求的录音机。有这么…

pytorch笔记:ModuleDict

1 介绍 在 PyTorch 中,nn.ModuleDict 是一个方便的容器,用于存储一组子模块(即 nn.Module 对象)的字典这个容器主要用于动态地管理多个模块,并通过键来访问它们,类似于 Python 的字典 2 特点 组织性 nn…

【Unity】在空物体上实现 IPointerClickHandler 不起作用

感谢Unity接口IPointerClickHandler使用说明_哔哩哔哩_bilibiliUnity接口IPointerClickHandler使用说明, 视频播放量 197、弹幕量 0、点赞数 3、投硬币枚数 2、收藏人数 2、转发人数 0, 视频作者 游戏创作大陆, 作者简介 ,相关视频:在Unity多场景同时编辑…