Flink 流式读写文件、文件夹

文章目录

  • 一、flink 流式读取文件夹、文件
  • 二、flink 写入文件系统——StreamFileSink
  • 三、查看完整代码

一、flink 流式读取文件夹、文件

Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

StreamExecutionEnvironment.readFile()接收如下参数:

  • FileInputFormat参数,负责读取文件中的内容。
  • 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
  • PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
  • 30000L表示多久扫描一次监听的文件。

FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。

DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。

在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。

二、flink 写入文件系统——StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3中状态:

  • in-progress Files 当前文件正在写入中
  • Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  • Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
    下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !

数据文件格式是行式存储格式

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
在这里插入图片描述
将数据以列式存储的格式输出到文件中

三、查看完整代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line -> JSONObject.parseObject(line)).filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0).map(line -> JSON.toJSONString(line)).addSink(fileSink);env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/99692.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java后端开发面试题——框架篇

Spring框架中的bean是单例的吗&#xff1f;Spring框架中的单例bean是线程安全的吗&#xff1f; singleton : bean在每个Spring IOC容器中只有一个实例。 prototype&#xff1a;一个bean的定义可以有多个实例。 Spring bean并没有可变的状态(比如Service类和DAO类)&#xff0c…

使用kubeadm工具升级kubernetes

一、背景&#xff1a; kubeadm部署的kubernetes集群进行升级&#xff0c;通常先升级控制节点&#xff0c;控制节点升级完成后再升级工作节点&#xff0c;本博文只升级了控制节点&#xff0c;工作节点按照相同的流程进行升级即可 环境说明&#xff1a; 主机名节点11.0.1.200k8s…

PyTorch DataLoader 报错 “DataLoader worker exited unexpectedly“ 的解决方案

注意&#xff1a;博主没有重写d2l的源代码文件&#xff0c;而是创建了一个新的python文件&#xff0c;并重写了该方法。 一、代码运行日志 C:\Users\Administrator\anaconda3\envs\limu\python.exe G:/PyCharmProjects/limu-d2l/ch03/softmax_regression.py Traceback (most r…

Ohio主题 - 创意组合和代理机构WordPress主题

Ohio主题是一个精心制作的多用途、简约、华丽、多功能的组合和创意展示主题&#xff0c;具有敏锐的用户体验&#xff0c;您需要构建一个现代且实用的网站&#xff0c;并开始销售您的产品和服务。它配备了最流行的WordPress页面构建器 WPBakery Page Builder&#xff08;以前称为…

avue多选列表根据后端返回的某个值去判断是否选中;avue-curd多选回显

效果如上&#xff1a; getSiteList().then(res > {//列表数据this.siteData res.data.datathis.$nextTick(()>{this.siteData.forEach(item>{//业务条件if(item.configid&&item.configid!0&&item.configid>0){//符合条件时调用选中的方法this.$…

OC调用Swift编写的framework

一、前言 随着swift趋向稳定&#xff0c;越来越多的公司都开始用swift来编写苹果相关的业务了&#xff0c;关于swift的利弊这里就不多说了。这里详细介绍OC调用swift编写的framework库的步骤 二、制作framework 1、新建项目&#xff0c;选择framework 2、填写framework的名称…

损失函数,基于概率分布度量的损失函数,信息量,信息熵的作用

目录 损失函数中为什么要用Log&#xff1a;概率损失函数-乘法转加法-便于求偏导 信息量&#xff0c;信息熵的作用 信息的作用是消除不确定性&#xff1a;信息量是0&#xff0c;事件确定 回答只是Y,N&#xff0c;因此对数底数为2​编辑 一句话描述的事件发生的概率越低&#…

隔断让你的办公室变得更加智能、环保、人性化

隔断可以在办公室中起到多种重要作用&#xff0c;使办公室更加智能、环保和人性化。以下是一些可能的方式&#xff1a; 1. 智能办公室控制系统&#xff1a;可以通过隔断集成智能办公室控制系统&#xff0c;实现办公室照明、温度和空调等设备的自动调节&#xff0c;提高能效和舒…

以 Java NIO 的角度理解 Netty

文章目录 前言Java NIO 工作原理Selector 的创建ServerSocketChannel 的创建ServerSocketChannel 注册 Selector对事件的处理总结 前言 上篇文章《Netty 入门指南》主要涵盖了 Netty 的入门知识&#xff0c;包括 Netty 的发展历程、核心功能与组件&#xff0c;并且通过实例演示…

用户信任建立:银行卡三要素API的隐私保护与数据安全

