【Flink入门修炼】1-3 Flink WordCount 入门实现

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目
image.png

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.2</flink.version> <!-- 1.14 之后部分类和函数有变化,可自行探索 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>

二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {//参数检查if (args.length != 2) {// System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");// return;args = new String[]{"127.0.0.1", "9000"};}String hostname = args[0];Integer port = Integer.parseInt(args[1]);// 创建 streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据DataStreamSource<String> stream = env.socketTextStream(hostname, port);// 计数SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);sum.print();env.execute("Java WordCount from SocketTextStream Example");}public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {String[] tokens = s.toLowerCase().split("\\W+");for (String token: tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));}}}}
}

二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:

nc -l 9000

然后启动程序,能看到控制台一些输出:
image.png

接下来,在 nc 中输入:

$ nc -l 9000
hello world
flink flink flink

回到我们的程序,能看到统计的输出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有报错

如果出现执行报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormatat com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormatat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:419)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:352)... 1 more

在 IDE 中把 「Add dependencies with “Provided” scope to classpath」勾选上:
image.png

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。

其中,WordCount 表数据如下:

wordfrequency
hello1
world1
flink1
flink1
flink1

那么接下来我们看,如何写一个 FlinkSQL 的程序。

二)环境和程序

首先,添加 FlinkSQL 需要的依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency>

程序如下:

