Flink Sql:四种Join方式详解(基于flink1.15官方文档)

JOINs

flink sql主要有四种连接方式,分别是Regular Joins、Interval Joins、Temporal Joins、lookup join

1、Regular Joins(常规连接 )

这种连接方式和hive sql中的join是一样的,包括inner join,left join,right join,full join

1、指定数据源建立students表
CREATE TABLE students (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置为最新生成的数据'format' = 'csv' -- 指定数据的格式
);2、kafka生产students表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班3、创建关联表scores
CREATE TABLE scores (sid STRING,   cid STRING,     --学科idscore INT     
) WITH ('connector' = 'kafka','topic' = 'scores', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置'format' = 'csv' -- 指定数据的格式
);4、kafka生产scores数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,56
1500100002,1000001,139
1500100002,1000002,102
1500100004,1000001,42
1500100004,1000002,142-- inner jion   两边数据都不为null的才会关联
select 
a.id,a.name,b.sid,b.score
from 
students as a
inner join
scores as b
on a.id=b.sid;-- left join/right join    保证左边/右边数据的完整性
select 
a.id,a.name,b.sid,b.score
from 
students as a
right join
scores as b
on a.id=b.sid;-- full join       保证两边数据的完整性
select 
a.id,a.name,b.sid,b.score
from 
students as a
full join
scores as b
on a.id=b.sid;-- 注意:
-- 常规连接,会将两个表的数据一直保存在状态中,时间长了,状态会越来越大,导致任务执行失败,通常在批处理中使用,因为批处理没有状态这个概念。为了避免状态过大可能会导致的任务失败问题,我们可以设置状态有效期
-- 状态有效期,状态在flink中保存的时间,但是如果sql中除了关联操作还有聚合这样也需要将数据保存在状态中的操作,状态有效期设置的太短可能会让聚合这样的操作失败,设置的太长延迟也会增加。所以,状态保留多久需要根据实际业务分析
SET 'table.exec.state.ttl' = '20000';
设置该参数后,那么只有在20秒内到达的数据才会被保存到状态中进行关联。

inner join结果:

left join 结果:

right join结果:

full join结果:

2、Interval Joins(间隔连接

Interval Joins:在一段时间内关联

对于流式查询,与常规连接相比,间隔连接仅支持具有时间属性的追加表。由于时间属性是拟单调递增的,因此 Flink 可以从其状态中删除旧值,而不会影响结果的正确性。

这种方式可以变相弥补Regular Joins中时间长了状态过大的问题。

CREATE TABLE students_proctime (id STRING,name STRING,age INT,sex STRING,clazz STRING,proctime AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'students', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置'format' = 'csv' -- 指定数据的格式
);kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班CREATE TABLE scores_proctime (sid STRING,cid STRING,score INT,proctime AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'scores', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置'format' = 'csv' -- 指定数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,56
1500100002,1000001,139
1500100002,1000002,102
1500100004,1000001,42
1500100004,1000002,142select a.id,a.name,b.sid,b.score from 
students_proctime a, scores_proctime b
where a.id=b.sid
-- a表的时间需要在b表时间10秒内或b表的时间需要在a表时间10秒内
and (a.proctime BETWEEN b.proctime - INTERVAL '10' SECOND AND b.proctimeor b.proctime BETWEEN a.proctime - INTERVAL '10' SECOND AND a.proctime
);

3、Temporal Joins(时态连接)

这种关联方式是专门用来关联时态表的。

  • Temporal Joins(时态连接)是在流式计算或数据处理中,对两个或多个随时间变化的表(也称为动态表或时态表)进行连接的操作。这些表包含随时间变化的数据,并且行与一个或多个时态周期相关联。

在我们生活中最常见的时态表就是汇率表,汇率随着时间变化而变化。

 

案例:

例如,假设我们有一张订单表,每张订单的价格都采用不同的货币。为了正确地将此表标准化为单一货币(如美元),每张订单都需要与下订单时相应的货币兑换率相结合。

1、创建订单表
CREATE TABLE orders (order_id    STRING,price       DECIMAL(32,2),currency    STRING,    --币种order_time  TIMESTAMP(3),WATERMARK FOR order_time AS order_time
) WITH ('connector' = 'kafka','topic' = 'orders', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置'format' = 'csv' -- 指定数据的格式
);2、订单表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
o_001,1,EUR,2024-06-06 12:00:00
o_002,100,EUR,2024-06-06 12:00:07
o_003,200,EUR,2024-06-06 12:00:16
o_004,10,EUR,2024-06-06 12:00:21
o_005,20,EUR,2024-06-06 12:00:253、创建汇率表
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3),WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED -- 主键,区分不同的汇率
) WITH ('connector' = 'kafka','topic' = 'currency_rates1', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置'format' = 'canal-json' -- 指定数据的格式
);4、向汇率表中添加数据
insert into currency_rates
values
('EUR',0.12,TIMESTAMP'2024-06-06 12:00:00'),
('EUR',0.11,TIMESTAMP'2024-06-06 12:00:09'),
('EUR',0.15,TIMESTAMP'2024-06-06 12:00:17'),
('EUR',0.14,TIMESTAMP'2024-06-06 12:00:23');kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates-- 使用常规关联方式关联时态表只能关联到最新的数据
select 
a.price,a.order_time,b.conversion_rate,b.update_time
from 
orders as a
join
currency_rates as b
on a.currency=b.currency;-- 时态表join
-- FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据
select 
a.price,a.order_time,b.conversion_rate,b.update_time
from 
orders as a
join
currency_rates FOR SYSTEM_TIME AS OF a.order_time as b 
on a.currency=b.currency;

