Java修仙传之Flink篇

大道三千:最近我修Flink

目前个人理解:

处理有界,无界流的工具

FLINK:

FLINK定义:

Flink特点

Flink分层API

流的定义

有界数据流(批处理):

有界流:数据结束了,程序也就结束了

知道数据开始以及结束的地方

无界数据流:

特征:读一条,计算一条,输出一次结果

知道数据开始的地方,却不知道结束的地方

(好似长江大河,会一直一直一直产生数据)

流的状态

个人理解:(有状态流会基于内存保存之前的数据)

如果后续流的操作需要用到之前的数据,这个流时有状态的

如果后续流的操作不需要用到之前的数据,这个流是无状态的

DataSet API:有界流批处理( 已淘汰)

1:创建执行环境

2:读取流(数据)

3:将读取到的数据,转换为方便处理的格式

4:将收集到的数据进行(分组,求和,最大,最小等....)操作

//批处理方式(有界流,因为很明确的知道这个文件在哪里结束)
public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)DataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Override         //一行数据       // 数据收集器     out:相当于是一个按照 下面格式收集数据的收集器  格式=out.collect(Tuple2.of(word,1L));public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");  //一行数据按照" "拆分for (String word : words) {   //word = 一行中的每一个字段    如果1改成2,则统计时数目会成2Tuple2<String, Long> of = Tuple2.of(word, 1L);//每个的那次都转为这种格式out.collect(of);  // 收集器添加数据 (转换格式为 (循环到的字段,1L))}}});// 4. 按照 word 进行分组    按照第一个字段分组.(字段,1L),就是按照第一个字段分组(A,1),(b,1),(c,1),(d,1),(d,1) 就是按照abcd分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计   根据第二个字段求和,即将每个分组的第二个字段相加,得到该分组的总和AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

DataStream API:流、批一体处理

转换(flatMap)、

分组(keyBy)、

求和(sum)、

执行(execute)、

读取文本(readTextFile,有界流)

1:创建流式执行环境(基于StreamExecutionEnvironment)

2:读取文件

3:转换、分组、求和,得到统计结果

4:打印输出

5:执行

//流处理方式 (有界流,因为很明确的知道这个文件在哪里结束),如果不是本地而是网络则是无界流
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果                                                                          SingleOutputStreamOperator<Tuple2<String, Integer>> resultList = lineStream.flatM输入类型,输出类型ap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Override           //当前行数据   //要返回的类型public void flatMap(String line, Collector<Tuple2<String, Integer>> list) throws Exception {String[] fields = line.split(" ");for (String field : fields) {Tuple2<String, Integer> result = Tuple2.of(field, 1);list.collect(result);}}});//分组                                                                                    // 传入的数据类型()           要分组的数据类型KeyedStream<Tuple2<String, Integer>, String> gropbyDate = resultList.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0; //这里是类型的第一位。如(hello,1),则是根据hello进行分组}});//求和。      以上一个为例子:(hello,1)分组之后,根据1索引即第二位(hello,1)的1进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> sum = gropbyDate.sum(1);//打印输出sum.print();//执行env.execute();}}
        // 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);

结果:

读取socket(无界流)

事件监听(环境对象.socketTextStream(IP,端口号))

备注:先启动linux 输入命令nc -lk 7777

然后启动代码监听 7777

此时linux输入的数据会被代码抓取到

备注2:跟前两个的区别就是这个是调用的socketTextStream。其他无任何区别

//监听7777端口的数据流
// 这里代码监听了  IP地址192.168.200.130  端口号7777 的操作   。ip地址那里写主机名也行
public class SocketStreamWordCount {public static void main(String[] args) throws Exception {//构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//拿到数据DataStreamSource<String> lineStream = env.socketTextStream("192.168.200.130", 7777);// 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> convert = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] fields = line.split(" ");for (String field : fields) {Tuple2<String, Long> of = Tuple2.of(field, 1L);out.collect(of);}}});//分组KeyedStream<Tuple2<String, Long>, Object> gropBy = convert.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}});//求和SingleOutputStreamOperator<Tuple2<String, Long>> sum = gropBy.sum(1);//输出sum.print();//执行env.execute();}
}
 SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> data.f0).sum(1);

