【API篇】三、Flink转换算子API

文章目录

  • 0、demo数据
  • 1、基本转换算子:映射map
  • 2、基本转换算子:过滤filter
  • 3、基本转换算子:扁平映射flatMap
  • 4、聚合算子:按键分区keyBy
  • 5、聚合算子:简单聚合sum/min/max/minBy/maxBy
  • 6、聚合算子:归约聚合reduce
  • 7、用户自定义函数:函数类
  • 8、用户自定义函数:富函数类

创建完执行环境,从数据源读入数据,就该用转换算子对数据做处理了,即使用各种转换算子,将一个或多个DataStream转换为新的DataStream

在这里插入图片描述

0、demo数据

准备一个实体类WaterSensor:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class WaterSensor{private String id;   //水位传感器类型private Long ts;     //传感器记录时间戳private Integer vc;  //水位记录
}
//注意所有属性的类型都是可序列化的,如果属性类型是自定义类,那要实现Serializable接口

1、基本转换算子:映射map

map即把数据流中的数据进行转换,形成新的数据流。一一映射,消费一个元素就产出一个元素。

在这里插入图片描述
DataStream对象调用map()方法进行转换处理。map方法形参是接口MapFunction的实现对象,返回值类型还是DataStream:

public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_2", 2, 2));// 方式一:传入匿名类,实现MapFunctionstream.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二:传入MapFunction的实现类stream.map(new MapFunctionImpl()).print();//方式三:Lambda表达式stream.map(t -> t.getId()).print();//方式四:Lambda表达式stream.map(WaterSensor::getId).print();env.execute();}}public class MapFunctionImpl implements MapFunction<WaterSensor, String> {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}
}

在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。当好几个作业都需要这个转换逻辑时,不用匿名内部类,而是实现类好点,省的重复写转换逻辑。

2、基本转换算子:过滤filter

通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤

在这里插入图片描述
filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式

public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));// 传入匿名类实现FilterFunctionstream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return "sensor_1".equals(e.getId());}}).print();// Lambda表达式// stream.filter(t -> "sensor_1".equals(t.getId())).print();env.execute();}}

3、基本转换算子:扁平映射flatMap

flatMap主要是将数据流中的整体拆分成一个一个的个体使用,消费一个元素,可以产生0到多个元素。先扁平化,再映射。
在这里插入图片描述

//实现:如果输入的数据是sensor_1,只打印vc; 
//如果输入的数据是sensor_2,既打印ts又打印vcpublic class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));stream.flatMap(new FlatMapFunctionImpl()).print();env.execute();}} public class FlatMapFunctionImpl implements FlatMapFunction<WaterSensor, String> {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if (value.id.equals("sensor_1")) {out.collect(String.valueOf(value.vc));   //一进一出} else if (value.id.equals("sensor_2")) {out.collect(String.valueOf(value.ts));   //一进多出out.collect(String.valueOf(value.vc));}//sensor_3 一进0出}
}
//value为WaterSensor类型,收集器为String类型,即WaterSensor转String

map和flatMap相比,map总是能一进一出是因为MapFunction接口的map方法是有return返回值的,一个传入,肯定对应一个返回。而flatMap下,FlatMapFunction接口的flatMap方法返回值类型为void,最终返回啥,是靠收集器往下游传,调用n次采集器的collect方法,就输出n条数据,一次也不调,那就是不处理,又是void,那就相当于被过滤了,因此有了flatMap的一进多出:

  • 一进一出
  • 一进多出
  • 一进零出

4、聚合算子:按键分区keyBy

对海量数据进行聚合计算前,分组是必要的。

在这里插入图片描述

  • 按键分区keyBy,返回的是一个KeyedStream键控流
  • keyBy不是转换算子,不能设置并行度,只是对数据做一个重分区
  • 在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要自己重写hashCode()方法

关于keyBy分组和分区的关系:

  • keyBy是对数据分组,保证相同key的数据在同一个分区
  • 分区,一个子任务可以理解为一个分区
  • 一个分区(子任务)中可以有多个分组
