2.1、如何在FlinkSQL中读取写出到Kafka

目录

1、环境设置

方式1:在Maven工程中添加pom依赖

方式2:在 sql-client.sh 中添加 jar包依赖

2、读取Kafka

2.1 创建 kafka表

2.2 读取 kafka消息体(Value)

使用 'format' = 'json' 解析json格式的消息

使用 'format' = 'csv' 解析csv格式的消息

使用 'format' = 'raw' 解析kafka消息为单个字符串字段

2.3 读取 kafka消息键(Key)

2.4 读取 kafka元数据(Metadata)

2.5 如何指定起始消费位点

从指定的timestamp开始消费:

从指定的timestamp开始消费:

2.6 创建 kafka表时,添加水位线生成策略

3、写入Kafka

3.1 写入 kafka时,可以指定的元数据


1、环境设置

        Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

        官网链接:官网

方式1:在Maven工程中添加pom依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version>
</dependency>

方式2:在 sql-client.sh 中添加 jar包依赖

        将 flink-sql-connector-kafka-1.17.1.jar 上传到flink的lib目录下 (可以去官网下载jar包)

        或者 启动 sql-client.sh 时,指定jar依赖

bin/sql-client.sh -j lib/flink-sql-connector-kafka-1.17.1.jar

2、读取Kafka

2.1 创建 kafka表

CREATE TABLE SourceKafkaTable (指定物理字段,指定元数据字段,指定水位线生成策略
) WITH ('connector' = 'kafka',                             --【必选】指定 连接器类型,kafka用'kafka''properties.bootstrap.servers' = 'localhost:9092', --【必选】指定 Kafka broker列表,用逗号分隔'topic' = 'user_behavior',                         --【必选】指定 topic列表,用逗号分隔'topic-pattern' = '.*log_kafka.*',                 --【必选】指定 匹配读取 topic 名称的正则表达式, 和 topic 配置一个即可'properties.group.id' = 'testGroup',               --【可选】指定 消费者组id,不指定时会自定生成 KafkaSource-{tableIdentifier}'scan.startup.mode' = 'earliest-offset',           --【可选】指定起始消费位点,默认值 earliest-offset'format' = 'csv'                                   --【必选】指定 消息的格式类型, 和 value.format 是等价的(配置一个即可)
);

2.2 读取 kafka消息体(Value)

在FlinkSQL读取kafka时,可以根据kafka存储的消息的格式,通过 'value.format' = 'csv|raw|json...'

来指定使用哪种格式来对kafka消息进行解析,并将解析的结果映射到表中的字段中去。


使用 'format' = 'json' 解析json格式的消息

当 kafka消息为json格式,可以使用  'format' = 'json' 在创建表时对json串进行解析,并将解析后的结果映射到表中的字段中去

注意:这种方式只能解析单层级的json格式,多层级时无法解析

           如果为多层级json格式时,可以使用raw格式 + udf函数来对json进行解析

导入Maven的pom依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version>
</dependency>

创建FlinkTable

-- TODO 创建用于读取kafka消息的flink表(消息格式为json)
-- kafka消息示例:{"ID":0,"NAME":"大王0"}
CREATE TABLE kafka_table_source_json (`ID` STRING,`NAME` STRING
) WITH ('connector' = 'kafka','topic' = '20231009','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true'
);-- TODO 解析json串时,容错性设置
'json.fail-on-missing-field' = 'false' -- 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
'json.ignore-parse-errors' = 'true'  -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。-- 触发读取kafka操作
select * from kafka_table_source_json;

运行结果:


使用 'format' = 'csv' 解析csv格式的消息

当 kafka消息为csv格式,可以使用  'format' = 'csv' 在创建表时对csv进行解析,并将解析后的结果映射到表中的字段中去

导入Maven的pom依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.17.1</version>
</dependency>

创建FlinkTable

-- TODO 创建用于读取kafka消息的flink表(消息格式为csv)
-- kafka消息示例:2,3.1
CREATE TABLE kafka_table_source_csv (`order_id` BIGINT,`price` DOUBLE
) WITH ('connector' = 'kafka','topic' = 'csv_format','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','value.format' = 'csv'
);-- 触发读取kafka操作
select * from kafka_table_source_csv;

运行结果:


使用 'format' = 'raw' 解析kafka消息为单个字符串字段

可以使用  'format' = 'raw' 将kafka消息以原始格式映射到flink表中的string类型的字段中

创建FlinkTable

-- TODO 创建用于读取kafka消息的flink表(消息格式为json)
-- kafka消息示例:{"ID":0,"NAME":"大王0"}
CREATE TABLE kafka_table_source_raw (`log` STRING
) WITH ('connector' = 'kafka','topic' = '20231009','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','format' = 'raw'
);-- 触发读取kafka操作
select * from kafka_table_source_raw;

