大数据Flink(八十八):Interval Join(时间区间 Join)

文章目录

Interval Join(时间区间 Join)


Interval Join(时间区间 Join)

Interval Join 定义(支持 Batch\Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。

应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以可以理解 Interval Join 就是用于消灭回撤流的。

Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

  • Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
  • Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。
  • Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反
  • Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R])

可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。

使用示例:

间隔联接需要至少一个等联接谓词和在两侧限制时间的联接条件。可以通过比较两个输入表中相同类型的时间属性(即处理时间或事件时间)的两个适当的范围谓词(<, <=, >=, >)BETWEEN谓词或单个相等谓词来定义这种条件。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

如果订单在收到订单后四个小时内发货,则上面的示例会将所有订单与其相应的发货合并在一起。

实际案例:还是刚刚的案例,曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):

下面为 Inner Interval Join:

 

Flink SQL> CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,`timestamp` bigint,row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),watermark for row_time as row_time 
) WITH ('connector' = 'socket','hostname' = 'node1',        'port' = '8888','format' = 'csv'
);Flink SQL> CREATE TABLE click_log_table (log_id BIGINT,click_params     STRING,`timestamp` bigint,row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),watermark for row_time as row_time 
)
WITH ('connector' = 'socket','hostname' = 'node1',        'port' = '9999','format' = 'csv'
);Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '10' SECOND AND click_log_table.row_time;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800 ->2022-07-20 15:06:40

开启netcat的9999端口,输入数据:

1,zhangsan,1658300805 ->2022-07-20 15:06:45

输出结果如下:

s_id                       s_params                 c_id                       c_params1                         hadoop                    1                         zhangsan

9999端口,继续输入数据:

1,zhangsan,1658300811 -> 2022-07-20 15:06:51

输出结果没有反应,因为这个时间:2022-07-20 15:06:51超过了时间区间下限。

如果是 Left Interval Join:

Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800		->2022-07-20 15:06:40

开启netcat的9999端口,输入数据:

1,zhangsan,1658300805	->2022-07-20 15:06:45

输出结果如下:

s_id                       s_params                 c_id                       c_params1                         hadoop                    1                       zhangsan

8888端口,继续输入数据:

1,hadoop,1658300801		->2022-07-20 15:06:41
1,hadoop,1658300811		->2022-07-20 15:06:51

输出结果如下:

s_id                       s_params                 c_id                       c_params1                         hadoop                    1                       zhangsan1                         hadoop                    1                       zhangsan

2022-07-20 15:06:51这条数据不会有任何的输出,因为已经超过了右表的边界。

如果是 Full Interval Join:

Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800		->2022-07-20 15:06:40
1,hadoop,1658300801		->2022-07-20 15:06:41

 开启netcat的9999端口,输入数据:

1,zhangsan,1658300805	->2022-07-20 15:06:45
1,zhangsan,1658300811	->2022-07-20 15:06:51

输出结果如下:

 s_id                       s_params                 c_id                       c_params1                         hadoop                    1                       zhangs1                         hadoop                    1                       zhangsan
  • 关于 Interval Join 的注意事项:

实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/144626.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE4 Cesium 与ultra dynamic sky插件天气融合

晴天&#xff1a; 雨天&#xff1a; 雨天湿度&#xff1a; 小雪&#xff1a; 中雪&#xff1a; 找到该路径这个材质&#xff1a; 双击点开&#xff1a; 将Wet_Weather_Effects与Snow_Weather_Effects复制下来&#xff0c;包括参数节点 找到该路径这个材质&#xff0c;双击点开&…

Docker配置镜像加速器

1.登录阿里云 阿里云-计算&#xff0c;为了无法计算的价值 (aliyun.com) 2.容器 说明&#xff1a;找到产品下的容器 3.容器镜像服务ACR 4.点击控制台 5. 点击镜像加速器 6.操作文档

35 LRU缓存

LRU缓存 题解1 双map&#xff08;差2个testcases&#xff09;题解2 哈希表双向链表&#xff08;参考&#xff09;题解3 STL:listunordered_map 请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正…

ThreeJS-3D教学四-光源

three模拟的真实3D环境&#xff0c;一个非常炫酷的功能便是对光源的操控&#xff0c;之前教学一中已经简单的描述了多种光源&#xff0c;这次咱们就详细的讲下一些最常见的光源&#xff1a; AmbientLight 该灯光在全局范围内平等地照亮场景中的所有对象。 该灯光不能用于投射阴…

k8s部署gin-vue-admin框架、gitlab-ci、jenkins pipeline 、CICD

测试环境使用的jenkins 正式环境使用的gitlab-ci 测试环境 创建yaml文件 apiVersion: v1 kind: ConfigMap metadata:name: dtk-go-tiktok-admin-configlabels:app.kubernetes.io/name: dtk-go-tiktok-adminapp.kubernetes.io/business: infrastructureapp.kubernetes.io/run…

Windows系统利用cpolar内网穿透搭建Zblog博客网站并实现公网访问内网!

文章目录 1. 前言2. Z-blog网站搭建2.1 XAMPP环境设置2.2 Z-blog安装2.3 Z-blog网页测试2.4 Cpolar安装和注册 3. 本地网页发布3.1. Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1. 前言 想要成为一个合格的技术宅或程序员&#xff0c;自己搭建网站制作网页是绕…