//演示以demo类的id字段来分类
public class TransKeyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));// 方式一:使用Lambda表达式KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(t -> t.id);// 方式二:使用匿名类实现KeySelectorKeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor e) throws Exception {return e.id;}});//分区后继续做你需要的聚合env.execute();}
}
  • keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream
  • KeyedStream泛型中第一个为流中的元素类型外,第二个是key的类型
  • KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API
  • 只有基于KeyedStream才可以做后续的聚合操作(比如sum,reduce)

5、聚合算子:简单聚合sum/min/max/minBy/maxBy

注意点:

  • 在完成keyBy分组后,可以进行简单聚合
  • sum/min/max/minBy/maxBy是KeyedStream类下的API,因此必须先完成分组
  • 而简单聚合算子返回的,又变回了一个SingleOutputStreamOperator,即先分区、后聚合,得到的依然是一个DataStream
  • 是分组内的聚合,即对同一个key的数据进行聚合,不会跨key聚合

关于这些API:

  • sum():在分组内,对指定的字段做叠加求和
  • min():在分组内,对指定的字段求最小值
  • max():在分组内,对指定的字段求最大值
  • minBy():与min类似,区别是,min只计算指定字段的最小值,其他字段会保留最初第一条数据的值,而minBy则是字段最小值所在的整条数据。也就是除了指定字段,其他字段以谁为准的区别。
  • maxBy():与max类似,区别同上
public class TransAggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));stream.keyBy(t -> t.id).max("vc");    // 指定字段名称//stream.keyBy(t -> t.id).max(2);  //报错env.execute();}
}

注意,这几个聚合算子的传参有两种:指定位置,和指定名称,对于元组类型的数据,两种都行。但如果数据流中的类型不是元组,而是一个pojo类,那就只能通过字段名来指定,而不能传一个位置,否则报错Cannot reference field by position on POJO

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。

6、聚合算子:归约聚合reduce

public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_1", 3, 3),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3));// 传入匿名类实现ReduceFunctionstream.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic boolean reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("value1===" + value1);System.out.println("value2===" + value2);return new WaterSensor(value1.getId(),value2.getTs(),value1.getVc() + value2.getVc());}}).print();env.execute();}}

运行:

在这里插入图片描述

总结:

  • reduce算子依旧是keyBy之后KeyedStream的API

  • 该算子传入一个ReduceFunction对象,要求数据的输入类型等于输出类型
    在这里插入图片描述

  • ReduceFunction接口的reduce方法,value1和value2是流中某key分组的两个数据,中途,value1是之前的计算结果(存状态,有状态计算),value2是后面新来的数据

  • 每个key的分组里第一条数据来的时候,不会执行reduce方法,只是存起来,然后就发到下游了

  • reduce算子和前面的简单算子一样,会存每一个key的状态值,且状态不会清空,因此,如果是无界流,其key值要有限个

7、用户自定义函数:函数类

自定义函数,即用户根据自己的需求,重新实现算子的逻辑。Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

DataStreamSource<WaterSensor> stream = env.fromElements(        new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_1", 2, 2),new WaterSensor("sensor_2", 2, 2),new WaterSensor("sensor_3", 3, 3)
);DataStream<String> stream = stream.filter(new FilterFunctionImpl("sensor_1"));  //new对象的时候传入str,通过构造方法赋值给了id属性
public  class FilterFunctionImpl implements FilterFunction<WaterSensor> {private String id;FilterFunctionImpl(String id) { this.id=id; }@Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);   //当前对象的id属性}
}

关于函数类,写实现类、Lambda表达式、匿名内部类等方式重写算子对应的接口,前面已经演示过,上面重点改良了一下代码,把过滤关键字做为类的属性,通过构造方法传了进去。

8、用户自定义函数:富函数类

富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能。

public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1,2,3,4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer) throws Exception {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();}
}

在这里插入图片描述

生命周期的方法即:

  • open():每个子任务,在启动时,调用一次
  • close():每个子任务,在结束时,调用一次