运行结果:


2.3 读取 kafka消息键(Key)

kafka消息信息:

{"key":{"ID_1":0,"NAME_1":"大王0"},"value":{"ID":0,"NAME":"大王0"},"metadata":{"offset":0,"topic":"readKey","partition":0}
}

创建FlinkTable

-- 读取kafka消息中的key部分
CREATE TABLE kafka_table_source_read_key (`ID` STRING,`NAME` STRING,`ID_1` STRING,`NAME_1` STRING
) WITH ('connector' = 'kafka','topic' = 'readKey','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','key.format' = 'json','key.json.ignore-parse-errors' = 'true','key.fields' = 'ID_1;NAME_1','value.format' = 'json'
);

2.4 读取 kafka元数据(Metadata)

创建FlinkTable

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
CREATE TABLE kafka_table_source_read_metadata (`log` STRING,`topic` STRING METADATA VIRTUAL, -- 消息所属的 topic`partition` BIGINT METADATA VIRTUAL, -- 消息所属的 partition ID`offset` BIGINT METADATA VIRTUAL, -- 消息在partition中的 offset`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH ('connector' = 'kafka','topic' = 'readKey','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','format' = 'raw'
);select * from kafka_table_source_read_metadata;


2.5 如何指定起始消费位点

scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。有效值为:

  • group-offsets:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • earliest-offset:从可能的最早偏移量开始。
  • latest-offset:从最末尾偏移量开始。
  • timestamp:从用户为每个 partition 指定的时间戳开始。
    • 如果使用了 timestamp,必须使用另外一个配置项              scan.startup.timestamp-millis=时间戳(毫秒值)
  • specific-offsets:从用户为每个 partition 指定的偏移量开始。
    • 如果使用了 specific-offsets,必须使用另外一个配置项 scan.startup.specific-offsets 来为每个 partition 指定起始偏移量, 例如,选项值 partition:0,offset:42;partition:1,offset:300 表示 partition 0 从偏移量 42 开始,partition 1 从偏移量 300 开始

默认值 group-offsets 表示从 Zookeeper/Kafka 中最近一次已提交的偏移量开始消费。

从指定的timestamp开始消费:

// --------------------------------------------------------------------------------------------
//  TODO 从指定的timestamp开始消费
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_startup_timestamp;
CREATE TABLE kafka_table_source_test_startup_timestamp (`log` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',`offset` BIGINT METADATA VIRTUAL
) WITH ('connector' = 'kafka','topic' = '20231009','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'timestamp', -- 从用户为每个 partition 指定的时间戳开始'scan.startup.timestamp-millis' = '1697008386973', -- 从 指定的timestamp开始(包括)消费'value.format' = 'raw'
);select * 
,cast(UNIX_TIMESTAMP(cast(ts as string), 'yyyy-MM-dd HH:mm:ss.SSS') as string) || SPLIT_INDEX(cast(ts as string),'.',1) as timestamp_hmz
from kafka_table_source_test_startup_timestamp;

运行结果:

从指定的timestamp开始消费:

// --------------------------------------------------------------------------------------------
//  TODO 从指定的offset开始消费
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_startup_offsets;
CREATE TABLE kafka_table_source_test_startup_offsets (`log` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',`offset` BIGINT METADATA VIRTUAL
) WITH ('connector' = 'kafka','topic' = '20231009','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'specific-offsets', -- 从用户为每个 partition 指定的偏移量开始'scan.startup.specific-offsets' = 'partition:0,offset:4', -- 为每个 partition 指定起始偏移量'value.format' = 'raw'
);select * from kafka_table_source_test_startup_offsets;

运行结果:


2.6 创建 kafka表时,添加水位线生成策略

// --------------------------------------------------------------------------------------------
//  TODO 创建 kafka表时,添加水位线生成策略
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_watermark;
CREATE TABLE kafka_table_source_test_watermark (`log` STRING,`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',`offset` BIGINT METADATA VIRTUAL,WATERMARK FOR event_time AS event_time -- 根据kafka的timestamp,生成水位线,使用 严格递增时间戳水位线生成策略
) WITH ('connector' = 'kafka','topic' = '20231009','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'specific-offsets', -- 从用户为每个 partition 指定的偏移量开始'scan.startup.specific-offsets' = 'partition:0,offset:4', -- 为每个 partition 指定起始偏移量'value.format' = 'raw'
);select * from kafka_table_source_test_watermark;

3、写入Kafka

3.1 写入 kafka时,可以指定的元数据

