Flink Flink中的合流

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkUnionStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222).map(a -> Integer.parseInt(a));DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));unionResult.print();streamExecutionEnvironment.execute();}
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述

package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class FlinkConnectStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO 定义数字流SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<String> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222);/**TODO 连接两个流一次只能连接 2 条流两条流的数据类型可以不一致所以得到的结果不再是一个DataStream,而是一个"连接流"ConnectedStreams连接后可以调用 map、flatmap、process 来处理,但是各处理各的*/ConnectedStreams<Integer, String> connectedStreams = source1.connect(source2);SingleOutputStreamOperator<Object> map = connectedStreams.map(new CoMapFunction<Integer, String, Object>() {@Overridepublic Object map1(Integer integer) throws Exception {return "来源于数字流" + integer.toString();}@Overridepublic Object map2(String s) throws Exception {return "来源于字符流" + s;}});map.print();streamExecutionEnvironment.execute();}
}

在这里插入图片描述
上面的代码中,ConnectedStreams 有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个 CoMapFunction,表示分别对两条流中的数据执行 map 操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的 map 操作,.map2()则是针对第二条流。

四、CoProcessFunction

与 CoMapFunction 类似,如果是调用.map()就需要传入一个 CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams:

connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数 keySelector1 和 keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/208005.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入Android S (12.0) 探索Framework之输入系统IMS的构成与启动

文章目录 前言一、输入系统的基本组成部分二、输入系统相关源码分析1、IMS 构建1.1、SystemServer # startOtherServices()1.2、InputManagerService1.3、NativeInputManager # nativeInit()1.4、NativeInputManager1.5、InputManager1.6、InputDispatcher1.7、InputReader1.8、…

io基础入门

压缩的封装 参考&#xff1a;https://blog.csdn.net/qq_29897369/article/details/120407125?utm_mediumdistribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-0-120407125-blog-120163063.235v38pc_relevant_sort_base3&spm1001.2101.3001.…

6 新建工程——寄存器

文章目录 6.1 本地新建工程文件夹6.2 新建工程6.2.1 选择CPU型号6.2.2 在线添加库文件6.2.3 添加文件6.2.4 复制存储器分配文件6.2.5 配置选项卡6.2.5.1 Linker6.2.5.2 Target6.2.5.3 Output 选项卡6.2.5.4 Listing 选项卡6.2.6 下载器配置 版本说明&#xff1a;MDK5.24 6.1 本…

数据结构(三)——算法和算法分析

&#x1f600;前言 数据结构和算法是计算机科学领域中至关重要的概念。它们为解决实际问题提供了有效的方法和步骤。算法作为解决问题的方法和步骤&#xff0c;在计算机中以指令的有限序列的形式表达。本文将介绍算法的定义、描述和程序设计等方面的内容&#xff0c;帮助您深入…

【Redisson】基于自定义注解的Redisson分布式锁实现

前言 在项目中&#xff0c;经常需要使用Redisson分布式锁来保证并发操作的安全性。在未引入基于注解的分布式锁之前&#xff0c;我们需要手动编写获取锁、判断锁、释放锁的逻辑&#xff0c;导致代码重复且冗长。为了简化这一过程&#xff0c;我们引入了基于注解的分布式锁&…

目标检测——Faster R-CNN算法解读

论文&#xff1a;Faster R-CNN: Towards Real-Time Object Detection with Region Proposal Networks 作者&#xff1a;Shaoqing Ren, Kaiming He, Ross Girshick, and Jian Sun 链接&#xff1a;https://arxiv.org/abs/1506.01497 代码&#xff1a;https://github.com/rbgirsh…

sqli-labs靶场详解(less17-less22)

目录 less-17 less-18 less-19 less-20 less-21 less-22 less-17 修改密码关卡 服务器后端 账号密码都存在数据库中 使用UPDATE进行修改密码 尝试username处 尝试好久尝试不出来应该是对用户名进行了过滤 于是对password进行注入 判断注入点 passwdadmin 报错&#xff1a…

CentOS 7 部署 MariaDB 的 2 种方法

有两种安装 MariaDB 服务器的方法。您可以安装 CentOS 7 存储库中可用的默认版本&#xff0c;也可以通过手动添加 MariaDB 存储库来安装最新版本。 如果安装过MariaDB或MySQL&#xff0c;使用以下命令彻底删除它们: yum remove mariadb* yum remove mysql* 方法一: 使用 Yum…

安卓开发学习---kotlin版---笔记(一)

