2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)

目录

1、文件系统 SQL 连接器

2、如何指定文件系统类型

3、如何指定文件格式

4、读取文件系统

4.1 开启 目录监控 

4.2 可用的 Metadata

5、写出文件系统

5.1 创建分区表

5.2 滚动策略、文件合并、分区提交

5.3 指定 Sink Parallelism

6、示例_通过FlinkSQL读取kafka在写入hive表

6.1、创建 kafka source表用于读取kafka

6.2、创建 hdfs sink表用于写出到hdfs

6.3、insert into 写入到 hdfs_sink_table

6.4、查询 hdfs_sink_table

6.5、创建hive表,指定local


1、文件系统 SQL 连接器

文件系统连接器允许从本地分布式文件系统进行读写数据

官网链接:文件系统 SQL 连接器


2、如何指定文件系统类型

创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

参考官网:文件系统类型

CREATE TABLE filesystem_table (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem',-- 本地文件系统'path' = 'file:///URI',-- HDFS文件系统'path' = 'hdfs://URI',-- 阿里云对象存储 'path' = 'oss://URI','format' = 'json'
);

3、如何指定文件格式

FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

来对数据进行解析后映射到表中的字段中

CREATE TABLE filesystem_table_file_format (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem',-- 指定文件格式类型'format' = 'json|csv|orc|raw'
);

4、读取文件系统

FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

注意:

        1、当读取目录时,对目录中的文件进行 无序的读取

        2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

             当开启目录监控(source.monitor-interval)时,才是流处理模式

4.1 开启 目录监控 

通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

注意:

        只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

-- 目录监控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (id INT,name STRING,`file.name` STRING NOT NULL METADATA
) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016','format' = 'json','source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
);-- 持续读取
select * from filesystem_source_table;

4.2 可用的 Metadata

使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

注意: 所有 metadata 都是只读的

-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (id INT,name STRING,`file.path` STRING NOT NULL METADATA,`file.name` STRING NOT NULL METADATA,`file.size` BIGINT NOT NULL METADATA,`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012','format' = 'json'
);select * from filesystem_source_table_read_metadata;

运行结果:


5、写出文件系统

5.1 创建分区表

FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

-- 创建分区表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012','partition.default-name' = 'default_partition','format' = 'json'
);-- 动态分区写入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES(1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);-- 静态分区写入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES(1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);-- 查询分区表数据
select * from filesystem_source_table_partition where ds = '20231010';

5.2 滚动策略、文件合并、分区提交

可以看之前的博客:flink写入文件时分桶策略

官网链接:官网分桶策略


5.3 指定 Sink Parallelism

当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING,  -- 分区字段,天`hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka','sink.parallelism' = '2', -- 指定sink算子并行度'format' = 'raw'
);

6、示例_通过FlinkSQL读取kafka在写入hive表

需求:

        使用FlinkSQL将kafka数据写入到hdfs指定目录中

        根据kafka的timestamp进行分区(按小时分区)

6.1、创建 kafka source表用于读取kafka

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(`log` STRING,`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH ('connector' = 'kafka','topic' = '20231017','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','format' = 'raw'
);

6.2、创建 hdfs sink表用于写出到hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING,  -- 分区字段,天`hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka','sink.parallelism' = '2', -- 指定sink算子并行度'format' = 'raw'
);

6.3、insert into 写入到 hdfs_sink_table

-- 流式 sql,插入文件系统表
insert into hdfs_sink_table
select log,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

6.4、查询 hdfs_sink_table

-- 批式 sql,使用分区修剪进行选择
select * from hdfs_sink_table;

6.5、创建hive表,指定local

create table `kafka_to_hive` (
`log` string comment '日志数据')comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string) 
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/161816.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python从入门到进阶】39、使用Selenium自动验证滑块登录

接上篇《38、selenium关于Chrome handless的基本使用》 上一篇我们介绍了selenium中有关Chrome的无头版浏览器Chrome Handless的使用。本篇我们使用selenium做一些常见的复杂验证功能,首先我们来讲解如何进行滑块自动验证的操作。 一、测试用例介绍 我们要通过sel…

C# Winform编程(5)菜单和菜单组件

菜单和菜单组件 添加菜单编辑菜单 添加菜单 将MenuStrip控件拖拽到Form窗体顶部添加菜单 编辑菜单 添加菜单项,编辑菜单属性等功能。 右键单击已添加的菜单项可以弹出右键菜单: 可以设置菜单图标,使能菜单,显示快捷键、转换菜…

C语言初学者工具选择:vscode + MSYS2 + cmake 搭建 C环境

文章目录 前言1. MSYS2 安装1. 下载安装包2. 安装3. pacman 换清华大学源4. 安装 mingw-w64 toolchain 和 cmake ninja5. 将 toolchain 加入系统环境变量 2. 设置 vscode1. 必要的插件2. 一个简单的 vscode cmake 项目 最后C数据结构与算法CMake 前言 网上关于使用 vscode 配…

【计算机网络】网络原理

目录 1.网络的发展 2.协议 3.OSI七层网络模型 4.TCP/IP五层网络模型及作用 5.经典面试题 6.封装和分用 发送方(封装) 接收方(分用) 1.网络的发展 路由器:路由指的是最佳路径的选择。一般家用的是5个网口,1个WAN口4个LAN口(口:端口)。可…

