Flink05 Windows 操作轻松应对复杂的场景

Flink Windows 操作

上篇文章介绍了Flink 几种类型 Windows 本文介绍窗口操作相关API,以及各自使用场景 。

本期Flink Windows 相关操作apply/union/join/collect/CoMap/CoFlatMap

Windows apply

通过实现WindowFunction或AllWindowFunction接口来完成的,这些接口允许你定义如何对窗口内的数据进行处理。通过使用这些函数,你可以实现复杂的窗口计算逻辑,满足各种业务需求。

例如对窗口内数值进行求和:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.List;
import java.util.Random;public class WindowDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> input = ...;DataStream<Tuple2<String, Integer>> result = input.keyBy(f->f.f0) // 按第一个字段(假设是key)进行分组.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 定义一个10秒的时间窗口.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {int sum = 0;for (Tuple2<String, Integer> value : values) {sum += value.f1;}out.collect(new Tuple2<>(key, sum)); // 输出键和窗口内值的总和}});result.print(); // 打印结果env.execute();}}

Union

两个或多个数据流的联合会创建一个包含所有流中所有元素的新流。

注意事项 :使用Union操作符合并的数据流,其数据类型必须相同

Union 操作,很方便实现对"多源数据"进行分析, 在实际应用中,数据可能来自多个不同的源,不同的系统的,不同的数据库。那么Union 就有用武之地。

当然也有在一些复杂的流处理场景中,可能会先将一个大的数据流根据某些条件分流成多个小的数据流进行独立处理,然后再将这些处理后的数据流通过Union操作符合并成一个数据流,以便进行进一步的汇总或分析。

以下是一个Union 使用示例代码

package com.codetonight.datastream.operater;import com.codetonight.Example;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Example.Person> flintstones1 = env.fromElements(new Example.Person("Fred", 35),new Example.Person("Wilma", 35),new Example.Person("Pebbles", 2));DataStream<Example.Person> flintstones2 = env.fromElements(new Example.Person("Tom", 35),new Example.Person("java", 35),new Example.Person("CPlus", 2));DataStream<Example.Person> flintstones = flintstones1.union(flintstones2);DataStream<Example.Person> adults = flintstones.filter(new FilterFunction<Example.Person>() {@Overridepublic boolean filter(Example.Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}public String toString() {return this.name.toString() + ": age " + this.age.toString();}}
}

Window Join

它允许开发者在两个或多个数据流之间基于时间和/或键值进行联接操作。Window Join作用于两个流中有相同key且处于相同窗口的元素上

使用方式

dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});

Interval Join

Interval Join允许数据流(我们称之为“左流”)去Join另一条数据流(我们称之为“右流”)中前后一段时间内的数据。Interval Join处理具有时间相关性的数据流时特别有用,比如在处理用户行为分析、交易日志等场景中。

下面代码展示  Interval Join 使用方式。

满足 ** key1 == key2 && leftTs - 2 < rightTs < leftTs + 2**,

Interval Join的使用场景包括但不限于:

用户行为分析:例如,分析用户浏览行为交易行为之间关系

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound.upperBoundExclusive(true) // optional.lowerBoundExclusive(true) // optional.process(new ProcessJoinFunction{...});

coGroup

Window CoGroup是将两个数据流按照指定的key和窗口进行分组,并在每个窗口结束时将两个流中相同key的数据合并在一起。这种操作通常用于需要同时对两个流中的数据进行聚合、比较或关联的场景

Window CoGroup适用于需要对两个流中的数据进行复杂关联或聚合操作的场景,如:

实时用户行为分析:将用户的浏览行为和点击行为数据合并,分析用户的兴趣偏好。