Hello word 前言&#xff1a;上次学习安卓&#xff0c;学了Java开发&#xff0c;简单的搭了几个安卓界面。这次要学习Kotlin语言&#xff0c;然后开发安卓&#xff0c;趁着还年轻&#xff0c;学点新东西&#xff0c;坚持~ 未来的你会感谢现在努力的你~ 主要学习资料&#xff1a…

15、 深度学习之正向传播和反向传播

上一节介绍了训练和推理的概念,这一节接着训练和推理的概念讲一下,神经网络的正向传播和反向传播。 其实单看正向传播和反向传播这两个概念,很好理解。 正向传播(Forward Propagation)是指从输入层到输出层的数据流动过程,而反向传播(Backpropagation)是指数据从输出…

30秒搞定一个属于你的问答机器人,快速抓取网站内容

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 文章目录 简介运行效果GitHub地址 简介 爬取一个网站的内容&#xff0c;然后让这个内容变成你自己的私有知识库&#xff0c;并且还可以搭建一个基于私有知识库的问…

IDEA下载和安装

IDEA的下载和安装 一、概述 IDEA全称IntelliJ IDEA&#xff0c;是用于Java语言开发的集成环境&#xff0c;它是业界公认的目前用于Java程序开发最好的工具。 集成环境&#xff1a;把代码编写&#xff0c;编译&#xff0c;执行&#xff0c;调试等多种功能综合到一起的开发工具…

光伏测算工具能测量哪些数据?

光伏测算工具在光伏电站的设计和规划过程中起着至关重要的作用。它们可以测量并分析一系列关键数据&#xff0c;以确保光伏电站的顺利建设和高效运营。本文将详细介绍光伏测算工具能测量的主要数据。 一、太阳能资源评估 光伏测算工具可以对场地的太阳能资源进行评估。这包括测…

ubuntu改window任务栏

经常在ubuntu和win之间切换&#xff0c;任务栏的布局不统一会让人很别扭&#xff0c;个人很喜欢win任务栏的不折叠图标功能&#xff0c;而ubuntu没有&#xff0c;又很喜欢的ubuntu的多工作空间&#xff0c;效率比副屏还高&#xff0c;还可以自定义切换工作空间的快捷键。鱼和熊…

判断一个字符序列是否为回文————利用使用双指针法

#include <stdio.h> #include <string.h> int is_palindrome(char s[]) { int left 0; int right strlen(s) - 1; // 循环判断左右指针字符是否相等 while (left < right) { // 如果左右指针所指字符不相等&#xff0c;则返回0表示不是回…

qt pdf 模块简介

文章目录 1. 技术平台2. Qt pdf 模块3. cmake 使用模块4. 许可证5. 简单示例5.1 CMakeLists.txt5.2 main.cpp 6. 总结 1. 技术平台 项目说明OSwin10 x64Qt6.6compilermsvc2022构建工具cmake 2. Qt pdf 模块 Qt PDF模块包含用于呈现PDF文档的类和函数。 QPdfDocument 类加载P…

常用sql记录

备份一张表 PostgreSQL CREATE TABLE new_table AS SELECT * FROM old_table;-- 下面这个比上面好&#xff0c;这个复制表结构时&#xff0c;会把默认值、约束、注释都复制 CREATE TABLE new_table (LIKE old_table INCLUDING ALL) WITHOUT OIDS; INSERT INTO new_table SELE…

10.30 作业 C++

设计一个Per类&#xff0c;类中包含私有成员:姓名、年龄、指针成员身高、体重&#xff0c;再设计一个Stu类&#xff0c;类中包含私有成员:成绩、Per类对象p1&#xff0c;设计这两个类的构造函数、析构函数和拷贝构造函数。 #include <iostream>using namespace std;clas…

【SpringCloud】注册中心和Ribbon负载均衡

SpringCloud 1.Eureka注册中心 1.1 Eureka的作用 注册中心拉取服务负载均衡远程调用 order-service得知user-service实例地址流程&#xff1a; user-service服务实例启动后&#xff0c;将自己的信息注册到eureka-server&#xff08;Eureka服务端&#xff09;&#xff0c;称…

pytorch中Conv1d、Conv2d与Conv3d详解

1 卷积介绍 1.1 什么是卷积 卷积&#xff08;convolution&#xff09;&#xff0c;是一种运算&#xff0c;你可以类比于加&#xff0c;减&#xff0c;乘&#xff0c;除&#xff0c;矩阵的点乘与叉乘等等&#xff0c;它有自己的运算规则&#xff0c;卷积的符号是星号*。表达式…