【API篇】五、Flink分流合流API

文章目录

  • 1、filter算子实现分流
  • 2、分流:使用侧输出流
  • 3、合流:union
  • 4、合流:connect
  • 5、connect案例

分流,很形象的一个词,就像一条大河,遇到岸边有分叉的,而形成了主流和测流。对于数据流也一样,不过是一个个水滴替换成了一条条数据。

在这里插入图片描述

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

在这里插入图片描述

1、filter算子实现分流

Demo案例:读取一个整数数字流,将数据流划分为奇数流和偶数流。

实现思路:针对同一个流,多次条用filter算子来拆分

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> ds = env.socketTextStream("node01", 9527).map(Integer::valueOf);//将ds 分为两个流 ,一个是奇数流,一个是偶数流//使用filter 过滤两次SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);ds1.print("偶数");ds2.print("奇数");env.execute();}
}

以上实现的明显缺陷是,同一条数据,被多次处理。以上其实是将原始数据流stream复制两份,然后对每一份分别做筛选,冗余且低效。

2、分流:使用侧输出流

基本步骤为:

  • 使用process算子(Flink分层API中的最底层的处理函数)
  • 定义OutputTag对象,即输出标签对象,用于后面标记和提取侧流
  • 调用上下文ctx的.output()方法
  • 通过主流获取侧流
案例:实现将WaterSensor按照Id类型进行分流

先定义下MapFunction的转换规则,用来将输入的数据转为自定义的WaterSensor对象:

public class WaterSensorMapFunction implements MapFunction<StringWaterSensor>{@Overridepublic WaterSensor map(String value) throws Exception {String[] strArr = value.split( regex: ",");//String组装对象return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));}
}

使用侧流:

public class SplitStreamByOutputTag {    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());//定义两个输出标签对象,用于后面标记和提取侧流OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));//返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>(){@Override//形参为别为:流中的一条数据、上下文对象、收集器public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print("主流");SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);s1DS.printToErr("侧流s1");  //区别主流,让控制台输出标红s2DS.printToErr("侧流s2");env.execute();}
}

相关传参说明,首先是创建OutputTag对象时的传参:

  • 第一个参数为标签名,用于区分是哪一个侧流
  • 第二个是放入侧流中的数据的类型,且必须是Flink的类型(TypeInfomation,借助Types类)
  • OutputTag的泛型,是流到对应的侧流的数据类型

ProcessFunction接口的泛型中:

  • 第一个是输入的数据类型
  • 第二个是输出到主流上的数据类型

ctx.output方法的形参:

  • 第一个为outputTag对象
  • 第二个为数据,上面代码中传value即直接输出数据本身,也可输出处理后的数据,主流侧流数据类型不用一致

看下运行效果:

在这里插入图片描述

3、合流:union

将来源不同的多条流,合并成一条来联合处理,即合流。最简单的合流操作,就是直接将多条流合在一起,叫作流的联合(union)

在这里插入图片描述

union的条件是:

  • 每条流中要合并的数据类型必须相同(原始不同,可先借助map,在union)
  • 合并之后的新流会包括所有流中的元素,数据类型不变
stream1.union(stream2, stream3, ...)  //可变长参数
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
//输出:
1
2
3
2
2
3
2
2
3

4、合流:connect

union合并流受限于数据类型,因此还有另一种合流操作:connect

在这里插入图片描述

public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Integer流SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("node01", 9527).map(i -> Integer.parseInt(i));//String流DataStreamSource<String> source2 = env.socketTextStream("node01", 2795);/*** 总结: 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的*/ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "来源于原source1流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "来源于原source2流:" + value;}});result.print();env.execute();    }
}

使用 connect 合流的总结:

  • 一次只能连接 2条流,因为connect返回的是一个ConnectedStreams对象,不再是DataStreamSource或其子类了
  • 两条流中的数据类型可以不一样
  • 连接后可以调用 map、flatmap、process来处理,但是各处理各的

以map为例,其形参是一个CoMapFuntion接口类型,泛型则分别是流1的数据类型、流2的数据类型、合并及处理后输出的数据类型。两个map方法可以看出,虽然两个流合并成一个了,但处理数据时还是各玩各的。

  • .map1()就是对第一条流中数据的map操作
  • .map2()则是针对第二条流

在这里插入图片描述

connect 就类比被逼相亲后结婚,两个人看似成一家了,但实际上各自玩各自的。往大了举例就相当于一国两制。

