Flinksql 模拟 视图 监听

工作中遇到这样的一个业务,业务方给的是一个视图,查了一下文档视图不能监听,这个时候想着要不要用datastream去自定义,然后发现flinksql也是可以实现
创建对应数据库和表

-- 创建班级表 tb_class
CREATE TABLE tb_class (class_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键class_name VARCHAR(50) NOT NULL           -- 班级名称
);-- 创建学生表 tb_student
CREATE TABLE tb_student (student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键student_name VARCHAR(50) NOT NULL,          -- 姓名class_id INT,                               -- 班级IDFOREIGN KEY (class_id) REFERENCES tb_class(class_id)  -- 外键关联到 tb_class
);
-- 向 tb_class 表中插入数据
INSERT INTO tb_class (class_name) VALUES ('Class A');
INSERT INTO tb_class (class_name) VALUES ('Class B');
INSERT INTO tb_class (class_name) VALUES ('Class C');-- 向 tb_student 表中插入数据
INSERT INTO tb_student (student_name, class_id) VALUES ('Alice', 1);
INSERT INTO tb_student (student_name, class_id) VALUES ('Bob', 2);
INSERT INTO tb_student (student_name, class_id) VALUES ('Charlie', 3);
INSERT INTO tb_student (student_name, class_id) VALUES ('Diana', 1);-- 创建视图 tb_student_view
CREATE VIEW tb_student_view AS
SELECT s.student_id AS student_id,       -- 主键s.student_name AS student_name,   -- 学生姓名c.class_name AS class_name        -- 班级名称
FROM tb_student s
JOIN tb_class c ON s.class_id = c.class_id;select * from tb_student_view;-- 创建 ads_student 表
CREATE TABLE ads_student (student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键student_name VARCHAR(50) NOT NULL,          -- 学生姓名class_name VARCHAR(50) NOT NULL             -- 班级名称
);

