Flink--Data Source 介绍

Data Source 简介

        Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

        Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。

        Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink Data Source分类

Flink的数据源可以根据数据的来源和特性进行分类。以下是常见的Flink数据源分类:

集合数据源

        集合数据源(Collection Data Source):集合数据源指的是将本地的集合或数组作为输入数据的数据源。在Flink中,可以使用fromCollection、fromElements等方法将Java或Scala中的集合数据转化为数据流进行处理。

1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。

3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。

5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import java.util.Arrays;
import java.util.List;public class CollectionDataSourceExample {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 创建一个包含整数的集合List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);// 将集合转化为Flink的DataSetDataSet<Integer> dataset = env.fromCollection(data);// 打印数据集中的元素dataset.print();}
}

关于使用集合数据源的注意事项:

  1. 数据规模:集合数据源适用于小规模数据集。确保你的数据集在内存中能够合理存放,不至于导致内存溢出。

  2. 内存消耗:集合数据源会将所有数据存储在内存中,因此需要谨慎处理大型数据集,避免对内存资源造成过大压力。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高作业的执行效率。

  4. 调试和测试:集合数据源非常适合用于本地调试和测试,可以快速验证处理逻辑并观察输出结果。

使用集合数据源时需要注意这些方面,以确保作业能够稳定运行并获得良好的性能表现。

文件数据源

        文件数据源(File Data Source):文件数据源用于从文件系统中读取数据,可以是本地文件系统或分布式文件系统(如HDFS)。Flink提供了readTextFile、readCsvFile等方法来支持常见文件格式的数据读取。

1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class FileDataSourceExample {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 从文件创建数据集String filePath = "path/to/your/file.txt";DataSet<String> text = env.readTextFile(filePath);// 打印文件中的内容text.print();}
}

关于使用文件数据源的注意事项:

  1. 文件路径:确保提供的文件路径是正确的,可以是本地文件系统路径,也可以是HDFS路径或其他支持的文件系统路径。

  2. 文件格式:Flink支持多种文件格式,包括文本文件、CSV文件、Parquet文件等。根据实际情况选择合适的文件格式进行读取。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高文件读取的并行处理能力。

  4. 文件分区:对于大型文件,可以考虑文件分区和并行读取,以加速数据的加载和处理过程。

  5. 文件读取性能:尽量避免频繁的小文件读取操作,因为这会增加文件系统的负担并降低整体性能。

使用文件数据源时需要注意以上方面,以确保能够有效地读取文件数据,并且提高作业的执行效率。

Socket数据源

        Socket数据源(Socket Data Source):Socket数据源允许通过网络套接字接收数据,通常用于测试和演示目的。Flink可以使用socketTextStream方法从TCP socket接收数据流。

socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocketDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从socket创建数据流String hostname = "localhost";int port = 9999;env.socketTextStream(hostname, port).print();// 执行作业env.execute("Socket Data Source Example");}
}

关于使用Socket数据源的注意事项:

  1. 主机和端口:确保指定的主机和端口是正确的,并且能够与数据源通信。

  2. 网络延迟:由于Socket数据源涉及网络通信,因此可能受到网络延迟的影响。需要考虑网络性能对作业整体性能的影响。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据格式:需要确保从Socket接收到的数据能够被正确解析和处理,例如按行读取文本数据等。

  5. 容错机制:在使用Socket数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Socket数据源时需要注意以上方面,以确保能够有效地接收数据并提高作业的执行效率。

自定义数据源

        自定义数据源(Custom Data Source):除了上述内置的数据源外,Flink还支持自定义数据源。用户可以实现自己的SourceFunction接口来定义特定的数据生成逻辑,例如从消息队列、数据库、传感器等实时数据源中读取数据。

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class CustomDataSource extends RichParallelSourceFunction<String> {private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {// 生成数据String data = generateData();// 发射数据ctx.collect(data);// 控制数据生成频率Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}private String generateData() {// 实现自定义的数据生成逻辑return "some data";}
}

        在这个示例中,我们创建了一个名为CustomDataSource的类,它继承自RichParallelSourceFunction并指定了数据类型为String。在run方法中,我们使用一个循环来生成数据并通过collect方法将数据发射出去。在cancel方法中,我们设置了一个标志位来控制数据源的运行状态。

关于使用自定义数据源的注意事项:

  1. 并行度设置:根据数据源的性质和数据量合理地设置并行度,以充分利用集群资源。

  2. 数据生成频率:确保数据生成的频率和速度能够适应作业的处理能力,避免数据源产生过快导致作业无法及时处理。

  3. 容错机制:在自定义数据源中,需要考虑作业的容错机制,例如在发生故障时如何正确处理和恢复。

  4. 数据格式:确保从自定义数据源产生的数据能够被正确解析和处理,符合作业的输入要求。

  5. 资源管理:需要确保自定义数据源的资源占用和生命周期管理,避免资源泄露或过度占用资源。

使用自定义数据源时需要考虑以上方面,并确保能够有效地产生数据并提高作业的执行效率。

Apache Kafka数据源

        Apache Kafka数据源(Kafka Data Source):作为流数据处理框架,Flink对Kafka提供了良好的集成支持。可以使用addSource方法结合Flink的Kafka Connector来从Kafka主题中读取数据。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-consumer-group");// 创建Kafka数据流FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);kafkaDataStream.print();// 执行作业env.execute("Kafka Data Source Example");}
}