常规join结果:

时态join结果:

4、lookup join(查找连接

Lookup Join,也称为维表 Join,通常用于从外部系统查询的数据表。连接要求一个表具有处理时间属性,另一个表由查找源连接器支持。

具体来说:

lookup join用于流表(动态表)关联维度表

流表:动态表

维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql等外部数据源


扩展:流表、事实表、维度表

-- 流表(动态表)
1、流表的数据来源通常是实时数据流,这些数据流可以来自各种数据源,如 Kafka、RabbitMQ、Kinesis 等。Flink可以通过数据源连接器(Source Connectors)将这些实时数据流接入到 Flink 系统中
2、与传统数据库中的表不同,流表的行是动态生成的,随着数据流的持续产生而不断增加-- 维度表
1、主要提供数据的分析角度,包含了描述业务环境的属性信息,如时间、地理、产品等。
2、维度表:通常比较宽(包含多个属性列),但行数相对较少,因为维度表中的每一行通常代表一个具体的业务实体或类别,如一个商品、一个客户、一个日期等。
3、维度表与事实表之间通过外键相关联,共同构成了星型模型或雪花模型。事实表中的外键用于与维度表中的主键相匹配,从而提供数据的上下文和分类信息。
4、维度表存储的是对数据的描述性信息,这些信息通常不随时间变化,或者变化不频繁。例如,商品的品牌、型号、颜色等属性一旦确定后很少会发生变化。但在某些情况下,如新产品上市或促销活动,可能需要更新维度表以添加新的维度成员。-- 事实表
1、存储了实际的数据度量值,如销售额、订单数量等。事实表是数据分析的核心,包含了所有用于分析的数据指标。
2、通常比较窄(包含较少的列),但行数非常多,因为事实表中的每一行通常代表一个具体的事件或交易,如一个订单、一次点击等。
3、事实表存储的是度量数据(即指标),这些数据会随时间变化,并且经常需要被汇总和分析。例如,销售额、订单数量、点击量等指标会随着业务活动的进行而不断更新。
4、事实表的数据更新频率通常较高,因为事实数据会随着业务活动的进行而不断产生。例如,每当有新的订单产生时,都需要在事实表中插入一条新的记录。

 

1、创建分数表
CREATE TABLE scores (sid INT,cid STRING,score INT,proctime AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'scores', -- 指定topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表'properties.group.id' = 'testGroup', -- 指定消费者组'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置'format' = 'csv' -- 指定数据的格式
);2、生产分数表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000003,1373、建立学生表,我们将学生表当作维度表放在mysql中
CREATE TABLE students_test (id INT,name STRING,age INT,gender STRING,clazz STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/bigdata29','table-name' = 'students_test','username' ='root','password' = '123456','lookup.cache.max-rows' = '1000', -- 最大缓存行数'lookup.cache.ttl' ='10000' -- 缓存过期时间
);学生表数据
1500100001,施笑槐,22,女,文科六班-- 使用常规关联方式
-- 维表的数据只在任务启动的时候读取一次,后面不再实时读取,
-- 只能关联到任务启动时读取的数据
-- 一旦mysql中的学生表更新数据,但是关联的学生表数据却是任务启动时从mysql读取的,这就有错误了,lookup join可以解决该问题。
select a.sid,a.score,b.id,b.name from
scores as a
left join
students_test  as b
on a.sid=b.id;-- lookup join
-- 当流表每来一条数据时,使用关联字段到维表的数据源中查询
-- 优点:实时更新数据源,准确性高
-- 缺点:每一次都需要查询数据库,性能会降低
select a.sid,a.score,b.id,b.name from
scores as a
left join
students_test FOR SYSTEM_TIME AS OF a.proctime as b
on a.sid=b.id;

此时我们修改更新mysql中的学生表数据

修改之前

修改后:

常规关联结果:

look up关联结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/351179.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java Springboot网上音乐商城(源码+sql+论文)

1.1 研究目的和意义 随着市场经济发展,尤其是我国加入WTO ,融入经济全球化潮流,已进入国内外市场经济发展新时期,音乐与市场联系越来越紧密,我国音乐和网上业务也进入新历史发展阶段。为了更好地服务于市场&#xff0…

Studio One 6.6.2 for Mac怎么激活,有Studio One 6激活码吗?

如果您是一名音乐制作人,您是否曾经为了寻找一个合适的音频工作站而苦恼过?Studio One 6 for Mac是一款非常适合您的MacBook的音频工作站。它可以帮助您轻松地录制、编辑、混音和发布您的音乐作品。 Studio One 6.6.2 for Mac具有直观的界面和强大的功能…

全网爆火《pvz植物大战僵尸杂交版》最新安装包,Android、Windows、ios安装包+教程!

今天阿星想和大家分享一个最近在B站上引起轰动的老游戏——《植物大战僵尸》! 是的,你没听错,就是那个曾经让我们熬夜到天亮,一关接一关挑战的游戏。 让我们来聊聊,这款游戏怎么就突然又火了起来呢? 原来…

AI玩具来了,它怎么样?

90后的我们,是AI时代的见证者。20后的小孩,才是AI时代的原著民。当ChatGPT们改变着大人的工作方式,我觉得,是时候让孩子们的玩具也更聪明些了吧。于是,在六一前夕,我用市面上的AI语音对话套件给娃DIY了一套…

简单的基于Transformer的滚动轴承故障诊断(Pytorch)

递归神经网络在很长一段时间内是序列转换任务的主导模型,其固有的序列本质阻碍了并行计算。因此,在2017年,谷歌的研究人员提出了一种新的用于序列转换任务的模型架构Transformer,它完全基于注意力机制建立输入与输出之间的全局依赖…

Docker-Portainer可视化管理工具

Docker-Portainer可视化管理工具 文章目录 Docker-Portainer可视化管理工具介绍资源列表基础环境一、安装Docker二、配置Docker加速器三、拉取Portainer汉化版本镜像四、运行容器五、访问可视化界面 介绍 Portainer是一款开源的容器管理平台,它提供了一个直观易用的…

Jmeter多个请求按照比例并发压测的几种方式

🍅 视频学习:文末有免费的配套视频可观看 🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 一、需求 在压测的过程中,为了能够压测整个链路,通常需要多个接口进行并…

基于Matlab停车场车牌识别计时计费管理系统 【W2】

简介 停车场车牌识别计时计费管理系统在现代城市管理中具有重要意义。随着城市化进程的加快和车辆数量的增加,传统的人工管理停车场的方式已经难以满足效率和精确度的要求。因此引入车牌识别技术的自动化管理系统成为一种趋势和解决方案。 背景意义 提升管理效率&a…

linux 部署瑞数6实战(维普,药监局)sign第二部分

声明 本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!wx …

诊断解决方案——CANdesc和MICROSAR

文章目录 一、CANdesc二、MICROSAR一、CANdesc canbeded是Vector汽车电子开发软件Nun Autosar标准的工具链之一。 canbeded是以源代码的形式提供的可重用的组件,包括CAN Driver,交互层(IL),网络管理(NM),传输层(TP),诊断层(CANdesc) , 通信测量和标定协议(CCP,XCP) 和 通信控…

【C++】C++入门的杂碎知识点

思维导图大纲: namespac命名空间 什么是namespace命名空间namespace命名空间有什么用 什么是命名空间 namespace命名空间是一种域,它可以将内部的成员隔绝起来。举个例子,我们都知道有全局变量和局部变量,全局变量存在于全局域…

联想电脑电池只能充到80%,就不在充电了,猛一看以为坏了,只是设置了养护模式。

现在电池管理模式有三种: 1)常规 2)养护 3)快充 好久没有用联想的电脑了,猛一看,咱充到了80%不充了,难道电池是坏的?我们要如何设置才可以让其充电到100%呢? 右下角…

