【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理

文章目录

  • 01 基本概念
  • 02 工作原理
  • 03 数据流实现
  • 04 项目实战
    • 4.1 项目结构
    • 4.2 maven依赖
    • 4.3 StreamFormat读取文件数据
    • 4.4 BulkFormat读取文件数据
    • 4.5 使用小结
  • 05 数据源比较
  • 06 总结

01 基本概念

Apache Flink 是一个流式处理框架,被广泛应用于大数据领域的实时数据处理和分析任务中。在 Flink 中,FileSource 是一个重要的组件,用于从文件系统中读取数据并将其转换为 Flink 的数据流。本文将深入探讨 FileSource 的工作原理、用法以及与其他数据源的比较。

02 工作原理

FileSource 是 Flink 提供的一种用于从文件系统中读取数据的源。它能够处理各种类型的文件,包括文本文件、压缩文件、序列文件等。FileSource 的工作原理可以概括为以下几个步骤:

1.文件分配(File Assignment)

在 Flink 集群中,每个任务都会负责读取文件的一个分片。FileSource 会根据文件的大小和数量将文件分配给不同的任务进行处理。

2.并行读取(Parallel Reading)

每个任务会并行地读取分配给它的文件分片。这意味着文件中的数据会被同时读取,从而提高了整体的读取速度和处理效率。

3.数据解析(Data Parsing)

读取的数据会经过解析器进行解析,将其转换为 Flink 中的数据结构,如 DataSet 或 DataStream。

4.数据分发(Data Distribution)

解析后的数据会被分发到后续的算子中进行进一步的处理和分析。

03 数据流实现

  • 有界流(Bounded Streams)

    有界流是指具有明确结束点的数据流,即数据流在某个时刻会结束,数据量是有限的。例如,从静态文件、数据库或有限数据集中读取的数据流就是有界流。有界流的特点包括:

    • 数据量是有限的,流的结束点是已知的。
    • 可以对整个数据流进行批处理式的分析和处理,因为所有数据都可用且有限。
    • 可以使用批处理算法和优化技术,例如排序、分组聚合等。
  • 无界流(Unbounded Streams)

    无界流是指没有明确结束点的数据流,即数据流会持续不断地产生,数据量可能是无限的。例如,实时传感器数据、日志流、消息队列中的数据等都是无界流。无界流的特点包括:

    • 数据源持续不断地产生数据,流没有明确的结束点。
    • 通常用于实时流式处理,要求系统能够实时处理数据并在流中进行持续的分析和计算。
    • 需要采用流式处理的技术和算法,例如窗口计算、流式聚合、事件时间处理等。
  • 不同数据流实现

    • 创建一个 File Source 时, 默认情况下,Source 为有界/批的模式;

      //创建一个FileSource数据源,并设置为批模式,读取完文件后结束
      final FileSource<String> source = FileSource.forRecordStreamFormat(...).build();
      
    • 设置参数monitorContinuously(Duration.ofMillis(5)) 可以把 Source 设置为持续的流模式

      //创建一个FileSource数据源,并设置为流模式,每隔5分钟检查路径新文件,并读取
      final FileSource<String> source = FileSource.forRecordStreamFormat(...).monitorContinuously(Duration.ofMillis(5))  .build();   
      

04 项目实战

1.FileSource支持多种数据格式数据读取与解析,本期以Text File文件为例展开。
2.jdk版本11
3.Flink版本1.18.0
4.下面是两个简单的示例代码,演示如何在 Flink 中使用 FileSource 读取文件数据

4.1 项目结构

在这里插入图片描述

4.2 maven依赖

<!-- flink读取Text File文件依赖 start-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.18.0</version>
</dependency>
<!-- flink读取Text File文件依赖 end--><!-- flink基础依赖 start -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.18.0</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.18.0</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version>
</dependency><!-- flink基础依赖 end -->

4.3 StreamFormat读取文件数据

  • StreamFormat从文件流中读取文件内容。它是最简单的格式实现, 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),但是限制了可应用的优化(例如对象重用,批处理等等)。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;/*** 描述:* flink集成FileSource & forRecordStreamFormat使用 & 流模式* StreamFormat:从文件流中读取文件内容。它是最简单的格式实现,* 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),* 但是限制了可应用的优化(例如对象重用,批处理等等)。** @author 浅夏的猫* @version 1.0.0* @date 2024-02-07 15:30:22*/
public class FileSourceRecordStreamingJob {public static void main(String[] args) throws Exception {// 创建 需要读取的文件路径PathPath path = new Path("D:\\flink\\file_source.txt");// 创建 读取文件的格式函数TextLineInputFormat textLineInputFormat = new TextLineInputFormat();// 创建 FileSourceFileSource<String> fileSource = FileSource.forRecordStreamFormat(textLineInputFormat, path)//放开注释则使用流模式,每隔5分钟检查是否有新文件否则默认使用批模式
//                .monitorContinuously(Duration.ofMillis(5)).build();// 创建 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 添加 FileSource 到数据流env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();// 执行任务env.execute("FileSourceRecordStreamingJob");}
}

4.4 BulkFormat读取文件数据

