【入门Flink】- 02Flink经典案例-WordCount

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)String filePath = Objects.requireNonNull(BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataSource<String> lineDS = env.readTextFile(filePath);// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

打印结果如下:(结果正确)

image-20231031193224024

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件String filePath = Objects.requireNonNull(StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();DataStreamSource<String> lineStream = env.readTextFile(filePath);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取文件DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

image-20231031201232801

2、再输入“hello world”,输出如下内容

image-20231031201316467

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/179802.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

超详细Linux搭建Hadoop集群

一、给计算机集群起别名——互通 总纲&#xff1a; 1、准备3台客户机&#xff08;关闭防火墙、静态IP、主机名称都设置好&#xff09; 2、安装JDK&#xff08;可点击&#xff09; 3、配置环境变量 4、安装Hadoop 5、配置hadoop的环境变量 6、配置集群 7、群起测试 1.1、环境准备…

VPN网络环境下 本地客户端能连上mysql 本地启服务连不上mysql的原因

背景 公司mysql使用的是华为云RDS&#xff0c;由于要做一些测试验证&#xff0c;需要本地通过VPN直连华为RDS节点&#xff1b;找运维配置好网络后&#xff0c;本地 telnet 内网ip 3306 以及通过navicat客户端都能正常连接数据库&#xff1b;但是本地启动的服务就是连接不上。问…

佳易王定制开发流水线商品标签自动打印软件,打印格式可定制

佳易王定制开发流水线商品标签自动打印软件&#xff0c;打印格式可以定制 软件特色&#xff1a; 定制试用商品标签打印管理V16.0&#xff0c;打印标签可以自动计算到期日期和品控日期&#xff0c;并打印品名、包装规格、生产日期、到期日期、储存条件、生产包装、品控日期等信…

ChinaSoft 论坛巡礼|开源软件供应链论坛

2023年CCF中国软件大会&#xff08;CCF ChinaSoft 2023&#xff09;由CCF主办&#xff0c;CCF系统软件专委会、形式化方法专委会、软件工程专委会以及复旦大学联合承办&#xff0c;将于2023年12月1-3日在上海国际会议中心举行。 本次大会主题是“智能化软件创新推动数字经济与社…

【Redis】Java连接Redis及Java操作Redis常用数据类型

一&#xff0c;Java连接Redis 1.1 连接前端服务器 打开RedisDesktopManager并连接Redis 不知道可看我上一篇文章&#xff1a; 【Redis】安装(Linux&window)及Redis的常用命令-CSDN博客 1.2 后端依赖 导入相关的jedis依赖 注意&#xff1a;要在dependencies标签中导入…

A股风格因子看板 (2023.11第01期)

该因子看板跟踪A股风格因子&#xff0c;该因子主要解释沪深两市的市场收益、刻画市场风格趋势的系列风格因子&#xff0c;用以分析市场风格切换、组合风格暴露等。 今日为该因子跟踪第01期&#xff0c;指数组合数据截止日2023-10-31&#xff0c;要点如下 近1年A股风格因子收益走…

Apache HttpClient库编写的Scala程序

Apache HttpClient库编写的Scala下载器程序&#xff0c;用于下载图片。代码如下&#xff1a; import org.apache.http.HttpHost import org.apache.http.client.HttpClients import org.apache.http.client.methods.HttpHead import org.apache.http.impl.client.CloseableHtt…

Python:PDF转长图像和分页图像

简介&#xff1a;随着电子化文档的普及&#xff0c;PDF文件的使用频率越来越高。有时我们需要将PDF中的内容转化为图片格式进行分享或编辑&#xff0c;那么如何才能轻松地完成此任务呢&#xff1f;本文将为你展示一个Python工具&#xff1a;如何将PDF文件转化为图片&#xff0c…

JumpServer开源堡垒机与万里安全数据库完成兼容性认证

近日&#xff0c;中国领先的开源软件提供商FIT2CLOUD飞致云宣布&#xff0c;JumpServer开源堡垒机已经与万里安全数据库软件GreatDB完成兼容性认证。针对产品的功能、性能、兼容性方面&#xff0c;经过双方共同测试&#xff0c;万里安全数据库软件&#xff08;简称&#xff1a;…

纷享销客荣获最佳制造业数字营销服务商奖

2023年10月26日&#xff0c;第二届中国制造业数智化发展大会在上海盛大召开。本次大会汇聚了制造行业的顶尖企业和专家&#xff0c;共同探讨如何通过数字化转型赋能企业自身成长&#xff0c;实现信息化向数字化的升级转型。 在本次盛会上&#xff0c;纷享销客以其卓越的基本面、…

SNAP打开影像失败No appropriate reader found

SNAP打开影像失败No appropriate reader found 问题描述 原因 这是我前几周用的&#xff0c;还有这些模块&#xff0c;但不知道何时&#xff0c;这些模块就少了 重装一下吧&#xff0c;可能是误删东西了 解决方案 重装了一下&#xff0c;就能够打开了 装完之后又有这些模…

【3D图像分割】基于 Pytorch 的 VNet 3D 图像分割3(3D UNet 模型篇)

在本文中&#xff0c;主要是对3D UNet 进行一个学习和梳理。对于3D UNet 网上的资料和GitHub直接获取的代码很多&#xff0c;不需要自己从0开始。那么本文的目的是啥呢&#xff1f; 本文就是想拆解下其中的结构&#xff0c;看看对于一个3D的UNet&#xff0c;和2D的UNet&#x…

项目实战:修改水果库存系统特定库存记录

1、在edit.html修改库存页面添加点击事件 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><link rel"stylesheet" href"style/index.css"><script s…

AI:51-基于深度学习的电影评价

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…

【计算机网络】网络层:数据平面

一.网络层概述 每台路由器的数据平面的主要功能时从其输入链路向其输出链路转发数据报&#xff0c;控制平面的主要功能是协调这些本地的每路由转发动作&#xff0c;使得数据报沿着源和目的地主机之间的路由器路径最终进行端到端传送。 网络层不运行运输层和应用层协议。 转发是…

性能优于BERT的FLAIR:一篇文章入门Flair模型

文章目录 What is FLAIR&#xff1f;FLAIR ModelContextual String Embedding for Sequence Labelingexample FLAIR Application AreaSentiment AnalysisNamed Entity RecognitionText Classification FLAIR一、什么是FLAIR&#xff1f;二、FLAIR Library的优势是什么&#xff…

基于51单片机电子秤-proteus仿真-源程序

一、系统方案 本设计采用52单片机作为主控器&#xff0c;液晶1602显示&#xff0c;HX711模块&#xff0c;按键设置单价&#xff0c;计算总价&#xff0c;超量程报警&#xff0c;蜂鸣器报警。 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 I…

界面控件DevExpress WinForms Gauge组件 - 实现更高级别数据可视化

DevExpress WinForms控件包含了超过150个随时可用的仪表盘预设&#xff0c;包括圆形&#xff0c;数字&#xff0c;线性和状态指示器等&#xff0c;来帮助用户实现更高级的数据可视化。 DevExpress WinForms有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业…

H5ke9

上次fetvh就一个参数url,,就是get请求 fetch还可以第二个参数对象,可以指定method:改为POST 请求头header :发送txt,servlet,json给客户端,,异步请求图片 1 这节客户端传到服务器端 2异步文件上传,两三行代码把文件传输 mouseover事件 .then()的使用 是Promise对象的一个方法…

HT6818 低 EMI 音频功率放大器

HT6818是一款具有低EMI、防削顶失真功能的立体声免输出滤波器D类音频功率放大器。AROC辐射和传导干扰抑Z电路使HT6818具有全带宽低辐射性能&#xff0c;在不加辅助滤波设计、输出喇叭线长20cm时的辐射水平远在FCC Part15Class B 标准之下。 HT6818的防削顶失真功能可检测并抑Z由…