【Flink】Flink 处理函数之基本处理函数(一)

1. 处理函数介绍

流处理API,无论是基本的转换聚合、还是复杂的窗口操作,都是基于DataStream进行转换的,所以统称为DataStreamAPI,这是Flink编程的核心。

但其实Flink为了更强大的表现力和易用性,Flink本身提供了多层API,DataStreamAPI只是中间一环,如下图所示:在这里插入图片描述
在更底层,Flink可以不定义任何具体的算子(比如 mapfilter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)

在处理函数中,操作的就是数据流中最基本的元素:数据事件(event)状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

2. 处理函数的分类

DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。

Flink 提供了 8 个不同的处理函数:

  • ProcessFunction
    最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
  • KeyedProcessFunction
    对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream
  • ProcessWindowFunction
    开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入。
  • ProcessAllWindowFunction
    同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
  • CoProcessFunction
    合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。
  • ProcessJoinFunction
    间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
  • BroadcastProcessFunction
    广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream与一个广播流(BroadcastStream)连接(conncet)之后的产物。
  • KeyedBroadcastProcessFunction
    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream广播流(BroadcastStream)做连接之后的产物。

2.1 基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。在Flink 中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作 ProcessFunction

2.1.1 处理函数的功能和使用

转换算子一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如Map算子只能获取当前的数据;而想窗口聚合复杂的操作AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现)。另外还有富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度任务名称之类的运行时信息。

但无论那种算子,如果想要访问事件的时间戳,或者当前的水位线信息,都是获取不到的。但是处理函数可以获取,处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)时间戳(timestamp)水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunctionMyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

代码实例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stream.process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {out.collect(value.toString());if (value.getUser().equals("Mary")) {out.collect(value.user + "click " + value.getUrl());} else if (value.getUser().equals("Alice")) {out.collect(value.user);out.collect(value.user);}System.out.println("timestamp:" + ctx.timestamp());System.out.println("watermark:" + ctx.timerService().currentWatermark());System.out.println(getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}}).print();env.execute();}

运行结果:
在这里插入图片描述

这里第一次的水位线的值其实是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
在这里插入图片描述

然后每次下一次的水位线都是上一次的timestamp - 1

2.1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...
public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
...
}
2.1.2.1 抽象方法.processElement()

用于处理元素,定义了处理的核心逻辑。这个方法对流中的每个元素都会调用一次,参数包括三个: 输入数据值 value上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。

  • value: 当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • cts:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()

Context 抽象类定义如下:

public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> outputTag, X value);
}
  • out: “收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

ProcessFunction 可以轻松实现flatMap这样的基本转换功能(当然 mapfilter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

2.1.2.2 非抽象方法.onTimer()
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);
}

用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp)上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。

处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用的是.processElement()方法,而处理水位线事件调用的是.onTimer()

.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/285837.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt程序可执行文件打包

目录 一、新建一个目录二、命令行2.1 添加临时变量2.2 打包命令 三、添加动态库四、普通 Qt 项目打包 Qml 项目打包 笔者写的python程序打包地址&#xff08;https://blog.csdn.net/qq_43700779/article/details/136994813&#xff09; 一、新建一个目录 新目录(例如test)用以…

Vue.js前端开发零基础教学(三)

目录 2.6 计算属性 2.7侦听器 2.8 样式绑定 2.8.1 绑定class属性 2.8.2 绑定style属性 2.9 阶段案例——学习计划表 2.6 计算属性 概念&#xff1a;Vue提供了计算属性来描述依赖响应式数据的复杂逻辑。 计算属性可以实时监听数据的变化&#xff0c;返回一个计算…

真假“长文本”,国产大模型混战

文&#xff5c;郝 鑫 Kimi有多火爆&#xff1f;凭一己之力搅乱A股和大模型圈。 Kimi概念股连日引爆资本市场&#xff0c;多个概念股随之涨停。在一片看好的态势中&#xff0c;谁都想来沾个边&#xff0c;据光锥智能不完全统计&#xff0c;目前&#xff0c;至少有包括读客…

【蓝桥杯知识点】浮点数二分(开n次方根再也不会超时啦!)

今天继续学习基础算法&#xff01;这篇文章介绍了二分的另一种应用——浮点数二分&#xff0c;可以用于开n次方根的计算&#xff0c;会使时间大大缩短&#xff01;我偷偷问过电脑编译器了&#xff0c;它说它喜欢优化过的算法哈哈哈哈~相信你也会喜欢的&#xff01; PS&#xff…

现代化应用部署工具-Docker

Docker 简介 什么是Docker Docker 是一个开源的应用容器引擎&#xff0c;可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上。 Docker部署的优势 通过使用Docker等容器技术&#xff0c;可以将应用程序及其依赖项…

构建品牌故事:Kompas.ai在叙事营销中的应用

引言 在数字化和全球化的浪潮中&#xff0c;品牌建设已经从单一的产品竞争演变为品牌故事的较量。叙事营销&#xff0c;作为一种通过讲述故事来传递品牌价值和理念的策略&#xff0c;已经成为连接品牌与消费者情感的桥梁。本文将深入探讨叙事营销的重要性&#xff0c;详细介绍K…

路由 (hash模式和history模式)