在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后设置Kafka的连接配置,包括bootstrap servers和consumer group id等。接下来,我们创建了一个FlinkKafkaConsumer对象,指定了要消费的topic以及数据的序列化方式,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Kafka数据源的注意事项:

  1. Kafka配置:确保指定的Kafka配置正确,并能够与Kafka集群进行通信。

  2. 序列化方式:根据实际情况选择合适的数据序列化方式,例如SimpleStringSchema、JSON、Avro等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Kafka数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Kafka数据源时需要注意以上方面,以确保能够有效地消费Kafka中的数据并提高作业的执行效率。

Apache Pulsar数据源

        Apache Pulsar数据源(Pulsar Data Source):类似于Kafka,Flink也集成了对Pulsar的支持,可以直接从Pulsar主题中读取数据。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;public class PulsarDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String serviceUrl = "pulsar://localhost:6650";String topic = "my-topic";FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(serviceUrl,topic,Schema.STRING);DataStream<String> pulsarDataStream = env.addSource(pulsarSource);pulsarDataStream.print();env.execute("Pulsar Data Source Example");}
}

        在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后指定了Pulsar的连接信息和要消费的topic。接下来,我们创建了一个FlinkPulsarSource对象,并指定了Pulsar的serviceUrl、topic以及数据的Schema,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Pulsar数据源的注意事项:

  1. Pulsar连接配置:确保指定的Pulsar连接信息正确,并能够与Pulsar集群进行通信。

  2. Schema设置:根据实际情况选择合适的数据Schema,例如STRING、JSON、AVRO等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Pulsar数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

        使用Pulsar数据源时需要注意以上方面,以确保能够有效地消费Pulsar中的数据并提高作业的执行效率。

        这些不同类型的数据源为Flink应用程序提供了灵活的数据接入方式,使得Flink可以轻松地处理不同来源和格式的数据。根据具体的业务需求和场景特点,可以选择合适的数据源类型来构建流处理和批处理应用程序。

更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/184214.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring的缓存机制-循环依赖

群公告 Java每日大厂面试题&#xff1a; 1、Spring 是如何解决循环依赖&#xff1f; 答案&#xff1a;三级缓存&#xff0c;简单来说&#xff0c;A创建过程中需要B&#xff0c;于是A将自己放到三级缓存里面&#xff0c;去实例化B&#xff0c;B实例化的时候发现需要…

【AICFD案例教程】进气歧管分析

AICFD是由天洑软件自主研发的通用智能热流体仿真软件&#xff0c;用于高效解决能源动力、船舶海洋、电子设备和车辆运载等领域复杂的流动和传热问题。软件涵盖了从建模、仿真到结果处理完整仿真分析流程&#xff0c;帮助工业企业建立设计、仿真和优化相结合的一体化流程&#x…

CSS时间线样式

css实现时间线样式&#xff0c;效果如下图&#xff1a; 一、CSS代码 .timeline {padding-left: 5px} .timeline-item { position: relative;padding-bottom: 20px;} .timeline-axis {position: absolute;left: -5px;top: 0;z-index: 10;width: 20px;height: 20px;line-he…

Android Studio报错:connect refused

参考链接&#xff1a; https://blog.csdn.net/qq_43213783/article/details/113936012 参考文章中说报错主要是由于代理导致的&#xff0c;在文件->设置->外观与行为->系统设置->HTTP代理。 方法一&#xff1a; 查看打开代理&#xff08;前提是代理可以通网&#x…

数据可视化PCA与t-SNE

PCA&#xff08;主成分分析&#xff09;和t-SNE&#xff08;t分布随机近邻嵌入&#xff09;都是降维技术&#xff0c;可以用于数据的可视化和特征提取。 降维&#xff1a;把数据或特征的维数降低&#xff0c;其基本作用包括&#xff1a; 提高样本密度&#xff0c;以及使基于欧…

实验(一):运算器实验

一、实验内容与目的 实验要求&#xff1a; 利用 CP226 实验仪的 K16..K23开关做为DBUS数据&#xff0c;其它开关做为控制信号&#xff0c;将数据写累加器A和工作寄存器W&#xff0c;并用开关控制ALU的运算方式&#xff0c;实现运算器的功能&#xff0c;将结果送入OUT寄存器。 实…

C# TCP Server服务端多线程监听RFID读卡器客户端上传的读卡数据