LMD存在泛型擦除,解决方案看这里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/176024.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

正则表达式包含数字和字符匹配

至少6位。 pattern : (?.[0-9])(?.[A-Za-z])[0-9A-Za-z]{6,} 正则表达式中的“?”是一个正向预查字符&#xff0c;它的意思是匹配前一个字符出现的最少一次。具体来说&#xff0c;当一个匹配出现时&#xff0c;它会检查前一个字符是否符合要求&#xff0c;如果符合&#xf…

【Java 进阶篇】深入理解 Java Response:从基础到高级

HTTP响应&#xff08;Response&#xff09;是Web开发中的一个关键概念&#xff0c;它是服务器向客户端&#xff08;通常是浏览器&#xff09;返回数据的方式。理解如何在Java中处理和构建HTTP响应是开发Web应用程序的重要一部分。本文将从基础知识到高级技巧&#xff0c;详细介…

ardupilot开发 --- 深度相机 篇

1. ZED 相机 1.1 规格 2. RealSense 需要机载计算机作为中介&#xff01;&#xff01;

分布式锁-Redis红锁解决方案

一 分布式锁的概念 1&#xff1a;概念 分布式锁&#xff08;多服务共享锁&#xff09; 在分布式的部署环境下&#xff0c;通过锁机制来让多客户端互斥的对共享资源进行访问控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共…

使用 Authing 快速实现一套类似 OpenAI 的认证、API Key 商业权益授权机制

如果你有经常使用 OpenAI 或者 HuggingFace 这一类面向开发者的 SaaS 服务&#xff0c;对于 API Key 肯定不会陌生。我们在使用这些服务时&#xff0c;通常都会在其平台上面创建一套 API Key&#xff0c;之后我们才能在代码中通过这一串 API key 访问其服务&#xff1b;同时&am…

处理SAP资产折旧AFAB 过账报错:“科目 8019010100 要求一个成本会计分配”

会计在进行资产折旧AFAB时 报错如下所示&#xff1a; 原因分析&#xff1a; 折旧时没有把资产设置得成本中心带到过账凭证的成本中心字段中去。而资产中已经维护了成本中心了。 所以要在资产过账的科目分配中设置一下路径如下&#xff1a; 或者TCODE&#xff1a;ACSET科目设置这…

Jmeter(二十一):jmeter导入和导出接口的处理(超详细)

JMeter测试导入接口 利用Jmeter测试上传文件&#xff0c;首先可根据接口文档或者fiddler抓包分析文件上传的接口&#xff1b;如下图&#xff1a; 以下是我通过fiddler所截取的文件上传的接口 1、填写导入接口的信息 查看文件上传栏下的填写信息&#xff1a; 文件名称&#x…

RT-Thread 7. RT-Thread Studio ENV修改MCU型号

1. 修改MCU型号 2.在ENV界面输入 scons -c scons --dist3. dist下为更新后完整源代码 4.导入RT-Thread Studio 发现GD32F330已经生效了。 5. 自己编写startup_gd32f3x0.S&#xff0c;准确性待验证 ;/* ; * Copyright (c) 2006-2021, RT-Thread Development Team ; * ; * SPD…

MySQL主从复制原理

1、MySQL主从复制的三个步骤及其原理图 slave会从master读取binlog来进行数据同步 MySQL复制过程分成三步&#xff1a; 1、master将改变记录到二进制日志&#xff08;binary log&#xff09;。这些记录过程叫做二进制日志事件&#xff0c;binary log events。 2、slave将ma…

WebService与RESTful两种接口风格示例

下面我将分别用WebService&#xff08;SOAP&#xff09;和RESTful API的例子来说明它们是如何工作的。 1. WebService (SOAP) 示例&#xff1a; 假设有一个在线计算器服务&#xff0c;它提供了一个加法操作的SOAP WebService。 SOAP请求&#xff08;客户端到服务器&#xff…

