【极数系列】Flink集成DataSource读取集合数据(07)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于集合读取数据
    • 3.1 集合创建数据流
    • 3.2 迭代器创建数据流
    • 3.3 给定对象创建数据流
    • 3.4 迭代并行器创建数据流
    • 3.5 基于时间间隔创建数据流
    • 3.6 自定义数据流
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建集合数据流作业
    • 4.3 运行结果日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkListSourceJob(集合)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于集合读取数据

3.1 集合创建数据流

fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型

3.2 迭代器创建数据流

fromCollection(Iterator, Class) 
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

3.3 给定对象创建数据流

fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

3.4 迭代并行器创建数据流

注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、

错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable

fromParallelCollection(SplittableIterator, Class) 
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型

3.5 基于时间间隔创建数据流

generateSequence 
基于给定间隔内的数字序列并行生成数据流。

3.6 自定义数据流

addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

4.2 创建集合数据流作业

注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可

package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.sql.DataSource;
import java.util.*;/*** @description flink的list集合source应用* @author 浅夏的猫* @datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式://STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);List<String> list = Arrays.asList("测试", "开发", "运维");// 01 从集合创建数据流DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list);// 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
//        DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class);// 03 从给定的对象序列中创建数据流DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维");// 04 从迭代器并行创建数据流NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10);DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE);// 05 基于给定间隔内的数字序列并行生成数据流DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10);//自定义数据流DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {//自定义你自己的数据来源for (int i = 0; i < 10; i++) {sourceContext.collect("测试数据" + i);}}@Overridepublic void cancel() {}});//5.输出打印dataStreamSource_01.print();
//        dataStreamSource_02.print();dataStreamSource_03.print();dataStreamSource_04.print();dataStreamSource_05.print();dataStreamSource_06.print();//6.启动运行env.execute();}}

4.3 运行结果日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/248561.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink实战四_TableAPISQL

接上文&#xff1a;Flink实战三_时间语义 1、Table API和SQL是什么&#xff1f; 接下来理解下Flink的整个客户端API体系&#xff0c;Flink为流式/批量处理应用程序提供了不同级别的抽象&#xff1a; 这四层API是一个依次向上支撑的关系。 Flink API 最底层的抽象就是有状态实…

海外云手机对于亚马逊卖家的作用

近年来&#xff0c;海外云手机作为一种新型模式迅速崭露头角&#xff0c;成为专业的出海SaaS平台软件。海外云手机在云端运行和存储数据&#xff0c;通过网页端操作&#xff0c;将手机芯片放置在机房&#xff0c;通过网络连接到服务器&#xff0c;为用户提供便捷的上网功能。因…

双通道音频功率放大电路——D2822M,交越失真,静态电流,外围元件少。开机和关机无冲击噪声

D2822M 用于便携式录音机和收音机作音频功率放大器。 D2822M 采用 DIP8 和 SOP8 封装形式。 特点&#xff1a;  电源电压降到 1.8V 时仍能正常工作  交越失真小  静态电流小  可作桥式或立体声式功放应用  外围元件少  通道分离度高  开机和关机…

【中关村开源生态论坛暨大模型智能应用技术大会】—— 探索AI和开源在未来的应用

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-9ttR7rpX3BzyF2C4 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

穷游网酒店数据采集与可视化分析与实现

摘 要 穷游网酒店数据采集与可视化分析大屏的背景是为了满足用户对酒店数据的需求以及提供数据洞察和决策支持。随着旅游业的快速发展&#xff0c;人们对酒店信息的需求日益增加&#xff0c;而穷游网作为一家专注于旅游信息的网站&#xff0c;拥有丰富的酒店数据资源。 这个大…

二维数组的学习

前言 在前面我们学习了一维数组&#xff0c;但是有的问题需要用二位数组来解决。 二维数组常称为矩阵&#xff0c;把二维数组写成行和列的排列形式&#xff0c;可以有助于形象化的理解二维数组的逻辑结构。 一、二维数组的定义 二维数组定义的一般格式&#xff1a; 数据类型 数…

百度智能小程序开发平台:SEO关键词推广优化 带完整的搭建教程

移动互联网的普及&#xff0c;小程序成为了众多企业和开发者关注的焦点。百度智能小程序开发平台为开发者提供了一站式的解决方案&#xff0c;帮助企业快速搭建并推广自己的小程序。本文将重点介绍百度智能小程序开发平台的SEO关键词推广优化功能&#xff0c;并带完整的搭建教程…

ElementUI Form:Radio 单选框

ElementUI安装与使用指南 Radio 单选框 点击下载learnelementuispringboot项目源码 效果图 el-radio.vue 页面效果图 项目里el-radio.vue代码 <script> export default {name: el_radio,data() {return {radio: 1,radio2: 2,radio3: 3,radio4: 上海,radio5: 上海,ra…

数据结构--堆排序(超详细!)

一、前言 堆排序与Top K问题是堆的两大应用&#xff0c;在我们日常也有很广泛的用处 我们已经上面已经说过了堆&#xff0c;这次来说堆的其中一个应用---堆排序。 二、堆排序 堆排序优势在哪里&#xff1f;有什么恐怖之处吗&#xff1f; 重点&#xff1a;拿一个举例&…

fiber学习

React原理&#xff1a;通俗易懂的 Fiber - 掘金

泰迪智能科技生成式人工智能(AIGC)实验室解决方案

AIGC&#xff08;Artificial Intelligence Generated Content&#xff0c;生成式人工智能&#xff09;是一种新的人工智能技术&#xff0c;指的是利用人工智能技术来生成内容。这种技术可以自动生成文本、图像、音频和视频等多种类型的内容&#xff0c;而且内容的质量较高&…

Qt Excel读写 - QXlsx的安装配置以及测试

Qt Excel读写 - QXlsx的安装配置以及测试 引言一、安装配置二、简单测试 引言 Qt无自带的库处理Excel 文件&#xff0c;但可通过QAxObject 借助COM接口进行Excel的读写1。亦可使用免费的开源第三方库&#xff1a;QXlsx&#xff0c;一个基于Qt库开发的用于读写Microsoft Excel文…

Security ❀ TCP异常报文详解

文章目录 1. TCP Out-Of-Order2. TCP Previous Segment Lost3. TCP Retransmission4. TCP Dup Ack XXX#X5. TCP Windows Update6. TCP Previous segment not captured7. 异常案例分析 TCP协议中seq和ack seq的联系&#xff1a; id4的http请求报文由客户端发向服务器&#xff0…

【Prometheus】Prometheus的PromQL语句

Prometheus promQL的语法&#xff1a; #时间序列 node_cpu_guest_seconds_total{cpu"0"} 监控&#xff08;指标数据&#xff09; {标签} node使用CPU的描述的统计&#xff0c;符合标签CPU0的时间序列的查询结果 指标标签生成时间序列 标签&#xff1a; __address…

品牌时代:应对非对称性风险的战略与实践

市场环境中&#xff0c;非对称性风险成为企业必须直面的挑战。非对称性风险指的是企业在经营过程中面临的不确定性因素&#xff0c;这些因素可能导致企业遭受重大损失或获得巨大收益。为了应对这种风险&#xff0c;企业需要从产品导向转向品牌导向&#xff0c;通过品牌建设来提…

解决:ModuleNotFoundError: No module named ‘selenium’

解决&#xff1a;ModuleNotFoundError: No module named ‘selenium’ 文章目录 解决&#xff1a;ModuleNotFoundError: No module named selenium背景报错问题报错翻译报错位置代码报错原因解决方法方法一&#xff0c;直接安装方法二&#xff0c;手动下载安装方法三&#xff0…

跟着cherno手搓游戏引擎【12】渲染context和首个三角形

渲染上下文&#xff1a; 目的&#xff1a;修改WindowsWindow的结构&#xff0c;把glad抽离出来 WindowsWindow.h:新建m_Context #pragma once #include "YOTO/Window.h" #include <YOTO/Renderer/GraphicsContext.h> #include<GLFW/glfw3.h> #include…

突破瓶颈,提升开发效率:Spring框架进阶与最佳实践-IOC

IOC相关内容 1.1 bean基础配置1.1.1 bean基础配置(id与class)1.1.2 bean的name属性步骤1&#xff1a;配置别名步骤2:根据名称容器中获取bean对象步骤3:运行程序 1.1.3 bean作用范围scope配置1.1.3.1 验证IOC容器中对象是否为单例验证思路具体实现 1.1.3.2 配置bean为非单例1.1.…

ASP .NET Core Api 使用过滤器

过滤器说明 过滤器与中间件很相似&#xff0c;过滤器&#xff08;Filters&#xff09;可在管道&#xff08;pipeline&#xff09;特定阶段&#xff08;particular stage&#xff09;前后执行操作。可以将过滤器视为拦截器&#xff08;interceptors&#xff09;。 过滤器级别范围…

Redis -- 开篇热身,常用的全局命令

目录 Redis重要文件 启动停止脚本 配置文件 持久化文件存储目录 核心命令 set get 全局命令 keys exists del expire ttl 过期策略是如何实现的 定时器 type 小结 Redis重要文件 启动停止脚本 /usr/bin/redis-benchmark &#xff1a; 用于对Redis做性能基准…