尚硅谷大数据Flink1.17实战教程-笔记04【Flink DataStream API】

  • 尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】
  • 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili
  1. 尚硅谷大数据Flink1.17实战教程-笔记01【Flink 概述、Flink 快速上手】
  2. 尚硅谷大数据Flink1.17实战教程-笔记02【Flink 部署】
  3. 尚硅谷大数据Flink1.17实战教程-笔记03【Flink 运行时架构】
  4. 尚硅谷大数据Flink1.17实战教程-笔记04【Flink DataStream API】
  5. 尚硅谷大数据Flink1.17实战教程-笔记05【】
  6. 尚硅谷大数据Flink1.17实战教程-笔记06【】
  7. 尚硅谷大数据Flink1.17实战教程-笔记07【】
  8. 尚硅谷大数据Flink1.17实战教程-笔记08【】

目录

基础篇

第05章-DataStream API

P033【033_DataStreamAPI_执行环境】24:22

P034【034_DataStreamAPI_源算子_准备工作】06:36

P035【035_DataStreamAPI_源算子_集合&文件&socket】14:40

P036【036_DataStreamAPI_源算子_从Kafka读取】19:50

P037【037_DataStreamAPI_源算子_数据生成器】14:09

P038【038_DataStreamAPI_Flink支持的数据类型】08:49

P039【039_DataStreamAPI_基本转换算子_map】11:48

P040【040_DataStreamAPI_基本转换算子_filter&flatmap】12:45

P041【041_DataStreamAPI_聚合算子_keyby】18:00

P042【042_DataStreamAPI_聚合算子_简单聚合算子】11:53

P043【043_DataStreamAPI_聚合算子_规约聚合reduce】09:34

P044【44_DataStreamAPI_用户自定义函数】24:24

P045【45_DataStreamAPI_分区算子&分区器】25:08

P046【46_DataStreamAPI_分区算子_自定义分区】06:41

P047【47_DataStreamAPI_分流_使用FIlter简单实现】08:50

P048【48_DataStreamAPI_分流_使用侧输出流】26:33

P049【49_DataStreamAPI_合流_union】06:37

P050【50_DataStreamAPI_合流_connect】15:44

P051【51_DataSrreamAPI_合流_connect案例】12:02


基础篇

第05章-DataStream API

P033【033_DataStreamAPI_执行环境】24:22

第5章 DataStream API

DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

5.1 执行环境(Execution Environment)

Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。

不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

5.1.1 创建执行环境

1)getExecutionEnvironment

最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

2)createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

3createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment

       .createRemoteEnvironment(

          "host",                   // JobManager主机名

          1234,                     // JobManager进程端口号

          "path/to/jarFile.jar"  // 提交给JobManagerJAR

      );

在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

package com.atguigu.env;import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** TODO** @author* @version 1.0*/
public class EnvDemo {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment//.getExecutionEnvironment();  // 自动识别是 远程集群 ,还是idea本地环境.getExecutionEnvironment(conf); // conf对象可以去修改一些参数//.createLocalEnvironment()//.createRemoteEnvironment("hadoop102", 8081,"/xxx")// 流批一体:代码api是同一套,可以指定为 批,也可以指定为 流// 默认 STREAMING// 一般不在代码写死,提交时 参数指定:-Dexecution.runtime-mode=BATCHenv.setRuntimeMode(RuntimeExecutionMode.BATCH);env//.socketTextStream("hadoop102", 7777).readTextFile("input/word.txt").flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1).print();env.execute();//env.executeAsync();/** TODO 关于execute总结(了解)*     1、默认 env.execute()触发一个flink job:*          一个main方法可以调用多个execute,但是没意义,指定到第一个就会阻塞住*     2、env.executeAsync(),异步触发,不阻塞*         => 一个main方法里 executeAsync()个数 = 生成的flink job数*     3、思考:*         yarn-application 集群,提交一次,集群里会有几个flink job?*         =》 取决于 调用了n个 executeAsync()*         =》 对应 application集群里,会有n个job*         =》 对应 Jobmanager当中,会有 n个 JobMaster*/}
}

P034【034_DataStreamAPI_源算子_准备工作】06:36

5.2 源算子(Source)

5.2.1 准备工作

package com.atguigu.bean;import java.util.Objects;/*** TODO** @author* @version 1.0*/
public class WaterSensor {public String id;public Long ts;public Integer vc;// 一定要提供一个 空参 的构造器public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) &&Objects.equals(ts, that.ts) &&Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}

P035【035_DataStreamAPI_源算子_集合&文件&socket】14:40

5.2.2 从集合中读取数据

package com.atguigu.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** TODO** @author* @version 1.0*/
public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 从集合读取数据DataStreamSource<Integer> source = env.fromElements(1, 2, 33); // 从元素读//.fromCollection(Arrays.asList(1, 22, 3)); // 从集合读source.print();env.execute();}
}
package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** TODO** @author* @version 1.0*/
public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 从文件读: 新Source架构FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();env.execute();}
}
/*** 新的Source写法:* env.fromSource(Source的实现类,Watermark,名字)*/

