Flink WordCount实践

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

	<properties><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

刷新依赖

刷新依赖后,能看到相关依赖如下

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

</mirrors>上面一行添加阿里云仓库镜像

	<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><mirrorOf>central</mirrorOf>        </mirror>

2.IDEA设置maven

数据准备

在工程的根目录下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下

hello world
hello java
hello flink

新建包

右键src/main下的java,新建Package

填写包名org.example,包名与groupId的内容一致。

批处理API实现WordCount

org.exmaple下新建wc包及BatchWordCount

填写wc.BatchWordCount

效果如下

BatchWordCount.java代码如下:

package org.example.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取DataSource<String> lineDS = env.readTextFile("data/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

运行程序,查看结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

运行结果

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

package org.example.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

控制台输出

继续往7777端口发送数据,例如发送hello flink

控制台输出

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

启动flink集群
[hadoop@node2 ~]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
​
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
​
命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

查看结果
Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

查看结果

继续发送测试数据

在nc终端继续发送数据

Web UI刷新结果

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log
[hadoop@node2 log]$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
(hello,1)
(flink,1)
(hello,2)
(world,1)
​

取消flink作业

点击Cancel Job取消作业 

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777

Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

点击Add New

选择flink作业jar包所在路径

点击jar包名称

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

提交后,在Running Jobs里看到运行的作业

发送测试数据

往7777端口发送数据

查看结果

继续发送测试数据

刷新结果

取消作业

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

提交代码

使用IDEA提交代码

提示有警告,忽略警告,继续提交

提交成功后,IDEA显示如下

刷新浏览器查看gitee界面,看到代码已上传成功

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/302999.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

登录压力测试

目录 一、准备测试数据 1.1数据库存储过程添加数据 1.2导出为csv作为测试数据&#xff08;账号、密码&#xff09; 二、使用fiddler抓包查看接口 2.1.抓到相关接口信息 2.2添加线程组和http请求 2.3将前面接口需要的参数去json格式化 ​2.4填写相关信息 ​ 2.5添加http…

使用阿里云试用Elasticsearch学习:4. 聚合——1

在这之前&#xff0c;本书致力于搜索。 通过搜索&#xff0c;如果我们有一个查询并且希望找到匹配这个查询的文档集&#xff0c;就好比在大海捞针。 通过聚合&#xff0c;我们会得到一个数据的概览。我们需要的是分析和总结全套的数据而不是寻找单个文档&#xff1a; 在大海里…

218基于matlab的有限差分法求解泊松方程

基于matlab的有限差分法求解泊松方程&#xff0c;采用SOR超松弛迭代法。模型采用方形区域&#xff0c;划分网格数为100*100&#xff0c;网格数可以很方便的更改。程序已调通&#xff0c;可直接运行。 218有限差分法 泊松方程 SOR超松弛迭代法 - 小红书 (xiaohongshu.com)

微服务学习3

目录 1.微服务保护 1.1.服务保护方案 1.1.1.请求限流 1.1.2.线程隔离 1.1.3.服务熔断 1.2.Sentinel 1.2.1.微服务整合 1.2.2.请求限流 1.3.线程隔离 1.3.1.OpenFeign整合Sentinel 1.3.2.配置线程隔离 1.4.服务熔断 1.4.1.编写降级逻辑 1.4.2服务熔断 2.分布式事…

监控指标体系:交互延迟上的探索与最佳实践

FID 在互联网高速发展的时代,用户体验已成为企业竞争的关键所在。网页性能作为用户体验的重要组成部分,直接影响着用户的满意度和工作效率。First Input Delay(FID)作为衡量网页性能的重要指标,越来越受到业界关注。今天,让我们一起来深入了解FID,探讨如何优化FID以提升…

Win10系统VScode远程连接VirtualBox安装的Ubuntu20.04.5

1.打开虚拟机&#xff0c;在中端中输入命令: sudo apt-get install openssh-server 安装ssh 我这里已经安装完成&#xff0c;故显示是这样 2.输入命令&#xff1a;sudo systemctl start ssh 启动远程连接 注意&#xff0c;如果使用VirtualBox安装的虚拟机&#xff0c;需要启用…

鹅厂实习offer

#转眼已经银四了&#xff0c;你收到offer了吗# 本来都打算四月再投实习了&#xff0c;突然三月初被wxg捞了&#xff08;一年前找日常实习投的简历就更新了下&#xff09;&#xff0c;直接冲了&#xff0c;流程持续二十多天&#xff0c;结果是运气还不错&#xff0c;应该是部门比…

web安全学习笔记(8)

记一下第十二节课的内容。 一、PHP文件包含的四种方式 Include和Include_once 操作系统会读取包含的文件的内容&#xff0c;并将它插入主文件中&#xff0c;include方式的文件包含会在包含失败的情况下输出警告信息&#xff0c;而include_once方式会检查包含的文件是否已经被…

翻译笔实现文字识别功能的原理

翻译笔作为一种便携式设备&#xff0c;近年来在语言学习、旅游、商务交流等领域中逐渐受到人们的青睐。其核心功能之一便是文字识别&#xff0c;即将纸质或电子文档中的文字快速、准确地转化为机器可读的文本格式。那么&#xff0c;翻译笔是如何实现这一神奇功能的呢&#xff1…

Redis中的集群(三)

集群 槽指派 记录节点的槽指派信息。 clusterNode结构的slots属性和numslot属性记录了节点负责处理哪些槽: struct clusterNode { // ... unsigned char slots[16384/8];int numslots; // ... }slots属性是一个二进制位数组(bit array)&#xff0c;这个数组的长度位16384/8…

由近期 RAGFlow 的火爆看 RAG 的现状与未来

4 月 1 日&#xff0c;InfiniFlow &#xff08;英飞流&#xff09;的端到端 RAG 解决方案 RAGFlow 正式开源&#xff0c;首日即获得了 github 千星&#xff0c;目前已接近 3000 star。在这之前&#xff0c;InfiniFlow 还开源了专门用于 RAG 场景的 AI 原生数据库 Infinity&…

gpt科普1 GPT与搜索引擎的对比

GPT&#xff08;Generative Pre-trained Transformer&#xff09;是一种基于Transformer架构的自然语言处理模型。它通过大规模的无监督学习来预训练模型&#xff0c;在完成这个阶段后&#xff0c;可以用于各种NLP任务&#xff0c;如文本生成、机器翻译、文本分类等。 以下是关…

Flutter之TabBar篇

总结了一下项目中用到的几种TabBar&#xff0c;针对不同的样式&#xff0c;有采用系统提供的&#xff0c;也有三方插件提供的&#xff0c;也有自定义的&#xff0c;效果如下&#xff08;后续如果遇到新的样式&#xff0c;会不间断地记录更新&#xff0c;避免重复造轮子…&#…

Day:004(4) | Python爬虫:高效数据抓取的编程技术(数据解析)

XPath工具 浏览器-元素-CtrlF 浏览器-控制台- $x(表达式) Xpath helper (安装包需要科学上网) 问题 使用离线安装包 出现 程序包无效 解决方案 使用修改安装包的后缀名为 rar&#xff0c;解压文件到一个文件夹&#xff0c;再用 加载文件夹的方式安装即可 安装 python若使用…

上门服务小程序|上门服务系统|上门服务软件开发流程

在如今快节奏的生活中&#xff0c;上门服务小程序的需求越来越多。它们向用户提供了方便、高效的服务方式&#xff0c;解决了传统服务行业中的很多痛点。如果你也想开发一个上门服务小程序&#xff0c;以下是开发流程和需要注意的事项。 1、确定需求&#xff1a;在开始开发之前…

ChromeDriver / Selenium-server

一、简介 ChromeDriver 是一个 WebDriver 的实现&#xff0c;专门用于自动化控制 Google Chrome 浏览器。以下是关于 ChromeDriver 的详细说明&#xff1a; 定义与作用&#xff1a; ChromeDriver 是一个独立的服务器程序&#xff0c;作为客户端库与 Google Chrome 浏览…

云安全在金融领域的作用是什么?

云安全在金融领域发挥着至关重要的作用&#xff0c;使金融机构能够保护敏感数据、遵守监管要求并推动创新。通过实施强有力的安全措施、利用先进技术并对新出现的威胁保持警惕&#xff0c;金融机构可以保护其数字资产并维持客户的信任。 金融机构面临的挑战 1.缺乏全网数据支撑…

uniapp小程序下载并导出excel

<button click"confirmExport">导出excel</button>confirmExport() {let header {"X-Access-Token": uni.getStorageSync(ACCESS_TOKEN), //自定义请求头信息} let url "http"/......"; // 后端API地址uni.request({url: ur…

20240309web前端_第三周作业_教务系统页面

作业&#xff1a;教务系统页面 成果展示&#xff1a; 完整代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1…

实战要求下,如何做好资产安全信息管理

文章目录 一、资产安全信息管理的重要性二、资产安全信息管理的痛点三、如何做好资产安全信息管理1、提升资产安全信息自动化、集约化管理能力&#xff0c;做到资产全过程管理2、做好资产的安全风险识别3、做好互联网暴露面的测绘与管空4、做好资产安全信息的动态稽核管理 “摸…