【入门Flink】- 10基于时间的双流联合(join)

统计固定时间内两条流数据的匹配情况,需要自定义来实现——可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

窗口联结(Window Join)

一段时间的双流合并

定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

stream1.join(stream2).where(<KeySelector>) // stream1 的 keyBy.equalTo(<KeySelector>) // stream2 的 keyBy.window(<WindowAssigner>).apply(<JoinFunction>)
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String,Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1 的keyby.equalTo(r2 -> r2.f0) // ds2 的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用 join 方法* @param first ds1 的数据* @param second ds2 的数据*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

输出:

image-20231112153403293

window join:

  1. 两条流落在同一个时间窗口范围内才能匹配
  2. 根据 keyBy 的 key,来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join

间隔联结(Interval Join)

存在如下场景:两条流匹配的两个数据有可能刚好“卡在”窗口边缘两侧,窗口内就都没有匹配了,可以使用“间隔联结”(interval join)来解决。

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)“下界”(lowerBound);可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp +upperBound], 即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:这段时间作为可以匹配另一条流数据的“窗口”范围。

匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

image-20231112154002415

stream1
.keyBy(<KeySelector>)// KeyedStream 调用   
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right,Context ctx, Collector<String> out){out.collect(left + "," + right);}
});

处理迟到数据,可以使用左右侧输出流

完整代码:

