Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权

基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction


处理函数的功能和使用

对于常用的转换算子来说:

  • MapFunction只能获取到当前的数据;
  • AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
  • RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();

但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的

与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了

因此需要使用处理函数

  • 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数还可以直接将数据输出到侧输出流(side output)中

处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:

stream.process(new MyProcessFunction())

简单示例:

public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}

ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑

ProcessFunction 解析

源码解析

源码如下:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a*     {@link TimerService} for registering timers and querying the time. The context is only*     valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,*     querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}*     for registering timers and querying the time. The context is only valid during the*     invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}

可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:

I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型

其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()

  • .processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义
    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
    • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
    • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
  • .onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService 来注册的
    • 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线

利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能

处理函数分类

  1. ProcessFunction:最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入
  2. KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream )
  3. ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入
  4. ProcessAllWindowFunction:开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入
  5. CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入
  6. ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物)
  8. KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream 调用.process()时作为参数传入(这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物)

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/238916.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity游戏图形学 Shader结构

shader结构 shader语言 openGL&#xff1a;SLG跨平台 >GLSL&#xff1a;openGL shaderlauguge DX&#xff1a;微软开发&#xff0c;性能很好&#xff0c;但是不能跨平台 >HLSL&#xff1a;high level shader language CG&#xff1a;微软和Nvidia公司联合开发&#xff…

使用ChatGPT对进行论文改写与润色

一、内容改写 关键在于明确改写的具体要求。 例如:[论文内容] 可以指明需要提升该段落的流畅性和逻辑连贯性。 常用指令 细微调整文本 轻微编辑 重写以增强表述清晰度 简化句式 校正语法和拼写错误 提升文本的流畅性和条理性 优化词汇使用 调整文本风格 进行深度编辑…

python-图片文字识别(三):EasyOCR

目录 简单使用 参数 异常处理 OCR,光学文字识别&#xff0c;对文本资料进行扫描&#xff0c;然后对图像文件进行分析处理&#xff0c;获取文字及版面信息的过程。easyocr是一个比较流行的库&#xff0c;支持超过80种语言。安装的时候注意会附带安装torch库&#xff08;一个深…

SpringBoot整合Kafka

一、首先下载windows版本的Kafka 官网&#xff1a;Apache Kafka 二、启动Kafka cmd进入到kafka安装目录&#xff1a; 1&#xff1a;cmd启动zookeeer .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties 2&#xff1a;cmd启动kafka server .\bin\wind…

图解拒付平台:如何应对用户的拒付

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;5&#xff09;篇。 本章主要讲清楚支付系统中拒付涉及的基本概念&#xff0c;产品架构、系统架构&#xff0c;以及一些核心的流程和相关领域模型、状态机设计等。 1. 前言 拒付在中国比较少见&#xff0c;但…

MPP架构和分布式架构的区别

前言&#xff1a;对大数据的数据处理需求&#xff0c;当前技术方向上存在两个不同的发展路线&#xff0c;MPP和分布式处理。两者数据处理的基本思路都是一样的&#xff0c;分布式并行处理再合并结果&#xff1b;但由于二者在处理架构上的差异&#xff0c;最终产品在应用需求性能…

LabVIEW在金属铜大气腐蚀预测评价系统中的应用

为了应对电子设备和仪器中金属铜因大气腐蚀带来的挑战&#xff0c;开发一种基于LabVIEW平台的先进预测评价系统。这个系统的设计宗旨是准确预测并评估在不同室内外环境中金属铜的腐蚀状况。我们团队在LabVIEW的强大数据处理和图形化编程支持下&#xff0c;结合实际的大气腐蚀数…

2024Flutter岗位面试题总结

StatelessWidget和StatefulWidget的区别是什么&#xff1f; StatelessWidget是一个不可变的类&#xff0c;充当UI布局中某些部分的蓝图&#xff0c;当某个组件在显示期间不需要改变&#xff0c;或者说没有状态&#xff08;State&#xff09;&#xff0c;你可以使用它。 Statef…

免费的域名要不要?

前言 eu.org的免费域名相比于其他免费域名注册服务&#xff0c;eu.org的域名后缀更加独特。同时&#xff0c;eu.org的域名注册也比较简单&#xff0c;只需要填写一些基本信息&#xff0c;就可以获得自己的免费域名。 博客地址 免费的域名要不要&#xff1f;-雪饼前言 eu.org…