编写对应的flinksql代码,这里没有flinksql客户端,只能在idea上完成了

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLJoinExample {public static void main(String[] args) throws Exception {// 设置流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建 Table 环境EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);env.setParallelism(2);  // 将并行度设置为2,根据需要调整// 定义 tb_student 表tableEnv.executeSql("CREATE TABLE tb_student (" +"   student_id INT, " +"   student_name STRING, " +"   class_id INT ," +"   PRIMARY KEY (student_id) NOT ENFORCED" +") WITH (" +"   'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用"    'hostname' = 'localhost'," +"  'port' = '3307', " +"   'username' = 'root', " +"   'password' = '123456'," +"    'database-name' = 't2', " +"    'scan.incremental.snapshot.enabled' = 'false', " +"   'table-name' = 'tb_student' " +")");// 定义 tb_class 表tableEnv.executeSql("CREATE TABLE tb_class (" +"   class_id INT, " +"   class_name STRING, " +"   PRIMARY KEY (class_id) NOT ENFORCED" +") WITH (" +"  'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用"    'hostname' = 'localhost'," +"  'port' = '3307', " +"   'username' = 'root', " +"   'password' = '123456'," +"    'database-name' = 't2', " +"    'scan.incremental.snapshot.enabled' = 'false', " +"   'table-name' = 'tb_class' " +")");// 定义 ads_student 表,使用 jdbc 连接器作为目标表tableEnv.executeSql("CREATE TABLE ads_student (" +"   student_id INT, " +"   student_name STRING, " +"   class_name STRING, " +"   PRIMARY KEY (student_id) NOT ENFORCED" +") WITH (" +"   'connector' = 'jdbc', " +"   'url' = 'jdbc:mysql://localhost:3307/t2', " +"   'table-name' = 'ads_student', " +"   'username' = 'root', " +"   'password' = '123456' " +")");// 执行 JOIN 查询并插入到 ads_student 表tableEnv.executeSql("INSERT INTO ads_student " +"SELECT s.student_id, s.student_name, c.class_name " +"FROM tb_student AS s " +"JOIN tb_class AS c ON s.class_id = c.class_id");// 启动任务env.execute("Flink SQL Join Example");}
}

注意要加上额外的pom依赖

# sink 使用的是jdbc连接的方式
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.2.0-1.19</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.15.0</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/462088.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中仕公考:25年上海省考时间

打算参加2025年上海省考的考生&#xff0c;中仕公考为大家整理了24年的考试时间点做参考。 报名时间:11月4日-11月10日 缴费时间:11月4日-11月11日 准考证打印时间:12月6日-12月9日 笔试时间:12月10日 9:00-11:30申论 13:30-15:30行政职业能力测验 16:15-17:15专业科目 …

智能指针(内存泄漏问题)

&#x1f33b;个人主页&#xff1a;路飞雪吖~ &#x1f320;专栏&#xff1a;C/C 目录 一、为什么需要智能指针&#xff1f; 二、内存泄露 三、智能指针的使用及原理 ⭐RAII ⭐智能指针的原理 &#x1f320;小贴士&#xff1a; ⭐std::auto_ptr ​编辑 ✨auto_ptr模拟实…

CSS例子: 横向排列的格子

效果 HTML <view class"content"><view class"item" v-for"item of 5">{{item}}</view></view> CSS .content {height: 100vh;display: flex;flex-direction: row; flex-wrap: wrap;align-content: flex-start;backgro…

面试题分享1

2024.11.1 1、过滤器和拦截器的区别 过滤器是基于spring的 拦截器是基于Java Web的 2、session 和 cookie 的区别、关系 cookie session 存储位置 保存在浏览器 &#xff08;客户端&#xff09; 保存在服务器 存储数据大小 限制大小&#xff0c;存储数据约为4KB 不限…

Python酷库之旅-第三方库Pandas(186)

目录 一、用法精讲 861、pandas.Index.names属性 861-1、语法 861-2、参数 861-3、功能 861-4、返回值 861-5、说明 861-6、用法 861-6-1、数据准备 861-6-2、代码示例 861-6-3、结果输出 862、pandas.Index.nbytes属性 862-1、语法 862-2、参数 862-3、功能 8…

Ansible 部署应用

Ansible Ansible 是基于 Python 开发&#xff0c;集合了众多优秀运维工具的优点&#xff0c;实现了批量运行命令、部署程序、配置系统等功能的自动化运维管理工具。默认通过 SSH 协议进行远程命令执行或下发配置&#xff0c;无需部署任何客户端代理软件&#xff0c;从而使得自动…

Python的全局锁GIL解析

Python的全局锁&#xff08;GIL&#xff09;是 CPython 解释器实现中的一个机制&#xff0c;用来确保任何时候只有一个线程执行 Python 字节码。这一机制存在于 CPython 中&#xff0c;主要是为了确保线程操作中的数据一致性&#xff0c;但也因此限制了多线程的并行执行效率。尤…

基于vue框架的的考研信息共享平台v0eyp(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;国家政策,用户,院校政策,院校信息,考研资料,资料分类,考研论坛 开题报告内容 基于Vue框架的考研信息共享平台开题报告 一、研究背景与意义 随着考研人数的逐年增长&#xff0c;考研学生对高效、便捷、个性化的信息获取需求愈发强烈。…

抽丝剥茧 分布式服务框架设计 理论设计篇

1、概述 前面几篇文章给大家详细的介绍了Zookeeper的基础概念以及应用的领域&#xff0c;今天我们讨论的话题是如何自研一套分布式服务框架。早些年有很多基于Dubbo和Zookeeper的分布式系统&#xff0c;这篇文章我们就来聊下如何设计一个分布式服务框架。 2、系统间交互 2.1、…

C++STL——list

C教学总目录 list 1、list简介2、构造函数3、迭代器4、访问和容量函数5、修改类函数6、操作类函数 1、list简介 list是带头双向循环链表&#xff0c;也是模板类&#xff0c;使用时要指明类型&#xff0c;包含于头文件<list> 由于list是双向循环链表&#xff0c;在任意位置…

YoloV8改进策略:Block改进|RFE模块,提高小物体的识别精度|即插即用|代码+修改过程

摘要 论文介绍 本文介绍了一种基于YOLOv5的人脸检测方法,命名为YOLO-FaceV2。该方法旨在解决人脸检测中的尺度变化、简单与困难样本不平衡以及人脸遮挡等问题。通过引入一系列创新模块和损失函数,YOLO-FaceV2在WiderFace数据集上取得了优异的表现,特别是在小物体、遮挡和困…

leaflet矢量瓦片vetorgrid显示聚合和图标裁剪显示不全的问题

1、问题现象 使用leaflet显示矢量瓦片会出现图片挤压的问题和图片裁剪显示不全的问题 2、解决办法和思路 1&#xff09;数据抽稀 方法一&#xff1a;在createTile方法通过控制feature在单张瓦片里面显示的数量&#xff0c;在小层级的时候进行筛选过滤&#xff0c;对点数据类…

Gitee push 文件

1、背景 想将自己的plecs仿真放到git中管理&#xff0c;以防丢失&#xff0c;以防乱改之后丢失之前版本仿真。此操作说明默认用户已下载git。 2、操作步骤 2.1 开启Git Bash 在文件夹中右键&#xff0c;开启Git Bash。 2.2 克隆文件 在Git Bash中打git clone git地址&#…

gitee 使用 webhoot 触发 Jenkins 自动构建

一、插件下载和配置 Manage Jenkins>Plugin Manager 搜索 gitee 进行安装 插件配置 1、前往Jenkins -> Manage Jenkins -> System -> Gitee Configuration -> Gitee connections 2、在 Connection name 中输入 Gitee 或者你想要的名字 3、Gitee host URL 中…

【JavaEE初阶 — 多线程】Thread类的属性

目录 Thread类的属性 1.Thread 的常见构造方法 2.Thread 的几个常见属性 2.1 前台线程与后台线程 2.2 setDaemon() 2.3 isAlive() Thread类的属性 Thread 类是JVM 用来管理线程的一个类&#xff0c;换句话说&#xff0c;每个线程都有一个唯一的Thread 对象与之关联&…

yocto是如何收集recipes,如何加入现有的bb文件

yocto通常是如何收集recipes: 在Yocto中&#xff0c;通过以下方式收集recipes&#xff1a; 层&#xff08;Layers&#xff09; Yocto项目使用层来组织recipes。层是包含配置文件、recipes和其他相关文件的目录结构。每个层有自己的目录&#xff0c;其中 recipes-* 目录用于存…

原生鸿蒙的竞争力到底如何?

目录 1. 崛起与挑战2. 安全机制3. 自动化检测前移4. 深入探讨开发者服务优势 1. 崛起与挑战 长期以来&#xff0c;移动操作系统市场被IOS和安卓所垄断&#xff0c;一直都难以推出完整的自主系统&#xff0c;面临诸多挑战&#xff0c;如推广困难、应用适配难度大&#xff0c;以及…

sublime Text中设置编码为GBK

要在sublime Text中设置编码为GBK&#xff0c;请按照以下步骤操作 1.打开Sublime Text编辑器, 2.点击菜单栏中的“Preferences”(首选项)选项&#xff0c;找打Package Control选项。 3.点击Package Control&#xff0c;随后搜索Install Package并点击&#xff0c;如下图 4.再…

KPRCB结构之ReadySummary和DispatcherReadyListHead

ReadySummary: Uint4B DispatcherReadyListHead : [32] _LIST_ENTRY 请参考 _KTHREAD *__fastcall KiSelectReadyThread(ULONG LowPriority, _KPRCB *Prcb)

Python爬虫:揭开淘宝商品描述的神秘面纱

在这个信息爆炸的时代&#xff0c;我们每天都在和时间赛跑。作为一名Python开发者&#xff0c;你是否曾梦想拥有超能力&#xff0c;能够瞬间揭开淘宝商品描述的神秘面纱&#xff1f;今天&#xff0c;就让我们一起化身为代码界的“福尔摩斯”&#xff0c;使用Python爬虫技术&…