贪心算法学习五

例题一 解法(贪⼼): 贪⼼策略: 我们的任何选择,应该让这个数尽可能快的变成 1 。 对于偶数:只能执⾏除 2 操作,没有什么分析的; 对于奇数: i. 当 n 1 的时候…

如何使用ios自带语音转文字工具?

ios自带语音转文字是iOS系统中自带的语音转文字功能主要应用于以下几个方面: 1. 语音输入:在iOS的任何文本输入框中,通常都有一个麦克风图标,点击后可以进行语音输入,系统会将你的语音实时转换成文字。 2. Siri&…

1. NAS和SAN存储

NAS和SAN存储 一、存储设备1、根据工作方式2、DAS 直接附加存储3、NAS存储4、SAN存储 二、模拟配置SAN存储1、创建虚拟机、安装openfiler2、访问openfiler webUI3、创建RAID设备4、开启iSCSI服务5、配置SAN存储设备共享空间5.1 设置IQN 6、业务服务器连接使用存储6.1 安装客户端…

JDK17 你的下一个白月光

JDK版本升级的非常快,现在已经到JDK20了。JDK版本虽多,但应用最广泛的还得是JDK8,正所谓“他发任他发,我用Java8”。 但实际情况却不是这样,越来越多的java工程师拥抱 JDK17,于是了解了一下 JDK17新语法&a…