UG\NX二次开发 球坐标到直角坐标的转换

文章作者:里海 来源网站:《里海NX二次开发3000例专栏》 感谢粉丝订阅 感谢 JiaLiHuiNaoGui 订阅本专栏,非常感谢。 简介 已知角度θ和ϕ,距离d,求P点坐标。 "> 代码 #include "me.hpp" using namespace NXOpen; using namespace std;void GetP<

MBox肯定会因为回购和销毁将起飞

近期&#xff0c;币安链上的GameFi和元宇宙平台MOBOX宣布推出代币回购和销毁计划&#xff0c;在加密货币市场还处于熊市的现在&#xff0c;消息引起了不少链游打金爱好者和玩家的关注&#xff0c;MOBOX的讨论量也在快速上升。 在近一年多的熊市之中&#xff0c;很多GameFi项目从…

LeetCode:2003. 每棵子树内缺失的最小基因值(C++)

目录 2003. 每棵子树内缺失的最小基因值 题目描述&#xff1a; 实现代码与解析&#xff1a; dfs 启发式合并 原理思路&#xff1a; 2003. 每棵子树内缺失的最小基因值 题目描述&#xff1a; 有一棵根节点为 0 的 家族树 &#xff0c;总共包含 n 个节点&#xff0c;节点编…

【Linux】虚拟机部署与发布J2EE项目(Linux版本)

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《微信小程序开发实战》。&#x1f3af;&#x1f3a…

【Vue3-Flask-BS架构Web应用】实践笔记1-使用一个bat脚本自动化完整部署环境

前言 近年来&#xff0c;Web开发已经成为计算机科学领域中最热门和多产的领域之一。Python和Vue.js是两个备受欢迎的工具&#xff0c;用于构建现代Web应用程序。在本教程中&#xff0c;我们将探索如何使用这两个工具来创建一个完整的Web项目。我们将完成从安装Python和Vue.js到…

从0到1之微信小程序快速入门(02)

目录 页面导航 - 声明式导航 1. 导航到 tabBar 页面 2. 导航到非 tabBar 页面 3. 后退导航 ​编辑 页面导航 - 编程式导航 页面导航 - 导航传参 页面事件 - 下拉刷新事件 监听下拉刷新事件 停止下拉刷新的效果 页面事件 - 上拉触底事件 监听页面的上拉触底事件 配置…

项目部署之OpenResty

项目部署之OpenResty 1. OpenResty介绍 OpenResty 是一个基于Nginx的高性能Web平台&#xff0c;用于方便地搭建能够处理超高并发、扩展性极高的动态Web应用、Web服务和动态网关。具备下列特点&#xff1a; 具备Nginx的完整功能基于Lua语言进行扩展&#xff0c;集成了大量精良…

cmake多目录构建初步成功

目录和代码和 首次cmake 多目录构建失败 此文一样&#xff1b; 只有一个CMakeLists.txt&#xff1b; cmake_minimum_required(VERSION 3.10) project(mytest3 VERSION 1.0) include_directories("${PROJECT_SOURCE_DIR}/include") add_executable(mytest3 src/main…

vue2:路由前置守卫无法获取到this.$store.state.xxx

在获取到vuex的数据时候&#xff0c;想在router目录下的index.js文件去获取到vuex仓库中声明的全局变量&#xff0c;但是通过this.$store.stote.xxx去获取的时候&#xff0c;报错提示&#xff1a;$store未定义 一、store/index.js const store new Vuex.Store({state: {// 属…

WPS中图的自动编号及引用

WPS中图的自动编号及引用 图的自动编号图编号的引用图编号及引用的更新 图的自动编号 将光标放置在需要插入编号的位置点击“引用”→“题注”&#xff1a; 点击“引用”→“题注”&#xff1a; 点击“编号”&#xff0c;设置图的编号格式&#xff0c;可勾选“包含章节编号”&…