Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

目录

1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。

2. File Sink

File Sink

Format Types 

Row-encoded Formats 

Bulk-encoded Formats 

桶分配

滚动策略

3. 如何输出结果

集合数据到客户端,execute_and_collect方法将收集数据到客户端内存

将结果发送到DataStream sink connector

将结果发送到Table & SQL sink connector

4. 执行 PyFlink DataStream API 作业。


1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。

本教程使用 FileSink 将结果数据写入文件中。

def split(line):yield from line.split()# compute word count
ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build()
)

sink_to函数,将DataStream数据发送到自定义sink connector,仅支持FileSink,可用于batch和streaming执行模式。

2. File Sink

Streaming File Sink是Flink1.7中推出的新特性,是为了解决如下的问题:

大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中。

Streaming File Sink就可以用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中,支持Exactly-Once语义。这种sink实现的Exactly-Once都是基于Flink checkpoint来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。

Streaming File Sink 是社区优化后添加的connector,推荐使用。

Streaming File Sink更灵活,功能更强大,可以自己实现序列化方法

Streaming File Sink有两个方法可以输出到文件:行编码格式forRowFormat 和  块编码格式forBulkFormat。

forRowFormat 比较简单,只提供了SimpleStringEncoder写文本文件,可以指定编码。

由于流数据本身是无界的,所以,流数据将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd--HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件。

Flink 提供了两个分桶策略,分桶策略实现了

org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 接口:

BasePathBucketAssigner,不分桶,所有文件写到根目录;

DateTimeBucketAssigner,基于系统时间(yyyy-MM-dd--HH)分桶。

除此之外,还可以实现BucketAssigner接口,自定义分桶策略。

Flink 提供了两个滚动策略,滚动策略实现了

org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy 接口:

DefaultRollingPolicy 当超过最大桶大小(默认为 128 MB),或超过了滚动周期(默认为 60 秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;

OnCheckpointRollingPolicy 当 checkpoint 的时候,滚动文件。

File Sink

File Sink 将传入的数据写入存储桶中。考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件。 完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。这意味着桶中将包含一个小时间隔内接收到的记录。

桶目录中的数据被拆分成多个 Part 文件。对于相应的接收数据的桶的 Sink 的每个 Subtask,每个桶将至少包含一个 Part 文件。将根据配置的滚动策略来创建其他 Part 文件。 对于 Row-encoded Formats默认的策略是根据 Part 文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。 对于 Bulk-encoded Formats 在每次创建 Checkpoint 时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。

重要:  STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

Format Types 

FileSink 不仅支持 Row-encoded 也支持 Bulk-encoded,例如 Apache Parquet 这两种格式可以通过如下的静态方法进行构造:

  • Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
  • Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

不论创建 Row-encoded Format 或者 Bulk-encoded Format Sink 时,都必须指定桶的路径以及对数据进行编码的逻辑。

Row-encoded Formats 

Row-encoded Format 需要指定一个 Encoder,在输出数据到文件过程中被用来将单个行数据序列化为 Outputstream

除了 bucket assignerRowFormatBuilder 还允许用户指定以下属性:

  • Custom RollingPolicy :自定义滚动策略覆盖 DefaultRollingPolicy
  • bucketCheckInterval (默认值 = 1 min) :基于滚动策略设置的检查时间间隔
data_stream = ...
sink = FileSink \.for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \.with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \.build()
data_stream.sink_to(sink)

这个例子中创建了一个简单的 Sink,默认的将记录分配给小时桶。 例子中还指定了滚动策略,当满足以下三个条件的任何一个时都会将 In-progress 状态文件进行滚动:

  • 包含了至少15分钟的数据量
  • 从没接收延时5分钟之外的新纪录
  • 文件大小已经达到 1GB(写入最后一条记录之后)

Bulk-encoded Formats 

Bulk-encoded Sink 的创建和 Row-encoded 的相似,但不需要指定 Encoder,而是需要指定 BulkWriter.Factory BulkWriter 定义了如何添加和刷新新数据以及如何最终确定一批记录使用哪种编码字符集的逻辑。

Flink 内置了5 BulkWriter 工厂类:

  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory

重要 Bulk-encoded Format 仅支持一种继承了 CheckpointRollingPolicy 类的滚动策略。 在每个 Checkpoint 都会滚动。另外也可以根据大小或处理时间进行滚动。

桶分配

桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。

Row-encoded Format Bulk-encoded Format使用了 DateTimeBucketAssigner 作为默认的分配器。 默认的分配器 DateTimeBucketAssigner 会基于使用了格式为 yyyy-MM-dd--HH 的系统默认时区来创建小时桶。日期格式(  桶大小)和时区都可以手动配置。

还可以在格式化构造器中通过调用 .withBucketAssigner(assigner) 方法指定自定义的 BucketAssigner

Flink 内置了两种 BucketAssigners

  • DateTimeBucketAssigner :默认的基于时间的分配器
  • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)

