FlinkSQL中【FULL OUTER JOIN】使用实例分析(坑)

Flink版本:flink1.14
最近有【FULL OUTER JOIN】场景的实时数据开发需求,想要的结果是,左右表来了数据都下发数据;左表存在的数据,右表进来可以关联下发(同样,右表存在的数据,左表进来也可以关联下发)。但在实际应用中遇到一些问题。
其中包括FlinkSQL知识点:

  • FlinkSQL 【FULL OUTER JOIN】
  • FlinkSQL 【Temporal Joins-Lookup Join】
  • FlinkSQL 【去重】
  • FlinkSQL 【upsert-kafka】

FlinkSQL demo


CREATE TABLE waybill_extend_kafka (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,_proc  as proctime()
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);CREATE TABLE package_state_kafka (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,_proc  as proctime()
) WITH ('connector' = 'kafka','topic' = 't2','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);CREATE TABLE es_dim(id                STRING,ts                STRING,waybill_code      STRING,pin               STRING,operater_ts       STRING,operater_type     STRING,is_enable         STRING,batch_no          STRING,activity_key      STRING,p_type            STRING,p_name            STRING,version            STRING,update_time            STRING
)
with ('connector' = 'elasticsearch-6','index' = 'es_dim','document-type' = 'es_dim','hosts' = 'http://xxx:9200','format' = 'json'
);CREATE TABLE es_sink(waybill_code      STRING,first_order       STRING -- 新客1,非新客0,extend_update_time            STRING,state       STRING -- 妥投150,package_update_time            STRING,pin              STRING,coupon_use_time      STRING,operater_type    STRING,is_enable        STRING,batch_no         STRING,update_time         STRING,PRIMARY KEY (waybill_code) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE TABLE kafka_sink (waybill_code      STRING,first_order       STRING ,extend_update_time            STRING,state       STRING -- 妥投150,package_update_time            STRING,pin              STRING,coupon_use_time      STRING,operater_type    STRING,is_enable        STRING,batch_no         STRING,update_time         STRING,PRIMARY KEY (waybill_code) NOT ENFORCED --注意 确保在 DDL 中定义主键。
) WITH ('connector' = 'upsert-kafka','topic' = 't3','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','key.format' = 'json','value.format' = 'json'
);--新客
CREATE view  waybill_extend_temp as
selectIF(cur['waybill_code'] IS NOT NULL , cur['waybill_code'], src ['waybill_code'])  AS waybill_code,IF(cur['data_key'] IS NOT NULL , cur['data_key'], src ['data_key'])  AS data_key,IF(cur['create_time'] IS NOT NULL , cur['create_time'], src ['create_time'])  AS create_time,opt,_proc
FROM waybill_extend_kafka
where UPPER(opt) = 'DELETE' OR UPPER(opt) = 'INSERT';CREATE view  waybill_extend_temp_handle as
SELECTwaybill_code,case when UPPER(opt) = 'INSERT'  then '1'when UPPER(opt) = 'DELETE'  then '0'end as first_order,create_time,_proc
from waybill_extend_temp
where data_key = 'firstOrder';--妥投
CREATE view package_state_temp as
selectIF(cur['WAYBILL_CODE'] IS NOT NULL , cur['WAYBILL_CODE'], src ['WAYBILL_CODE'])  AS waybill_code,IF(cur['STATE'] IS NOT NULL , cur['STATE'], src ['STATE'])  AS state,IF(cur['CREATE_TIME'] IS NOT NULL , cur['CREATE_TIME'], src ['CREATE_TIME'])  AS create_time,opt,_proc
FROM package_state_kafka
where UPPER(opt) = 'INSERT';CREATE view package_state_temp_handle as
SELECTwaybill_code,max(state) as state,min(create_time) as package_update_time,proctime() as _proc
from package_state_temp
where state = '150'
group by waybill_code;--full join
-- flink1.14 注意:flinksql里面的FULL OUTER JOIN 只是分别下发左右数据,中间状态不关联下发,在流处理场景下相当于union all
CREATE view waybill_extend_package_state  as
SELECTCOALESCE(a.waybill_code, b.waybill_code) as waybill_code,a.first_order,a.create_time as extend_update_time,b.state,b.package_update_time,COALESCE(a._proc, b._proc) as _proc
from waybill_extend_temp_handle as a
FULL OUTER JOIN package_state_temp_handle b
on a.waybill_code=b.waybill_code;--result
CREATE VIEW res_view AS
SELECTa.waybill_code,a.first_order,a.extend_update_time,a.state,a.package_update_time,b.pin,b.operater_ts,b.operater_type,b.is_enable,b.batch_no,CAST(CAST(a._proc AS TIMESTAMP(3)) AS STRING) as update_time,row_number() over(partition by a.waybill_code order by b.operater_ts desc) as rn 
from waybill_extend_package_state as a
JOIN es_dim FOR SYSTEM_TIME AS OF a._proc as b
on a.waybill_code=b.waybill_code;INSERT INTO es_sink
SELECTwaybill_code,first_order,extend_update_time,state,package_update_time,pin,operater_ts,operater_type,is_enable,batch_no,update_time
FROM res_view
where rn =1;INSERT INTO kafka_sink
SELECTwaybill_code,first_order,extend_update_time,state,package_update_time,pin,operater_ts ,operater_type,is_enable,batch_no,update_time
FROM res_view
where rn =1;

es_sink mapping:

POST es_sink/es_sink/_mapping
{"es_sink": {"properties": {"waybill_code": {"type": "keyword"},"pin": {"type": "keyword"},"operater_ts": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},"operater_type": {"type": "keyword"},"is_enable": {"type": "keyword"},"batch_no": {"type": "keyword"},"first_order": {"type": "keyword"},"extend_update_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},"state": {"type": "keyword"},"package_update_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},"update_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}

结果分析

从sink_es和sink_kafka获取数据都是同样的结果,部分结果如下:
在这里插入图片描述
但从结果中可以看出,FlinkSQL里面的【FULL OUTER JOIN】 只是分别下发左右数据,中间状态(从FlinkUI中可以看到【FULL OUTER JOIN】状态也做了保存)不关联下发,在流处理场景下相当于【UNION ALL】,不知是否是FlinkSQL的bug。
【FULL OUTER JOIN】状态数据,如下:
在这里插入图片描述
此次用例分析只是针对于Flink1.14,对于其他版本尚不清楚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/232732.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FlinkAPI开发之数据合流

案例用到的测试数据请参考文章&#xff1a; Flink自定义Source模拟数据流 原文链接&#xff1a;https://blog.csdn.net/m0_52606060/article/details/135436048 概述 在实际应用中&#xff0c;我们经常会遇到来源不同的多条流&#xff0c;需要将它们的数据进行联合处理。所以…

Apache Paimon:Streaming Lakehouse is Coming

摘要&#xff1a;本文整理自阿里云智能开源表存储负责人&#xff0c;Founder of Paimon&#xff0c;Flink PMC 成员李劲松&#xff08;花名&#xff1a;之信&#xff09;、同程旅行大数据专家&#xff0c;Apache Hudi & Paimon Contributor 吴祥平、汽车之家大数据计算平台…

Mybatis实现增删改查的两种方式-配置文件/注解

环境准备 1.数据库表tb_brand -- 删除tb_brand表 drop table if exists tb_brand; -- 创建tb_brand表 create table tb_brand(-- id 主键id int primary key auto_increment,-- 品牌名称brand_name varchar(20),-- 企业名称company_name varchar(20),-- 排序字段ordered int…

[Flutter]WebPlatform上运行遇到的问题总结

[Flutter]WebPlatform上运行遇到的问题总结 目录 [Flutter]WebPlatform上运行遇到的问题总结 写在开头 正文 Q1、不兼容判断 Q2、跨域问题 其他 写在结尾 写在开头 Flutter项目已能在移动端完美使用后&#xff0c;想看看在桌面端等使用情况 基于Flutter3.0后已支持Wi…

前端框架中的状态管理(State Management)

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

李沐-《动手学深度学习》-- 01-预备知识

一、线性代数知识 1. 矩阵计算 a. 矩阵求导 ​ 当y和x分别为标量和向量时候&#xff0c;进行求导得到的矩阵形状&#xff0c;矩阵求导就是矩阵A中的每一个元素对矩阵B中的每一个元素求导 ​ 梯度指向的是值变化最大的方向 ​ 分子布局和分母布局&#xff1a; b. 常识 ax…

知识付费平台搭建?找明理信息科技,专业且高效

明理信息科技知识付费saas租户平台 在当今数字化时代&#xff0c;知识付费已经成为一种趋势&#xff0c;越来越多的人愿意为有价值的知识付费。然而&#xff0c;公共知识付费平台虽然内容丰富&#xff0c;但难以满足个人或企业个性化的需求和品牌打造。同时&#xff0c;开发和…

群晖NAS+DMS7.0以上版本+无docker机型安装zerotier

测试机型&#xff1a;群晖synology 218play / DSM版本为7.2.1 因218play无法安装docker&#xff0c;且NAS系统已升级为7.0以上版本&#xff0c;按zerotier官网说法无法安装zerotier, 不过还是可以通过ssh终端和命令方式安装zerotier。 1、在DSM新建文件夹 用于存放zerotier脚…

2023我的编程之旅-地质人的山和水

引言 大家好&#xff0c;我是搞地质的。外行人有的说我们游山玩水&#xff0c;有的说我们灰头土脸&#xff0c;也有的说我们不是科学。 而我说&#xff0c;这是一门穷极一生青春&#xff0c;值得奉献的行业。这是一门贴近民生&#xff0c;又拥抱自然的学科。他的真理性在于探…

进阶学习——Linux系统安全及应用

目录 一、系统安全加固 1.账号安全基本措施 1.1系统账号清理 1.1.1延伸 1.2密码安全控制 1.3命令历史限制 1.4终端自动注销 二、使用su命令切换用户 1.用途及用法 2.密码验证 3.限制使用su命令的用户 4.查看su操作记录 5.sudo&#xff08;superuse do&#xff09;…

stable diffusion 人物高级提示词(一)头部篇

一、女生发型 prompt描述推荐用法Long hair长发一定不要和 high ponytail 一同使用Short hair短发-Curly hair卷发-Straight hair直发-Ponytail马尾high ponytail 高马尾&#xff0c;一定不要和 long hair一起使用&#xff0c;会冲突Pigtails2条辫子-Braid辫子只写braid也会生…

数据库开发必备神器:DataGrip 工具安装指南

DataGrip是一款强大的跨平台数据库集成开发环境&#xff0c;全能数据库工具&#xff0c;支持多种数据库系统&#xff0c;包括MySQL&#xff0c;PostgreSQL&#xff0c;Oracle等&#xff1b;提供智能代码编辑功能&#xff0c;包括语法高亮、代码补全等&#xff0c;提升开发效率。…

Qt/QML编程学习之心得:Linux下Thread线程创建(26)

GUI设计中经常为了不将界面卡死,会用到线程Thread,而作为GUI设计工具,Qt也提供了一个这样的类,即QThread。 QThread对象管理程序中的一个控制线程。线程QThread开始在run()中执行。默认情况下,run()通过调用exec()启动事件循环,并在线程内运行Qt事件循环。 也可以通过…

YTM32的低功耗PowerDown模式及唤醒管理器WKU模块

文章目录 Introduction专门的唤醒源管理器WKU外部的唤醒引脚内部的触发信号 进入PowerDown模式的操作流进入低功耗模式配合使用的其他模块 性能指标低功耗电流唤醒时间 Conclusion Introduction YTM32的低功耗系统中有设计了多种工作模式&#xff0c;功耗从高到低&#xff0c;…

Spring之 国际化:i18n

1、i18n概述 国际化也称作i18n&#xff0c;其来源是英文单词 internationalization的首末字符i和n&#xff0c;18为中间的字符数。由于软件发行可能面向多个国家&#xff0c;对于不同国家的用户&#xff0c;软件显示不同语言的过程就是国际化。通常来讲&#xff0c;软件中的国…

2.6 KERNEL LAUNCH

图2.15在vecAdd函数中显示最终主机代码。此源代码完成了图2.6.中的骨架。2.12和2.15共同说明了一个简单的CUDA程序&#xff0c;该程序由主机代码和设备内核组成。该代码是硬接的&#xff0c;每个线程块使用256个线程。然而&#xff0c;使用的线程块的数量取决于向量&#xff08…

java基于SSM的游戏商城的设计与实现论文

基于SSM的游戏商城的设计与实现 摘 要 当下&#xff0c;正处于信息化的时代&#xff0c;许多行业顺应时代的变化&#xff0c;结合使用计算机技术向数字化、信息化建设迈进。以前相关行业对于游戏信息的管理和控制&#xff0c;采用人工登记的方式保存相关数据&#xff0c;这种以…

【C++】- 类和对象(!!C++类基本概念!this指针详解)

类和对象 引入类类的定义类的访问限定操作符类的作用域类的实例化类对象模型this指针 引入类 在 C中&#xff0c;引入了一个新的定义----------类。类是一种用户自定义的数据类型&#xff0c;用于封装数据和行为。类可以看作是一个模板或蓝图&#xff0c;描述了一组相关的数据和…

Android 集成vendor下的模块

Android 集成vendor下的模块 &#xff0c;只需要在 PRODUCT_PACKAGES 加上对应的模块名&#xff0c;编译的时候就会执行对应模块的bp文件&#xff0c;集成到系统中 PRODUCT_PACKAGES \WallpaperPicker \Launcher3 \com.nxp.nfc Android11 Framework Vendor下自定义系统…

SpringBoot 如何 返回页面

背景 RestController ResponseBody Controller Controller中的方法无法返回jsp页面&#xff0c;或者html&#xff0c;配置的视图解析器 InternalResourceViewResolver不起作用&#xff0c;返回的内容就是Return 里的内容。 Mapping ResponseBody 也会出现同样的问题。 解…