P036【036_DataStreamAPI_源算子_从Kafka读取】19:50

5.2.5 从Kafka读取数据

package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/*** TODO** @author* @version 1.0*/
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 从Kafka读:新Source架构KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口.setGroupId("atguigu")  // 指定消费者组的id.setTopics("topic_1")   // 指定消费的 Topic.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value.setStartingOffsets(OffsetsInitializer.latest())  // flink消费kafka的策略.build();env//.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource").fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}
/***   kafka消费者的参数:*      auto.reset.offsets*          earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费*          latest  : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费**   flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest*          earliest: 一定从 最早 消费*          latest  : 一定从 最新 消费*/

P037【037_DataStreamAPI_源算子_数据生成器】14:09

5.2.6 从数据生成器读取数据

package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** TODO** @author* @version 1.0*/
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 如果有n个并行度,最大值设为a// 将数值 均分成 n份,a/n ,比如,最大100,并行度2,每个并行度生成50个// 其中一个是 0-49,另一个50-99env.setParallelism(2);/*** 数据生成器Source,四个参数:*     第一个:GeneratorFunction接口,需要实现,重写map方法,输入类型固定是Long*     第二个:long类型,自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了*     第三个:限速策略,比如 每秒生成几条数据*     第四个:返回的类型*/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator").print();env.execute();}
}

P038【038_DataStreamAPI_Flink支持的数据类型】08:49

P039【039_DataStreamAPI_基本转换算子_map】11:48

P040【040_DataStreamAPI_基本转换算子_filter&flatmap】12:45

P041【041_DataStreamAPI_聚合算子_keyby】18:00

P042【042_DataStreamAPI_聚合算子_简单聚合算子】11:53

P043【043_DataStreamAPI_聚合算子_规约聚合reduce】09:34

P044【44_DataStreamAPI_用户自定义函数】24:24

P045【45_DataStreamAPI_分区算子&分区器】25:08

P046【46_DataStreamAPI_分区算子_自定义分区】06:41

P047【47_DataStreamAPI_分流_使用FIlter简单实现】08:50

P048【48_DataStreamAPI_分流_使用侧输出流】26:33

P049【49_DataStreamAPI_合流_union】06:37

P050【50_DataStreamAPI_合流_connect】15:44

P051【51_DataSrreamAPI_合流_connect案例】12:02

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/450848.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring篇】初识之Spring的入门程序及控制反转与依赖注入

&#x1f9f8;安清h&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;【计算机网络】&#xff0c;【Mybatis篇】 &#x1f6a6;作者简介&#xff1a;一个有趣爱睡觉的intp&#xff0c;期待和更多人分享自己所学知识的真诚大学生。 文章目录 &#x1f3af;初始Spring …

【K8S系列】Kubernetes pod节点NotReady问题及解决方案详解【已解决】

Kubernetes 集群中的每个节点都是运行容器化应用的基础。当节点状态显示为 NotReady 时&#xff0c;意味着该节点无法正常工作&#xff0c;这可能会导致 Pod 无法调度&#xff0c;从而影响整个应用的可用性。本文将深入分析节点不健康的各种原因、详细的排查步骤以及有效的解决…

查看SQL执行计划 explain

查看SQL执行计划 explain explain使用方式 alter session set current_schematest; explain plan for sql语句; --并不会实际执行&#xff0c;因此生成的执行计划也是预估的 select * from table(dbms_xplan.display); explain使用场景 1.内存中没有谓词信息了&#xff0…

MySQL从入门到跑路

SQL语言 SQL&#xff08;Structured Query Language&#xff0c;结构化查询语言&#xff09;是用于管理和操作关系数据库的一种标准编程语言。 SQL分类&#xff1a; DDL(Data Definition Language)&#xff1a;数据定义语言&#xff0c;用于操作数据库、表、字段&#xff0c…

前端文件流导出