引言 在数字支付和金融领域&#xff0c;用户信任是构建稳健金融生态的关键。随着金融科技的迅速发展&#xff0c;银行卡三要素API作为一种用于身份验证和支付安全的工具&#xff0c;不仅带来了便利&#xff0c;也引发了对隐私保护和数据安全的关切。本文将探讨银行卡三要素API…

C语言编写图形界面

文章目录 环境使用库基础概念句柄 程序的入口创建窗口定义窗口类注册窗口类创建窗口 完整代码运行效果 环境 使用的是VSCode MinGW&#xff1b; 使用库 我们使用windows.h库来实现图形化界面。 头文件如下&#xff1a; #include <windows.h>windows.h是 Windows 操作…

Redis中的分布式锁及其延生的问题

前言 本文将着重介绍Redis中的分布式锁及其与出现的死锁和锁误删问题 什么是分布式锁 首先问题就是什么是分布式锁&#xff0c;分布式锁就是分布式系统中实现并发控制的一种锁机制&#xff0c;它可以保证多个节点在同一个时间只有有一个能成功竞争到系统资源&#xff08;共享…

微软Win11 Dev预览版Build23526发布

近日&#xff0c;微软Win11 Dev预览版Build23526发布&#xff0c;修复了不少问题。牛比如斯Microsoft&#xff0c;也有这么多bug&#xff0c;所以你写再多bug也不作为奇啊。 主要更新问题 [开始菜单&#xff3d; 修复了在高对比度主题下&#xff0c;打开开始菜单中的“所有应…

【LeetCode-中等题】128. 最长连续序列

题目 题解一&#xff1a;HeshSet枚举 思路&#xff1a;先对数组进行set去重&#xff0c;核心就是&#xff0c;先找出临界值&#xff08;假设以最小临界为例&#xff0c;那么这个临界值自己就是最小值&#xff0c;&#xff09;&#xff0c;以临界值不断做加1操作&#xff0c;看…

使用拦截器+Redis实现接口幂等

文章目录 使用拦截器Redis实现接口幂等1.思路分析2.具体实现2.1 创建redis工具类2.2 自定义幂等注解2.2 自定义幂等拦截器2.3 注入拦截器到容器 3.测试 使用拦截器Redis实现接口幂等 1.思路分析 接口幂等有很多种实现方式&#xff0c;拦截器/AOPRedis&#xff0c;拦截器/AOP本…

Linux —— 进程间通信(管道)

目录 一&#xff0c;进程间通信 二&#xff0c;管道 匿名管道 命名管道 一&#xff0c;进程间通信 进程间通信&#xff08;IPC&#xff0c;InterProcess Communication&#xff09;&#xff0c;即在不同进程之间进行信息的传播或交换&#xff1b;由于一般进程用户地址空间是…

JavaScript 中常用简写技巧总结

平时我们写代码时最高级的境界是自己写的东西别人看不懂&#xff01;哈哈哈&#xff01;分享一些自己常用的js简写技巧&#xff0c;长期更新&#xff0c;会着重挑选一些实用的简写技巧&#xff0c;使自己的代码更简洁优雅~ 这里只会收集一些大多数人不知道的用法&#xff0c;但…

PHP实现轻量级WEB服务器接收HTTP提交的RFID刷卡信息并回应驱动读卡器显示播报语音

本示例使用的读卡器&#xff1a;RFID网络WIFI无线TCP/UDP/HTTP可编程二次开发读卡器POE供电语音-淘宝网 (taobao.com) <?php mb_http_output(utf-8); $port88; $socket socket_create(AF_INET, SOCK_STREAM, SOL_TCP); $bool socket_bind($socket, "0.0.0.0",…

宇宙原理:黑洞基础。

宇宙原理&#xff1a;黑洞基础TOC 黑洞的数理基础&#xff1a;一个由满数组成的数盘&#xff0c;经过自然演进&#xff0c;将会逐步稀疏化、最终会向纯数方案发展&#xff1b;纯数方案虽然只有{2}、无数&#xff08;虚拟&#xff09;、{0,1,2,3}&#xff08;虚拟&#xff09;、…

如何在 Ubuntu 中安装最新的 Python 版本

动动发财的小手&#xff0c;点个赞吧&#xff01; Python 是增长最快的主要通用编程语言。其原因有很多&#xff0c;例如其可读性和灵活性、易于学习和使用、可靠性和效率。 目前使用的 Python 有两个主要版本 – 2 和 3&#xff08;Python 的现在和未来&#xff09;&#xff1…