C#开发-集合使用和技巧(二)Lambda 表达式介绍和应用

C#开发-集合使用和技巧 Lambda 表达式介绍和应用 C#开发-集合使用和技巧介绍简单的示例:集合查询示例: 1. 基本语法从主体语句上区分:1. 主体为单一表达式2. 主体是代码块(多个表达式语句) 从参数上区分1. 带输入参数的…

69. UE5 RPG 使用Gameplay Cue 实现技能表现效果

在上一章中,我们实现了敌人的攻击技能的特效和音效。如果我们在多人模式下打开,发现,其它客户端看不到对应的效果。 造成这种问题的原因是因为敌人的技能是运行在服务器端的,它只复制到拥有它的客户端,而敌人的效果对于…

仿FC数学金刚游戏介绍

简介 Math Monkey是Simple2l工作室开发的第二款小游戏,灵感来源于FC游戏平台的数学金刚游戏。小学时玩FC游戏是业余时间最期待的事情,还记得有一次和玩伴玩游戏时已经晚上了,于是约定再玩一把就各回各家,没想到又连玩了N把每一把…

Postman下发流表至Opendaylight

目录 任务目的 任务内容 实验原理 实验环境 实验过程 1、打开ODL控制器 2、网页端打开ODL控制页面 3、创建拓扑 4、Postman中查看交换机的信息 5、L2层流表下发 6、L3层流表下发 7、L4层流表下发 任务目的 1、掌握OpenFlow流表相关知识,理解SDN网络中L…