public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.socketTextStream("hadoop102", 7777).map((MapFunction<String, Tuple2<String, Integer>>) value -> {String[] datas = value.split(",");return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.socketTextStream("hadoop102", 8888).map((MapFunction<String, Tuple3<String, Integer, Integer>>) value -> {String[] datas = value.split(",");return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));/*** 【Interval join】* 1、只支持事件时间* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后* 3、process 中,只能处理 join 上的数据* 4、两条流关联后的 watermark,以两条流中最小的为准* 5、如果 当前数据的事件时间 < 当前的 watermark,就是迟到数据,主流的 process 不处理* => between 后,可以指定将 左流 或 右流的迟到数据放入侧输出流* *///1. 分别做 keyby,key 其实就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//2. 调用 interval join// 左右测输出流迟到标签OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)) // 指定上下界.sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1 的数据* @param right ks2 的数据* @param ctx 上下文* @param out 采集器*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189326.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript_动态表格_添加功能

1、动态表格_添加功能.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>动态表格_添加功能</title><style>table{border: 1px solid;margin: auto;width: 100%;}td,th{text-align: ce…

SOME/IP 协议介绍(四)RPC协议规范

RPC协议规范 本章描述了SOME/IP的RPC协议。 传输协议绑定 为了传输不同传输协议的SOME/IP消息&#xff0c;可以使用多种传输协议。SOME/IP目前支持UDP和TCP。它们的绑定在以下章节中进行了解释&#xff0c;而第[SIP_RPC_450页&#xff0c;第36页]节讨论了选择哪种传输协议。…

【Go入门】面向对象

【Go入门】面向对象 前面两章我们介绍了函数和struct&#xff0c;那你是否想过函数当作struct的字段一样来处理呢&#xff1f;今天我们就讲解一下函数的另一种形态&#xff0c;带有接收者的函数&#xff0c;我们称为method method 现在假设有这么一个场景&#xff0c;你定义…

Linux驱动开发——PCI设备驱动

目录 一、 PCI协议简介 二、PCI和PCI-e 三、Linux PCI驱动 四、 PCI设备驱动实例 五、 总线类设备驱动开发习题 一、 PCI协议简介 PCI (Peripheral Component Interconnect&#xff0c;外设部件互联) 局部总线是由Intel 公司联合其他几家公司一起开发的一种总线标准&#…

前端开发引入element plus与windi css

背景 前端开发有很多流行框架&#xff0c;像React 、angular、vue等等&#xff0c;本文主要讲vue 给新手用的教程&#xff0c;其实官网已经写的很清楚&#xff0c;这里再啰嗦只是为了给新手提供一个更加简单明了的参考手册。 一、打开element plus官网选则如图所示模块安装命令…

Nginx缓存基础

1 nginx缓存的流程 客户端需要访问服务器的数据时&#xff0c;如果都直接向服务器发送请求&#xff0c;服务器接收过多的请求&#xff0c;压力会比较大&#xff0c;也比较耗时&#xff1b;而如果在nginx缓存一定的数据&#xff0c;使客户端向基于nginx的代理服务器发送请求&…

华为L410上制作内网镜像模板02

原文链接&#xff1a;华为L410上制作离线安装软件模板02 hello&#xff0c;大家好啊&#xff0c;今天给大家带来第二篇在内网搭建Apache服务器&#xff0c;用于安装完内网操作系统后&#xff0c;在第一次开机时候&#xff0c;为系统安装软件的文章&#xff0c;今天给大家介绍在…

Linux之基础开发工具gdb调试器的使用(三)

文章目录 一、Linux调试器-gdb使用1、安装gdb2、背景3、Debug和release4、区分Debug和release 二、Linux调试器-gdb命令演示1、显示指定行之后的代码&#xff08;自动记录最后一条指令&#xff09;2、断点1、打印断点2、查看断点3、删除断点4、使能&#xff08;禁用/开启&#…

StartUML的基本使用

文章目录 简介和安装创建包创建类视图时序图 简介和安装 最近在学习一个项目的时候用到了StartUML来构造项目的类图和时序图 虽然vs2019有类视图&#xff0c;但是也不是很清晰&#xff0c;并没有生成uml图&#xff0c;但是宇宙最智能的IDE IDEA有生成uml图的功能 下面就简单介…

Flowable 外部表单

内置表单需要在每个节点中去配置&#xff0c;当如果多个节点使用同一套表单属性就要配置多次比较麻烦&#xff0c;修改的时候也要修改多次&#xff0c;外部表单可以定义一次&#xff0c;然后其它节点都去引用同一个表单属性。 外部表单需要定义一个.form后缀的文件。 外部表单…

关于值传递和引用传递的问题记录

目录 1. 问题概述 1.1 测试 1.2 结果 2. ArrayList和Arrays.ArrayList 1. 问题概述 最近忙着写论文很久没更新了&#xff0c;趁现在有时间简单记录一下最近遇到的一个坑。 对于Java中的List<>类型的对象&#xff0c;按我以前理解是引用传递&#xff0c;但有一点要注…

Excel中使用数据验证、OFFSET实现自动更新式下拉选项

在excel工作簿中&#xff0c;有两个Sheet工作表。 Sheet1&#xff1a; Sheet2&#xff08;数据源表&#xff09;&#xff1a; 要实现Sheet1中的“班级”内容&#xff0c;从数据源Sheet2中获取并形成下拉选项&#xff0c;且Sheet2中“班级”内容更新后&#xff0c;Sheet1中“班…

SMART PLC MODBUSTCP速度测试

SMART PLC MODBUSTCP通信详细介绍请参看下面文章链接: S7-200SMART PLC ModbusTCP通信(多服务器多从站轮询)_matlab sumilink 多个modbustcp读写_RXXW_Dor的博客-CSDN博客文章浏览阅读6.4k次,点赞5次,收藏10次。MBUS_CLIENT作为MODBUS TCP客户端通过S7-200 SMART CPU上的…

ARM寄存器及功能介绍/R0-R15寄存器

1、ARM 寄存器组介绍 ARM 处理器一般共有 37 个寄存器&#xff0c;其中包括&#xff1a; &#xff08;1&#xff09; 31 个通用寄存器&#xff0c;包括 PC&#xff08;程序计数器&#xff09;在内&#xff0c;都是 32 位的寄存器。 &#xff08;2&#xff09; 6 个状态寄存器…

【中间件篇-Redis缓存数据库02】Redis高级特性和应用(慢查询、Pipeline、事务、Lua)

Redis高级特性和应用(慢查询、Pipeline、事务、Lua) Redis的慢查询 许多存储系统&#xff08;例如 MySQL)提供慢查询日志帮助开发和运维人员定位系统存在的慢操作。所谓慢查询日志就是系统在命令执行前后计算每条命令的执行时间&#xff0c;当超过预设阀值,就将这条命令的相关…

spring boot 中@Value读取中文配置时乱码

1.spring boot 读取application.properties 该文件是iso8859编码 如果是直接写中文 读取时会乱码 显示成?? 必须得转ascii码才能正常显示 其他方法测试也不行 Value("${apig.order.tiaokong.qianzi}") private String apigOrderTiaokongQianzi;

LabVIEW中NIGPIB设备与驱动程序不相关的MAX报错

LabVIEW中NIGPIB设备与驱动程序不相关的MAX报错 当插入GPIB-USB设备时&#xff0c;看到了NI MAX中列出该设备&#xff0c;但却显示了黄色警告指示&#xff0c;并且指出Windows没有与您的设备相关的驱动程序。 解决方案 需要安装能兼容的NI-488.2驱动程序。 通过交叉参考以下有…

链表的实现(文末附完整代码)

链表的概念及结构 链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的 我们在上一篇文章所学习的顺序表是连续存储的 例如&#xff1a; 顺序表就好比火车上的一排座位&#xff0c;是连续的 而链表就好比是火车…

xlua游戏热更新(lua访问C#)

CS.UnityEngine静态方法访问unity虚拟机 创建游戏物体 CS.UnityEngine.GameObject(new by lua);静态属性 CS.UnityEngine.GameObject(new by lua); -- 创建 local camera CS.UnityEngine.GameObject.Find(Main Camera); --查找 camera.name Renamed by Lua;访问组件 loca…

Swift--量值与基本数据类型

系列文章目录 第一章: Swift–量值与基本数据类型 文章目录 系列文章目录前言对学习过程做一个记录 变量和常量命名规范注释 元祖类型可选类型拆包 typealias 前言 对学习过程做一个记录 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 变量和常量 …