工作中遇到这样的一个业务,业务方给的是一个视图,查了一下文档视图不能监听,这个时候想着要不要用datastream去自定义,然后发现flinksql也是可以实现
创建对应数据库和表
-- 创建班级表 tb_class
CREATE TABLE tb_class (class_id INT PRIMARY KEY AUTO_INCREMENT, -- 主键class_name VARCHAR(50) NOT NULL -- 班级名称
);-- 创建学生表 tb_student
CREATE TABLE tb_student (student_id INT PRIMARY KEY AUTO_INCREMENT, -- 主键student_name VARCHAR(50) NOT NULL, -- 姓名class_id INT, -- 班级IDFOREIGN KEY (class_id) REFERENCES tb_class(class_id) -- 外键关联到 tb_class
);
-- 向 tb_class 表中插入数据
INSERT INTO tb_class (class_name) VALUES ('Class A');
INSERT INTO tb_class (class_name) VALUES ('Class B');
INSERT INTO tb_class (class_name) VALUES ('Class C');-- 向 tb_student 表中插入数据
INSERT INTO tb_student (student_name, class_id) VALUES ('Alice', 1);
INSERT INTO tb_student (student_name, class_id) VALUES ('Bob', 2);
INSERT INTO tb_student (student_name, class_id) VALUES ('Charlie', 3);
INSERT INTO tb_student (student_name, class_id) VALUES ('Diana', 1);-- 创建视图 tb_student_view
CREATE VIEW tb_student_view AS
SELECT s.student_id AS student_id, -- 主键s.student_name AS student_name, -- 学生姓名c.class_name AS class_name -- 班级名称
FROM tb_student s
JOIN tb_class c ON s.class_id = c.class_id;select * from tb_student_view;-- 创建 ads_student 表
CREATE TABLE ads_student (student_id INT PRIMARY KEY AUTO_INCREMENT, -- 主键student_name VARCHAR(50) NOT NULL, -- 学生姓名class_name VARCHAR(50) NOT NULL -- 班级名称
);
编写对应的flinksql代码,这里没有flinksql客户端,只能在idea上完成了
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLJoinExample {public static void main(String[] args) throws Exception {// 设置流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建 Table 环境EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);env.setParallelism(2); // 将并行度设置为2,根据需要调整// 定义 tb_student 表tableEnv.executeSql("CREATE TABLE tb_student (" +" student_id INT, " +" student_name STRING, " +" class_id INT ," +" PRIMARY KEY (student_id) NOT ENFORCED" +") WITH (" +" 'connector' = 'mysql-cdc', " + // 使用 datagen 连接器模拟数据,开发测试用" 'hostname' = 'localhost'," +" 'port' = '3307', " +" 'username' = 'root', " +" 'password' = '123456'," +" 'database-name' = 't2', " +" 'scan.incremental.snapshot.enabled' = 'false', " +" 'table-name' = 'tb_student' " +")");// 定义 tb_class 表tableEnv.executeSql("CREATE TABLE tb_class (" +" class_id INT, " +" class_name STRING, " +" PRIMARY KEY (class_id) NOT ENFORCED" +") WITH (" +" 'connector' = 'mysql-cdc', " + // 使用 datagen 连接器模拟数据,开发测试用" 'hostname' = 'localhost'," +" 'port' = '3307', " +" 'username' = 'root', " +" 'password' = '123456'," +" 'database-name' = 't2', " +" 'scan.incremental.snapshot.enabled' = 'false', " +" 'table-name' = 'tb_class' " +")");// 定义 ads_student 表,使用 jdbc 连接器作为目标表tableEnv.executeSql("CREATE TABLE ads_student (" +" student_id INT, " +" student_name STRING, " +" class_name STRING, " +" PRIMARY KEY (student_id) NOT ENFORCED" +") WITH (" +" 'connector' = 'jdbc', " +" 'url' = 'jdbc:mysql://localhost:3307/t2', " +" 'table-name' = 'ads_student', " +" 'username' = 'root', " +" 'password' = '123456' " +")");// 执行 JOIN 查询并插入到 ads_student 表tableEnv.executeSql("INSERT INTO ads_student " +"SELECT s.student_id, s.student_name, c.class_name " +"FROM tb_student AS s " +"JOIN tb_class AS c ON s.class_id = c.class_id");// 启动任务env.execute("Flink SQL Join Example");}
}
注意要加上额外的pom依赖
# sink 使用的是jdbc连接的方式
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.2.0-1.19</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.15.0</version>
</dependency>