如何修改运行中的docker容器的端口映射

一、必须先关闭docker服务 systemctl stop docker二、修改容器的hostconfig.json 文件 路径:vim /var/lib/docker/containers/容器id/hostconfig.json 修改 PortBindings 属性,如下图: 然后保存 三、修改config.v2.json 文件 路径&#…

[机缘参悟-110] :一个IT人对面具的理解:职业面具戴久了,就会忘记原本真实的自己,一个人是忠于职位,还是忠于内心?

目录 一、职业面具戴久了,就会忘记原本真实的自己 二、霸王别姬 三、没有对错,各走各路 3.1 程蝶衣:戏里戏外,忠于角色 3.2 段小楼:戏里戏外,角色分明 3.3 没有对错,各走各路 四、职场中…

Linux文件管理与用户管理

一、查看文件内容 1、回顾之前的命令 cat命令、tac命令、head命令、tail命令、扩展:tail -f动态查看一个文件的内容 2、more分屏显示文件内容(了解) 基本语法: # more 文件名称 特别注意:more命令在加载文件时并不…

本地安装telepresence,访问K8S集群 Mac(m1) 非管理員

kubeconfig 一.安装telepresence 1.安装 Telepresence Quickstart | Telepresence (1)brew install datawire/blackbird/telepresence 2.配置 目录kubectl 将使用默认的 kubeconfig 文件:$HOME/.kube/config 创建文件夹&…

分享一个比对图片是否一致的小工具(来源: github)

运行效果图: 官网: GitHub - codingfishman/image-diff: 一个方便的图片对比工具一个方便的图片对比工具. Contribute to codingfishman/image-diff development by creating an account on GitHub.https://github.com/codingfishman/image-diff 优缺点: 1.采用比对各色块是…

idea 里 没有svn选项的处理办法

总结一下没有svn选项的几种情况: 情况1:IntelliJ IDEA打开带SVN信息的项目不显示SVN信息,项目右键SVN以及图标还有Changes都不显示解决方法 在VCS菜单中有个开关,叫Enabled Version Control Integration,在打开的窗口…

ZY Player:影视爱好者的万能播放器

如果你是一位影视爱好者,一定有过为寻找一款支持各种影视资源、能解析VIP权限的播放器而头疼的经历。今天,我要为大家介绍一款被称为万能影视资源播放器的ZY Player,它由网友Hiram-Wong二次开发,并且是开源免费的 导航 强大的影视…

基于Java的旅游网站系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

数据模型设计必读方法论!很实用

数据架构的重要构件之一是数据模型,当然从数据架构的视角来说的数据模型是指企业级数据模型。本篇文章更多是讨论如何设计和管理数据模型,此处的数据模型是泛指在组织中通过数据建模的过程,来发现、分析和确定数据需求范围,并用于…

分享Java NET Python三大技术下AutojsPro7云控代码

引言 有图有真相,那短视频就更是真相了。下面是三大语言的短视频。 Java源码版云控示例: Java源码版云控示例在线视频 Net源码版云控示例: Net源码版云控示例在线视频亚丁号-知识付费平台 支付后可见 扫码付费可见 Python源码版云控示例&…

彻底理解操作系统与内核的区别!

通用底盘技术 Canoo公司有一项核心技术专利,这就是它们的通用电动底盘技术,长得是这个样子,非常像一个滑板: 这个带轮子、有电池、能动的滑板已经包含了一辆车最核心的组件,差的就是一个外壳。这个看起来像滑板的东西…

基于SpringBoot的大学生体质测试管理系统

基于SpringBoot的大学生体质测试管理系统的设计与实现~ 开发语言:Java数据库:MySQL技术:SpringBootMyBatisVue工具:IDEA/Ecilpse、Navicat、Maven 系统展示 主页 管理员界面 教师界面 学生界面 摘要 大学生体质测试管理系统是一…

Flink的ResourceManager详解(一)

ResourceManager 总结 一、概述 1、ResourceManager 管理 Flink 集群中的计算资源,计算资源主要来自 TaskManager 组件。 2、如果集群采用 Native【本地模式】部署,则 ResourceManager 会动态地向集群资源管理器申请 Container 并启动TaskManager&…

python+django医患档案电子病历管理系统7ld2o

本课题使用Python语言进行开发。代码层面的操作主要在PyCharm中进行,将系统所使用到的表以及数据存储到MySQL数据库中,方便对数据进行操作本课题基于WEB的开发平台 1.运行环境:python3.7/python3.8。 2.IDE环境:pycharmmysql5.7; …

ArcGIS笔记5_生成栅格文件时保存报错怎么办

本文目录 前言Step 1 直接保存到指定文件夹会报错Step 2 先保存到默认位置再数据导出到指定文件夹 前言 有时生成栅格文件时,保存在自定义指定的文件夹内会提示出错,而保存到默认位置则没有问题。因此可以通过先保存到默认位置,再数据导出到…

linux进程间通讯--信号量

1.认识信号量 方便理解:信号量就是一个计数器。当它大于0能用,小于等于0,用不了,这个值自己给。 2.特点: 信号量用于进程间同步,若要在进程间传递数据需要结合共享内存。信号量基于操作系统的 PV 操作&am…