4.2、Flink任务怎样读取文件中的数据

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath));  // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter());  // 指定Flink中的数据类型    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName);     // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }

代码示例:

    public static void readTextFile() throws Exception {/** TODO 功能说明*   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}

3、readFile(已过时,不推荐使用)

语法说明:

定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath    : 指定 文件路径watchType   : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval    : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
FileInputFormat 的实现类
FileInputFormat 的实现类

代码示例:

    public static void readFile() throws Exception {/** TODO 功能说明*    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。*    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)*       按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:*      @inputFormat : 指定文件输入格式*      @filePath    : 指定文件路径*      @watchType   : 指定监控类型,提供了两种读取策略*            PROCESS_ONCE : 只读取一次*            PROCESS_CONTINUOUSLY :持续读取,监控新增数据*      @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:*      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该*           文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/82447.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】数据结构与算法:常用排序算法

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍常用排序算法。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次更新不迷路&#x1…

单片机、嵌入式的大神都平时浏览什么网站?

我平时也喜欢收藏些有关嵌入式的学习网站&#xff0c;压箱底的记录翻出来总结下 1、综合网站 哔哩哔哩 (゜-゜)つロ 干杯~-bilibili//B站是一个有很多好资料的网站 https://github.com/nhivp/Awesome-Embedded //github开源项目网站&#xff0c;这个是我找到嵌入式综合相关的…

PS的一些智能对象是怎么用的?用于包装设计该怎么使用?

大家都对一些效果图不太理解&#xff0c;我现在就献丑给大家讲一下&#xff0c;教程都是网友盛传的&#xff0c;我自己学习并且有所体会。 一般做的非常好的PS效果图都是外国人自己做的&#xff0c;所以大多数效果图都是英文&#xff0c;细心的网友会发现&#xff0c;中文的是一…

chatGPT能力培训,客户最关注的99个方向

前言&#xff1a; chatGPT的主要应用&#xff0c;包括文本生成、图像生成和图文关联三大核心方向&#xff1a; 用户的在实际的工作和学习过程中&#xff0c;最关心的内容&#xff0c;可以按照上述类别进行划分&#xff0c;我们总结了&#xff0c;相关的插头GPT能力培训的相关主…

基于OFDM通信系统的低复杂度的资源分配算法matlab性能仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 .......................................................................%子载波分配[~,po…

MySQL:表的约束和基本查询

表的约束 表的约束——为了让插入的数据符合预期。 表的约束很多&#xff0c;这里主要介绍如下几个&#xff1a; null/not null,default, comment, zerofill&#xff0c;primary key&#xff0c;auto_increment&#xff0c;unique key 。 空属性 两个值&#xff1a;null&am…

C 语言的 ctype.h 头文件

C 语言的 ctype.h 头文件包含了很多字符函数的函数原型, 可以专门用来处理一个字符, 这些函数都以一个字符作为实参. ctype.h 中的字符测试函数如表所示: 这些测试函数返回 0 或 1, 即 false 或 true. ctype.h 中的字符映射函数如表所示: 字符测试函数不会修改原始实参, 只会…

list交并补差集合

list交并补差集合 工具类依赖 <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.8.1</version> </dependency><dependency><groupId>commons-collections&…

内生安全构建数据存储

一、数据安全成为防护核心&#xff0c;存储安全防护不容有失 1、数据作为企业的核心资产亟需重点保护&#xff0c;数据安全已成网络空间防护核心 2、国家高度重视关键信息基础设施的数据安全&#xff0c;存储安全已成为审核重点 二、存储安全是数据安全的关键一环&#xff0c;应…

解决github打不开的方法

解决github打不开的方法 本文参考文章&#xff1a;解决可ping通但无法访问github网站的问题 一、确定域名github.com的ip地址 进入网址 IP/服务器github.com的信息 - 站长工具 (chinaz.com)&#xff0c;查看 ip 地址。 20.205.243.166 github.com二、确定域名github.global.…

【论文研读】MARLlib 的架构分析

【论文研读】MARLlib: A Scalable Multi-agent Reinforcement Learning Library 和尚念经 多智能体强化学习框架研究。 多智能体强化学习库。 多智能体强化学习算法实现。 多智能体强化学习环境的统一化&#xff0c;标准化。 多智能体强化学习算法解析。 多智能体强化学习 算法…

5W2H分析法模版

&#xff08;1&#xff09;WHAT——是什么&#xff0c;目的是什么&#xff0c;做什么工作。 条件是什么&#xff0c;哪一部分工作要做&#xff0c;目的是什么&#xff0c;重点是什么&#xff0c;与什么有关系&#xff0c;功能是什么&#xff0c;规范是什么&#xff0c;工作对象…

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2…

时序数据库 TDengine 与 WhaleStudio 完成相互兼容性测试认证

近年来&#xff0c;开源及其价值获得社会各界的广泛认可&#xff0c;无论是国家政策导向还是企业数字化转型&#xff0c;都在加速拥抱开源。对于如操作系统、数据库等基础软件来说&#xff0c;开源更是成为驱动技术创新的有力途径。 在此背景下&#xff0c;近日&#xff0c;涛…

Feign

一、为什么使用Feign 二、Feign和RestTemplate的区别 三、自定义配置 四、Feign日志 1.配置文件方式 2.java代码方式

TS 踩坑之路(四)之 Vue3

一、在使用定义默认值withDefaults和defineProps 组合时&#xff0c;默认值设置报错 代码案例 报错信息 不能将类型“{ isBackBtn: false; }”分配给类型“(props: PropsType) > RouteMsgType”。 对象字面量只能指定已知属性&#xff0c;并且“isBackBtn”不在类型“(pro…

【机器学习】在 MLOps构建项目 ( MLOps2)

My MLOps tutorials: Tutorial 1: A Beginner-Friendly Introduction to MLOps教程 2&#xff1a;使用 MLOps 构建机器学习项目 一、说明 如果你希望将机器学习项目提升到一个新的水平&#xff0c;MLOps 是该过程的重要组成部分。在本文中&#xff0c;我们将以经典手写数字分类…

【车道线】TwinLiteNet 复现过程全纪录

码字不易&#xff0c;喜欢的请点赞收藏&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 论文全文翻译&#xff1a;【freespace】TwinLiteNet: An Efficient and Lightweight Model for Driveable Area and Lane Segmentation_莫克_Cheney的博客-CSDN博客 目录…

Vue2:组件高级(下)

Vue2&#xff1a;组件高级&#xff08;下&#xff09; Date: May 25, 2023 Sum: 自定义指令、插槽、商品列表、动态组件 目标&#xff1a; 自定义指令 基础概念&#xff1a; 概念&#xff1a; 内置指令&#xff1a;vue 官方提供了 v-for、v-model、v-if 等常用的内置指令。…

机器学习基础08-回归算法矩阵分析(基于波士顿房价(Boston House Price)数据集)

回归算法通常涉及到使用矩阵来表示数据和模型参数。线性回归是最常见的回归算法之一&#xff0c;它可以用矩阵形式来表示。 考虑一个简单的线性回归模型&#xff1a; y m x b y mx b ymxb&#xff0c;其中 y y y 是因变量&#xff0c; x x x 是自变量&#xff0c; m m m 是…