What is a UDP Flood Attack?

用户数据报协议 &#xff08;UDP&#xff09; 是计算机网络中使用的无连接、不可靠的协议。它在互联网协议 &#xff08;IP&#xff09; 的传输层上运行&#xff0c;并提供跨网络的快速、高效的数据传输。与TCP&#xff08;其更可靠的对应物&#xff09;不同&#xff0c;UDP不提…

GitHub配置SSH key

GitHub配置SSH key Git配置信息并生成密钥 设置用户名和密码 设置用户名 git config --global user.name "用户名" 设置邮箱 git confir --global user.email "邮箱" 生成密钥 ssh-keygen -t rsa -C "邮箱" 查看密钥 到密钥所保存的位置 复…

ubuntu与win之间共享文件夹

ubuntu上设置共享文件夹 第一步&#xff1a;点击【设置】或【虚拟机弹窗下面的【设置】选项】 第二步&#xff1a;进入【虚拟机设置】页面&#xff0c;点击【选项】如下图所示 第三步&#xff1a;启用共享文件&#xff1a;点击【总是启用】第四步&#xff1a;添加共享文件&…

uni-app:canvas-绘制图形4(获取画布宽高,根据画布宽高进行图形绘制)

效果 代码 var width ; var height ; const query uni.createSelectorQuery(); //获取宽度 query.select(#firstCanvas).fields({ size: true }, (res) > { width res.width; height res.height; }).exec(); console.log(宽度width); console.log(高…

国庆day1

发送数据 #include<myhead.h>//消息结构体 typedef struct {long msgtype; //消息类型char data[1024]; //消息正文 }Msg_ds;#define SIZE sizeof(Msg_ds)-sizeof(long) //正文大小 int main(int argc, const char *argv[]) {//1、创建key值key_t ke…

五、接口测试工具:Postman

Postman是一款接口调试工具&#xff0c;是一款免费的可视化软件&#xff0c;同时支持各种操作系统平台&#xff0c;是测试接口的首选工具。 官网下载&#xff1a; https://www.postman.com/downloads/ 工作面板 简易的get请求 简易的post请求 案例&#xff1a;请求百度地图…

Unity实现设计模式——中介者模式

Unity实现设计模式——中介者模式 用一个中介者对象来封装一系列的对象交互&#xff0c;中介者使各对象不需要显示地相互引用&#xff0c;从而使其松散耦合&#xff0c;而且可以独立地改变它们之间的交互。 这里使用一个生活中的例子来介绍中介者模式&#xff0c;比如当我们在…

使用Python爬虫抓取网站资源的方法

Python爬虫是一种自动化程序&#xff0c;用于从互联网上获取数据。使用Python爬虫可以轻松地抓取网站上的各种资源&#xff0c;例如文本、图片、视频等。在本文中&#xff0c;我们将介绍如何使用Python爬虫抓取网站资源。 安装Python 在使用Python爬虫之前&#xff0c;需要先安…

【SSL】用Certbot生成免费HTTPS证书

1. 实验背景 服务器&#xff1a;CentOS7.x 示例域名&#xff1a; www.example.com 域名对应的web站点目录&#xff1a; /usr/local/openresty/nginx/html 2. 安装docker # yum -y install yum-utils# yum-config-manager --add-repo https://download.docker.com/linux/ce…

矢量图形编辑软件illustrator 2023 mac软件特点

illustrator 2023 mac是一款矢量图形编辑软件&#xff0c;用于创建和编辑排版、图标、标志、插图和其他类型的矢量图形。 illustrator mac软件特点 矢量图形&#xff1a;illustrator创建的图形是矢量图形&#xff0c;可以无限放大而不失真&#xff0c;这与像素图形编辑软件&am…

国庆周《Linux学习第三课》

国庆周《Linux学习第三课》 国庆周《Linux学习第二课》_IHOPEDREAM的博客-CSDN博客 总结 用户的管理 增加一个用户 删除一个用户 修改一个用户 查看一个用户 用户组的管理 增加一个组 删除一个组 修改一个组 查看一个组 将用户成员增加到该组中去 移除组的成员 1 用户

【深度学习实验】卷积神经网络(六):卷积神经网络模型(VGG)训练、评价

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 构建数据集&#xff08;CIFAR10Dataset&#xff09; a. read_csv_labels&#xff08;&#xff09; b. CIFAR10Dataset 2. 构建模型&#xff08;FeedForward&…

python监控ES索引数量变化

文章目录 1, datafram根据相同的key聚合2, 数据合并&#xff1a;获取采集10,20,30分钟es索引数据脚本测试验证 1, datafram根据相同的key聚合 # 创建df1 > json {key:A, value:1 } {key:B, value:2 } data1 {key: [A, B], value: [1, 2]} df1 pd.DataFrame(data1)# 创建d…

rhel8 网络操作学习

一、查询dns服务器地址汇总 1.查询dns服务器地址&#xff1a; &#xff08;1&#xff09;方法一&#xff1a;执行命令 cat /etc/resolv.conf 执行结果如下&#xff1a; nameserver后面就是dns服务器的ip地址。 &#xff08;2&#xff09;方法2&#xff1a;查看/etc/syscon…