5、connect案例

和connect以后的map传CoMapFunction一样,process算子也不再传ProcessFunction,而是CoProcessFunction,实现两个方法:

  • processElement1():针对第一条流
  • processElement2():针对第二条流

connect合并后得到的ConnectedStreams也可以直接调用.keyBy()进行按键分区,分区后返回的还是一个ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2,是两条流中各自的键选择器

ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理

案例需求:连接两条流,输出能根据id匹配上的数据,即两个流里元组f0相同的数据(类似inner join效果)
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//二元组流DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));//三元组流DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> result = connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 定义 HashMap,缓存来过的数据,key=id,value=list<数据>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s1数据,都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条,直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s2数据,都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条,直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}});result.print();env.execute();}
}

运行效果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/163468.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GitHub-使用 Git工具 创建密钥id_rsa.pub

快速导航 步骤1 打开Git Bash步骤2 输入指令【ssh-keygen】步骤3 打开创建的公钥文件步骤4 复制其中所有内容步骤5 打开GitHub中的Setting界面步骤6 添加SSH keys 步骤1 打开Git Bash 打开Git Bash 工具 步骤2 输入指令【ssh-keygen】 输入指令【ssh-keygen】&#xff0c;并…

【四:httpclient的使用】

目录 1、Demo案例2、请求一个带cookies的get请求3、请求一个带cookies的post请求案例一&#xff0c;案例二的properties的配置 1、Demo案例 public class MyHttpClient {Testpublic void test1() throws IOException {//用来存放我们的结果String result;HttpGet get new Htt…

安装VSCode,提升工作效率!iPad Pro生产力进阶之路

文章目录 前言1. 本地环境配置2. 内网穿透2.1 安装cpolar内网穿透(支持一键自动安装脚本)2.2 创建HTTP隧道 3. 测试远程访问4. 配置固定二级子域名4.1 保留二级子域名4.2 配置二级子域名 5. 测试使用固定二级子域名远程访问6. iPad通过软件远程vscode6.1 创建TCP隧道 7. ipad远…

模型量化笔记--KL散度量化

KL散度量化 前面介绍的非对称量化中&#xff0c;是将数据中的min值和max值直接映射到[-128, 127]。 同样的&#xff0c;前面介绍的对称量化是将数据的最大绝对值 ∣ m a x ∣ |max| ∣max∣直接映射到127。 上面两种直接映射的方法比较粗暴&#xff0c;而TensorRT中的int8量化…

openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数

文章目录 openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数102.1 背景信息102.2 操作步骤 openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数 102.1 背景信息 当用户连接数达到上限后&#…

小黑子—Maven基础

Maven基础 一 小黑子的Maven学习1. Mavn的介绍2. Maven基础概念2.1 仓库2.2 坐标2.3 仓库配置 3. 手动写一个maven项目3.1 Maven项目构建命令3.2 插件创建工程 4. IDEA下的maven项目5. 依赖管理5.1 依赖配置5.2 依赖传递5.3 可选依赖&#xff08;不透明&#xff09;5.4 排除依赖…

【一:实战开发testng的介绍】

目录 1、主要内容1.1、为啥要做接口测试1.2、接口自动化测试落地过程1.3、接口测试范围1.4、手工接口常用的工具1.5、自动化框架的设计 2、testng自动化测试框架基本测试1、基本注解2、忽略测试3、依赖测试4、超时测试5、异常测试6、通过xml文件参数测试7、通过data实现数据驱动…

UWB十个知识点

UWB是一直被基于厚望的高精度定位技术 1&#xff1a;定位技术及UWB特点 位置空间感知技术包括了GNSS、RFID、蓝牙和UWB&#xff0c;在室内和区域空间测量最具技术优势的技术是UWB。 GNSS是广域定位技术&#xff0c;室内以及建筑物旁边等场景&#xff0c;GNSS无法实现定位&am…

【微服务 SpringCloud】实用篇 · Ribbon负载均衡

微服务&#xff08;4&#xff09; 文章目录 微服务&#xff08;4&#xff09;1. 负载均衡原理2. 源码跟踪1&#xff09;LoadBalancerIntercepor2&#xff09;LoadBalancerClient3&#xff09;负载均衡策略IRule4&#xff09;总结 3. 负载均衡策略3.1 负载均衡策略3.2 自定义负载…

企业IT资产设备折旧残值如何计算