PyFlink 只支持 DateTimeBucketAssigner  BasePathBucketAssigner 

滚动策略

RollingPolicy 定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。  STREAMING 模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在 BATCH 模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。

Flink 内置了两种 RollingPolicies

  • DefaultRollingPolicy
  • OnCheckpointRollingPolicy

PyFlink 只支持 DefaultRollingPolicy  OnCheckpointRollingPolicy 

3. 如何输出结果

Print

ds.print()

Collect results to client

集合数据到客户端,execute_and_collect方法将收集数据到客户端内存

with ds.execute_and_collect() as results:

    for result in results:

        print(result)

将结果发送到DataStream sink connector

add_sink函数,将DataStream数据发送到sink connector,此函数仅支持FlinkKafkaProducer, JdbcSink和StreamingFileSink,仅在streaming执行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors import FlinkKafkaProducer
from pyflink.common.serialization import JsonRowSerializationSchemaserialization_schema = JsonRowSerializationSchema.builder().with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_producer = FlinkKafkaProducer(topic='test_sink_topic',serialization_schema=serialization_schema,producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds.add_sink(kafka_producer)

sink_to函数,将DataStream数据发送到自定义sink connector,仅支持FileSink,可用于batch和streaming执行模式

from pyflink.datastream.connectors import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoderoutput_path = '/opt/output/'
file_sink = FileSink \.for_row_format(output_path, Encoder.simple_string_encoder()) \  .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \.build()
ds.sink_to(file_sink)

将结果发送到Table & SQL sink connector

Table & SQL connectors也被用于写入DataStream. 首先将DataStream转为Table,然后写入到 Table & SQL sink connector.

from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# option 1:the result type of ds is Types.ROW
def split(s):splits = s[1].split("|")for sp in splits:yield Row(s[0], sp)ds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: Row(i[0] + j[0], i[1]))# option 1:the result type of ds is Types.TUPLE
def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))# emit ds to print sink
t_env.execute_sql("""CREATE TABLE my_sink (a INT,b VARCHAR) WITH ('connector' = 'print')""")table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

4. 执行 PyFlink DataStream API 作业。

PyFlink applications 是懒加载的,并且只有在完全构建之后才会提交给集群上执行。

要执行一个应用程序,你只需简单地调用 env.execute()。

env.execute()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/109355.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

水库大坝安全监测的主要内容包括哪些?

在水库大坝的实时监测中,主要任务是通过无线传感网络监测各个监测点的水位、水压、渗流、流量、扬压力等数据,并在计算机上用数据模式或图形模式进行实时反映,以掌握整个水库大坝的各项变化情况。大坝安全监测系统能实现全天候远程自动监测&a…

ruoyi-vue-plus 配置邮箱

ruoyi-vue-plus 配置邮箱 📔 千寻简笔记介绍 千寻简笔记已开源,Gitee与GitHub搜索chihiro-notes,包含笔记源文件.md,以及PDF版本方便阅读,且是用了精美主题,阅读体验更佳,如果文章对你有帮助请…

当我焦虑时,我从CSDN的博主身上学到了什么?

文章目录 前言一、思考为什么会产生差距1.1 懒惰1.2 没有合理的规划学习时间 二、我该如何做?2.1 认真生活规律作息2.2 做事就是0和1 结语 前言 我们在学习的过程当中总会遇到一些比我们自己优秀的人,不论你是在更好的985或211院校学习,还是…

Netty-01-快速掌握Java NIO

文章目录 一、从传统I/O到Java NIO二、NIO 三大组件1. Channel(通道)1.1. FileChannel1.1.1. 获取 FileChannel1.1.2. FileChannel 读取 文件1.1.3. FileChannel写⽂件1.1.4. 通道之前传输数据-transferFrom1.1.5. 通道之前传输数据-transferTo 1.2. Soc…

学习Linux基础知识与命令行操作

开始学习Linux系统前,首先要掌握计算机基础知识,了解硬件、操作系统、文件系统、网络和安全等概念。对这些基础知识的了解能够帮助理解Linux系统的概念和功能。 在Linux系统中,文件和目录是数据管理的基本单位。每个文件和目录都有一个称为&…

Unity实现倒计时和获取系统时间

一:创建UGUI 1.创建Canvas画布组件,调节Canvas画布的分辨率等其他设置。我们可以把视图设置为2D模式下。 2.创建Text文本组件,取名为Timer计时器,我们调整Text文本组件的大小,用锚点设置Text文本组件的位置,并且设置好Text文本组件的颜色。 3.我们再创建一个Text文…