// --------------------------------------------------------------------------------------------
//  TODO 通过flinksql向kafka写入数据(写入时指定 timestamp)
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_startup_mode;
CREATE TABLE kafka_table_source_test_startup_mode (`order_id` BIGINT,`price` DOUBLE,`ts` TIMESTAMP(3) METADATA FROM 'timestamp',`offset` BIGINT METADATA VIRTUAL
) WITH ('connector' = 'kafka','topic' = '20231011','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','value.format' = 'csv'
);insert into kafka_table_source_test_startup_mode(order_id, price,ts)
SELECT * FROM (VALUES(1, 2.0,TO_TIMESTAMP_LTZ(1000, 3))
, (2, 4.0,TO_TIMESTAMP_LTZ(2000, 3))
, (3, 6.0,TO_TIMESTAMP_LTZ(3000, 3))
, (4, 7.0,TO_TIMESTAMP_LTZ(4000, 3))
, (5, 8.0,TO_TIMESTAMP_LTZ(5000, 3))
, (6, 10.0,TO_TIMESTAMP_LTZ(6000, 3))
, (7, 12.0,TO_TIMESTAMP_LTZ(7000, 3))
) AS book (order_id, price,ts);-- 触发读取kafka操作
select * from kafka_table_source_test_startup_mode;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/158163.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【从0开发】百度BML全功能AI开发平台【实操:以部署情感分析模型为例】

目录 一、全功能AI开发平台介绍二、AI项目落地应用流程&#xff08;以文本分类为例&#xff09;2-0、项目开始2-1、项目背景2-2、数据准备介绍2-3、项目数据2-4、建模调参介绍2-5、项目的建模调参2-6、开发部署2-7、项目在公有云的部署 附录&#xff1a;调用api代码总结 一、全…

百度车牌识别AI Linux使用方法-armV7交叉编译

1、获取百度ai的sdk 百度智能云-登录 (baidu.com) 里面有两个版本的armV7和armV8架构。v7架构的性能比较低往往需要交叉编译&#xff0c;v8的板子性能往往比较好&#xff0c;可以直接在板子上编译。 解压到ubuntu里面。这里介绍v7架构的。 2、ubuntu环境配置 ubuntu下安装软件…

Bootstrap-媒体类型

加上媒体查询之后&#xff0c;只有在特定的设备之下才能起作用&#xff01;&#xff01;&#xff01;

Win10找不到hosts文件的解决方案

正常情况下&#xff0c;Windows10系统的C:\Windows\System32\drivers\etc目录下应该有hosts文件&#xff0c;但偏偏有些电脑没有&#xff0c;哪怕你打开了查看“隐藏的项目”也没见到hosts文件&#xff0c;如下&#xff1a; 解决方案 1、先点击查看&#xff0c;再点击选项&…

Ant Design Form.List基础用法

使用 Form.List 使用 项目中需要在新增可以多个如图 代码如下 // An highlighted block <Card title"产品信息" bordered{false}><Form.List name"productList" >{(fields, {add, remove}) > (<>{fields.map((field) > (<Ro…

苹果放出快捷指令专题介绍页面,大大提高了 Mac 使用效率

近日&#xff0c;苹果发布 macOS Sonoma 更新的同时&#xff0c;还上线了“《快捷指令》助你效率倍增”专题页面&#xff0c;其目标是在 Mac 上让好用的 App 更强大。 快捷指令功能可以让设备自动完成常用或繁琐的操作&#xff0c;大大提升 Mac 的效率。 快捷指令能帮你在《邮…

JVM上篇之类加载子系统

目录 类加载子系统 内存结构 类的生命周期 类的加载过程 加载 加载class文件方式 连接 验证 验证阶段 准备 解析 初始化 类加载器 介绍 作用 分类 引导类加载器 自定义类加载器 ClassLoader 获取ClassLoader途径 双亲委派机制 介绍 执行流程 好处 打破…

软件UI自动化测试应该怎么做?对软件产品起到什么作用?

在软件开发过程中&#xff0c;开发人员需要编写大量的代码来实现软件产品的功能。而这些功能往往需要在用户界面上进行展示和操作&#xff0c;称为UI(User Interface)。UI自动化测试是为了检测软件界面是否符合预期的设计和用户操作&#xff0c;通过自动化测试工具和脚本&#…

Idea执行Pom.xml导入jar包提示sun.misc.BASE64Encoder jar找不到---SpringCloud工作笔记197

奇怪之前都是好好的,这个是因为,jdk的版本不对,重新打开以后自动被选择成jdk11了...记录一下 原因是从jdk9的时候,这个jar包已经被删除了,所以会报错,如果你用的是jdk自带的这个jar包就会报错,那么还可以,修改,不让他用jdk的,让他用 用org.apache.commons.codec.binary.Base64…

