使用flink编写WordCount

1. env-准备环境

2. source-加载数据

3. transformation-数据处理转换

4. sink-数据输出

5. execute-执行

流程图:

DataStream API开发

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/

 添加依赖

<properties><flink.version>1.13.6</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>

 

编写代码

package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}

批处理结果:前面的序号代表分区

流处理结果:

也可以通过如下方式修改分区数量:

 env.setParallelism(2);

关于并行度的代码演示:

系统以及算子都可以设置并行度,或者获取并行度

package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 ,根据流的性质,决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流, 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism = env.getParallelism();System.out.println(parallelism);// 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度,一般跟系统保持一致System.out.println("flatMap的并行度:"+flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1); // ("hello",1)}});DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组,进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();}
}
  1. 打包、上传

 文件夹需要提前准备好

提交我们自己开发打包的任务

flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

去界面中查看运行结果:

因为你这个是集群运行的,所以标准输出流中查看,假如第一台没有,去第二台查看,一直点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/479922.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SpringBoot】28 API接口防刷(Redis + 拦截器)

Gitee仓库 https://gitee.com/Lin_DH/system 介绍 常用的 API 安全措施包括&#xff1a;防火墙、验证码、鉴权、IP限制、数据加密、限流、监控、网关等&#xff0c;以确保接口的安全性。 常见措施 1&#xff09;防火墙 防火墙是网络安全中最基本的安全设备之一&#xff0c…

java全栈day10--后端Web基础(基础知识)

引言&#xff1a;只要能通过浏览器访问的网站全是B/S架构&#xff0c;其中最常用的服务器就是Tomcat 在浏览器与服务器交互的时候采用的协议是HTTP协议 一、Tomcat服务器 1.1介绍 官网地址&#xff1a;Apache Tomcat - Welcome! 1.2基本使用(网上有安装教程&#xff0c;建议…

elasticsearch的索引模版使用方法

5 索引模版⭐️⭐️⭐️⭐️⭐️ 索引模板就是创建索引时要遵循的模板规则索引模板仅对新创建的索引有效&#xff0c;已经创建的索引并不受索引模板的影响 5.1 索引模版的基本使用 1.查看所有的索引模板 GET 10.0.0.91:9200/_index_template2.创建自定义索引模板 xixi &…

英语知识网站开发:Spring Boot框架应用

3系统分析 3.1可行性分析 通过对本英语知识应用网站实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本英语知识应用网站采用SSM框架&#xff0c;JAVA作为开发语…

Linux自动化构建-make/Makefile

目录 1. 背景2. 基本使用3. 推导过程4. 好用的操作5. 拓展语法 1. 背景 会不会写makefile&#xff0c;从⼀个侧⾯说明了⼀个⼈是否具备完成⼤型⼯程的能⼒⼀个⼯程中的源⽂件不计数&#xff0c;其按类型、功能、模块分别放在若⼲个⽬录中&#xff0c;makefile定义了⼀系列的规…

Ubuntu20.04+ROS 进行机械臂抓取仿真:环境搭建(一)

目录 一、从官网上下载UR机械臂 二、给UR机械臂添加夹爪 三、报错解决 本文详细介绍如何在Ubuntu20.04ROS环境中为Universal Robots的UR机械臂添加夹爪。首先从官方和第三方源下载必要的软件包&#xff0c;包括UR机械臂驱动、夹爪插件和相关依赖。然后&#xff0c;针对gazeb…

C++11(下)

C11&#xff08;下&#xff09; 1.条件变量2.包装器&#xff08;重要&#xff09;3.bind &#x1f31f;&#x1f31f;hello&#xff0c;各位读者大大们你们好呀&#x1f31f;&#x1f31f; &#x1f680;&#x1f680;系列专栏&#xff1a;【C的学习】 &#x1f4dd;&#x1f4…

【组件封装】uniapp vue3 封装一个自定义下拉刷新组件pullRefresh,带刷新时间和加载动画教程

