Java版Flink使用指南——合流

大纲

  • 新建工程
  • 无界流
    • 奇数Long型无界流
    • 偶数Long型无界流
    • 奇数String型无界流
  • 合流
    • Union
    • Connect
  • 测试
  • 工程代码

在《Java版Flink使用指南——分流导出》中,我们通过addSink进行了输出分流。本文我们将介绍几种通过多个无界流输入合并成一个流来进行处理的方案。

新建工程

我们新建一个名字叫MultiSource的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

无界流

我们使用《Java版Flink使用指南——自定义无界流生成器》中的方法,我们定义3个无界流。其中两个是Long类型,一个是String类型。

奇数Long型无界流

src/main/java/org/example/generator/UnBoundedOddStreamGenerator.java
这个类每隔1秒钟产生一个Long型奇数。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedOddStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 1L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count); // Emit datacount = count + 2;}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

偶数Long型无界流

src/main/java/org/example/generator/UnBoundedEvenStreamGenerator.java
这个类每隔1秒钟产生一个Long型偶数。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedEvenStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count); // Emit datacount = count + 2;}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

奇数String型无界流

src/main/java/org/example/generator/UnBoundedOddStringStreamGenerator.java
这个类每隔1秒钟产生一个String型奇数。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedOddStringStreamGenerator extends RichSourceFunction<String> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {long count = 1L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(String.valueOf(count)); // Emit datacount = count + 2;}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

合流

Union

Union是最简单的算子。它可以把两个数据类型相同的流合并
上面奇数和偶数Long型流就可以使用Union去做合并。

DataStreamSource<Long> evenLongDataStreamSource = env.addSource(new UnBoundedEvenStreamGenerator());
DataStreamSource<Long> oddLongDataStreamSource = env.addSource(new UnBoundedOddStreamGenerator());evenLongDataStreamSource.union(oddLongDataStreamSource).addSink(new SinkFunction<Long>() {@Overridepublic void invoke(Long value, Context context) throws Exception {System.out.println("sink union value: " + value);}}
).name("union stream");

Connect

Connect可以用于连接两个不同类型的流。这就意味着它需要提供针对不同类型的处理方法。
上面这个例子,如果使用Connect实现,则如下

evenLongDataStreamSource.connect(oddLongDataStreamSource).map(new CoMapFunction<Long, Long, Long>() {@Overridepublic Long map1(Long value) throws Exception {return value;}@Overridepublic Long map2(Long value) throws Exception {return value;}}).addSink(new SinkFunction<Long>() {@Overridepublic void invoke(Long value, Context context) throws Exception {System.out.println("sink connect value: " + value);}}).name("connect stream");

map方法中的CoMapFunction接口类中的map1和map2就是将两个不同类型的流归一化处理的中间方法。
IN1是Connect方法调用者的流数据类型;IN2是Connect参数的流数据类型;R是它们归一化后的类型。

@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {/*** This method is called for each element in the first of the connected streams.** @param value The stream element* @return The resulting element* @throws Exception The function may throw exceptions which cause the streaming program to fail*     and go into recovery.*/OUT map1(IN1 value) throws Exception;/*** This method is called for each element in the second of the connected streams.** @param value The stream element* @return The resulting element* @throws Exception The function may throw exceptions which cause the streaming program to fail*     and go into recovery.*/OUT map2(IN2 value) throws Exception;
}

假如我们将Long型偶数流和String型奇数流合并,并生成一个Double类型的流,则可以如下

evenLongDataStreamSource.connect(oddStringDataStreamSource).map(new CoMapFunction<Long, String, Double>() {@Overridepublic Double map1(Long value) throws Exception {return Double.valueOf(value);}@Overridepublic Double map2(String value) throws Exception {return Double.valueOf(value);}}).addSink(new SinkFunction<Double>() {@Overridepublic void invoke(Double value, Context context) throws Exception {System.out.println("sink union connect value: " + value);}}
).name("union connect stream");

map1方法将evenLongDataStreamSource中的Long型数据转成了Double;map2将oddStringDataStreamSource中的String型数据转换成了Double。

测试

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/373300.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux查看目录下的文件夹命令,find 查找某个目录,但是不包括这个目录本身?

linux查看目录下的文件夹命令&#xff0c;find 查找某个目录&#xff0c;但是不包括这个目录本身&#xff1f; Linux中查看目录下的文件夹的命令是使用ls命令。ls命令用于列出指定目录中的文件和文件夹。通过不同的选项可以实现显示详细信息、按照不同的排序方式以及使用不同的…

算法小练之 位运算基础

前言 今天正式走入&#xff0c;位运算这个章节&#xff0c;关于这一部分我会先介绍几个重要的知识点&#xff0c;然后再根据几个力扣上的题来讲解。 了解6种位操作 总所周知&#xff0c;变量在计算机中都是二进制存储的&#xff0c;比如一个变量int a 1&#xff1b; 它的存…

申请商标用什么颜色:企业和个人申请注册商标攻略!

在申请注册商标到底要用什么颜色&#xff0c;许多初次申请注册主体都不是特别清楚&#xff0c;普推知产商标老杨建议&#xff0c;在一般情况下建议尽量用黑白色&#xff0c;因为商标用黑白色在使用时可以着任何色。 在用黑色申请注册成功&#xff0c;别的主体用其它颜色要在同…

飞腾平台虚拟机组播性能调优指南

【写在前面】 飞腾开发者平台是基于飞腾自身强大的技术基础和开放能力&#xff0c;聚合行业内优秀资源而打造的。该平台覆盖了操作系统、算法、数据库、安全、平台工具、虚拟化、存储、网络、固件等多个前沿技术领域&#xff0c;包含了应用使能套件、软件仓库、软件支持、软件适…

Mattermost:一个强大的开源协作平台

Mattermost是一个强大的开源协作平台&#xff0c;基于云原生架构&#xff0c;为企业级用户提供安全、可扩展且自托管的消息传递解决方案。 一、平台特点 开源与定制性&#xff1a;Mattermost是一个开源项目&#xff0c;用户可以根据自身需求定制界面、添加功能或扩展其功能&am…

c++ 多边形 xyz 数据 获取 中心点方法

有需求需要对。多边形 获取中心点方法&#xff0c;绝大多数都是 puthon和java版本。立体几何学中的知识。 封装函数 point ##########::getCenterOfGravity(std::vector<point> polygon) {if (polygon.size() < 2)return point();auto Area [](point p0, point p1, p…

【福利】代码公开!咸鱼之王自动答题脚本

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 微信或QQ打开咸鱼之王小程序&#xff0c;进入答题界面&#xff0c;运行main.py。期间不要动鼠标。 可自行更改代码来适配自己的需求~ 可以按照示例图片…

在亚马逊云科技AWS上利用SageMaker机器学习模型平台搭建生成式AI应用(附Llama大模型部署和测试代码)

项目简介&#xff1a; 接下来&#xff0c;小李哥将会每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。本次介绍的是如何在Amazon …

大屏自适应容器组件 v-scale-screen

在vue中&#xff0c;v-scale-screen可用于大屏项目开发&#xff0c;实现屏幕自适应&#xff0c;可根据宽度自适应&#xff0c;高度自适应&#xff0c;和宽高等比例自适应&#xff0c;全屏自适应。 仓库地址&#xff1a;github国内地址&#xff1a;gitee 一、安装 npm instal…

leetcode--从中序与后序遍历序列构造二叉树

leeocode地址&#xff1a;从中序与后序遍历序列构造二叉树 给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 示例 1: 输入&#xff1a;inorder …

python脚本“文档”撰写——“诱骗”ai撰写“火火的动态”python“自动”脚本文档

“火火的动态”python“自动”脚本文档&#xff0c;又从ai学习搭子那儿“套”来&#xff0c;可谓良心质量&#x1f44d;&#x1f44d;。 (笔记模板由python脚本于2024年07月07日 15:15:33创建&#xff0c;本篇笔记适合喜欢钻研python和页面源码的coder翻阅) 【学习的细节是欢悦…

PHP智慧门店微信小程序系统源码

&#x1f50d;【引领未来零售新风尚】&#x1f50d; &#x1f680;升级启航&#xff0c;智慧零售新篇章&#x1f680; 告别传统门店的束缚&#xff0c;智慧门店v3微信小程序携带着前沿科技与人性化设计&#xff0c;正式启航&#xff01;这个版本不仅是对过往功能的全面优化&a…

Java面试八股之MySQL的redo log和undo log

MySQL的redo log和undo log 在MySQL的InnoDB存储引擎中&#xff0c;redo log和undo log是两种重要的日志&#xff0c;它们各自服务于不同的目的&#xff0c;对数据库的事务处理和恢复机制至关重要。 Redo Log&#xff08;重做日志&#xff09; 功能 redo log的主要作用是确…

伯克利、斯坦福和CMU面向具身智能端到端操作联合发布开源通用机器人Policy,可支持多种机器人执行多种任务

不同于LLM或者MLLM那样用于上百亿甚至上千亿参数量的大模型&#xff0c;具身智能端到端大模型并不追求参数规模上的大&#xff0c;而是指其能吸收大量的数据&#xff0c;执行多种任务&#xff0c;并能具备一定的泛化能力&#xff0c;如笔者前博客里的RT1。目前该领域一个前沿工…

CentOS6禁止锁屏

在电源中设置后还是会锁屏, 原因是有屏幕保护程序 电源管理都 “从不” 一些AI的回答 在CentOS 6系统中&#xff0c;如果你想要禁用锁屏功能&#xff0c;可以编辑/etc/kbd/config文件。这个文件通常包含了键盘相关的设置&#xff0c;包括密码策略和屏幕锁定选项。 首先打开终…

javascript高级部分笔记

javascript高级部分 Function方法 与 函数式编程 call 语法&#xff1a;call([thisObj[,arg1[, arg2[, [,.argN]]]]]) 定义&#xff1a;调用一个对象的一个方法&#xff0c;以另一个对象替换当前对象。 说明&#xff1a;call 方法可以用来代替另一个对象调用一个方法。cal…

C语言程序题(一)

一.三个整数从大到小输出 首先做这个题目需要知道理清排序的思路&#xff0c;通过比较三个整数的值&#xff0c;使之从大到小输出。解这道题有很多方法我就总结了两种方法&#xff1a;一是通过中间变量比较和交换&#xff0c;二是可以用冒泡排序法&#xff08;虽然三个数字排序…

微信小程序引入自定义子组件报错,在 C:/Users/***/WeChatProjects/miniprogram-1/components/路径下***

使用原生小程序开发时候&#xff0c;会报下面的错误&#xff0c; [ pages/button/button.json 文件内容错误] pages/button/button.json: [“usingComponents”][“second-component”]: “…/…/components/second-child/index”&#xff0c;在 C:/Users/***/WeChatProjects/m…

Infinitar链游新发展新机遇

区块链游戏市场在近年来经历了显著增长&#xff0c;吸引了大量的投资和关注。随着加密货币和NFT&#xff08;非同质化代币&#xff09;概念的普及&#xff0c;越来越多的投资者、游戏开发者和看到了区块链技术在游戏领域的应用潜力&#xff0c;纷纷涌入市场。区块链游戏的用户量…

电脑虚拟摄像头怎么使用?电脑摄像头可以被虚拟摄像头替代吗?8款推荐!

在数字化日益普及的今天&#xff0c;视频通话和在线会议已成为我们生活和工作中不可或缺的一部分。然而&#xff0c;当我们的电脑没有配备摄像头&#xff0c;或摄像头出现故障时&#xff0c;我们可能会面临一些不便。这时&#xff0c;电脑虚拟摄像头便成为了一个实用的解决方案…