6.1、Flink数据写入到文件

1、前言

Flink API 提供了FileSink连接器,来帮助我们将数据写出到文件系统中去

版本说明:java1.8、flink1.17

官网链接:官网


2、Format Types - 指定文件格式

FileSink 支持 Row-encoded 、Bulk-encoded 两种格式写入文件系统

        Row-encoded:文本格式

        Bulk-encoded:Parquet、Avro、SequenceFile、Compress、Orc

Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

3、桶分配 - 文件分区策略(分目录)

桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。(好比Hive中的分区)

Flink 内置了两种同分配策略:

  • DateTimeBucketAssigner :默认的基于时间的分配器
  • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)
BasePathBucketAssigner - 不会生成子目录
DateTimeBucketAssigner - 根据时间进行分桶

代码示例:

// TODO 按照时间进行分桶,每分钟生成一个子目录,目录名称为 yyyy-MM-dd HH-mm
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))
// TODO 当个全局桶,不生成子目录
.withBucketAssigner(new BasePathBucketAssigner())

4、滚动策略 - 分文件

滚动策略定义`何时生成新的文件`,可以指定 文件创建时间和文件大小 进行配置

// TODO 文件滚动策略:  文件创建后1分钟 或 大小超过1m 时生成新的文件
.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1))     //   指定文件持续时间.withMaxPartSize(new MemorySize(1024 * 1024))    //   指定文件大小.build()
)

5、文件命名&生命周期

Part 文件可以处于以下三种状态中的任意一种:

  1. In-progress :当前正在写入的 Part 文件处于 in-progress 状态
  2. Pending :由于指定的滚动策略)关闭 in-progress 状态文件,并且等待提交
  3. Finished :流模式(STREAMING)下的成功的 Checkpoint 或者批模式(BATCH)下输入结束,文件的 Pending 状态转换为 Finished 状态

注意:在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 Finished状态的文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

文件命名策略:

  • In-progress / Pending:prefix-part-<uid>-<partFileIndex>.ext.inprogress.uid
  • Finished:prefix-part-<uid>-<partFileIndex>.ext

    prefix : 文件名称前缀(默认为空)

  ext :文件名称后缀(默认为空)

  uid :uid 是一个分配给 Subtask 的随机 ID 值

└── 2019-08-25--12├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext└── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

代码示例:

// TODO 指定输出文件的名称配置 前缀、后缀
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flink") // 指定前缀.withPartSuffix("txt")   // 指定后缀.build()
)

6、这是一个完整的例子

