java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景

之前我们在一片文章里简单介绍过Flink的多流合并算子

java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join

今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。

Interval Join不是两个窗口做关联,更适用于处理乱序数据流之间的关联。它的作用更类似于从左流中a元素本身出发,对右流中一段时间内的数据进行关联(Inner Join:只关联相同Key的数据)。

如图所示:

 下边这条流中的2关联到上范围内的0/1

源码解析

Flink版本1.14.4

按住Ctrl+鼠标左键,点击process进入源码

 这里process方法是在KeydStream.java下IntervalJoined类下的方法

 包装返回类型的TypeInfomation(TypeInfo的介绍可以看上一篇)

 返回的outputType

SingleOutputStreamOperator使用给定的用户函数完成联接操作,该函数针对每个联接的元素对执行。这种方法允许传递输出类型的显式类型信息。

 IntervalJoinOperator初始化

左边界<=右边界检查

获取左流还有右流数据对应的序列化(从TypeInfo获取的)

继续看IntervalJoinOperator中的其余关键实现

open方法用来注册定时器

 初始化两个流的map状态

处理左侧流中的数据。每当数据到达左流时,它就会被添加到左缓冲区。将从右侧缓冲区中查找该元素可能的候选联接,如果该对位于用户定义的边界内,则将其传递给 ProcessJoinFunction

​ 同理处理右流

进入数据处理函数

获取数据,取出事件时间

 超过当前watermark的数据进行过滤

 

数据没问题的话,将数据添加到状态

 ​​

遍历另一条流的状态,遍历其中的数据,把满足时间要求的数据进行collect

​注册一个当前事件时间戳+右边界的定时器

定时器触发后,清空map状态中时间戳-左边界的那条数据

