Flink(一)【WordCount 快速入门】

前言

        学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

        最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

        这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。
        就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

        我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

API 环境搭建

添加依赖

pom.xml

<properties><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version>
</dependency>
</dependencies>

log4j.properties 

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 入门案例

0、数据准备

在 根目录下创建 words.txt

hello flink
hello java
hello spark
hello hadoop

1、批处理

批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个执行批式数据处理环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件中读取数据 String类型  批式数据处理环境得到的 DataSource 继承自 DataSetDataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 将每行数据转换成一个二元组类型// 输入类型: String 输出类型: Tuple2FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =// String lines: 输入数据行  Collector<Tuple2<String,Long>> out: 输出类型lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));  //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型// 4. 根据 word 分组UnsortedGrouping<Tuple2<String, Long>> wordGroup = wordAndOne.groupBy(0);   // 0 是索引位置// 5. 分组内进行聚合AggregateOperator<Tuple2<String, Long>> res = wordGroup.sum(1); // 1 也是索引位置// 6. 打印结果res.print();}
}

运行结果:

(hadoop,1)
(flink,1)
(hello,4)
(java,1)
(spark,1)Process finished with exit code 0

因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2、流处理

2.1、有界数据流处理

这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 2. 流式数据处理环境得到的 DataSource 继承自 DataStreamDataStreamSource<String> lineDS = env.readTextFile("input/words.txt");// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 根据 word 分组KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);// 5. 根据键对索引为 1 处的值进行合并SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);// 6. 输出结果res.print();// 7. 执行env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来}
}

运行结果:

3> (java,1)
13> (flink,1)
1> (spark,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
5> (hello,4)
15> (hadoop,1)

        我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

        Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

        我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

2.2、无界数据流处理

这里我们使用 netcat 来模拟产生数据流

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class UnBoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建一个流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 2. 流式数据处理环境得到的 DataSource 继承自 DataStreamParameterTool parameterTool = ParameterTool.fromArgs(args);String host = parameterTool.get("host");Integer port = parameterTool.getInt("port");DataStreamSource<String> lineDS = env.socketTextStream(host,port);// 3. flatMap 打散数据 返回元组SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 根据 word 分组KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);// 5. 根据键对索引为 1 处的值进行合并SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);// 6. 输出结果res.print();// 7. 执行env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来}
}

运行结果: 

        可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

总结

        Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

        还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/182705.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习之基于Yolov5人体姿态摔倒识别分析报警系统(GUI界面)

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 系统设计概述&#xff1a; 传感器采集&#xff1a;通过在场景中布置摄像头或红外传感器等设备&#xff0c;采集人体…

GZ035 5G组网与运维赛题第8套

2023年全国职业院校技能大赛 GZ035 5G组网与运维赛项&#xff08;高职组&#xff09; 赛题第8套 一、竞赛须知 1.竞赛内容分布 竞赛模块1--5G公共网络规划部署与开通&#xff08;35分&#xff09; 子任务1&#xff1a;5G公共网络部署与调试&#xff08;15分&#xff09; 子…

数学到底在哪里支撑着编程?

如果编程语言是血肉&#xff0c;那么数学的思想和知识就是灵魂。它可以帮助你选择合适的数据结构和算法&#xff0c;提升系统效率&#xff0c;并且赋予机器智慧。在大数据和智能化的时代更是如此。举个例子&#xff0c;我们在小学就学过的余数&#xff0c;其实在编程的世界里也…

python基础(Python高级特性(切片、列表生成式)、字符串的正则表达式、函数、模块、Python常用内置函数、错误处理)培训讲义

文章目录 1. Python高级特性&#xff08;切片、列表生成式&#xff09;a) 切片的概念、列表/元组/字符串的切片切片的概念列表切片基本索引简单切片超出有效索引范围缺省 扩展切片step为正数step为负数 b) 列表生成式以及使用列表生成式需要注意的地方概念举例说明1. 生成一个列…

Python详细教程,如何使用Python进行数据可视化?

文章目录 前言一、导入必要的库二、加载数据三、创建基本图表四、添加更多细节五、使用Seaborn库创建更复杂的图表关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③…

3D医学三维技术影像PACS系统源码

一、系统概述 3D医学影像PACS系统&#xff0c;它集影像存储服务器、影像诊断工作站及RIS报告系统于一身,主要有图像处理模块、影像数据管理模块、RIS报告模块、光盘存档模块、DICOM通讯模块、胶片打印输出等模块组成&#xff0c; 具有完善的影像数据库管理功能&#xff0c;强大…

人工智能AI 全栈体系(十二)

第二章 计算机是如何学会下棋的 下棋一直被认为是人类的高智商游戏&#xff0c;从人工智能诞生的那一天开始&#xff0c;研究者就开始研究计算机如何下棋。著名人工智能学者、图灵奖获得者约翰麦卡锡在 50 年代就开始从事计算机下棋方面的研究工作&#xff0c;并提出了著名的 …

北京陪诊小程序|陪诊系统开发|陪诊小程序未来发展不可小觑