package com.baidu.datastream.sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;
import java.time.ZoneId;// TODO flink 数据输出到文件系统
public class SinkFiles {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// STREAMING 模式时,必须开启checkpoint,否则文件一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);// 2.指定数据源DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 3.初始化 FileSink 实例FileSink<String> fileSink = FileSink// TODO 指定输出方式 行式输出、文件路径、编码.<String>forRowFormat(new Path("data/output"), new SimpleStringEncoder<String>("UTF-8"))// TODO 指定输出文件的名称配置 前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flink") // 指定前缀.withPartSuffix(".txt")   // 指定后缀.build())// TODO 按照时间进行目录分桶:每分钟生成一个目录,目录格式为 yyyy-MM-dd HH-mm.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))// TODO 文件滚动策略:  1分钟 或 1m 生成新的文件.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024 * 1024)).build()).build();streamSource.sinkTo(fileSink);// 3.触发程序执行env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/135636.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vulnhub实战-DC9

前言 本次的实验靶场是Vulnhub上面的DC-9&#xff0c;其中的渗透测试过程比较多&#xff0c;最终的目的是要找到其中的flag。 一、信息收集 对目标网络进行扫描 arp-scan -l 对目标进行端口扫描 nmap -sC -sV -oA dc-9 192.168.1.131 扫描出目标开放了22和80两个端口&a…

GFS分布式文件系统

目录 1、GFS理论 1.1、概述 1.2、GFS的基本概念和架构&#xff1a; 1.2.1. 基本概念 1.2.2. 架构&#xff1a; 1.3、GFS的基本组成部分包括&#xff1a; 1.4、GFS具有以下几个关键特点&#xff1a; 1.5、GlusterFS特点 1.6、GFS的术语 1.7、GlusterFS 采用模块化、堆…

【C++】STL简介 | string类的常用接口

目录 STL简介 学string类前的铺垫 概念 为什么要学string类 string类的底层&#xff08;了解&#xff09; 编码表的故事 string类的常用接口与应用 3个必掌握的构造 赋值 访问字符operator[] 初识迭代器&#xff08;iterator&#xff09; 反向迭代器 用范围for遍历…

【微信小程序】swiper的使用

1.swiper的基本使用 <jxz-header></jxz-header> <view class"banner"><swiperprevious-margin"30rpx"autoplayinterval"2000"indicator-dotsindicator-color"rgba(0,0,0,0.3)"indicator-active-color"#bda…

每日一题 337. 打家劫舍 III

难度&#xff1a;中等 整体思路相当于是前两天的方法倒过来&#xff0c;毕竟二叉树最常用的解法就是递归倒推 对于每一颗子树&#xff0c;他必定有一种最大的盗取方法&#xff0c;但是只有它的 root 的盗取情况才会影响到 root 的父节点&#xff0c;即如果收益最大的盗取方法…

虚拟线上发布会带来颠覆性新体验,3D虚拟场景直播迸发品牌新动能

虚拟线上发布会是近年来在数字化营销领域备受关注的形式&#xff0c;而随着虚拟现实技术的不断进步&#xff0c;3D虚拟场景直播更成为了品牌宣传、推广的新选择。可以说&#xff0c;虚拟线上发布会正在以其颠覆性的新体验&#xff0c;为品牌带来全新的活力。 1.突破时空限制&am…

竞赛选题 基于深度学习的人脸性别年龄识别 - 图像识别 opencv

文章目录 0 前言1 课题描述2 实现效果3 算法实现原理3.1 数据集3.2 深度学习识别算法3.3 特征提取主干网络3.4 总体实现流程 4 具体实现4.1 预训练数据格式4.2 部分实现代码 5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 毕业设计…

09MyBatisX插件

MyBatisX插件 在真正开发过程中对于一些复杂的SQL和多表联查就需要我们自己去编写代码和SQL语句,这个时候可以使用MyBatisX插件帮助我们简化开发 安装MyBatisX插件: File -> Settings -> Plugins -> 搜索MyBatisx插件搜索安装然后重启IDEA 跳转文件功能 由于一个项…

计算机竞赛 深度学习 opencv python 公式识别(图像识别 机器视觉)

文章目录 0 前言1 课题说明2 效果展示3 具体实现4 关键代码实现5 算法综合效果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习的数学公式识别算法实现 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学…

Unity Bolt 实现UI拖拽功能

最近在学习使用Bolt插件实现五代码对UGUI Image元素实现拖拽。先看效果 录制_2023_09_15_17_50_45_29 下面是实现方式介绍&#xff1a; 1&#xff1a;注册RectTransformUtility 在使用Bolt插件实现UI拖拽的功能&#xff0c;需要使用 RectTransformUtility.ScreenPointToLoca…

操作系统基本概念

目录 一、基本概述 二、操作系统的特点 &#xff08;一&#xff09;并发性&#xff08;实质是微观的串行、宏观的并行&#xff09; 1. 对比看&#xff1a;并行性 2. 单核CPU和多核CPU &#xff08;二&#xff09;共享性 &#xff08;三&#xff09;虚拟性 &#xff08;…

自动化和数字化在 ERP 系统中意味着什么?

毋庸置疑&#xff0c;ERP系统的作用是让工作更轻松。它可以集成流程&#xff0c;提供关键分析&#xff0c;确保你的企业高效运营。这些信息可以提高你的运营效率&#xff0c;并将有限的人力资本重新部署到更有效、更重要的需求上。事实上&#xff0c;自动化和数字化是ERP系统最…

【Unity程序技巧】Unity中的单例模式的运用

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

爬虫 — 验证码反爬

目录 一、超级鹰二、图片验证模拟登录1、页面分析1.1、模拟用户正常登录流程1.2、识别图片里面的文字 2、代码实现 三、滑块模拟登录1、页面分析2、代码实现&#xff08;通过对比像素获取缺口位置&#xff09; 四、openCV1、简介2、代码3、案例 五、selenium 反爬六、百度智能云…

【QT】day2

1.完善登录框 点击登录按钮后&#xff0c;判断账号&#xff08;admin&#xff09;和密码&#xff08;123456&#xff09;是否一致&#xff0c;如果匹配失败&#xff0c;则弹出错误对话框&#xff0c;文本内容“账号密码不匹配&#xff0c;是否重新登录”&#xff0c;给定两个按…

分布式缓冲-Redis

个人名片&#xff1a; 博主&#xff1a;酒徒ᝰ. 个人简介&#xff1a;沉醉在酒中&#xff0c;借着一股酒劲&#xff0c;去拼搏一个未来。 本篇励志&#xff1a;三人行&#xff0c;必有我师焉。 本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》&#xff0c;SpringCloud…

神经网络 02(激活函数)

一、激活函数 在神经元中引入了激活函数&#xff0c;它的本质是向神经网络中引入非线性因素的&#xff0c;通过激活函数&#xff0c;神经网络就可以拟合各种曲线。 如果不用激活函数&#xff0c;每一层输出都是上层输入的线性函数&#xff0c;无论神经网络有多少层&#xff0c…

(vue2)面经基础版-案例效果分析

配路由 先配一级&#xff0c;一级里面配二级。一级路由&#xff1a;首页&#xff08;二级&#xff1a;嵌套4个小页面&#xff09;、详情页 高亮a->router-link&#xff0c;高亮效果对自带高亮类名router-link(-exact)-active设置 注&#xff1a;通过children配置项&#…

使用vite创建vue3项目及项目的配置 | 环境准备 ESLint配置 prettier配置 husky配置

使用vite创建vue3项目及项目的配置 1.环境准备 使用vite搭建项目&#xff0c;vite需要nodejs版本14.18、16 node v18.16.1pnpm 8.7.4 pnpm:performant npm(高性能的npm)由npm/yarn衍生而来&#xff0c;解决了npm/yarn内部潜在的bug&#xff0c;极大的优化了性能&#xff0c…

能用就行——玄学问题:Compile with TORCH_USE_CUDA_DSA to enable device-side assertions

配置&#xff1a; python 3.9.0&#xff0c;torch2.0.1cu118 背景&#xff1a; 一直使用这个配置训练都没问题。搁置了一个月之后&#xff0c;再次使用就显示报错“Compile with TORCH_USE_CUDA_DSA to enable device-side assertions.” 过程&#xff1a; 尝试了网上的各种方…