用户可以在CoGroupFunction中实现自定义的处理逻辑,如聚合、过滤、比较等操作

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});
@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {/*** This method must be implemented to provide a user implementation of a coGroup. It is called* for each pair of element groups where the elements share the same key.** @param first The records from the first input.* @param second The records from the second.* @param out A collector to return elements.* @throws Exception The function may throw Exceptions, which will cause the program to cancel,*     and may trigger the recovery logic.*/void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

Connect

它允许开发者将两个DataStream(数据流)连接起来,以便后续可以对这两个流中的数据进行统一处理。与Union操作不同,Connect操作不要求两个流的数据类型必须相同

两个流之间可以共享状态,这在处理复杂的数据流逻辑时非常有用

dataStream.connect 操作会返回一个 ConnectedStreams

   ConnectedStreams collectStream  = dataStream.connect(otherStream);

CoMap, CoFlatMap

CoMap 和 CoFlatMap 将不同类型的数据流进行组合和转换以生成新的数据流

ConnectedStreams → DataStream

下面例子CoMap使用例子。

  1. Interger数据流 与 String 数据流 进行 connect 操作 得到connectedStreams

  2. 将connectedStreams  通过CoMapFunction  转换为一个String 数据流

  3. CoMapFunction map1 对原Interger数据流中元素 乘2 后再转String

  4. CoMapFunction map2 对原String数据流中元素转为大写

public class CoMapExample {public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建两个不同类型的数据流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<String> stream2 = env.fromElements("a", "b", "c");// 连接两个数据流ConnectedStreams<Integer, String> connectedStreams = stream1.connect(stream2);// 使用 CoMap 进行转换操作DataStream<String> resultStream = connectedStreams.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) {return String.valueOf(value * 2);}@Overridepublic String map2(String value) {return value.toUpperCase();}});// 打印结果resultStream.print();// 执行任务env.execute();}
}

CoFlatMap 读者参考上篇文章FlatMap 类比学习。CoFlatMap 可以完成CoMap 功能,前面例子稍作修改。读者根据输出结果自行体会CoFlatMap 与 Map 区别。

总结

本期Flink Windows 相关操作apply/union/join/collect/CoMap/CoFlatMap 。介

绍了各自的使用场景,并给出简单的例子。关于Flink 操作还有很多,期待我们一起持续学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/446913.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

考研C语言程序设计_编程题相关(持续更新)

目录 零、说明一、程序设计经典编程题(C语言实现)T1 求1~100的奇数T2 求n!T3 求1!2!3!...10!T4 在一个有序数组中查找具体的某个数字n(二分查找)T5 编写代码&#xff0c;演示多个字符从两端移动&#xff0c;向中间汇聚T6 模拟用户登录(三次机会)T7 输入三个数 并从大到小输出T8…

大一计算机课程之线性代数

《大一计算机课程之线性代数》 在大一的计算机课程中&#xff0c;线性代数是一门极为重要的基础学科&#xff0c;它就像一把神奇的钥匙&#xff0c;为计算机科学领域的诸多方面开启了智慧之门。 线性代数主要研究线性方程组、向量空间、线性变换等内容。对于计算机专业的学生…

【星汇极客】STM32 HAL库各种模块开发之DHT11模块

前言 本人是一名嵌入式学习者&#xff0c;在大学期间也参加了不少的竞赛并获奖&#xff0c;包括&#xff1a;江苏省电子设计竞赛省一、睿抗机器人国二、中国高校智能机器人国二、嵌入式设计竞赛国三、光电设计竞赛国三、节能减排竞赛国三等。 暑假的时候参加了太多的比赛&#…

从加载到对话:使用 Transformers 本地运行量化 LLM 大模型(GPTQ AWQ)

&#xff08;无需显卡&#xff09;使用 Transformers 在本地加载具有 70 亿参数的 LLM 大语言模型&#xff0c;通过这篇文章你将学会用代码创建属于自己的 GPT。 LLM 的加载、微调和应用涉及多个方面&#xff0c;今天我们先聚焦于加载&#xff0c;本文的难点仅在于正确安装和知…

护理陪护系统|护理陪护小程序|护理陪护软件定制

护理陪护系统是针对需要长期照护的患者和老年人开发的一套系统&#xff0c;旨在帮助用户更加方便地获取医疗、护理等服务。用户端功能是系统的重要组成部分&#xff0c;通过用户端功能的设计和开发&#xff0c;可以让用户更加方便快捷地使用系统。首先&#xff0c;用户端功能应…

中兴通讯举办AI“兴”视野沙龙:求真务实 推动AI健康、向善、普惠发展

近日&#xff0c;由中兴通讯主办的“AI‘兴’视野沙龙”在北京举行&#xff0c;中兴通讯首席发展官崔丽与多名业界大咖聚焦人工智能技术发展的前世今生、最新进展、应用趋势、产业融合新路径等热点话题展开深入交流。 数智经济大势所趋 机遇与挑战并存 崔丽谈到&#xff0c;当…

Java利用itextpdf实现pdf文件生成

前言 最近公司让写一个数据页面生成pdf的功能&#xff0c;找了一些市面代码感觉都太麻烦&#xff0c;就自己综合性整合了一个便捷的工具类&#xff0c;开发只需简单组装数据直接调用即可快速生成pdf文件。望大家一起学习&#xff01;&#xff01;&#xff01; 代码获取方式&am…

Graphviz是一个开源的图形可视化软件

官网没有给出代码示例&#xff0c;所以需要自己琢磨&#xff0c; 这里最底下给了一些简单的&#xff0c; 确实可以出很好看的图片 Graphviz介绍 Graphviz是一个开源的图形可视化软件&#xff0c;主要用于绘制各种类型的图表&#xff0c;如流程图、结构图、网络拓扑图等。它通…

cmake模板-支持编译动态/静态文件

代码链接&#xff1a;代码仓库 git clone https://gitee.com/etsuyou/cmake-template.git模板 模板截图 如何使用 在src和inc中写代码 此处用我默认提供的代码 ./go.sh cmake 生成Makefile ./go.sh make 生成bin文件和.a以及.so ./go.sh run app 运行 ./go.sh clean 以…

基于FPGA的ov5640摄像头图像采集(二)

之前讲过ov5640摄像头图像采集&#xff0c;但是只包了的摄像头驱动与数据对齐两部分&#xff0c;但是由于摄像头输入的像素时钟与HDMI输出的驱动时钟并不相同&#xff0c;所有需要利用DDR3来将像素数据进行缓存再将像素数据从DDR3中读出&#xff0c;对DDR3的读写参考米联客的IP…

安装TDengine数据库3.3版本和TDengine数据库可视化管理工具

安装TDengine数据库3.3版本和TDengine数据库可视化管理工具 一、下载安装包二、解压安装包三、部署四、启动服务五、进入数据库六、创建数据库、表和往表中插入数据七、测试 TDengine 性能八、使用数据库九、查询数据十、TDengine数据库可视化界面 一、下载安装包 TDengine-cl…

EXCEL怎么锁定单元格(锁定的单元格不能修改)

选中你的需要保护的单元格&#xff0c;然后点击鼠标右键&#xff0c;在弹出来的下拉菜单里面找到单元格格式 设置单元格格式&#xff0c;弹出来的对话框里找到右侧的保护&#xff0c;勾上锁定 找到审阅按钮&#xff0c;在下面找到更改下面的保护工作表按钮 具体保护的操作…

获取京东商品历史价格接口item_history_price介绍

接口开发背景 京东作为中国知名的电商平台&#xff0c;提供了丰富的商品和服务。为了更好地满足用户和商家的需求&#xff0c;京东开放平台推出了多种API接口&#xff0c;其中“item_history_price”接口用于获取指定商品的历史价格信息。这一接口的开发背景在于帮助用户判断当…

JavaSE——集合5:Set(HashSet的底层原理)(重要!!!)

目录 一、Set接口基本介绍 二、Set接口的常用方法 三、Set接口实现类——HashSet 四、HashSet(HashMap底层原理:重要!!!) (一)第一次添加元素 (二)第二次添加不同的元素 (三)添加重复的元素 1.仍旧走到了putVal(hash(key), key, value, false, true);方法 2.判断计算出…

java-02 数据结构-队列

在Java中&#xff0c;队列是一种常见的数据结构&#xff0c;用于在保持顺序的同时存储和检索数据。Java提供了java.util.Queue接口&#xff0c;它的常见实现包括ArrayDeque、LinkedList和PriorityQueue等。 如果你觉得我分享的内容或者我的努力对你有帮助&#xff0c;或者你只…

PyQt5常用功能四

⽂本涂鸦 写⼀些⽂本上下居中对齐的俄罗斯Cylliric语⾔的⽂字 import sys from PyQt5.QtWidgets import QWidget, QApplication from PyQt5.QtGui import QPainter, QColor, QFont from PyQt5.QtCore import Qtclass Example(QWidget):def __init__(self):super().__init__()…

趋势(一)利用python绘制折线图

趋势&#xff08;一&#xff09;利用python绘制折线图 折线图&#xff08; Line Chart&#xff09;简介 折线图用于在连续间隔或时间跨度上显示定量数值&#xff0c;最常用来显示趋势和关系&#xff08;与其他折线组合起来&#xff09;。折线图既能直观地显示数量随时间的变化…

如何查看GB28181流媒体平台LiveGBS中对GB28181实时视频数据统计的负载信息

目录 1、负载信息2、负载信息说明3、会话列表查看 3.1、会话列表4、停止会话5、搭建GB28181视频直播平台 1、负载信息 实时展示直播、回放、播放、录像、H265、级联等使用数目 2、负载信息说明 直播&#xff1a;当前推流到平台的实时视频数目回放&#xff1a;当前推流到平台的回…

【无标题】基于情境依赖因果影响的多智能体协作强化学习

、文章探讨了大型语言模型&#xff08;LLMs&#xff09;&#xff0c;例如GPT-4&#xff0c;是否以及在何种意义上拥有知识。作者认为&#xff0c;这些模型展现了一种称为“工具性知识”的能力&#xff0c;这种知识允许它们根据输入上下文推断任务结构&#xff0c;并在此基础上进…

废水处理(一)——MDPI特刊推荐

特刊征稿 01 期刊名称&#xff1a; Removing Challenging Pollutants from Wastewater: Effective Approaches 截止时间&#xff1a; 摘要提交截止日期&#xff1a;2024年11月30日 投稿截止日期&#xff1a;2025年5月31日 目标及范围&#xff1a; 该主题是分享去除有毒物…