使用FlinkSql进行实时工作流开发
- 引言
- Flink SQL实战
- 常用的Connector
- 1. MySQL-CDC 连接器配置
- 2. Kafka 连接器配置
- 3. JDBC 连接器配置
- 4. RabbitMQ 连接器配置
- 5. REST Lookup 连接器配置
- 6. HDFS 连接器配置
- FlinkSql数据类型
- 1. 基本数据类型
- 2. 字符串数据类型
- 3. 日期和时间数据类型
- 4. 复杂数据类型
- 5. 特殊数据类型
- 数据类型的使用示例
- 1. 数学函数
- 2. 字符串函数
- 3. 日期时间函数
- 4. 聚合函数
- 5. 条件函数
- 高阶函数
- 1. 窗口函数
- 2. 集合操作函数
- 3. JSON 函数
- 4. 数据类型转换函数
- 5. 复杂类型函数
- 6. 用户自定义函数 (UDF)
引言
在大数据时代,实时数据分析和处理变得越来越重要。Apache Flink,作为流处理领域的佼佼者,提供了一套强大的工具集来处理无界和有界数据流。其中,Flink SQL是其生态系统中一个重要的组成部分,允许用户以SQL语句的形式执行复杂的数据流操作,极大地简化了实时数据处理的开发流程。
什么是Apache Flink?
Apache Flink是一个开源框架,用于处理无边界(无尽)和有边界(有限)数据流。它提供了低延迟、高吞吐量和状态一致性,使开发者能够构建复杂的实时应用和微服务。Flink的核心是流处理引擎,它支持事件时间处理、窗口操作以及精确一次的状态一致性。
为什么选择Flink SQL?
易用性:Flink SQL使得非专业程序员也能快速上手,使用熟悉的SQL语法进行实时数据查询和处理。
灵活性:可以无缝地将SQL与Java/Scala API结合使用,为用户提供多种编程模型的选择。
性能:利用Flink的高性能流处理引擎,Flink SQL能够实现实时响应和低延迟处理。
集成能力:支持多种数据源和数据接收器,如Kafka、JDBC、HDFS等,易于集成到现有的数据生态系统中。
Flink SQL实战
常用的Connector
在配置FlinkSQL实时开发时,使用mysql-cdc、Kafka、jdbc和rabbitmq作为连接器是一个很常见的场景。以下是详细的配置说明,你可以基于这些信息来撰写你的博客:
1. MySQL-CDC 连接器配置
MySQL-CDC(Change Data Capture)连接器用于捕获MySQL数据库中的变更数据。配置示例如下:
CREATE TABLE mysql_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'mysql-cdc', -- 使用mysql-cdc连接器'hostname' = 'mysql-host', -- MySQL服务器主机名'port' = '3306', -- MySQL端口号'username' = 'user', -- MySQL用户名'password' = 'password', -- MySQL密码'database-name' = 'db', -- 数据库名'table-name' = 'table' -- 表名'server-time-zone' = 'GMT+8', -- 服务器时区'debezium.snapshot.mode' = 'initial', -- 初始快照模式,initial表示从头开始读取所有数据;latest-offset表示从最近的偏移量开始读取;timestamp则可以指定一个时间戳,从该时间戳之后的数据开始读取。'scan.incremental.snapshot.enabled' = 'true' -- 可选,设置为true时,Flink会尝试维护一个数据库表的增量快照。这意味着Flink不会每次都重新读取整个表,而是只读取自上次读取以来发生变化的数据。这样可以显著提高读取效率,尤其是在处理大量数据且频繁更新的场景下。'scan.incremental.snapshot.chunk.size' = '1024' -- 可选, 增量快照块大小'debezium.snapshot.locking.mode' = 'none', -- 可选,控制在快照阶段锁定表的方式,以防止数据冲突。none表示不锁定,lock-tables表示锁定整个表,transaction表示使用事务来锁定。'debezium.properties.include-schema-changes' = 'true', -- 可选,如果设置为true,则在CDC事件中会包含模式变更信息。'debezium.properties.table.whitelist' = 'mydatabase.mytable', -- 可选,指定要监控的表的白名单。如果table-name未设置,可以通过这个属性来指定。'debezium.properties.database.history' = 'io.debezium.relational.history.FileDatabaseHistory' -- 可选,设置数据库历史记录的实现类,通常使用FileDatabaseHistory来保存历史记录,以便在重启后能恢复状态。
);
2. Kafka 连接器配置
Kafka连接器用于读写Kafka主题中的数据。配置示例如下:
CREATE TABLE kafka_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'kafka', -- 使用kafka连接器'topic' = 'topic_name', -- Kafka主题名'properties.bootstrap.servers' = 'kafka-broker:9092', -- Kafka服务器地址'format' = 'json' -- 数据格式,例如json'properties.group.id' = 'flink-consumer-group', -- 消费者组ID'scan.startup.mode' = 'earliest-offset', -- 启动模式(earliest-offset, latest-offset, specific-offset, timestamp)'format' = 'json', -- 数据格式'json.fail-on-missing-field' = 'false', -- 是否在字段缺失时失败'json.ignore-parse-errors' = 'true', -- 是否忽略解析错误'properties.security.protocol' = 'SASL_SSL', -- 安全协议(可选)'properties.sasl.mechanism' = 'PLAIN', -- SASL机制(可选)'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";' -- SASL配置(可选)
);
3. JDBC 连接器配置
JDBC连接器用于与其他关系型数据库进行交互。配置示例如下:
CREATE TABLE jdbc_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'jdbc', -- 使用jdbc连接器'url' = 'jdbc:mysql://mysql-host:3306/db', -- JDBC连接URL'table-name' = 'table_name', -- 数据库表名'username' = 'user', -- 数据库用户名'password' = 'password' -- 数据库密码'driver' = 'com.mysql.cj.jdbc.Driver', -- JDBC驱动类'lookup.cache.max-rows' = '5000', -- 可选,查找缓存的最大行数'lookup.cache.ttl' = '10min', -- 可选,查找缓存的TTL(时间到期)'lookup.max-retries' = '3', -- 可选,查找的最大重试次数'sink.buffer-flush.max-rows' = '1000', -- 可选,缓冲区刷新最大行数'sink.buffer-flush.interval' = '2s' -- 可选,缓冲区刷新间隔
);
4. RabbitMQ 连接器配置
RabbitMQ连接器用于与RabbitMQ消息队列进行交互。配置示例如下:
CREATE TABLE rabbitmq_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'rabbitmq', -- 使用rabbitmq连接器'host' = 'rabbitmq-host', -- RabbitMQ主机名'port' = '5672', -- RabbitMQ端口号'username' = 'user', -- RabbitMQ用户名'password' = 'password', -- RabbitMQ密码'queue' = 'queue_name', -- RabbitMQ队列名'exchange' = 'exchange_name' -- RabbitMQ交换机名'routing-key' = 'routing_key', -- 路由键'delivery-mode' = '2', -- 投递模式(2表示持久)'format' = 'json', -- 数据格式'json.fail-on-missing-field' = 'false', -- 是否在字段缺失时失败'json.ignore-parse-errors' = 'true' -- 是否忽略解析错误
);
5. REST Lookup 连接器配置
REST Lookup 连接器允许在 SQL 查询过程中,通过 REST API 进行查找操作。
CREATE TABLE rest_table (id INT,name STRING,price DECIMAL(10, 2),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'rest-lookup','url' = 'http://api.example.com/user/{id}', -- REST API URL,使用占位符 {product_id}'lookup-method' = 'POST' -- 'GET' 或 'POST''format' = 'json', -- 数据格式'asyncPolling' = 'false' -- 可选,指定查找操作是否使用异步轮询模式。默认值为 'false'。当设置为 'true' 时,查找操作会以异步方式执行,有助于提高性能。'gid.connector.http.source.lookup.header.Content-Type' = 'application/json' -- 可选,设置 Content-Type 请求头。用于指定请求体的媒体类型。例如,设置为 application/json 表示请求体是 JSON 格式。'gid.connector.http.source.lookup.header.Origin' = '*' -- 可选,设置 Origin 请求头。通常用于跨域请求。'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff' -- 可选,设置 X-Content-Type-Options 请求头。用于防止 MIME 类型混淆攻击。'json.fail-on-missing-field' = 'false', -- 可选,是否在字段缺失时失败'json.ignore-parse-errors' = 'true' -- 可选,是否忽略解析错误'lookup.cache.max-rows' = '5000', -- 可选,查找缓存的最大行数'lookup.cache.ttl' = '10min', -- 可选,查找缓存的TTL(时间到期)'lookup.max-retries' = '3' -- 可选,查找的最大重试次数
);
6. HDFS 连接器配置
HDFS connector用于读取或写入Hadoop分布式文件系统中的数据。
创建HDFS Source
CREATE TABLE hdfsSource (line STRING
) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/data/input', -- HDFS上的路径。'format' = 'csv' -- 文件格式。
);
创建HDFS Sink
CREATE TABLE hdfsSink (line STRING
) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/data/output','format' = 'csv'
);
FlinkSql数据类型
在FlinkSQL中,数据类型的选择和定义是非常重要的,因为它们直接影响数据的存储和处理方式。FlinkSQL提供了多种数据类型,可以满足各种业务需求。以下是FlinkSQL中的常见数据类型及其详细介绍:
1. 基本数据类型
-
BOOLEAN: 布尔类型,表示
TRUE
或FALSE
。CREATE TABLE example_table (is_active BOOLEAN );
-
TINYINT: 8位带符号整数,范围是
-128
到127
。CREATE TABLE example_table (tiny_value TINYINT );
-
SMALLINT: 16位带符号整数,范围是
-32768
到32767
。CREATE TABLE example_table (small_value SMALLINT );
-
INT: 32位带符号整数,范围是
-2147483648
到2147483647
。CREATE TABLE example_table (int_value INT );
-
BIGINT: 64位带符号整数,范围是
-9223372036854775808
到9223372036854775807
。CREATE TABLE example_table (big_value BIGINT );
-
FLOAT: 单精度浮点数。
CREATE TABLE example_table (float_value FLOAT );
-
DOUBLE: 双精度浮点数。
CREATE TABLE example_table (double_value DOUBLE );
-
DECIMAL(p, s): 精确数值类型,
p
表示总精度,s
表示小数位数。CREATE TABLE example_table (decimal_value DECIMAL(10, 2) );
2. 字符串数据类型
-
CHAR(n): 定长字符串,
n
表示字符串的长度。CREATE TABLE example_table (char_value CHAR(10) );
-
VARCHAR(n): 可变长字符串,
n
表示最大长度。CREATE TABLE example_table (varchar_value VARCHAR(255) );
-
STRING: 可变长字符串,无长度限制。
CREATE TABLE example_table (string_value STRING );
3. 日期和时间数据类型
-
DATE: 日期类型,格式为
YYYY-MM-DD
。CREATE TABLE example_table (date_value DATE );
-
TIME§: 时间类型,格式为
HH:MM:SS
,p
表示秒的小数位精度。CREATE TABLE example_table (time_value TIME(3) );
-
TIMESTAMP§: 时间戳类型,格式为
YYYY-MM-DD HH:MM:SS.sss
,p
表示秒的小数位精度。CREATE TABLE example_table (timestamp_value TIMESTAMP(3) );
-
TIMESTAMP§ WITH LOCAL TIME ZONE: 带有本地时区的时间戳类型。
CREATE TABLE example_table (local_timestamp_value TIMESTAMP(3) WITH LOCAL TIME ZONE );
4. 复杂数据类型
-
ARRAY: 数组类型,
T
表示数组中的元素类型。CREATE TABLE example_table (array_value ARRAY<INT> );
-
MAP<K, V>: 键值对映射类型,
K
表示键的类型,V
表示值的类型。CREATE TABLE example_table (map_value MAP<STRING, INT> );
-
ROW<…>: 行类型,可以包含多个字段,每个字段可以有不同的类型。
CREATE TABLE example_table (row_value ROW<name STRING, age INT> );
5. 特殊数据类型
-
BINARY(n): 定长字节数组,
n
表示长度。CREATE TABLE example_table (binary_value BINARY(10) );
-
VARBINARY(n): 可变长字节数组,
n
表示最大长度。CREATE TABLE example_table (varbinary_value VARBINARY(255) );
数据类型的使用示例
以下是一个包含各种数据类型的表的定义示例:
CREATE TABLE example_table (id INT,name STRING,is_active BOOLEAN,salary DECIMAL(10, 2),birth_date DATE,join_time TIMESTAMP(3),preferences ARRAY<STRING>,attributes MAP<STRING, STRING>,address ROW<street STRING, city STRING, zip INT>
);
为了详细介绍 Flink SQL 中所有内置函数,以下是它们的分类、功能描述以及使用案例:
1. 数学函数
-
ABS(x)
- 描述:返回 x 的绝对值。
- 示例:
SELECT ABS(-5);
返回 5。
-
CEIL(x)
- 描述:返回不小于 x 的最小整数。
- 示例:
SELECT CEIL(4.3);
返回 5。
-
FLOOR(x)
- 描述:返回不大于 x 的最大整数。
- 示例:
SELECT FLOOR(4.7);
返回 4。
-
EXP(x)
- 描述:返回 e 的 x 次方。
- 示例:
SELECT EXP(1);
返回 2.71828。
-
LOG(x)
- 描述:返回 x 的自然对数。
- 示例:
SELECT LOG(2.71828);
返回 1。
-
LOG2(x)
- 描述:返回 x 以 2 为底的对数。
- 示例:
SELECT LOG2(8);
返回 3。
-
LOG10(x)
- 描述:返回 x 以 10 为底的对数。
- 示例:
SELECT LOG10(100);
返回 2。
-
POWER(x, y)
- 描述:返回 x 的 y 次方。
- 示例:
SELECT POWER(2, 3);
返回 8。
-
SQRT(x)
- 描述:返回 x 的平方根。
- 示例:
SELECT SQRT(16);
返回 4。
2. 字符串函数
-
LENGTH(str)
- 描述:返回字符串 str 的长度。
- 示例:
SELECT LENGTH('Flink');
返回 5。
-
UPPER(str)
- 描述:将字符串转为大写。
- 示例:
SELECT UPPER('flink');
返回 ‘FLINK’。
-
LOWER(str)
- 描述:将字符串转为小写。
- 示例:
SELECT LOWER('FLINK');
返回 ‘flink’。
-
SUBSTRING(str, pos, len)
- 描述:返回从字符串 str 的 pos 位置开始长度为 len 的子字符串。
- 示例:
SELECT SUBSTRING('Flink', 1, 2);
返回 ‘Fl’。
-
CONCAT(str1, str2, …)
- 描述:将多个字符串连接成一个字符串。
- 示例:
SELECT CONCAT('Hello', ' ', 'World');
返回 ‘Hello World’。
-
TRIM(str)
- 描述:去除字符串两端的空白字符。
- 示例:
SELECT TRIM(' Flink ');
返回 ‘Flink’。
3. 日期时间函数
-
CURRENT_TIMESTAMP()
- 描述:返回当前的时间戳。
- 示例:
SELECT CURRENT_TIMESTAMP();
返回类似 ‘2024-08-07 12:34:56.789’。
-
CURRENT_DATE()
- 描述:返回当前的日期。
- 示例:
SELECT CURRENT_DATE();
返回类似 ‘2024-08-07’。
-
DATE_FORMAT(timestamp, format)
- 描述:格式化日期时间戳为指定格式的字符串。
- 示例:
SELECT DATE_FORMAT(TIMESTAMP '2024-08-07 12:34:56', 'yyyy-MM-dd HH:mm:ss');
返回 ‘2024-08-07 12:34:56’。
-
TIMESTAMPDIFF(unit, ts1, ts2)
- 描述:返回两个时间戳之间的差值,单位可以是 DAY、HOUR、MINUTE 等。
- 示例:
SELECT TIMESTAMPDIFF(DAY, TIMESTAMP '2024-08-01 00:00:00', TIMESTAMP '2024-08-07 00:00:00');
返回 6。
4. 聚合函数
-
COUNT(expr)
- 描述:计算符合条件的行数。
- 示例:
SELECT COUNT(*) FROM table;
返回表中的行数。
-
SUM(expr)
- 描述:计算 expr 的总和。
- 示例:
SELECT SUM(salary) FROM employees;
返回员工薪资的总和。
-
AVG(expr)
- 描述:计算 expr 的平均值。
- 示例:
SELECT AVG(salary) FROM employees;
返回员工薪资的平均值。
-
MIN(expr)
- 描述:返回 expr 的最小值。
- 示例:
SELECT MIN(salary) FROM employees;
返回员工薪资的最小值。
-
MAX(expr)
- 描述:返回 expr 的最大值。
- 示例:
SELECT MAX(salary) FROM employees;
返回员工薪资的最大值。
5. 条件函数
- CASE WHEN condition THEN result [WHEN …] [ELSE result] END
- 描述:类似于编程语言中的条件语句,根据条件返回不同的结果。
- 示例:
SELECT CASE WHEN salary > 5000 THEN 'High' ELSE 'Low' END FROM employees;
根据薪资返回 ‘High’ 或 ‘Low’。
除了上述基本的数学、字符串、日期时间和聚合函数外,Flink SQL 还提供了一些实际使用中的实用功能,这些功能在数据处理和分析中非常有用。以下是一些实用功能的介绍和示例:
高阶函数
1. 窗口函数
窗口函数允许用户在指定的窗口范围内进行计算,如滑动窗口、滚动窗口和会话窗口。以下是几种常见的窗口函数:
-
TUMBLE
- 描述:创建一个固定大小的滚动窗口。
- 示例:
SELECT TUMBLE_START(ts, INTERVAL '10' MINUTE) AS window_start,COUNT(*) AS cnt FROM table GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
-
HOP
- 描述:创建一个滑动窗口。
- 示例:
SELECT HOP_START(ts, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_start,COUNT(*) AS cnt FROM table GROUP BY HOP(ts, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE);
-
SESSION
- 描述:创建一个会话窗口。
- 示例:
SELECT SESSION_START(ts, INTERVAL '15' MINUTE) AS window_start,COUNT(*) AS cnt FROM table GROUP BY SESSION(ts, INTERVAL '15' MINUTE);
2. 集合操作函数
集合操作函数用于对多个表进行集合运算,如 UNION、INTERSECT 和 EXCEPT。
-
UNION
- 描述:合并两个表,去重。
- 示例:
SELECT * FROM table1 UNION SELECT * FROM table2;
-
UNION ALL
- 描述:合并两个表,不去重。
- 示例:
SELECT * FROM table1 UNION ALL SELECT * FROM table2;
-
INTERSECT
- 描述:返回两个表的交集。
- 示例:
SELECT * FROM table1 INTERSECT SELECT * FROM table2;
-
EXCEPT
- 描述:返回存在于第一个表但不存在于第二个表的记录。
- 示例:
SELECT * FROM table1 EXCEPT SELECT * FROM table2;
3. JSON 函数
Flink SQL 支持处理 JSON 数据的函数,非常适用于处理嵌套结构的数据。
-
JSON_VALUE(json_string, path)
- 描述:从 JSON 字符串中提取指定路径的值。
- 示例:
SELECT JSON_VALUE('{"name": "John", "age": 30}', '$.name') AS name;
-
JSON_QUERY(json_string, path)
- 描述:从 JSON 字符串中提取指定路径的子 JSON。
- 示例:
SELECT JSON_QUERY('{"name": "John", "info": {"age": 30, "city": "New York"}}', '$.info') AS info;
4. 数据类型转换函数
这些函数用于在不同的数据类型之间进行转换。
-
CAST(expr AS type)
- 描述:将 expr 转换为指定的数据类型。
- 示例:
SELECT CAST('2024-08-07' AS TIMESTAMP) AS ts;
-
TRY_CAST(expr AS type)
- 描述:尝试将 expr 转换为指定的数据类型,如果失败则返回 NULL。
- 示例:
SELECT TRY_CAST('abc' AS INT) AS number; -- 返回 NULL
5. 复杂类型函数
处理数组、映射等复杂数据类型的函数。
-
ARRAY
- 描述:创建一个数组。
- 示例:
SELECT ARRAY[1, 2, 3] AS numbers;
-
CARDINALITY(array)
- 描述:返回数组的元素个数。
- 示例:
SELECT CARDINALITY(ARRAY[1, 2, 3]) AS size;
-
ELEMENT(array, index)
- 描述:返回数组中指定索引的元素。
- 示例:
SELECT ELEMENT(ARRAY[1, 2, 3], 2) AS second_element; -- 返回 2
6. 用户自定义函数 (UDF)
Flink SQL 允许用户定义自己的函数以满足特定需求。
- 创建和使用 UDF
- 示例:
// Java 代码示例 public static class MyUdf extends ScalarFunction {public int eval(int x) {return x * x;} }// 注册和使用 UDF tableEnv.createTemporarySystemFunction("MyUdf", MyUdf.class); tableEnv.sqlQuery("SELECT MyUdf(age) FROM people");
- 示例: