Flink作业执行之 2.算子 StreamOperator

Flink作业执行之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {// 处理数据void processElement(StreamRecord<IN> element) throws Exception;
}// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {// 处理双流输入中第一个流上的元素void processElement1(StreamRecord<IN1> element) throws Exception;// 处理双流输入中第二个流上的元素void processElement2(StreamRecord<IN2> element) throws Exception;
}// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {// 封装Functionprotected final F userFunction;// 通过Function实现进行算子的实例化public AbstractUdfStreamOperator(F userFunction) {this.userFunction = requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}// 算子生命周期的相关方法,实际上调用Function的方法@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();}}@Overridepublic void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);}
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {if (operator == null) {return null;} else if (operator instanceof StreamSource&& ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {// 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);} else if (operator instanceof StreamSink&& ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {// 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);} else if (operator instanceof AbstractUdfStreamOperator) {return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);} else {return new SimpleOperatorFactory<>(operator);}
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 获取StreamMap对应的StreamOperatorFactory工厂类return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {OneInputTransformation<T, R> resultTransform =new OneInputTransformation<>(this.transformation,operatorName,// 将StreamOperatorFactory工厂实例,传入到Transformation中operatorFactory,outTypeInfo,environment.getParallelism());@SuppressWarnings({"unchecked", "rawtypes"})SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {// 以下3个属性从父类继承// 函数实例protected final F userFunction;// 结果输出protected transient Output<StreamRecord<OUT>> output;// 默认算子链生成策略protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;public StreamMap(MapFunction<IN, OUT> mapper) {super(mapper);// 实例化StreamMap时,指定ALWAYS的算子链生成策略chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {// userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。output.collect(element.replace(userFunction.map(element.getValue())));}
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/*** StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 */
public enum ChainingStrategy {// 尽可能的将和上游算子链接到一起,大多数算子的默认值ALWAYS,// 当前算子不会上下游算子链接到一起NEVER,// 不会上游算子连接到一起,但是可以和下游算子链接到一起HEAD,// 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。HEAD_WITH_SOURCES;public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/351474.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows修改CMD窗口编码为UTF-8

windows下的cmd的默认编码是GBK编码&#xff0c;有时可能造成乱码问题&#xff0c;下面是我找到的两种更换编码方式为UTF-8的方法。 1、临时修改 &#xff08;1&#xff09;先进入cmd命令窗口&#xff08;快捷键win键R&#xff09; &#xff08;2&#xff09;直接输入“chcp…

复数乘法IP核的使用

一、IP核解析 这一部分参考自&#xff1a;FPGA IP之算数运算IP(1)_哔哩哔哩_bilibili IP核设置也是先僵硬复制up主的配置&#xff0c;后续再灵活变通。 在这张图片中&#xff0c;我们看到的是一个“Complex Multiplier (6.0)” IP 核的配置界面。以下是各个配置参数的详细说明…

部署LVS—DR群集

1、LVS-DR工作流向分析 &#xff08;1&#xff09;客户端发送请求到 Director Server&#xff08;负载均衡器&#xff09;&#xff0c;请求的数据报文&#xff08;源 IP 是 CIP,目标 IP 是 VIP&#xff09;到达内核空间。 &#xff08;2&#xff09;Director Server 和 Real Se…

Mongodb使用$pop删除数组中的元素

学习mongodb&#xff0c;体会mongodb的每一个使用细节&#xff0c;欢迎阅读威赞的文章。这是威赞发布的第67篇mongodb技术文章&#xff0c;欢迎浏览本专栏威赞发布的其他文章。如果您认为我的文章对您有帮助或者解决您的问题&#xff0c;欢迎在文章下面点个赞&#xff0c;或者关…

给文件夹加密的最简单方法

安当TDE透明加密针对文件夹数据加密的保护方案主要包括以下几个方面&#xff1a; 1. 透明加密机制&#xff1a; 用户无需关心数据的加密和解密过程&#xff0c;操作文件夹时就像处理普通数据一样。加密和解密操作在后台自动进行&#xff0c;对用户和应用程序透明。 2. 高性能加…

MySQL查询数据库中所有表名表结构及注释以及生成数据库文档

MySQL查询数据库中所有表名表结构及注释 生成数据库文档在后面&#xff01;&#xff01;&#xff01; select t.TABLE_COMMENT -- 数据表注释 , c.TABLE_NAME -- 表名称 , c.COLUMN_COMMENT -- 数据项 , c.COLUMN_NAME -- 英文名称 , -- 字段描述 , upper(c.DATA_TYPE) as …

SortTable.js + vxe-table 实现多条批量排序

环境: vue3+vxe-table+sorttable.js 功能: 实现表格拖动排序,支持单条排序,多条排序 实现思路: sorttable.js官网只有单条排序的例子,网上也都是简单的使用,想要实现多条排序,就要结合着表格的复选框功能,在对其勾选的行统一计算! 最终效果: 实现代码 <template>…

网络数据包抓取与分析工具wireshark的安及使用

WireShark安装和使用 WireShark是非常流行的网络封包分析工具&#xff0c;可以截取各种网络数据包&#xff0c;并显示数据包详细信息。常用于开发测试过程中各种问题定位。 1 任务目标 1.1 知识目标 了解WireShark的过滤器使用,通过过滤器可以筛选出想要分析的内容 掌握Wir…

分享一个 .NET Core 使用选项方式读取配置内容的详细例子

前言 在 .NET Core 中&#xff0c;可以使用选项模式&#xff08;Options Pattern&#xff09;来读取和管理应用程序的配置内容。 选项模式通过创建一个 POCO&#xff08;Plain Old CLR Object&#xff09;来表示配置选项&#xff0c;并将其注册到依赖注入容器中&#xff0c;方…

使用 Oracle SQL Developer 导入数据

使用 Oracle SQL Developer 导入数据 1. 导入过程 1. 导入过程 选择要导入数据的表&#xff0c; 然后单击右键&#xff0c;选择"导入数据"&#xff0c; 浏览本地文件&#xff0c;选择正确的工作表&#xff0c; 按默认&#xff0c; 按默认&#xff0c; 根据情况修改&…

解决MacOS docker 拉取镜像慢的问题

docker官网&#xff1a;https://docker.p2hp.com/get-started/index.html 下载完成之后&#xff0c;拉取镜像速度慢&#xff0c;问题如下&#xff1a; 解决方法 配置阿里源&#xff1a;https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors在docker desktop里面设置…

go的netpoll学习

go的运行时调度框架简介 Go的运行时&#xff08;runtime&#xff09;中&#xff0c;由调度器管理&#xff1a;goroutine&#xff08;G&#xff09;、操作系统线程&#xff08;M&#xff09;和逻辑处理器&#xff08;P&#xff09;之间的关系 以实现高效的并发执行 当一个gorout…

独辟蹊径:我是如何用Java自创一套工作流引擎的(上)

作者&#xff1a;后端小肥肠 创作不易&#xff0c;未经允许严谨转载。 目录 1. 前言 2. 我为什么要自创一套工作流引擎 3. 表结构设计及关系讲解 3.1. 流程类别business_approval_workflow 3.1.1. 表结构 3.1.2. 表关系说明 3.2. 流程定义business_approval_workflow_de…

LVS+Keepalived NGINX+Keepalived 高可用群集实战部署

Keepalived及其工作原理 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题。 VRRP协议&#xff08;虚拟路由冗余协议&#xff09; 是针对路由器的一种备份解决方案由多台路由器组成一个热备组&#xff0c;通过共用的…

Linux:基础IO(二.缓冲区、模拟一下缓冲区、详细讲解文件系统)

上次介绍了&#xff1a;Linux&#xff1a;基础IO&#xff08;一.C语言文件接口与系统调用、默认打开的文件流、详解文件描述符与dup2系统调用&#xff09; 文章目录 1.缓冲区1.1概念1.2作用与意义 2.语言级别的缓冲区2.1刷新策略2.2具体在哪里2.3支持格式化 3.自己来模拟一下缓…

简单谈谈云服务器私网IP的存在意义及优势

云服务器是基于虚拟化技术的计算资源&#xff0c;可以在云平台上灵活创建和管理。为了满足不同用户的需求&#xff0c;云服务提供商在云服务器上分配了两种类型的IP地址&#xff1a;公网IP和私网IP。其中&#xff0c;私网IP是指在局域网内使用的内部IP地址&#xff0c;无法通过…

计算机图形学入门11:图形管线与着色器

1.什么是图形管线 把场景中的物体经过一系列的处理&#xff0c;最后一张图像的形式在屏幕上显示出来&#xff0c;这一系列过程就是图形管线(Graphics Pipeline)&#xff0c;也叫实时渲染管线(Real-time Rendering Pipeline)。如下图所示&#xff0c;为整个渲染管线的过程。 渲染…

《幻影大师:透视缠中说禅的虚像与真相》

而且他从不犯错&#xff0c;至少在他的叙述中是这样&#xff0c;所有的文章和言论都被粉饰得完美无瑕&#xff0c;即便有误&#xff0c;他也绝不公开承认&#xff0c;更别提什么真诚的道歉和改正了。那些对他推崇备至的人&#xff0c;多是盲目追随&#xff0c;将他神化为无所不…

Win11 问题集

文章目录 一、Win11 选择其他应用打开无反应1、新建 1.reg 文件2、新建 2.reg 文件3、运行 reg 文件 二、Win11 账户怎么改名 一、Win11 选择其他应用打开无反应 Win11选择打开方式卡死怎么办? 选择打开方式没有反应的解决办法 1、新建 1.reg 文件 1.reg Windows Registry…

代理IP协议有何区别?深入了解 SOCKS5、HTTP 代理

在数字通信领域&#xff0c;数据安全和匿名性都是非常重要的指标。互联网的不断发展催生了几种协议&#xff0c;每种协议都有独特的优势和挑战。其中&#xff0c;SOCKS5 代理、HTTP代理最为广泛使用&#xff0c;下面给大家一起讨论&#xff0c;HTTP代理与 SOCKS5代理&#xff0…