但需要注意:

  • 当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用

  • 处理有界流,处理完以后程序运行结束,调用close

  • 处理无界流,程序中止时调用close

  • 如果Flink是异常中止,则不会调用close

  • 如果是正常调用cancle命令(控制台去cancle),则会正常调用close方法

关于富函数:

  • 相比普通的自定义函数类,富函数多了一个运行时上下文对象,可通过这个对象获取到运行时环境的信息,比如子任务编号、子任务名称
  • 有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等
  • 处理数据需求有时机要求时,可使用富函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/162943.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wsl使用vscode连接,远程安装C/C++ 拓展时,报错

报错内容&#xff1a; EACCES: permission denied, rename /home/wen/.vscode-server/extensions/.b61b1c7c-f703-4dfd-bdc5-d9a00681c4b7 -> /home/wen/.vscode-server/extensions/ms-vscode.cpptools-1.17.5-linux-x64 解决办法&#xff1a; 升级wsl到wsl2就好了。 &a…

GitLab使用webhook触发Jenkins自动构建

1、jenkins安装gitlab插件 在插件管理中&#xff0c;搜索gitlab安装这个插件。 2、job中配置webhook地址和密钥 进入job设置&#xff0c;构建触发器中就可以看到gitlab的webhook配置&#xff0c;复制URL地址和随机令牌至gitlab中 勾选后&#xff0c;就可以展开设置&#xff…

G.711语音编解码器详解

语音编解码利用人听觉上的冗余对语音信息进行压缩从而达到节省带宽的目的。值得注意的是,本文说的是语音编解码器,也就Speech codec,而常用的还有另一种编解码器称作音频编解码器,英文是Audio codec,它们的区别如下。 以前在学校的时候研究了很多VoIP的编解码器从G.723到A…

神经网络硬件加速器-DPU分析

一 DPU概述 DPU是专为卷积神经网络优化的可编程引擎&#xff0c;其使用专用指令集&#xff0c;支持诸多卷积神经网络的有效实现。 1、关键模块 卷积引擎&#xff1a;常规CONV等ALU&#xff1a;DepthwiseConvScheduler&#xff1a;指令调度分发Buffer Group&#xff1a;片上数据…

Kafka三种认证模式,Kafka 安全认证及权限控制详细配置与搭建

Kafka三种认证模式,Kafka 安全认证及权限控制详细配置与搭建。 Kafka三种认证模式 使用kerberos认证 bootstrap.servers=hadoop01.com:9092,hadoop02.com:9092,hadoop03.com:9092,hadoop04.com:9092 security.

信创办公–基于WPS的Word最佳实践系列 (图文环绕方式)

信创办公–基于WPS的Word最佳实践系列 &#xff08;图文环绕方式&#xff09; 目录 应用背景操作步骤1、 打开布局选项中图文环绕方式的方法2、 图文环绕三大类型 应用背景 在Word中&#xff0c;对文字和图片进行排版时&#xff0c;采用各种不同的图片与文字组合效果能够使页面…

php 遍历PHP数组的7种方式

在PHP中&#xff0c;遍历数组有多种方式可以选择。以下是最常用的几种方式&#xff1a; 使用foreach循环 $array array("apple", "banana", "orange"); foreach($array as $value){echo $value . "<br>"; } 输出结果&#xff…

【数组的使用续篇】

文章目录 以数组的形式打印数组打印方法&#xff1a;Arrays.toString(数组名) 数组排序大小排序方法是 Arrays.sort(数组名) 创建一个自己的打印数组的方法自己创建一个冒泡排序两数之间交换方法 逆置数组打印核心思路还是 i 和 j 交换 总结 以数组的形式打印数组 打印方法&am…

浅析“代码可视化” | 京东云技术团队

1.什么是代码可视化&#xff1f; Code visualization is the process of creating graphical representations of source code to help understand and analyze it. 代码可视化是创建源代码的图形表示以帮助理解和分析它的过程。 个人理解&#xff1a;通过使用图形化手段&#…

计算机保研推免面试复习大纲(数学+408)