简单实例 

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkCode</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><jdk.version>1.8</jdk.version><jar.name>ubs-data-converter</jar.name><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.2</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${jdk.version}</source><target>${jdk.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${jar.name}</finalName><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.glassfish.jersey.core:jersey-common</exclude></excludes></artifactSet><relocations><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.shade.apache.kafka</shadedPattern></relocation></relocations><filters><filter><artifact>*</artifact><includes><include>org/apache/htrace/**</include><include>org/apache/avro/**</include><include>org/apache/flink/streaming/**</include><include>org/apache/flink/connector/**</include><include>org/apache/kafka/**</include><include>org/apache/hive/**</include><include>org/apache/hadoop/hive/**</include><include>org/apache/curator/**</include><include>org/apache/zookeeper/**</include><include>org/apache/jute/**</include><include>org/apache/thrift/**</include><include>org/apache/http/**</include><include>org/I0Itec/**</include><include>jline/**</include><include>com/yammer/**</include><include>kafka/**</include><include>org/apache/hadoop/hbase/**</include><include>com/alibaba/fastjson/**</include><include>org/elasticsearch/action/**</include><include>io/confluent/**</include><include>com/fasterxml/**</include><include>org/elasticsearch/**</include><include>hbase-default.xml</include><include>hbase-site.xml</include></includes></filter><filter><artifact>org.apache.hadoop.hive.*:*</artifact><excludes><exclude></exclude><exclude></exclude><exclude></exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

user bean

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class User{Integer id;Long t;}

 order bean 

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Order {Integer id;Long price;Long time;}

main

package ubs.app.intervaljoin;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import ubs.app.intervaljoin.bean.Order;
import ubs.app.intervaljoin.bean.User;
import ubs.app.intervaljoin.source.OrderSource;
import ubs.app.intervaljoin.source.UserSource;import java.time.Duration;public class IntervalJoinApp  {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置watermarkWatermarkStrategy<User> userWatermarkStrategy = WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<User>() {@Overridepublic long extractTimestamp(User element, long recordTimestamp) {return element.getT();}});DataStream<User> userDataStreamSource = env.addSource(new UserSource()).assignTimestampsAndWatermarks(userWatermarkStrategy);//设置watermarkWatermarkStrategy<Order> orderWatermarkStrategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Order>() {@Overridepublic long extractTimestamp(Order element, long recordTimestamp) {return element.getTime();}});DataStream<Order> orderDataStreamSource = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(orderWatermarkStrategy);env.setParallelism(1);SingleOutputStreamOperator<String> process = userDataStreamSource.keyBy(o -> o.getId()).intervalJoin(orderDataStreamSource.keyBy(o -> o.getId())).between(Time.seconds(-5), Time.seconds(0)).process(new ProcessJoinFunction<User, Order, String>() {@Overridepublic void processElement(User left, Order right, ProcessJoinFunction<User, Order, String>.Context ctx, Collector<String> out) throws Exception {Integer lid = left.getId();Long lt = left.getT();Integer rid = right.getId();long rt = right.getTime();out.collect(String.format("左%s 左时间%s 右%s 右时间%s 关联到了 %s", lid, lt/1000, rid, rt/1000, rt/1000-lt/1000));}});process.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/282687.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全流程ArcGIS Pro技术应用

GIS是利用电子计算机及其外部设备&#xff0c;采集、存储、分析和描述整个或部分地球表面与空间信息系统。简单地讲&#xff0c;它是在一定的地域内&#xff0c;将地理空间信息和 一些与该地域地理信息相关的属性信息结合起来&#xff0c;达到对地理和属性信息的综合管理。GIS的…

HQYJ 2024-3-19 作业

TCP通信三次握手和四次挥手&#xff1a; 并行和并发的区别&#xff1a;并发是单核处理器处理多个线程任务&#xff0c;并行是多核处理器同时处理多个线程任务。并发过程中会抢占CPU资源&#xff0c;轮流使用&#xff1b;并行过程不会抢占CPU资源。 阻塞IO和非阻塞IO&#xff…

【系统架构师】-计算机网络

1、网络的划分 网络性能指标&#xff1a;速率、带宽(频带宽度或传送线路速率)、吞吐量、时延、往返时间、利用率。 网络非性能指标&#xff1a;费用、质量、标准化、可靠性、可扩展性、可升级性、易管理性和可维护性。 总线型(利用率低、干扰大、价格低)、 星型(交换机转发形…

Python PyQt5

实现界面开发&#xff0c;与tkinter功能一致&#xff0c;网上已有详细资料&#xff0c;此处仅记录自己的代码&#xff1a; 文章目录 1. 实操1.1 main.py1.2. 窗体模块代码1.3. 页面效果 2. 参考资料2.1. PyQt5 参考资料2.2. tkinter 参考资料 3. 安装注意事项3.1. 下载3.2 Pyc…

解决jenkins运行磁盘满的问题

参考&#xff1a;https://blog.csdn.net/ouyang_peng/article/details/79225993 分配磁盘空间相关操作&#xff1a; https://cloud.tencent.com/developer/article/2230624 登录jenkins相对应的服务或容器中查看磁盘情况&#xff1a; df -h在102挂载服务器上看到是这两个文件…

数据结构:详解【栈和队列】的实现

目录 1. 栈1.1 栈的概念及结构1.2 栈的实现1.3 栈的功能1.4 栈的功能的实现1.5 完整代码 2. 队列2.1 队列的概念及结构2.2 队列的实现2.3 队列的功能2.4 队列的功能的实现2.5 完整代码 1. 栈 1.1 栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的…

leecode1793 | 好子数组的最大分数 | 求给高度矩阵最大值

题目我就不念了&#xff0c;就一个字难理解&#xff0c;给的题总是这么难懂&#xff0c;总感觉出题人的语文是体育老师教的&#xff1f; 还有就是思维转变&#xff0c;才能能好的理解&#xff1f;一味的钻牛角尖死理解&#xff0c;效果不好 思维的转变 >悟性&#xff1f;&am…

使用远程工具连接Mysql

&#xff08;若想要远程连接Mysql需要下面解决四个问题&#xff09; 1、目标地址 直接查询 2、端口号 3306 3、防火墙关闭 [rootlocalhost date]# systemctl stop firewalld.service 4、授权mysql数据库root用户权限&#xff08;因为mysql开始不允许其他IP访问&#xff0…

Docker 学习笔记

Play With Docker一个免费使用的基于web界面的Docker环境 常用docker命令 可使用docker COMMAND --help查看命令的用法 Docker镜像相关 1、docker image pull&#xff1a;用于下载镜像&#xff0c;镜像从远程镜像仓库服务的仓库中下载&#xff0c;默认从Docker Hub的仓库中拉…

ssm基于Vue.js的在线购物系统的设计与实现论文

摘 要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于在线购物系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了在线购物系统&#xff0c;它彻底改变了过去传统的…

OKR与敏捷开发、精益创业等方法如何协同工作?

在快速变化的市场环境中&#xff0c;企业需要更加灵活和高效地应对各种挑战。目标与关键成果法&#xff08;OKR&#xff09;、敏捷开发以及精益创业等方法&#xff0c;作为现代企业管理的重要工具&#xff0c;各自在推动企业发展、提高团队效率、优化产品迭代等方面发挥着不可或…

社科赛斯考研:二十二载岁月铸辉煌,穿越周期的生命力之源

在考研培训行业的浩瀚海洋中&#xff0c;社科赛斯考研犹如一艘稳健的巨轮&#xff0c;历经二十二载风礼&#xff0c;依然破浪前行。在考研市场竞争白热化与学生对于考研机构要求越来越高的双重影响下&#xff0c;社科赛斯考研却以一种分蘖成长的姿态&#xff0c;扎根、壮大&…

基于springboot的mysql实现读写分离

前言: 首先思考一个问题:在高并发的场景中,关于数据库都有哪些优化的手段&#xff1f;常用的有以下的实现方法:读写分离、加缓存、主从架构集群、分库分表等&#xff0c;在互联网应用中,大部分都是读多写少的场景,设置两个库,主库和读库,主库的职能是负责写,从库主要是负责读…

2核4G服务器优惠价格和性能测试,2024年

阿里云2核4G服务器租用优惠价格&#xff0c;轻量2核4G服务器165元一年、u1服务器2核4G5M带宽199元一年、云服务器e实例30元3个月&#xff0c;活动链接 aliyunfuwuqi.com/go/aliyun 活动链接如下图&#xff1a; 阿里云2核4G服务器优惠价格 轻量应用服务器2核2G4M带宽、60GB高效…

【原创】基于分位数回归的卷积长短期神经网络(CNN-QRLSTM)回归预测的MATLAB实现

基于分位数回归的卷积长短期神经网络&#xff08;CNN-QRLSTM&#xff09;是一种用于时间序列预测的深度学习模型&#xff0c;结合了卷积神经网络&#xff08;CNN&#xff09;和长短期记忆网络&#xff08;LSTM&#xff09;&#xff0c;并采用分位数回归技术进行预测。 这个模型…

YOLOV9训练自己的数据集

1.代码下载地址GitHub - WongKinYiu/yolov9: Implementation of paper - YOLOv9: Learning What You Want to Learn Using Programmable Gradient Information 2.准备自己的数据集 这里数据集我以SAR数据集为例 具体的下载链接如下所示&#xff1a; 链接&#xff1a;https:/…

面试题小结

一、什么是虚拟dom 描述真实dom的js对象。 二、DOM操作——怎样添加、移除、移动、复制、创建和查找节点 &#xff08;1&#xff09;创建新节点 createDocumentFragment() //创建一个DOM片段 createElement() //创建一个具体的元素 createTextNode() //创建一个文本节…

全栈的自我修养 ———— uniapp中加密方法

直接按部就班一步一步来 一、首先创建一个js文件填入AES二、创建加密解密方法三、测试 一、首先创建一个js文件填入AES 直接复制以下内容 /* CryptoJS v3.1.2 code.google.com/p/crypto-js (c) 2009-2013 by Jeff Mott. All rights reserved. code.google.com/p/crypto-js/wi…

SpringCloud入门(1) Eureka Ribbon Nacos

这里写目录标题 认识微服务SpringCloud 服务拆分和远程调用服务拆分案例实现远程调用 RestTemplate Eureka注册中心Eureka的结构和作用搭建eureka-server服务注册服务发现 Ribbon负载均衡 LoadBalancedLoadBalancerIntercepor源码解析负载均衡策略饥饿加载 Nacos注册中心安装与…

【NLP】从变形金刚到Transfomer 01

Transformer是一种非常强大的模型&#xff0c;在自然语言处理&#xff08;NLP&#xff09;领域里引起了一场革命。 "从变形金刚到技术革命家&#xff0c;Transformer不再仅是儿时屏幕上的英雄。&#x1f916;✨ 在今天的AI领域&#xff0c;它变身成为自然语言处理的超级英…