1、前端代码 ​ /** 导出 */ const handleExport async () > {let config {responseType: blob,headers: {Content-Type: application/json,},};const res await getTargetExport(config);const blob new Blob([res]);const fileName PK目标跟进导出列表.xls;const li…

SpringBoot整合Freemarker(一)

Freemarker和jsp一样是一个视图的引擎模板&#xff0c;其实所有的模板引擎的工作原理都是类似的&#xff0c;如下图&#xff1a; 接下来就具体讲解一下Freemarker的用法&#xff0c;参考手册&#xff1a;模板 数据模型 输出 - FreeMarker 中文官方参考手册 SpringBoot默认就…

【浏览器】如何正确使用Microsoft Edge

1、清理主页广告 如今的Microsoft Edge 浏览器 主页太乱了&#xff0c;各种广告推送&#xff0c;点右上角⚙️设置&#xff0c;把快速链接、网站导航、信息提要、背景等全部关闭。这样你就能得到一个超级清爽的主页。 网站导航       关闭 …

HarmonyOS NEXT和认证(在校生的大福利)

今天重点关注了一下HarmonyOS NEXT&#xff0c;也就是我们所说的纯血鸿蒙&#xff01; 根据官方的说法&#xff1a; 欢迎开发者进入HarmonyOS NEXT。暌违一年&#xff0c;HarmonyOS NEXT终于在万千开发者的期待下从幕后走向台前。 HarmonyOS NEXT采用全新升级的系统架构&#…

【Python】NumPy(一):数据类型、创建数组及基本操作

目录 ​NumPy初识 1.什么是NumPy&#xff1f; NumPy的应用 NumPy数据类型 Python基本数据类型 NumPy数据类型 NumPy数组 创建数组 1.使用numpy.array() 2.使用arange()方法创建 3.使用linspace()创建等差数列 4使用zeros()创建数组 5.使用ones()创建数组 6.利用…

Linux基本使用和程序部署

文章目录 一. Linux背景Linux发行版 二. Linux环境搭建Linux常见命令lspwdcdtouchcatmkdirrmcpmvtailvimgreppsnetstat管道 三. 搭建java部署环境安装jdk安装mysql部署Web项目到Linux 一. Linux背景 1969−1970年,⻉尔实验室的DennisRitchie和KenTompson开发了Unix操作系统. 他…

在Linux操作系统上安装NVM教程——CentOS 7/VMware 17版

目录 一、测试网络是否能上网 二、下载阿里云镜像 三、解决执行yum命令出现报错&#xff08;没有就跳过&#xff09; 四、下载NVM安装包 五、解压NVM安装包 六、安装Node 七、连接新的动态库 八、升级GLIBC版本 九、安装GCC 十、查看当前服务器CentOS版本 一、测试网…

[AWS云]kafka调用和创建

背景:因为因为公司的项目需要使用AWS的kafka&#xff0c;但是在创建和使用过程中都遇到了一些报错和麻烦&#xff0c;毕竟老外的东西&#xff0c;和阿里云、华为使用起来还是不一样。 一、创建&#xff08;创建的配置过程就略了&#xff0c;就是配置一下可用区、型号&#xff0…

闯关leetcode——110. Balanced Binary Tree

大纲 题目地址内容 解题代码地址 题目 地址 https://leetcode.com/problems/balanced-binary-tree/description/ 内容 Given a binary tree, determine if it is height-balanced. A height-balanced binary tree is a binary tree in which the depth of the two subtrees…

深入理解售后派单管理系统,功能优势一览

售后派单管理系统优化售后服务流程&#xff0c;提升响应速度、运营效率和服务质量。ZohoDesk等系统通过自动化派单、实时调度监控等功能&#xff0c;助力企业赢得竞争优势。适用于电子产品、汽车、IT及房地产等行业。 一、什么是售后派单管理系统 售后派单管理系统是一种专门用…

第七届机械、控制与计算机工程国际学术会议(ICMCCE2024)

重要信息 大会官网&#xff1a;www.icmcce.com 大会地点&#xff1a;中国杭州 大会时间&#xff1a;2024年10月25-27日 大会简介 第七届机械、控制与计算机工程国际学术会议定于2024年10月25日至27日在中国杭州召开。本届会议由巢湖学院主办&#xff0c;主要围绕“机械”、…

AGI|浅尝多Agent协作框架CrewAI,打造一个智能旅行助手

目录 一、介绍 二、特性 三、使用案例 四、 结语 一、介绍 Crew AI是一个多智能体协作智能框架&#xff0c;可以编排角色扮演的AI智能体。旨在协调角色扮演的自主AI代理&#xff0c;通过促进协作智能体&#xff0c;Crew AI使代理能够无缝协作&#xff0c;共同应对复杂任务。…

【JavaScript】LeetCode:61-65

文章目录 61 课程表62 实现Trie&#xff08;前缀树&#xff09;63 全排列64 子集65 电话号码的字母组合 61 课程表 Map BFS拓扑排序&#xff1a;将有向无环图转为线性顺序。遍历prerequisites&#xff1a;1. 数组记录每个节点的入度&#xff0c;2. 哈希表记录依赖关系。n 6&a…

(十九)、使用 minikube 运行k8s 集群

文章目录 1、机器信息2、官方文档3、启动本机 docker4、安装 minikube5、启动 minikube5.1、报错重试应该做什么&#xff1f; 6、启动后7、安装 Vs Code & k8s extensions8、在 VS Code 查看运行起来的 k8s 集群9、基本命令10、虚拟化不支持 Mac Os 14.3.1 1、机器信息 Ma…

c++算法第3天

本篇文章包含三道算法题&#xff0c;难度由浅入深&#xff0c;适合新手练习哟 目录 第一题 题目链接 题目解析 代码原理 代码编写 本题总结 第二题 题目链接 题目解析 代码原理 代码编写 第三题 题目链接 题目解析 代码原理 代码编写 第一题 题目链接 [NOIP2…

Iceberg 基本操作和快速入门二-Spark DDL操作

Iceberg 基本操作和快速入门一-CSDN博客 启动spark会话 docker exec -it spark-iceberg spark-sql 创建表 CREATE TABLE prod.db.sample ( id bigint NOT NULL COMMENT unique id, data string) USING iceberg; 创建分区表 CREATE TABLE prod.db.sample_par ( id bigint, …