NSDT编辑器实现数字孪生

数字孪生的强大功能来自于将真实世界的资产与真实世界的数据联系起来&#xff0c;因此您可以更好地可视化它们。数字孪生使跨职能团队能够以交互式和沉浸式方式协作设计、构建、测试、部署和操作复杂系统。 如何创建数字孪生&#xff1f; 数字孪生是通过导入概念模型&#xf…

PBA.客户需求分析 需求管理

一、客户需求分析 1 需求的三个层次: Requirement/Wants/Pains 大部分人认为&#xff0c;产品满足不了客户需要&#xff0c;是因为客户告知的需求是错误的&#xff0c;这听起来有一些道理&#xff0c;却没有任何意义。不同角色对于需求的理解是不一样的。在客户的需求和厂家的…

四、RIP动态路由实验

拓扑图&#xff1a; 基本ip的配置已经配置好了&#xff0c;接下来对两台路由器配置rip协议&#xff0c;两台PC进行跨网段通讯 RIPv1版本只能识别ABC的大类网段&#xff0c;不能区分子网掩码&#xff0c;v2版本可以识别子网掩码 首先进入R1&#xff0c;进入rip&#xff0c;宣告…

栈实现深度优先搜索

引言 之前刚学DFS的时候并不完全理解为什么递归可以一直往下做&#xff0c;后来直到了递归的本质是栈&#xff0c;就想着能不能手写栈来代替递归呢。当时刚学&#xff0c;自己觉得水平不够就搁置了这个想法&#xff0c;今天上数据结构老师正好讲了栈的应用&#xff0c;其中就有…

华为云云耀云服务器L实例评测 | 实例评测使用之硬件参数评测:华为云云耀云服务器下的 Linux 网络监控神器 bmon

华为云云耀云服务器L实例评测 &#xff5c; 实例评测使用之硬件参数评测&#xff1a;华为云云耀云服务器下的 Linux 网络监控神器 bmon 介绍华为云云耀云服务器 华为云云耀云服务器 &#xff08;目前已经全新升级为 华为云云耀云服务器L实例&#xff09; 华为云云耀云服务器是什…

Python一步到位实现图像转PDF自动化处理详解

什么是 img2pdf 库&#xff1f; img2pdf 是一个 Python 库&#xff0c;它可以让你轻松地把多张图像转换为 PDF 文件。它支持多种图像格式&#xff0c;如 JPG, PNG, GIF, BMP 等&#xff0c;并且可以自动调整图像的大小和方向&#xff0c;以适应 PDF 的页面大小和方向。它还可以…

数据仓库DW-理论知识储备

数据仓库DW 数据仓库具备 采集数据、分析数据、存储数据的功能&#xff0c;最后得出一些有用的数据&#xff0c;一些目标数据来使用。 采集来自不同源的数据&#xff0c;然后对这些数据进行分析和计算得出一些有用的指标&#xff0c;提供数据决策支持。 数据的来源有&#xff…

供应链 | 零售商-供应商柔性承诺契约:一种鲁棒优化方法 (一)

论文解读&#xff1a;毕鑫宇 作者&#xff1a;Aharon Ben-Tal, Boaz Golany, Arkadi Nemirovski, Jean-Philippe Vial 引用&#xff1a;Ben-Tal, A., Golany, B. , Nemirovski, A., & Vial, J. P… (2005). Retailer-supplier flexible commitments contracts: a robust op…

SpringBoot篇之集成Jedis、Lettuce、Redisson

目录 前言一、详解Jedis、Lettuce 和 Redisson的区别二、SpringBoot集成2.1 集成Jedis2.2 集成Lettuce2.3 集成Redisson 总结 前言 大家好&#xff0c;我是AK&#xff0c;最近在做新项目&#xff0c;基于旧项目框架修改&#xff0c;正好最近也在整理springboot相关知识&#x…

游戏中的随机——“动态平衡概率”算法

前言 众所周知计算机模拟的随机是伪随机&#xff0c;但在结果看来依然和现实中的随机差别不大。 例如掷硬币&#xff0c;连续掷很多很多次之后&#xff0c;总有连续七八十来次同一个面朝上的情况出现&#xff0c;计算机中一般的随机函数也能很好模拟这一点。 但在游戏中&…

使用ChatGPT和MindShow一分钟生成PPT模板

对于最近学校组织的实习答辩&#xff0c;由于时间太短了&#xff0c;而且小编也特别的忙&#xff0c;于是就用ChatGPT结合MindShow一分钟快速生成PPT&#xff0c;确实很实用。只要你跟着小编后面&#xff0c;你也可以快速制作出这个PPT&#xff0c;下面小编就来详细介绍一下&am…