public class SQLWordCount {public static void main(String[] args) throws Exception {// 创建上下文环境ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);// 读取一行模拟数据作为输入String words = "hello world flink flink flink";String[] split = words.split("\\W+");ArrayList<WC> list = new ArrayList<>();for (String word : split) {WC wc = new WC(word, 1);list.add(wc);}DataSource<WC> input = fbEnv.fromCollection(list);// DataSet 转 SQL,指定字段名Table table = fbTableEnv.fromDataSet(input, "word,frequency");table.printSchema();// 注册为一个表fbTableEnv.createTemporaryView("WordCount", table);Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);ds1.printToErr();}public static class WC {public String word;public long frequency;public WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return  word + ", " + frequency;}}
}

执行,结果输出:

(`word` STRING,`frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/253784.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

春运也要“信号升格”:中兴通讯助运营商打造高铁精品网

一年一度的春运&#xff0c;承载了游子的思乡情。据官方预计&#xff0c;今年春运跨区域人员流动量将达到90亿人次&#xff0c;创下历史新高&#xff0c;铁路、公路、水路、民航等营业性客运量全面回升&#xff0c;其中铁路预计发送旅客4.8亿人次&#xff0c;日均1200万人次&am…

使用yolo训练自己的模型

YOLO&#xff08;You Only Look Once&#xff09;是一种用于目标检测的深度学习模型&#xff0c;旨在实时检测图像或视频中的多个对象。与传统的目标检测方法不同&#xff0c;YOLO一次性处理整个图像&#xff0c;而不是通过滑动窗口或区域提议进行多次检测。这种方法使得YOLO在…

使用虚拟主机部署多站点

网站目录权限的管理和虚拟主机的配置。 目录权限控制

基于hadoop+spark的大规模日志的一种处理方案

概述: CDN服务平台上有为客户提供访问日志下载的功能,主要是为了满足在给CDN客户提供服务的过程中,要对所有的记录访问日志,按照客户定制的格式化需求以小时为粒度(或者其他任意时间粒度)进行排序、压缩、打包,供客户进行下载,以便进行后续的核对和分析的诉求。而且CDN…

C++实现鼠标点击和获取鼠标位置(编译环境visual studio 2022)

1环境说明 2获取鼠标位置的接口 void GetMouseCurPoint() {POINT mypoint;for (int i 0; i < 100; i){GetCursorPos(&mypoint);//获取鼠标当前所在位置printf("% ld, % ld \n", mypoint.x, mypoint.y);Sleep(1000);} } 3操作鼠标左键和右键的接口 void Mo…

BVH动画绑骨蒙皮并在Unity上展示

文章目录 Blender绑定骨骼Blender蒙皮Blender中导入bvh文件将FBX导入Unity Blender绑定骨骼 先左上角红框进入model模式&#xff0c;选中要绑定的模型&#xff0c;然后进入Edit模式把骨骼和关节对齐。 &#xff08;选中骨骼&#xff0c;G移动&#xff0c;R旋转&#xff09; 为…

跟着pink老师前端入门教程-day21+22

5.4 常见flex布局思路 5.5 背景线性渐变 语法&#xff1a; background: linear-gradient( 起始方向 , 颜色 1, 颜色 2, ...); background: -webkit-linear-gradient(left, red , blue); background: -webkit-linear-gradient(left top, red , blue); 背景渐变必须添加浏览…

asp.net core 依赖注入 实例化对象实例

在面向对象编程中&#xff0c;推荐使用面向接口编程&#xff0c;这样我们的代码就依赖于服务接口&#xff0c;而不是依赖于实现类&#xff0c;可以实现代码解耦。 名称解释&#xff1a; 我们把负责提供对象的注册和 获取功能的框架叫作“容器”&#xff0c; 注册到容器中的对象…

【RT-DETR进阶实战】利用RT-DETR进行视频划定区域目标统计计数

👑欢迎大家订阅本专栏,一起学习RT-DETR👑 一、本文介绍 Hello,各位读者,最近会给大家发一些进阶实战的讲解,如何利用RT-DETR现有的一些功能进行一些实战, 让我们不仅会改进RT-DETR,也能够利用RT-DETR去做一些简单的小工作,后面我也会将这些功能利用PyQt或者是…

政安晨:示例演绎Python的函数与获取帮助的方法

调用函数和定义我们自己的函数&#xff0c;并使用Python内置的文档&#xff0c;是成为一位Pythoner的开始。 通过我的上篇文章&#xff0c;相信您已经看过并使用了print和abs等函数。但是Python还有许多其他函数&#xff0c;并且定义自己的函数是Python编程的重要部分。 在本…

由亚马逊云科技 Graviton4 驱动的全新内存优化型实例 Amazon EC2 实例(R8g),现已开放预览

下一代 Amazon Elastic Compute CloudAmazon EC2) 实例的预览版现已公开 提供。全新的 R8g 实例 搭载新式 Graviton4 处理器&#xff0c;其性价比远超任何现有的内存优化实例。对于要求较高的内存密集型工作负载&#xff0c;R8g 实例是不二之选&#xff1a;大数据分析、高性能数…

Linux操作系统基础(一):操作系统概述

文章目录 操作系统概述 一、计算机分类 二、计算机组成 三、操作系统概述 四、操作系统分类 操作系统概述 一、计算机分类 计算机一般分为个人计算机&#xff08;笔记、台式机&#xff09;与 企业级服务器&#xff08;1U、2U、机柜、塔式、刀片&#xff09;两种形式。 二…

PyTorch深度学习实战(23)——从零开始实现SSD目标检测

PyTorch深度学习实战&#xff08;23&#xff09;——从零开始实现SSD目标检测 0. 前言1. SSD 目标检测模型1.1 SSD 网络架构1.2 利用不同网络层执行边界框和类别预测1.3 不同网络层中默认框的尺寸和宽高比1.4 数据准备1.5 模型训练 2. 实现 SSD 目标检测2.1 SSD300 架构2.2 Mul…

前端JavaScript篇之对JSON的理解

目录 对JSON的理解JSON.stringify 和 JSON.parse 方法JSON.stringify 和 JSON.parse 区别JSON.stringify 和 JSON.parse 注意事项总结 对JSON的理解 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;它以易读易写的文本形式表示…

Vue前端框架--Vue工程项目问题总结{脚手架 Vue-cli}

Vue脚手架部署问题总结 我所遇到的一共两大问题 只有先执行npm install之后 才能run serve 否则会报错 vue-cli-serve不是内部或者外部的命令&#xff0c;也不是可运行的程序或者批处理文件的错误 1. 运行npm install会报错 2. 运行npm run serve报错 nodejs官网为 https://no…

ERP 系统架构的设计与实践总结

企业资源计划&#xff08;ERP&#xff09;系统是一种集成多个业务功能的综合性软件解决方案。在设计和实践 ERP 系统架构时&#xff0c;需要考虑诸多因素&#xff0c;以确保系统能够满足企业的需求&#xff0c;并提供高效、可靠、安全的服务。本文将介绍一些关键的设计原则和实…

03-抓包_封包_协议_APP_小程序_PC应用_WEB应用

抓包_封包_协议_APP_小程序_PC应用_WEB应用 一、参考工具二、演示案例&#xff1a;2.1、WEB应用站点操作数据抓包-浏览器审查查看元素网络监听2.2、APP&小程序&PC抓包HTTP/S数据-Charles&Fiddler&Burpsuite2.3、程序进程&网络接口&其他协议抓包-WireSh…

LabVIEW双光子荧光显微成像系统开发

双光子显微成像是一种高级荧光显微技术&#xff0c;广泛用于生物学和医学研究&#xff0c;尤其是用于活体组织的深层成像。在双光子成像过程中&#xff0c;振镜&#xff08;Galvo镜&#xff09;扮演了非常关键的角色&#xff0c;它负责精确控制激光束在样本上的扫描路径。以下是…

贪心算法篇

“靠漫步&#xff0c;将生趣填饱~” 贪心算法简介&#xff1f; 贪心算法&#xff08;Greedy Algorithm&#xff09;&#xff0c;也称为贪婪算法&#xff0c;是一种在解决问题时采取贪心策略的方法。其基本原理是很简单的&#xff1a; “在每个决策点上都选择当下看似最好的选项…

専攻春节钜惠

専攻春节钜惠 大家好&#xff0c;新春佳节到来之际&#xff0c;为了答谢大家多年来的支持厚爱&#xff0c;也为了更广泛的推广VBA应用&#xff0c;“VBA语言専攻”在春节期间再次推出钜惠活动&#xff0c;时间2月9日到2月17日&#xff08;大年三十到正月初八&#xff09; 1 &…