本示例使用设备介绍&#xff1a;液显WIFI无线网络HTTP协议RFID云读卡器可编程实时可控开关TTS语-淘宝网 (taobao.com) using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using Sy…

EM@解三角形@正弦定理@余弦定理

文章目录 abstract解三角形基本原理不唯一性 正弦定理直角三角形中的情形推广锐角三角形钝角情形 小结:正弦定理 余弦定理直角三角形中的情形非直角情形小结:余弦定理公式的角余弦形式 abstract 解直角三角形问题正弦定理和余弦定理的推导 对于非直角情形,都是直角情形的推广同…

机器视觉的试卷批改系统 - opencv python 视觉识别 计算机竞赛

文章目录 0 简介1 项目背景2 项目目的3 系统设计3.1 目标对象3.2 系统架构3.3 软件设计方案 4 图像预处理4.1 灰度二值化4.2 形态学处理4.3 算式提取4.4 倾斜校正4.5 字符分割 5 字符识别5.1 支持向量机原理5.2 基于SVM的字符识别5.3 SVM算法实现 6 算法测试7 系统实现8 最后 0…

微服务架构深入理解 | 技术栈

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 微服务架构深入理解 | 技术栈 服务网关 服务网关是在微服务架构中扮演重要角色的组件&#xff0c;它是系统对外的入口&#xff0c;负责接收和处理客户端的请求&#x…

【算法 | 模拟No.3】leetcode 38. 外观数列

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【Leetcode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

鸿蒙原生应用开发-DevEco Studio超级终端模拟器的使用

一、了解超级终端模拟器支持的设备情况 该特性在DevEco Studio V2.1 Release及更高版本中支持。 目前超级终端模拟器支持“PhonePhone”、“PhoneTablet”和“PhoneTV”的设备组网方式&#xff0c;开发者可以使用该超级终端模拟器来调测具备跨设备特性的应用/服务&#xff0c;如…

中远麒麟堡垒机SQL注入漏洞复现

简介 中远麒麟堡垒机用于运维管理的认证、授权、审计等监控管理&#xff0c;在该产品admin.php处存在SQL 注入漏洞。 漏洞复现 FOFA语法&#xff1a; body"url\"admin.php?controlleradmin_index&actionget_user_login_fristauth&username" 或者 c…

redis: 记录一次线上redis内存占用过大问题解决过程

引言 记录一次线上redis占用过大的排查过程&#xff0c;供后续参考 问题背景 测试同事突然反馈测试环境的web系统无法登陆&#xff0c;同时发现其他子系统也存在各类使用问题 排查过程 1、因为首先反馈的是测试环境系统无法登陆&#xff0c;于是首先去查看了登陆功能的报错…

【STM32】HAL库UART含校验位的串口通信配置BUG避坑

【STM32】HAL库UART含校验位的串口通信配置BUG避坑 文章目录 UART协议校验位HAL库配置含校验位的串口配置BUG避坑附录&#xff1a;Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏定义总线函…

Vuex状态管理(简单易懂、全网最全)

目录 Vuex是什么&#xff1f; 如何部署 如何使用 state 基础使用 在计算属性属性中使用 使用展开运算符 mutations 基础使用 使用辅助函数&#xff08;mapMutations&#xff09;简化 使用常量替代 Mutation 事件类型 getters actions 使用辅助函数&#xff08;…

启动Hbase出现报错

报错信息&#xff1a;slave1:head: cannot open/usr/local/hbase-2.3.1/bin/../logs/hbasewanggiqi-regionserver-slavel.out’ for reading: No such file or direslave2: head: cannot open/usr/local/hbase-2.3.1/bin/../logs/hbasewangqiqi-regionserver-slave2.out’ for …

计算机毕业论文内容参考|基于spingboot的金融投资顾问推荐系统

文章目录 导文文章重点摘要前言绪论课题背景:国内外现状与趋势:课题内容:相关技术与方法介绍系统分析系统设计系统实现总结与展望1本文总结2后续工作展望导文 计算机毕业论文内容参考|基于spingboot的金融投资顾问推荐系统 文章重点 摘要 基于SpingBoot的金融投资顾问推荐…

汽车生产RFID智能制造设计解决方案与思路

汽车行业需求 汽车行业正面临着快速变革&#xff0c;传统的汽车制造方式正在向柔性化、数字化、自动化和数据化的智能制造体系转变&#xff0c;在这个变革的背景下&#xff0c;汽车制造企业面临着物流、生产、配送和资产管理等方面的挑战&#xff0c;为了应对这些挑战&#xf…

ch579串口编程笔记

“CH579SFR.h”库文件&#xff0c;关于串口中断部分 /* UART interrupt identification values for IIR bits 3:0 */ #define UART_II_SLV_ADDR 0x0E // RO, UART0 slave address match #define UART_II_LINE_STAT 0x06 // R…