AtCoder Beginner Contest 336 G. 16 Integers(图计数 欧拉路径转欧拉回路 矩阵树定理 best定理)

题目 给16个非负整数&#xff0c;x[i∈(0,1)][j∈(0,1)][k∈(0,1)][l∈(0,1)] 求长为n3的01串的方案数&#xff0c;满足长度为4的ijkl&#xff08;2*2*2*2&#xff0c;16种情况&#xff09;串恰为x[i][j][k][l]个 答案对998244353取模 思路来源 https://www.cnblogs.com/tz…

深度系统QT 环境搭建

1.QT安装 不折腾最新版直接去商店搜索QT安装。 2.修改su密码&#xff0c;安装需要权限 打开一个终端&#xff0c;然后输入下面的命令&#xff1a;按照提示输入密码按回车就行。 sudo passwd 回车后会出现让你输入现在这个账户的密码&#xff1a; 3.编译环境安装。 安…

生活自来水厂污水处理设备需要哪些

生活自来水厂是确保我们日常用水质量安全的重要设施。在自来水的生产过程中&#xff0c;污水处理设备是不可或缺的环节。那么&#xff0c;生活自来水厂的污水处理设备都有哪些呢&#xff1f;本文将为您详细介绍。 首先&#xff0c;生活自来水厂的污水处理设备主要包括预处理设备…

大语言模型系列-总述

大语言模型发展史 研究人员发现&#xff0c;扩展预训练模型&#xff08;Pre-training Language Model&#xff0c;PLM&#xff09;&#xff0c;例如扩展模型大小或数据大小&#xff0c;通常会提高下游任务的模型性能&#xff0c;模型大小从几十亿&#xff08;1 B 10亿&#x…

Linux常用命令大全(三)

系统权限 用户组 1. 创建组groupadd 组名 2. 删除组groupdel 组名 3. 查找系统中的组cat /etc/group | grep -n “组名”说明&#xff1a;系统每个组信息都会被存放在/etc/group的文件中1. 创建用户useradd -g 组名 用户名 2. 设置密码passwd 用户名 3. 查找系统账户说明&am…

MySQL的多表数据记录查询笔记

关系数据操作 合并查询数据记录 在MySQL中通过关键字UNION来实现并操作&#xff0c;即可以通过其将多个SELECT语句的查询结果合并在一起组成新的关系。 两张表&#xff0c;表1 和表2 带有关键字UNION的合并操作 关键字UNION会把查询结果集直接合并在一起&#xff0c;同时将…

力扣每日一题--2088. 统计农场中肥沃金字塔的数目

看到这道题有些人很容易放弃&#xff0c;其实这道题不是很难&#xff0c;主要是题目长&#xff0c;读的容易让人放弃&#xff0c;但是 只要抓住一些性质就可以解决该问题。 本题中的定义放到图像里其实就是个金字塔&#xff0c;下层的那部分比上一层的那部分&#xff0c;长度加…

What does `ResponseEntity` do?

ResponseEntity 作为 Spring MVC controller层 的 HTTP response&#xff0c;包含 status code, headers, body 这三部分。 正常场景 RestController Slf4j public class SearchController {AutowiredUserService userService;RequestMapping(value "/getAllStudents4&…

Androidmanifest文件加固和对抗

前言 恶意软件为了不让我们很容易反编译一个apk&#xff0c;会对androidmanifest文件进行魔改加固&#xff0c;本文探索androidmanifest加固的常见手法以及对抗方法。这里提供一个恶意样本的androidmanifest.xml文件&#xff0c;我们学完之后可以动手实践。 1、Androidmanife…

文心一言 vs. ChatGPT:哪个更胜一筹?

文心一言 vs. ChatGPT&#xff1a;从简洁美到深度思考的文本生成之旅 近年来&#xff0c;文本生成工具的崛起使得人们在表达和沟通方面拥有了更多的选择。在这个领域中&#xff0c;文心一言和ChatGPT作为两个备受瞩目的工具&#xff0c;各自以独特的优势展现在用户面前。本文将…

国际版WPS Office 18.6.1

【应用名称】&#xff1a;WPS Office 【适用平台】&#xff1a;#Android 【软件标签】&#xff1a;#WPS 【应用版本】&#xff1a;18.6.1 【应用大小】&#xff1a;160MB 【软件说明】&#xff1a;软件日常更新。WPS Office是使用人数最多的移动办公软件。独有手机阅读模式…