Matlab怎么引入外部的latex包?Matlab怎么使用特殊字符?

Matlab怎么引入外部的latex包?Matlab怎么使用特殊字符? Matlab怎么使用特殊字符?一种是使用latex方式,Matlab支持基本的Latex字符【这里】,但一些字符需要依赖外部的包,例如“𝔼”,需…

android2022配置opencv4android480

1,安装android studio2022。 2,下载OPENCV4ANDROID,解压到任意盘中。 3,File->New->New Project,选择Empty Views Activity。再选择语言,本文选择JAVA。 4,File->New->Import Modu…

Kotlin数据结构

数据结构基础 什么是数据结构 在计算机科学中,数据结构(Data Structure)是计算机中存储、组织数据的方式。数据结构是各种编程语言的基础。 一些使用场景 不同的数据结构适用于不同的应用场景。比如HashMap与ConcurrentHashMap&#xff0…

(四)k8s实战-服务发现

一、Service 1、配置文件 apiVersion: v1 kind: Service metadata:name: nginx-svclabels:app: nginx-svc spec:ports:- name: http # service 端口配置的名称protocol: TCP # 端口绑定的协议,支持 TCP、UDP、SCTP,默认为 TCPport: 80 # service 自己的…

Apache StreamPark系列教程第二篇——项目打包和开发

一、项目打包 项目依赖maven、jdk8.0、前端(node、npm) //下载代码 git clone//maven打包相关内容 mvn -N io.takari:maven:wrapper //前端打包相关内容 curl -sL https://rpm.nodesource.com/setup_16.x | bash - yum -y install nodejs npm -v npm install -g pnpm默认是h2…

河湖长制综合管理信息平台建设项目总体设计方案[507页Word]

导读:原文《河湖长制综合管理信息平台建设项目总体设计方案[507页Word]》(获取来源见文尾),本文精选其中精华及架构部分,逻辑清晰、内容完整,为快速形成售前方案提供参考。 部分内容: 1.1.1.3…

【Docker】云原生利用Docker确保环境安全、部署的安全性、安全问题的主要表现和新兴技术产生

前言 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。 云原生利用Docker确保环境安全、部署的…

软件开发bug问题跟踪与管理

一、Redmine 项目管理和缺陷跟踪工具 官网:https://www.redmine.org/ Redmine 是一个开源的、基于 Web 的项目管理和缺陷跟踪工具。它用日历和甘特图辅助项目及进度可视化显示,同时它又支持多项目管理。Redmine 是一个自由开源软件解决方案,…

OLED透明屏是什么?什么叫做OLED透明屏的原屏?

OLED透明屏是一种新型的显示技术,具有高对比度、高亮度和能耗低等优势,正被越来越广泛地应用于各个领域中。 在OLED透明屏中,原屏是至关重要的元件之一。本文将深入探讨OLED透明屏原屏的意义、制造过程、品质要求、应用案例和发展趋势&#…

异地访问Oracle数据库的解决方案:利用内网穿透实现PL/SQL远程连接的建议与步骤

文章目录 前言1. 数据库搭建2. 内网穿透2.1 安装cpolar内网穿透2.2 创建隧道映射 3. 公网远程访问4. 配置固定TCP端口地址4.1 保留一个固定的公网TCP端口地址4.2 配置固定公网TCP端口地址4.3 测试使用固定TCP端口地址远程Oracle ​ 小月糖糖主页 在强者的眼中,没有最…

Nexus私有仓库+IDEA配置远程推送

目录 一、docker安装nexus本地私服,Idea通过maven配置deploy本地jar包(简单) 二、docker push镜像到第三方nexus远程私服(shell命令操作) 三、springboot通过maven插件自动生成docker镜像并push到nexus私服&#xf…

STM32--SPI通信与W25Q64(2)

STM32–SPI通信与W25Q64(1) 文章目录 SPI外设特征 SPI框图传输模式主模式全双工连续传输 非连续传输硬件SPI读写W25Q64 SPI外设 STM32内部集成了硬件SPI收发电路,可以由硬件自动执行时钟生成、数据收发等功能,减轻CPU的负担。 特…

[Linux]进程地址空间

[Linux]进程地址空间 文章目录 [Linux]进程地址空间进程地址空间的概念进程地址空间的实现理解写时拷贝为什么要有进程地址空间 进程地址空间的概念 操作系统作为计算机软硬件资源管理者,当然也要管理各个进程的内存分配,因此要有描述各个进程的内存分配…

【Docker 】Docker 客户端,容器使用,启动容器,启动已停止运行的容器,停止一个容器,进入容器

作者简介: 辭七七,目前大一,正在学习C/C,Java,Python等 作者主页: 七七的个人主页 文章收录专栏: 七七的闲谈 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖&#x1f…