目录 线性代数概率论高等数学信号与系统离散数学操作系统计算机网络计算机组成数据结构算法编译原理C杂项 线性代数 怎么求逆矩阵 逆矩阵&#xff1a; A A − 1 E AA^{-1}E AA−1E&#xff0c;伴随矩阵&#xff1a; A A ∗ A ∗ A ∣ A ∣ E AA^{*}A^{*}A|A|E AA∗A∗A∣A∣…

Top 10 数据恢复工具,可从iPhone 和 iPad 恢复数据

您是否正在寻找最好的 iPad 恢复软件&#xff0c;但不知道哪个选项最好&#xff1f;没有什么可担心的。本文将为您提供有关根据文件类型、设备兼容性和数据丢失原因等因素选择合适的 iPad 恢复软件的提示。此外&#xff0c;前 10 名提到的恢复软件是安全可靠的。 第 1 部分、iP…

上海亚商投顾:沪指震荡调整 转基因概念股逆势大涨

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日低开低走&#xff0c;深成指、创业板指均跌超1%&#xff0c;双双创出年内新低。转基因概念股逆势大涨…

开源Vue盲盒商城4.0源码/网页盲盒源码/前端uniapp后端thinkphp+安装教程/亲测

源码简介&#xff1a; 开源Vue盲盒商城4.0源码&#xff0c;它是打包小程序app的网页盲盒源码,亲测可用&#xff0c;它是采用vueTP5框架开发开源盲盒网站源码&#xff0c;附带了安装教程。 简单测试过了&#xff0c;可以使用&#xff0c;大家可以自测下。 前端uniapp后端think…

如何使用Python给图片添加水印

目录 一、安装Pillow库 二、导入Pillow库和需要用到的模块 三、添加水印 四、调用函数并设置参数 五、需要注意的方面 总结 在Python中&#xff0c;我们可以使用Pillow库来处理图像&#xff0c;包括添加水印。Pillow是Python中最流行的图像处理库之一&#xff0c;它支持多…

搭建Pytorch的GPU环境超详细

效果 1、下载和安装VS2019 https://visualstudio.microsoft.com/zh-hans/vs/older-downloads/ 登录需要用户名和密码 安装后需要联网下载组件的,安装的时候要勾选使用C++的桌面开发 2、下载和安装显卡驱动 查看自己的显卡型号 从英伟达下载和安装最新驱动

【面试经典150 | 区间】插入区间

文章目录 Tag题目解读题目来源解题思路方法一&#xff1a;合并区间方法二&#xff1a;模拟 其他语言python3 写在最后 Tag 【模拟】【数组】 题目解读 给定一个含有多个无重叠区间的数组&#xff0c;并且数组已经按照区间开始值升序排序。在列表中插入一个新的区间&#xff0…

【nginx学习笔记】

1.正向代理&#xff1a;代理的是客户端&#xff0c;一般有明确的访问对象 比如&#xff1a;我现在通过v-p-n去访问YouTube&#xff0c;那么就是正向代理。 2.反向代理&#xff1a;代理的是服务器 最常见的就是web中&#xff0c;nginx去代理一群后端的服务器。 3.负载均衡&…

solidworks 2024新功能之-打造更加智能的工作 硕迪科技

SOLIDWORKS 2024 的新增功能 SOLIDWORKS 的每个版本都致力于改进您的工作流程&#xff0c;使您常用的工具尽可能快速高效地运作。此外&#xff0c;SOLIDWORKS 2024 可以通过量身定制的解决方案扩展您的工具集&#xff0c;并使您能够通过 Cloud Services 轻松将您的设计数据连接…

【Linux进行时】进程控制

1.进程创建&#xff1a; 1.1fork函数 在linux中fork函数时非常重要的函数&#xff0c;它从已存在进程中创建一个新进程。新进程为子进程&#xff0c;而原进程为父进程。 \#include <unistd.h> pid_t fork(void); 返回值&#xff1a;子进程中返回0&#xff0c;父进程返…

APP开发成本的影响因素

在温州或中国任何地方开发APP的成本取决于多个因素&#xff0c;包括应用的规模、功能、设计、复杂性以及所需的技术和人力资源。以下是一些可能影响APP开发成本的主要因素&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xf…