近几年随着互联网快速发展&#xff0c;各行业领域都比较注重线上服务系统&#xff0c;通过陪诊小程序开发可以满足更多用户使用需求&#xff0c;同时还能提高用户使用体验。现在陪诊类的软件应用得到全面推广&#xff0c;在医疗行业当中陪诊小程序更贴近用户生活&#xff0c;可…

“七人拼团模式:创新玩法助力平台快速裂变引流“

七人拼团模式是一种结合了社交电商和拼购玩法的快速裂变引流模式。这种模式通过抽取平台营业所得作为奖励补贴用户&#xff0c;以更人性化的奖励机制吸引用户&#xff0c;服务用户&#xff0c;以此加快用户向粉丝的转变&#xff0c;为平台拉取有效流量。本文将介绍七人拼团模式…

什么是防火墙?详解三种常见的防火墙及各自的优缺点

目录 防火墙的定义 防火墙的功能 防火墙的特性 防火墙的必要性 防火墙的优点 防火墙的局限性 防火墙的分类 分组过滤防火墙 优点&#xff1a; 缺点&#xff1a; 应用代理防火墙 优点 缺点 状态检测防火墙 优点 缺点 防火墙的定义 防火墙的本义原是指古代人们…

DCU集群搭建虚拟环境方法简介

1.conda安装方法&#xff1a; wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh #下载miniconda安装包chmod 750 Miniconda3-latest-Linux-x86_64.sh #添加执行权限bash ./Miniconda3-latest-Linux-x86_64.sh #安装下载的minnconda32.集群安装…

VBA根据Excel内容快速创建PPT

示例需求&#xff1a;根据Excel中选中的单元格内容&#xff08;3列&#xff09;如下图所示&#xff0c;在已打卡的PowerPoint文件中创建页面。 新增PPT Slide页面使用第二个模板页面&#xff0c;其中包含两个文本占位符&#xff0c;和一个图片占位符。将Excel选中区域中前两列写…

c++实现观察者模式

前言 我觉得这是最有意思的模式&#xff0c;其中一个动&#xff0c;另外的自动跟着动。发布-订阅&#xff0c;我觉得很巧妙。 代码 头文件 #pragma once #include<vector> #include<string> #include<iostream>// 抽象观察者 class Aobserver { public:v…

xlua源码分析(二)lua Call C#的无wrap实现

xlua源码分析&#xff08;二&#xff09;lua Call C#的无wrap实现 上一节我们主要分析了xlua中C# Call lua的实现思路&#xff0c;本节我们将根据Examples 03_UIEvent&#xff0c;分析lua Call C#的底层实现。例子场景里有一个简单的UI面板&#xff0c;面板中包含一个input fie…

使用VSCODE链接Anaconda

打代码还是在VSCODE里得劲 所以得想个办法在VSCODE里运行py文件 一开始在插件商店寻找插件 但是没有发现什么有效果的 幸运的是VSCODE支持自己选择Python的编译器 打开VSCODE 按住CtrlShiftP 输入Select Interpreter 如果电脑已经安装上了Python的环境 VSCODE会默认选择普通…

yolov5--ptq--qat量化之敏感层分析

敏感层分析&#xff0c;应该是发生在ptq量化之前进行分析的操作&#xff0c;经过该操作&#xff0c;可得出哪些层不适合进行量化&#xff0c;则在接下来ptq时可以手动关闭这些层的量化。 进入敏感层分析函数sensitive_analysis中&#xff0c; 具体流程为&#xff1a; 首先验证…

安科瑞变电站综合自动化系统在青岛海洋科技园应用

安科瑞 耿敏花 摘 要&#xff1a;变电站综合自动化系统是将变电站内的二次设备经过功能的组合和优化设计&#xff0c;利用先进的计算机技术、通信技术、信号处理技术&#xff0c;实现对全变电站的主要设备和输、配电线路的自动监视、测量、控制、保护、并与上级调度通信的综合性…

UI设计感大型数据管理仪表盘后台模板源码

大型数据管理仪表盘后台模板是一款适合数据统计管理后台网站模板下载。提示&#xff1a;本模板调用到谷歌字体库&#xff0c;可能会出现页面打开比较缓慢。 演示下载 qnziyw点cn/wysc/qdmb/20838点html

软件测试/测试开发丨利用ChatGPT自动生成架构图

点此获取更多相关资料 简介 架构图通过图形化的表达方式&#xff0c;用于呈现系统、软件的结构、组件、关系和交互方式。一个明确的架构图可以更好地辅助业务分析、技术架构分析的工作。架构图的设计是一个有难度的任务&#xff0c;设计者必须要对业务、相关技术栈都非常清晰…

【技术干货】开源库 Com.Gitusme.Net.Extensiones.Core 的使用

目录 1、项目介绍 2、为项目添加依赖 3、代码中导入命名空间 4、代码中使用 示例 1&#xff1a;string转换 示例 2&#xff1a;object转换 1、项目介绍 Com.Gitusme.Net.Extensiones.Core是一个.Net扩展库。当前最新版本1.0.4&#xff0c;提供了常见类型转换&#xff0c…