  • BulkFormat从文件中一次读取一批记录,虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/*** 描述:flink集成FileSource & forBulkFileFormat使用 & 流模式* BulkFormat:从文件中一次读取一批记录。 它虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。** @author 浅夏的猫* @version 1.0.0* @date 2024-02-07 15:30:22*/
public class FileSourceBulkStreamingJob {public static void main(String[] args) throws Exception {//创建 批量读取文件的格式函数,其实底层还是通过对单行文件读取BulkFormat<String, FileSourceSplit> bulkFormat = new StreamFormatAdapter<>(new TextLineInputFormat());// 创建 FileSourceFileSource<String> fileSource = FileSource.forBulkFileFormat(bulkFormat, new Path("D:\\flink\\file_source.txt"))//放开注释则使用流模式,每隔5分钟检查是否有新文件,否则默认使用批模式
//                .monitorContinuously(Duration.ofMillis(5)).build();// 创建 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 添加 FileSource 到数据流env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();// 执行任务env.execute("FileSourceBulkStreamingJob");}
}

4.5 使用小结

在上面的示例中,我们使用FileSource方法从指定路径读取文本文件,并将其转换为一个数据流,选择不同的输入格式和解析方式,然后我们调用 print 方法将数据流中的数据打印出来。

05 数据源比较

FileSource 是 Flink 中常用的数据源之一,与其他数据源相比,它具有一些优势和劣势,根据实际情况和需求,可以选择不同的数据源来满足任务的要求。

  • 优势

    • 支持读取大规模的文件数据,适用于大数据处理场景。
    • 支持并行读取和处理,能够充分利用集群资源,提高处理效率。
    • 支持多种文件格式和压缩方式,灵活性强。
  • 劣势