环境&#xff1a; 企业/公司 IT资产 问题描述&#xff1a; 企业IT设备折旧残值如何计算&#xff1f; 解决方案&#xff1a; 1.按三年折旧 净值原值-月折旧额折旧月份 &#xff0c; 月折旧额原值(1-3%)/36 折旧月份ROUND(E2*(1-3%)/36,2) 2.净值E2-F2*G2

vue使用pdf 导出当前页面,(jspdf, html2canvas )

需要安装两个插件 npm install html2canvas jspdfyarn add html2canvas jspdf<div class"app-container" id"pdfPage">我是内容 </div><el-button size"mini" click"onExportPdf">导出数据</el-button>onexp…

代码随想录算法训练营第23期day24|回溯算法理论基础、77. 组合

目录 一、回溯算法基础 回溯法模板 二、&#xff08;leetcode 77&#xff09;组合 剪枝 一、回溯算法基础 1.回溯的本质是穷举&#xff0c;穷举所有可能&#xff0c;然后选出想要的答案&#xff08;为了提升效率&#xff0c;最多再加一个剪枝&#xff09; 2.回溯法解决的…

凝聚技术力量 共建测试生态 ——集成电路测试技术交流日成功举办

10月18日下午&#xff0c;凝聚技术力量&#xff0c;共建测试生态 ——集成电路测试技术交流会在上海成功举办。来自全国各地知名专家学者、技术大咖及企业代表齐聚一堂&#xff0c;共同探讨封装测试技术的发展方向&#xff0c;共话产业未来&#xff0c;共促产业发展。 本次活动…

Stable Diffusion WebUI扩展a1111-sd-webui-tagcomplete之Booru风格Tag自动补全功能详细介绍

安装地址 直接附上地址先: Ranting8323 / A1111 Sd Webui Tagcomplete GitCodeGitCode——开源代码托管平台,独立第三方开源社区,Git/Github/Gitlabhttps://gitcode.net/ranting8323/a1111-sd-webui-tagcomplete.git上面是GitCode的地址,下面是GitHub的地址,根据自身情…

CUDA编程入门系列(二) GPU硬件架构综述

一、Fermi GPU Fermi GPU如下图所示&#xff0c;由16个SM&#xff08;stream multiprocessor&#xff09;组成&#xff0c;不同的SM之间通过L2 Cache和全局内存进行相连。整个架构大致分为两个层次&#xff0c;①总体架构由多个SM组成 ②每个SM由多个SP core&#xff08;stream…

数据结构中的七大排序(Java实现)

目录 一、直接插入排序 二、希尔排序 三、直接选择排序 四、堆排序 五、冒泡排序 六、快速排序 七、归并排序 一、直接插入排序 思想&#xff1a; 定义i下标之前的元素全部已经有序&#xff0c;遍历一遍要排序的数组&#xff0c;把i下标前的元素全部进行排序&#xff0…

elementui select组件下拉框底部增加自定义按钮

elementui select组件下拉框底部增加自定义按钮 el-select组件的visible-change 事件&#xff08;下拉框出现/隐藏时触发&#xff09; <el-selectref"select":value"value"placeholder"请选择"visible-change"visibleChange">&…

一天吃透Java集合面试八股文

内容摘自我的学习网站&#xff1a;topjavaer.cn 常见的集合有哪些&#xff1f; Java集合类主要由两个接口Collection和Map派生出来的&#xff0c;Collection有三个子接口&#xff1a;List、Set、Queue。 Java集合框架图如下&#xff1a; List代表了有序可重复集合&#xff0c…

软考-访问控制技术原理与应用

本文为作者学习文章&#xff0c;按作者习惯写成&#xff0c;如有错误或需要追加内容请留言&#xff08;不喜勿喷&#xff09; 本文为追加文章&#xff0c;后期慢慢追加 by 2023年10月 访问控制概念 访问控制是计算机安全的一个重要组成部分&#xff0c;用于控制用户或程序如…

LiveGBS流媒体平台GB/T28181常见问题-安全控制HTTP接口鉴权勾选流地址鉴权后401Unauthorized如何播放调用接口

LiveGBS流媒体平台GB/T28181常见问题-安全控制HTTP接口鉴权勾选流地址鉴权后401 Unauthorized如何播放调用接口&#xff1f; 1、安全控制1.1、HTTP接口鉴权1.2、流地址鉴权 2、401 Unauthorized2.1、携带token调用接口2.1.1、获取鉴权token2.1.2、调用其它接口2.1.2.1、携带 Co…