首先我们了解一下资源请求&#xff1a; 1.什么是资源 在浏览器需要某一个数据或文件进行解析或者浏览器在解析某个脚本的时候需要数据进行DOM渲染等工作&#xff0c;那么这个数据或文件就是浏览器的资源 2.资源怎么获取 浏览器的资源都必须通过资源请求的方式或从缓存中调出…

【Java程序设计】【C00371】基于(JavaWeb)Springboot的社区防疫物资申报系统(有论文)

TOC 博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;已经做了六年的毕业设计程序开发&#xff0c;开发过上千套毕业设计程序&#xff0c;博客中有上百套程序可供参考&#xff0c;欢迎共同交流学习。 项目简介 项目获取 &#x1f345;文末点击卡片…

MapReduce配置和Yarn的集群部署

一、集群环境&#xff0c;还是如下三台服务器 192.168.32.101 node1192.168.32.102 node2192.168.32.103 node3 二、YARN架构 YARN&#xff0c;主从架构&#xff0c;有2个角色 主&#xff08;Master&#xff09;角色&#xff1a;ResourceManager从&#xff08;Slave&#x…

政安晨:【深度学习实践】【使用 TensorFlow 和 Keras 为结构化数据构建和训练神经网络】(三)—— 随机梯度下降

政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras实战演绎 希望政安晨的博客能够对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff01; 这篇文章中&#xff0c;咱们将使用Keras和TensorFlow…

看似简单的SQL,实则就是简单

加班遇到一个SQL问题&#xff0c;本想把别人的SQL改下成SparkSQL&#xff0c;在YARN上运行&#xff0c;然而数据一直对不上。 原SQL ⚠️说明&#xff1a;a.id&#xff0c;b.id没有空的&#xff0c;数据1:1&#xff0c;b.name可能存在空的 select a.id,b.id,b.name from tab…

JDK1.6、1.7、1.8内存区域的变化?

JDK1.6、1.7/1.8内存区域发生了变化&#xff0c;主要体现在方法区的实现&#xff1a; JDK1.6使用永久代实现方法区&#xff1a; JDK1.7时发生了一些变化&#xff0c;将字符串常量池、静态变量&#xff0c;存放在堆上 在JDK1.8时彻底干掉了永久代&#xff0c;而在直接内存中划出…

【每日八股】Java基础经典面试题4

前言&#xff1a;哈喽大家好&#xff0c;我是黑洞晓威&#xff0c;25届毕业生&#xff0c;正在为即将到来的秋招做准备。本篇将记录学习过程中经常出现的知识点以及自己学习薄弱的地方进行总结&#x1f970;。 本篇文章记录的Java基础面试题&#xff0c;如果你也在复习的话不妨…

阿里的库存秒杀是如何实现的?

一、阿里的库存秒杀的实现 阿里有很多业务&#xff0c;几十上百个业务线&#xff0c;各自都有一些需要做抢购、秒杀、热点扣将的场景。他们都用哪些方案呢? 我看了很多资料&#xff0c;也找了很多人做交流&#xff0c;最终得到的结论是啥都有&#xff0c;主要总结几个主流的&…

Linux离线部署gitLab及使用教程

一、下载gitLab的linux系统rpm包 地址&#xff1a;Index of /gitlab-ce/yum/el7/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 找到这个最新版 点击下载 二、上传到linux系统 笔者是在windows系统下的vmware虚拟机中部署安装的&#xff0c;虚拟机中安装了cent…

《C++ Primer 第五版 中文版》第12章 动态内存【阅读笔记 + 个人思考】

《C Primer 第五版 中文版》第12章 动态内存【阅读笔记 个人思考】 12.1 动态内存与智能指针12.1.1 shared_ptr类 静态内存包括&#xff1a;初始化只读数据段&#xff0c;初始化读写数据段&#xff0c;未初始化数据和常量数据段。 详细在下面博客总结&#xff1a; Linux系统下…

商家如何自己零成本免费制作点餐小程序项目完整源码

现在点餐小程序成为餐饮店的标配&#xff0c;顾客只要扫码&#xff0c;即可进入小程序点餐。顾客付款后&#xff0c;后厨自动打印出订单并开始制作。整个过程非常方便流畅&#xff0c;甚至还可以免去收银&#xff08;或服务&#xff09;人员。那么&#xff0c;这种餐饮小程序要…

STM32—控制蜂鸣器(定时器)

目录 1 、 电路构成及原理图 2 、编写实现代码 main.c tim_irq.c 3、代码讲解 4、烧录到开发板调试、验证代码 5、检验效果 此笔记基于朗峰 STM32F103 系列全集成开发板的记录。 1 、 电路构成及原理图 定时器中断是利用定时器的计数功能&#xff08;向上计数或向下计…

ChatGPTGPT4科研应用、数据分析与机器学习、论文高效写作、AI绘图技术教程

原文链接&#xff1a;ChatGPTGPT4科研应用、数据分析与机器学习、论文高效写作、AI绘图技术教程https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247598798&idx2&sn014f5ae90306a3b1e8fd87ab58561411&chksmfa820329cdf58a3f72799a43016b223057fd1bd02284…

算法系列--动态规划--子序列(1)

&#x1f495;"深思熟虑的结果往往就是说不清楚。"&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;算法系列–动态规划–子序列(2) 今天带来的是算法系列--动态规划--子序列(1),是子序列问题的开篇!带大家初识子序列问题 一.什么是子序列问题 我们…