Flink Lookup Join(维表 Join)

Lookup Join 定义(支持 Batch\Streaming)

Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

应用场景:

Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来;

实际案例

kafka流表和mysql维表的关联:
使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

mysql端处理:

[root@spop007~]# mysql -uroot -p123456mysql> create database test;
mysql> CREATE TABLE `user_profile` (`user_id` varchar(100) NOT NULL,`age` varchar(100) DEFAULT NULL,`sex` varchar(100) DEFAULT NULL,PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO test.user_profile (user_id,age,sex) VALUES('a','12-18','男'),('b','18-24','女'),('c','18-24','男');mysql>select * from test.user_profile; 

kafka端处理:

# 1.创建Kafka主题 test_k,指定分区数量为1,副本数量为1
kafka-topics.sh \
--create \
--topic test_k \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1# 2.向 test_k 中写入JSON格式的样例数据
./kafka-console-producer.sh \
--topic test_k \
--bootstrap-server localhost:9092!!!!!这个错误是因为你使用的 Kafka 版本较旧,不支持 --bootstrap-server 参数。旧版本的 Kafka 使用
参数代替 --broker-list 
----------------------------------
./kafka-console-producer.sh \
--topic test_k \
--broker-list localhost:9092
-----------------------------------
#输入完上面脚本,直接粘贴复制json
{"log_id": "1", "timestamp": "1635696063","user_id":"a"}
{"log_id": "2", "timestamp": "1635696180","user_id":"b"}
{"log_id": "3", "timestamp": "1635696300","user_id":"c"}
{"log_id": "4", "timestamp": "1635696360","user_id":"b"}
{"log_id": "5", "timestamp": "1635696420","user_id":"c"}
{"log_id": "6", "timestamp": "1635696420","user_id":"d"}# 3.创建一个消费者组 group_k1 来消费 test_k 数据
kafka-console-consumer.sh \
--topic test_k \
--bootstrap-server localhost:9092 \
--group group_k1 \
--from-beginning

Flinksql代码:

前提:jdbc的jar包和mysql的驱动包,都需要事先放入$FLINK_HOME/lib目录下。flink-connector-jdbc-1.15.2.jarmysql-connector-java-8.0.29.jar
cd $FLINK_HOME/bin
./sql-client.sh    CREATE TABLE click_log_table (log_id BIGINT, `timestamp` bigint,user_id string,proctime AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'test_k','properties.bootstrap.servers' = '192.168.77.88:9092','properties.group.id' = 'group_k1','scan.startup.mode' = 'earliest-offset','format' = 'json'
);CREATE TABLE user_profile (`user_id` string, `age` string,`sex` string
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.77.88:3306/test','table-name' = 'user_profile','username'='root','password'='root'
);SELECT s.log_id as log_id, s.`timestamp` as `timestamp`, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age
FROM click_log_table AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id;查看flinksql输出窗口显示:log_id            timestamp                        user_id                proctime                            sex                            age1           1635696063                              a 2024-11-19 00:28:14.40412-182           1635696180                              b 2024-11-19 00:28:14.40718-243           1635696300                              c 2024-11-19 00:28:14.40918-244           1635696360                              b 2024-11-19 00:28:14.41218-245           1635696420                              c 2024-11-19 00:28:14.42218-246           1635696420                              d 2024-11-19 00:28:14.424                         (NULL)                         (NULL)

在这里插入图片描述

修改mysql的数据 查看动态表的变化
UPDATE user_profile
SET age = '99-99', sex = 0
WHERE user_id = "a";kafka端输入:
{"log_id": "11111111111", "timestamp": "1635696063","user_id":"a"}
结果对应下图一kafka端再输入:
{"log_id": "222222", "timestamp": "1635696063","user_id":"a"}
结果对应下图二

在这里插入图片描述
在这里插入图片描述


删除和新增有空再写总结: Lookup Join 使用left join关联 ,左表全部输出,右表能匹配上的输出,匹配不上的用null填充。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/476981.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 101题集(随时更新)

题集来源:GitHub - changgyhub/leetcode_101: LeetCode 101:力扣刷题指南 使用C完成相关题目,以训练笔试 贪心 采用贪心的策略,保证每次操作都是局部最优的,从而使最后得到的结果是全局最优的。 分配问题 455. 分发饼…

百度主动推送可以提升抓取,它能提升索引量吗?

站长在建站SEO的时候,需要用到百度站长平台(资源平台)的工具,在站长工具中【普通收录】-【资源提交】-【API提交】这个功能,对网站的抓取进行一个提交。 这里估计很多站长就有疑问,如果我主动推送&#xf…

DevOps-Jenkins-新手入门级

1. Jenkins概述 1. Jenkins是一个开源持续集成的工具,是由JAVA开发而成 2. Jenkins是一个调度平台,本身不处理任何事情,调用插件来完成所有的工作 1.1 什么是代码部署 代码发布/部署>开发书写的程序代码---->部署测试/生产环境 web服务…

速通前端篇 —— CSS

找往期文章包括但不限于本期文章中不懂的知识点: 个人主页:我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏:速通前端 目录 CSS的介绍 基本语法规范 CSS选择器 标签选择器 class选择器 id选择器 复合选择器 通配符选择器 CSS常见样式 颜…

51c大模型~合集76

我自己的原文哦~ https://blog.51cto.com/whaosoft/12617524 #诺奖得主哈萨比斯新作登Nature,AlphaQubit解码出更可靠量子计算机 谷歌「Alpha」家族又壮大了,这次瞄准了量子计算领域。 今天凌晨,新晋诺贝尔化学奖得主、DeepMind 创始人哈萨…

怎么只提取视频中的声音?从视频中提取纯音频技巧

在数字媒体的广泛应用中,提取视频中的声音已成为一项常见且重要的操作。无论是为了学习、娱乐、创作还是法律用途,提取声音都能为我们带来诸多便利。怎么只提取视频中的声音?本文将详细介绍提取声音的原因、工具、方法以及注意事项。 一、为什…

IDEA如何设置编码格式,字符编码,全局编码和项目编码格式

前言 大家好,我是小徐啊。我们在开发Java项目(Springboot)的时候,一般都是会设置好对应的编码格式的。如果设置的不恰当,容易造成乱码的问题,这是要避免的。今天,小徐就来介绍下我们如何在IDEA…

Unable to find image ‘hello-world:latest‘ locally

网上对于这个问题的解答有很多了,我尝试了后并有解决,最后发现重启指令并没有使用sudo导致的。这里写一下总的解决方法: 1 查看是否已经有了hello-world sudo docker info如果有hello-world,就先删除 sudo docker rmi hello-w…

Web3.0安全开发实践:Clarity最佳实践总结

在过去的一段时间里,CertiK团队对比特币生态系统及其发展进行了深入研究。同时,团队还审计了多个比特币项目以及基于不同编程语言的智能合约,包括OKX的BRC-20钱包和MVC DAO的sCrypt智能合约实现。 现在,我们的研究重点转向了Clar…

Chrome离线安装包下载

1、问Chrome的官网:https://www.google.cn/chrome/ 直接下载的是在线安装包,安装需要联网。 2、如果需要在无法联网的设备上安装Chrome,需要在上面的地址后面加上?standalone1。 Chrome离线安装包下载地址:https://www.google.c…

【从零开始的LeetCode-算法】3232. 判断是否可以赢得数字游戏

给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中,Alice 可以从 nums 中选择所有个位数 或 所有两位数,剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和,则 Alice 获胜。 如果 Alice 能赢得这场游…

前端速通(JavaScript)

1 初识JavaScript 1 JavaScript是什么 JavaScript 是一种高层的、轻量级的、解释型的编程语言,最初由 Netscape 公司于 1995 年开发。它的特点包括: 动态性:JavaScript是动态类型语言,允许开发者灵活地操作数据。跨平台&#xf…

分层架构 IM 系统之架构演进

在电商业务日活几百万的情况下,IM 系统采用分层架构方式,如下图。 分层架构的 IM 系统,整体上包含了【终端层】、【入口层】、【业务逻辑层】、【路由层】、【数据访问层】和【存储层】,我们在上篇文章(分层架构 IM 系…

基于ToLua的C#和Lua内存共享方案保姆级教程

C#和Lua内存共享方案保姆级教程 前言 在介绍C#和Lua内存共享方案之前,先介绍下面两个点来支撑这个方案的必要性 跨语言交互很费 Lua和C#交互最早是基于反射的方式实现的,后来为了提升性能发展成Luajit+C#静态方法导出注入到lua虚拟机的方式至此Lua+Unity的性能才达到了实…

SpringSecurity创建一个简单的自定义表单的认证应用

1、SpringSecurity 自定义表单 在 Spring Security 中创建自定义表单认证应用是一个常见的需求,特别是在需要自定义登录页面、认证逻辑或添加额外的表单字段时。以下是一个详细的步骤指南,帮助你创建一个自定义表单认证应用。 2、基于 SpringSecurity 的…

创客匠人老蒋:个人IP如何获取有效流量?

大家好,我是老蒋。 为什么我反复强调说,如果你想把个人IP、创始人IP做起来,想把自己直播间的流量变大变活,一定要去参加这场将在2024年底举办的《全球创始人IP领袖高峰论坛》?一定要走出去看看更高的世界?…

华三(H3C)T1020 IPS服务器硬件监控指标解读

在日益复杂的网络环境中,服务器的稳定运行对于保障业务的连续性和安全性至关重要。华三(H3C)T1020 IPS作为一款高性能的入侵防御系统,其运行状态和性能监控显得尤为重要。监控易作为一款专业的监控软件,为华三T1020 IP…

【Unity3D插件】Unity3D HDRP Outline高亮发光轮廓描边插件教程

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享QQ群:398291828小红书小破站 大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 最近用Unity3D的HDRP(高清渲染管…

数据结构-7.Java. 对象的比较

本篇博客给大家带来的是java对象的比较的知识点, 其中包括 用户自定义类型比较, PriorityQueue的比较方式, 三种比较方法...... 文章专栏: Java-数据结构 若有问题 评论区见 欢迎大家点赞 评论 收藏 分享 如果你不知道分享给谁,那就分享给薯条. 你们的支持是我不断创作的动力 .…

OpenCV相机标定与3D重建(3)校正鱼眼镜头畸变的函数calibrate()的使用

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::fisheye::calibrate 函数是 OpenCV 中用于校正鱼眼镜头畸变的一个重要函数。该函数通过一系列棋盘格标定板的图像来计算相机的内参矩阵和畸变…