    • 对于小文件的处理效率较低,可能会导致资源浪费和性能下降。
    • 无法实时监控文件的变化,需要手动触发重新读取。

06 总结

FileSource 是 Apache Flink 中用于读取文件数据的重要组件,它能够高效地处理大规模的文件数据,并提供丰富的功能和灵活的用法。通过深入了解 FileSource 的工作原理和用法,可以更好地利用 Flink 来实现大规模数据文件的处理和分析任务。

通过以上详细介绍,可以对 Apache Flink 中的 FileSource 有一个全面的了解,从而更好地应用于实际的数据处理项目中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/260062.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

虚拟机+麒麟海光+达梦数据库linux 安装教程

一 下载 虚拟机下载地址下载 VMware Workstation Pro | CN 达梦数据库下载地址 产品下载 | 达梦数据库 (dameng.com) 银河麒麟下载地址 国产操作系统、银河麒麟、中标麒麟、开放麒麟、星光麒麟——麒麟软件官方网站 (kylinos.cn) 二 安装 虚拟机安装 https://www.cnblogs…

嵌入式学习C++ Day7

嵌入式学习C Day7 一、思维导图 二、作业

设计模式之委派模式

文章目录 前言正文一、生活中的例子二、Java代码实现2.1 类设计2.2 代码实现2.2.1 Employee2.2.2 ArchitectureDesignEmployer2.2.3 BackEmployer2.2.4 FrontEmployer2.2.5 Leader2.2.6 EmployeeStrongPointEnum2.2.7 Boss 2.3 测试2.3.1 Client2.3.2 测试结果 三、委派模式的优…

Docker Desktop 链接windos 安装的redis和mysql

1.1.先在容器安装项目 2.链接redis和mysql配置 redis和mysql是在windos安装的&#xff0c;使用的是小p管理器安装的 项目链接 DB_DRIVERmysql DB_HOSThost.docker.internal DB_PORT3306 DB_DATABASEyunxc_test DB_USERNAMEyunxc_test DB_PASSWORDtest123456... DB_CHARSETutf…

【医学大模型】Text2MDT :从医学指南中,构建医学决策树

Text2MDT &#xff1a;从医学指南中&#xff0c;构建医学决策树 提出背景Text2MDT 逻辑Text2MDT 实现框架管道化框架端到端框架 效果 提出背景 论文&#xff1a;https://arxiv.org/pdf/2401.02034.pdf 代码&#xff1a;https://github.com/michael-wzhu/text2dt 假设我们有一…

Vue-route核心知识整理

目录 1 相关理解 1.1 对 vue-router 的理解 1.2 对 SPA 应用的理解 1.3 对路由的理解 1.3.1 什么是路由&#xff1f; 1.3.2 路由的分类 2 几个注意点 3 路由的基本使用 4 嵌套 (多级) 路由 5 路由传参 5.1 query 方式传参 5.1.1 跳转路由并携带query参数&#xff0…

微信小程序-绑定数据并在后台获取它

如图 遍历列表的过程中需要绑定数据&#xff0c;点击时候需要绑定数据 这里是源代码 <block wx:for"{{productList}}" wx:key"productId"><view class"product-item" bindtap"handleProductClick" data-product-id"{{i…

Web3区块链游戏:创造虚拟世界的全新体验

随着区块链技术的不断发展&#xff0c;Web3区块链游戏正逐渐崭露头角&#xff0c;为玩家带来了全新的虚拟世界体验。传统游戏中的中心化结构和封闭经济体系已经被打破&#xff0c;取而代之的是去中心化的游戏环境和真实所有权的数字资产。本文将深入探讨Web3区块链游戏的特点、…

代码随想录算法训练营29期|day55 任务以及具体安排

第九章 动态规划part12 309.最佳买卖股票时机含冷冻期 class Solution {public int maxProfit(int[] prices) {//0代表持股票&#xff0c;1代表保持卖出状态&#xff0c;2代表卖出股票。3代表冷冻int[][] dp new int[prices.length][4];dp[0][0] -prices[0];for(int i 1 ; …

MySQL数据库基础(十):DQL数据查询语言

文章目录 DQL数据查询语言 一、数据集准备 二、select查询 三、简单查询 四、条件查询 1、比较查询 2、范围查询 3、逻辑查询 4、模糊查询 5、非空查询 五、排序查询 六、聚合查询 七、分组查询与having子句 1、分组查询介绍 2、group by的使用 3、group by 聚…

web基础及http协议 (二) apache

一、httpd 安装组成 http 服务基于 C/S 结构 1 .常见http 服务器程序 httpd apache&#xff0c;存在C10K&#xff08;10K connections&#xff09;问题 nginx 解决C10K问题lighttpd IIS .asp 应用程序服务器 tomcat .jsp 应用程序服务器 jetty 开源的servlet容器&#xf…

vm centos7 docker 安装 mysql 5.7.28(2024-02-18)

centos系统版本 [rootlocalhost mysql5.7]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) docker版本 拉取指定版本镜像 docker pull mysql:5.7.28 docker images 创建挂载目录&#xff08;数据存储在centos的磁盘上&#xff09; mkdir -p /app/softwa…

ElscticSearch基础操作

Es数据格式和Mysql对比 ElasticSearch index(索引) Type(类型) Documents(文档) Fields(字段) ​ MySQL Databases(数据库) Table(表) Row(行) Column(列) 倒排索引 正向索引,在Mysql中使用的索引就是正排索引,索引对应的就是直接的数据 例子: id content 1 my name is …

【JVM篇】什么是类加载器,有哪些常见的类加载器

文章目录 &#x1f354;什么是类加载器&#x1f6f8;有哪些常见的类加载器 &#x1f354;什么是类加载器 负责在类加载过程中&#xff0c;将字节码信息以流的方式获取并加载到内存当中 &#x1f6f8;有哪些常见的类加载器 启动类加载器 启动类加载器是有Hotspot虚拟机通过的类…

每日一题 力扣107 二叉树的层序遍历Ⅱ

107. 二叉树的层序遍历 II 题目描述&#xff1a; 给你二叉树的根节点 root &#xff0c;返回其节点值 自底向上的层序遍历 。 &#xff08;即按从叶子节点所在层到根节点所在的层&#xff0c;逐层从左向右遍历&#xff09; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20…

Github 2024-02-18 开源项目日报 Top10

根据Github Trendings的统计&#xff0c;今日(2024-02-18统计)共有10个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目5PowerShell项目1Rust项目1PHP项目1Jupyter Notebook项目1TypeScript项目1 Black&#xff1a;不妥…

把Llama2封装为API服务并做一个互动网页

最近按照官方例子&#xff0c;把Llama2跑起来了测试通了&#xff0c;但是想封装成api服务&#xff0c;耗费了一些些力气 参考&#xff1a;https://github.com/facebookresearch/llama/pull/147/files 1. 准备的前提如下 按照官方如下命令&#xff0c;可以运行成功 torchrun -…

程序员必看的几部电影

目录 《我是谁&#xff1a;没有绝对安全的系统》 《模仿游戏》 《硅谷传奇》 《代码 The Code》 作为程序员&#xff0c;除了在工作中不断学习和提升技术外&#xff0c;适当地放松也是必不可少的 看电影可以是一个很好的放松方式&#xff0c;而对于程序员来说&#xff0c;…

DNS服务正反解析

1.正向解析 1.配置基本 1.1防火墙配置 二者都要关闭 setenforce 0 systemctl stop firewalld #关闭防火墙 yum install bind -y #下载bind软件 客户端可以不用下 1.2服务端配置静态ip&#xff0c; ip a 查看网卡 nmcli c modify ens33 ipv4.method manual ipv4.addresses …

HTTP特性

大家好我是苏麟 , 今天说说HTTP特性. 资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) 到目前为止&#xff0c;HTTP 常见到版本有 HTTP/1.1&#xff0c;HTTP/2.0,HTTP/3.0&#xff0c;不同版本的 HTTP 特性是不一样的。 这里先用 HTTP/1.1 版本给大家介…