文章目录 前言一、实现原理二、组件样式和功能设计三、scroll-view 自定义下拉刷新使用回顾相关属性&#xff1a;最终版完整代码&#xff1a; 前言 手把手教你封装一个移动端 自定义下拉刷新组件带更新时间和加载动画&#xff08;PullRefresh&#xff09;&#xff0c;以uniapp …

14、保存与加载PyTorch训练的模型和超参数

文章目录 1. state_dict2. 模型保存3. check_point4. 详细保存5. Docker6. 机器学习常用库 1. state_dict nn.Module 类是所有神经网络构建的基类&#xff0c;即自己构建一个深度神经网络也是需要继承自nn.Module类才行&#xff0c;并且nn.Module中的state_dict包含神经网络中…

【Threejs进阶教程-着色器篇】9.顶点着色器入门

【Threejs进阶教程-着色器篇】9.顶点着色器入门 本系列教程第一篇地址&#xff0c;建议按顺序学习认识顶点着色器varying介绍顶点着色器与片元着色器分别的作用Threejs在Shader中的内置变量各种矩阵gl_Position 尝试使用顶点着色器增加分段数增强效果 制作平面鼓包效果鼓包效果…

w058基于web的美发门店管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0…

leetcode 二叉树的最大深度

104. 二叉树的最大深度 已解答 简单 相关标签 相关企业 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3…

VMware ubuntu创建共享文件夹与Windows互传文件

1.如图1所示&#xff0c;点击虚拟机&#xff0c;点击设置&#xff1b; 图1 2.如图2所示&#xff0c;点击选项&#xff0c;点击共享文件夹&#xff0c;如图3所示&#xff0c;点击总是启用&#xff0c;点击添加&#xff1b; 图2 图3 3.如图4所示&#xff0c;出现命名共享文件夹…

matlab 实现混沌麻雀搜索算法的光伏MPPT控制仿真

1、内容简介 略 103-可以交流、咨询、答疑 2、内容说明 略 3、仿真分析 略 4、参考论文 略

Unity3D 截图

使用 Unity3D 自带的截图接口&#xff0c;制作截图工具。 截图 有时候我们想对 Unity 的窗口进行截图&#xff0c;如果直接使用一些截图工具&#xff0c;很难截取到一张完整分辨率的图片&#xff08;例如&#xff0c;我们想要截取一张 1920 * 1080 的图片&#xff09;。 其实…

STM32F10x 定时器

使用定时器实现&#xff1a;B5 E5的开关 添加相关的.h路径文件 添加相关的.c配置文件 led.h文件 用于声明LED函数 #ifndef __LED_H //没有定义__LED_H #define __LED_H //就定义__LED_H #define LED1_ON GPIO_ResetBits(GPIOB,GPIO_Pin_5) #defi…

PMP好考吗,有多大的价值?

非常好考&#xff01;PMP目前大陆地区的笔试是只有选择题的&#xff0c;运气好的话 蒙一个都能对&#xff0c;所以PMP的通过率高&#xff0c;这也是很多人考了吐槽PMP没用&#xff0c;是“水证”&#xff0c;但是每年考PMP 的人不减反增&#xff0c;大家可以想一下&#xff0c;…

css:项目

这是一个完整的网站制作的流程 美工会先制作一个原型图&#xff1a; 原型图写的不详细&#xff0c;就是体现一个网页大致的布局 然后美工再做一个psd样例图片 然后再交给程序员 项目 模块化开发&#xff1a;把代码的不同的样式封装起来&#xff0c;需要用到相同样式的标签就…

VsCode 插件推荐(个人常用)

VsCode 插件推荐&#xff08;个人常用&#xff09;

黑马程序员Java项目实战《苍穹外卖》Day01

苍穹外卖-day01 课程内容 软件开发整体介绍苍穹外卖项目介绍开发环境搭建导入接口文档Swagger 项目整体效果展示&#xff1a; ​ 管理端-外卖商家使用 ​ 用户端-点餐用户使用 当我们完成该项目的学习&#xff0c;可以培养以下能力&#xff1a; 1. 软件开发整体介绍 作为一…