尚硅谷Flink(一)

目录

☄️前置工作

fenfa脚本

🌋概述

☄️Flink是什么

☄️特点(多nb)

☄️应用场景(不用看)

☄️分层API

🌋配环境

☄️wordcount

☄️WcDemoUnboundStreaming 

🌋集群部署

☄️集群角色

☄️集群规划

webUI提交作业

命令行提交作业

​编辑

☄️部署模式

会话模式(Session Mode)

单作业模式(Per-Job Mode)

应用模式(Application Mode)

☄️standalone运行模式 

会话模式部署 

单作业模式部署

应用模式部署 

☄️YARN 运行模式(重点) 

会话模式部署 

单作业模式部署 

应用模式部署 

☄️历史服务器

🌋深入运行流程

☄️总体

☄️核心概念

并行度设置

算子链

任务槽

​编辑

任务槽和并行度的关系 

☄️作业提交流程

Standalone 会话模式作业提交流程

☄️ Yarn 应用模式作业提交流程 


☄️前置工作

fenfa脚本

使用方法:配置环境变量路径~/bin,fenfa 文件相对路径、绝对路径都行

#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi
#2. 遍历集群所有机器
for host in hadoop2 hadoop3
doecho ==================== $host ====================#3. 遍历所有目录,挨个发送for file in $@do#4. 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirecho $pdir/$fname have trans !!!!!!!!!!!!!elseecho !!!!!!$file does not exists!!!!!!fidone
done

🌋概述

☄️Flink是什么

Apache Flink® — Stateful Computations over Data Streams | Apache Flink

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。Flink 被设计为在所有常见的集群环境中运行,以内存速度任何规模执行计算。

把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。这就是所谓的“有状态的流处理”。

无界数据流:

  • 有定义流的开始,但没有定义流的结束
  • 会无休止的产生数据
  • 无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的

有界数据流:

  • 有定义流的开始,也有定义流的结束
  • 有界流可以在摄取所有数据后再进行计算;
  • 有界流所有数据可以被排序,所以并不需要有序摄取
  • 有界流处理通常被称为批处理

☄️特点(多nb)

低延迟、高吞吐、结果的准确性和良好的容错性。

Flink主要特点如下:
⚫ 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。

⚫ 结果的准确性。Flink提供了事件时间(event-time)和处理时间(processing-time)语义。
对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。

⚫ 精确一次(exactly-once)的状态一致性保证。

⚫ 可以连接到最常用的外部系统,如Kafka、Hive、JDBC、HDFS、Redis等。

⚫ 高可用。本身高可用的设置,加上与K8s,YARN和Mesos的紧密集成,再加上从故障中
快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7×24全天候运行。

Flink vs SparkStreaming 

Spark以批处理为根本

• Spark数据模型:Spark 采用 RDD 模型,Spark Streaming 的 DStream 实际上也就是一组组小批数据 RDD 的集合
• Spark运行时架构:Spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个

Flink以流处理为根本

• Flink数据模型:Flink 基本数据模型是数据流,以及事件(Event)序列

• Flink运行时架构:Flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

☄️应用场景(不用看)

1)电商和市场营销
举例:实时数据报表、广告投放、实时推荐

2)物联网(IOT)
举例:传感器实时数据采集和显示、实时报警,交通运输业

3)物流配送和服务业
举例:订单状态实时更新、通知信息推送

4)银行和金融业
举例:实时结算和通知推送,实时检测异常行为

☄️分层API

⚫ 越顶层越抽象,表达含义越简明,使用越方便

⚫ 越底层越具体,表达能力越丰富,使用越灵活

DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map、flatmap等),连接(joins),聚合(aggregations),窗口(windows)操作等。

注意:Flink1.12以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时

🌋配环境

IDEA创建Maven环境

导入配置项

<properties> <flink.version>1.17.0</flink.version> 
</properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> 
</dependencies> 

☄️wordcount

dataSet

    public static void main(String[] args) throws Exception {// TODO 准备环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// TODO read fileString path = "data/goodnight.txt";DataSource<String> dataSource = env.readTextFile(path);// TODO mapFlatMapOperator<String, Tuple2<String, Integer>> wordOne = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//                                   指定输出元组String words = value.replace(",", "").replace(".", "");String[] words2 = words.split(" ");for (String word : words2) {Tuple2<String, Integer> stringIntegerTuple2 = Tuple2.of(word, 1);//    指定输出元组out.collect(stringIntegerTuple2);}}});// TODO group shuffleUnsortedGrouping<Tuple2<String, Integer>> wordOneGroup= wordOne.groupBy(0);  // 表示元祖0AggregateOperator<Tuple2<String, Integer>> sum = wordOneGroup.sum(1);  // 表示元祖1sum.print();}

dataStreaming

    public static void main(String[] args) throws Exception {// TODO 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO read fileString path = "data/goodnight.txt";DataStreamSource<String> lines = env.readTextFile(path);// TODO mapSingleOutputStreamOperator<Tuple2<String, Integer>> wordOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String words1 = value.replace(",", "").replace(".", "");String[] words2 = words1.split(" ");for (String word : words2) {Tuple2<String, Integer> wordOne = Tuple2.of(word, 1);out.collect(wordOne);}}});// TODO group shuffleKeyedStream<Tuple2<String, Integer>, String> group = wordOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});SingleOutputStreamOperator<Tuple2<String, Integer>> sum = group.sum(1);sum.print();env.execute();  //类似spark行动算子}

体现出流处理,来一条处理一条,前面的编号是并行度,cpu核数 

☄️WcDemoUnboundStreaming 

无界流socket 文本流 

   public static void main(String[] args) throws Exception {// TODO 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO read file
//        String path = "data/goodnight.txt";DataStreamSource<String> hadoop1 = env.socketTextStream("centos7", 7777);// TODO mapSingleOutputStreamOperator<Tuple2<String, Integer>> wordOne = hadoop1.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1);out.collect(wordAndOne);}}).returns(Types.TUPLE(Types.STRING, Types.INT));// Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,// 从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情// 况下(比如 Lambda 表达式中),自动提取的信息是不够精细的——只告诉 Flink 当前的元素// 由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类// 型信息,才能使应用程序正常工作或提高其性能。// 因为对于flatMap 里传入的Lambda 表达式,系统只能推断出返回的是Tuple2 类型,而无// 法得到 Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整// 数据。// TODO group shuffle//                                                               只有一个参数不写类型也行KeyedStream<Tuple2<String, Integer>, String> group = wordOne.keyBy(value -> value.f0);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = group.sum(1);sum.print();// TODO execenv.execute();}

🌌:在 Flink 中,.returns() 方法用于指定数据流的返回类型。这对于 Flink 的类型推断和优化非常重要。在你的代码中,你创建了一个 SingleOutputStreamOperator<Tuple2<String, Integer>>,它表示输出流中的每个元素都是一个包含字符串和整数的元组。

.returns() 方法的目的是帮助 Flink 确定流的返回类型,以便进行类型检查和优化。在你的代码中,如果没有使用 .returns() 方法,Flink 将尝试根据上下文来推断返回类型,但有时推断可能不准确或不完整。

通过使用 .returns(Types.TUPLE(Types.STRING, Types.INT)),你明确告诉 Flink,你的数据流将返回一个元组,其中第一个字段是字符串类型,第二个字段是整数类型。这有助于 Flink 更准确地理解和优化你的代码,并在运行时执行必要的类型检查。

🌋集群部署

☄️集群角色

Flink提交作业和执行任务,需要几个关键组件:
• 客户端(Client):代码由客户端获取并做转换,之后提交给JobManger
• JobManager就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业
后,会进一步处理转换,然后分发任务给众多的TaskManager。
• TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的。

☄️集群规划

自己建三个虚拟机,hostname可能不一样

1. 下载安装包flink-1.17.0-bin-scala_2.12.tgz,将该jar 包上传到hadoop102 节点服务器的/opt/software(随意,记住就行) 路径上。

夸克网盘分享-flink-1.17.0-bin-scala_2.12

2. 在/opt/software 路径上解压flink-1.17.0-bin-scala_2.12.tgz 到/opt/module 路径上

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/ 

3. 进入 conf 路径,修改 flink-conf.yaml 文件,指定 hadoop1 节点服务器为JobManager ,修改flink-conf.yaml 

# JobManager节点地址. 
jobmanager.rpc.address: hadoop102 
jobmanager.bind-host: 0.0.0.0 
rest.address: hadoop102 
rest.bind-address: 0.0.0.0 
# TaskManager节点地址.需要配置为当前机器名 
taskmanager.bind-host: 0.0.0.0 
taskmanager.host: hadoop102 

🌌:在 Apache Flink 的配置文件中,jobmanager.rpc.address 参数指定 JobManager 的 RPC 通信地址。在你的配置中,jobmanager.rpc.address 被设置为 hadoop1,这意味着 Flink JobManager 将绑定到主机名为 hadoop1 的地址。这是为了告诉 Flink 在启动 JobManager 时使用 hadoop1 作为其通信地址。

🌌:在 Apache Flink 的配置文件中,taskmanager.bind-host 参数指定 TaskManager 绑定的网络接口地址。将其设置为 0.0.0.0 表示 TaskManager 将绑定到所有可用的网络接口,使其可以接受来自任何网络接口的连接请求。这通常用于允许来自任何主机或网络的连接,使 Flink 任务能够在广泛的网络范围内访问。 

4. 修改workers 文件,指定hadoop102、hadoop103 和hadoop104 为 TaskManager 

hadoop102 
hadoop103 
hadoop104 

5. 修改masters 文件 

hadoop102:8081 

另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和TaskManager 组件
进行优化配置,主要配置项如下: 
⚫ jobmanager.memory.process.size:对JobManager 进程可使用到的全部内存进行配置,
包括 JVM 元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。 
⚫ taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,
包括 JVM 元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。 
⚫ taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置,
默认为1,可根据TaskManager 所在的机器能够提供给Flink 的CPU 数量决定。所谓
Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。 
⚫ parallelism.default:Flink 任务执行的并行度,默认为1。优先级低于代码中进行的并
行度配置和任务提交时使用参数指定的并行度数量。

6. fenfa flink-1.17.0,分别修改另外两个flink-conf.yaml中的

taskmanager.host: hadoop103 
taskmanager.host: hadoop104 

7. 原神启动!

(base) [root@hadoop1 bin]# bash start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

webUI提交作业

IDEA中pom.xml导入打包配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.yuange</groupId><artifactId>Flink</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies><properties><flink.version>1.17.0</flink.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><!-- 打包插件 --><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

出现

hadoop1启动nc -lk 7777           提交

stop

命令行提交作业

bin/flink run -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./Flink-1.0-SNAPSHOT.jar 

在浏览器中打开 Web UI,http://hadoop102:8081 查看应用执行情况。 
用 netcat 输入数据,可以在 TaskManager 的标准输出(Stdout)看到对应的统计结果 

☄️部署模式

主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。 
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的 main 方法到底
在哪里执行——客户端(Client)还是 JobManager。

会话模式(Session Mode)

需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

会话模式比较适合于单个规模小、执行时间短的大量作业。

单作业模式(Per-Job Mode)

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的
作业启动一个集群,这就是所谓的单作业(Per-Job)模式

一个作业提交 现启动一个集群

作业完成后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes(K8S)

应用模式(Application Mode)

前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗

所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。

应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的1

☄️standalone运行模式 

会话模式部署 

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。 

之前webUI提交作业就是这种,提前启动集群,并通过 Web 页面客户端提交任务(可以多个任务,但是集群资源固定)。 

单作业模式部署

Flink 的 Standalone 集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管
理平台。 

应用模式部署 


应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在bin 目录下的standalone-job.sh 来创建一个 JobManager。

1. 启动 nc -lk 7777

2. Flinkxxx-jar 包放到 lib/目录下

3. 启动 JobManager。

bin/standalone-job.sh start --job-classname com.yuange.wc.WcDemoUnboundStreaming

直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。 

4. 同样是使用 bin 目录下的脚本,启动 TaskManager。 

bin/taskmanager.sh start 

5.stop

bin/taskmanager.sh stop 
bin/standalone-job.sh stop 

☄️YARN 运行模式(重点) 

YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的Slot 数量动态分配TaskManager 资源。 

🌌:在 YARN 上,JobManager 和 TaskManager 运行在 YARN 容器中,而这些容器由 YARN 的 ResourceManager 分配给 NodeManager 执行。NodeManager 负责监控和管理容器,确保它们按照 ResourceManager 的指示执行。NodeManager 提供了容器的隔离和资源管理。

配置环境变量,增加环境变量配置如下: 

HADOOP_HOME=/opt/module/hadoop-3.3.4 (看自己的在哪)
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin 
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop 
export HADOOP_CLASSPATH=`hadoop classpath`(这是反引号,esc键下边)

启动Hadoop 集群,包括HDFS 和YARN。

执行以下命令启动 netcat。 nc -lk 7777 

会话模式部署 

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN Session)来启动Flink 集群。具体步骤如下: 

执行脚本命令向 YARN 集群申请资源,开启一个YARN 会话,启动Flink 集群。

[atguigu@hadoop102 flink-1.17.0]$ bin/yarn-session.sh -nm [-d]后台执行模式,不占用shell) test 

要关闭 Flink YARN 会话(YARN session),你可以使用 yarn-session.sh 脚本,具体步骤如下:

  1. 打开终端窗口并登录到运行 Flink 会话的机器上。

  2. 进入 Flink 的 bin 目录,通常是 Flink 安装目录下的 bin 子目录。

  3. 运行 yarn-session.sh 脚本并提供 -id 参数,后接正在运行的 YARN 会话的 ID(可以在启动时找到)以关闭指定的会话。例如:

  4. ./yarn-session.sh -id <session_id> stop
  5. 其中 <session_id> 是你要关闭的 YARN 会话的 ID。通过提供会话的 ID,你可以确保关闭正确的会话。

  6. 等待脚本执行完成。脚本将向 ResourceManager 发送关闭请求,并等待会话成功终止。一旦会话关闭,你将在终端上看到相应的消息。

请确保提供正确的会话 ID,以免关闭错误的会话。如果你不确定会话 ID,可以使用 yarn application -list 命令来列出当前运行的 YARN 应用程序,并找到你要关闭的会话的 ID。

在关闭 Flink YARN 会话之后,你可以通过 YARN ResourceManager 或 Flink Dashboard 来验证会话是否已成功终止。

单作业模式部署 

在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一
个单独的作业,从而启动一个 Flink 集群。

启动nc -lk 7777

bin/flink run -t yarn-per-job \(固定 

-c com.yuange.wc.WcDemoUnboundStreaming Flink-1.0-SNAPSHOT.jar 

-d 表示以"detached"模式运行 Flink 作业。在这种模式下,作业会在后台运行,不会阻塞当前终端,并且你可以关闭终端而不影响作业的运行。这对于长时间运行的作业或需要后台运行的作业非常有用。

-t 参数表示要运行的作业类型,其中 yarn-per-job 是一种作业运行模式。在 yarn-per-job 模式下,每个 Flink 作业都会启动一个独立的 YARN 会话,该会话运行作业并在作业完成后终止。这与 Flink 的 "YARN session cluster" 模式不同,YARN session cluster:在 YARN 集群上维护一个长时间运行的 Flink 集群。

客户端(xshell)停止不影响任务运行,在webUI点cancel job

应用模式部署 

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。 
1)命令行提交 

bin/flink run-application -t yarn-application -c com.yuange.wc.WcDemoUnboundStreaming Flink-1.0-SNAPSHOT.jar 

2)上传HDFS 提交 
可以通过yarn.provided.lib.dirs 配置选项指定位置,将 flink 的依赖上传到远程。 
(1)上传flink 的lib 和 plugins 到HDFS 上

yarn每次都要上传Flink自身的依赖到HDFS,不如自己先上传

http://hadoop1:9870/explorer.html#/

bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop1:8020/flink-dist" -c com.yuange.wc.WcDemoUnboundStreaming hdfs://hadoop1:8020/flink-jars/Flink-1.0-SNAPSHOT.jar

☄️历史服务器

运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。 
Flink 提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我都知道只有当作业处于运行中的状态,才能够查看到相关的 WebUI 统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。 此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。 

hadoop fs -mkdir -p /logs/flink-job 

修改conf/flink-conf.yaml, 拉到下边


#==============================================================================
# HistoryServer
#==============================================================================# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
jobmanager.archive.fs.dir: hdfs:///hadoop1:8020/logs/flink-his# The address under which the web-based HistoryServer listens.
historyserver.web.address: hadoop1# The port under which the web-based HistoryServer listens.
historyserver.web.port: 8082# Comma separated list of directories to monitor for completed jobs.
historyserver.archive.fs.dir: hdfs:///hadoop1:8020/logs/flink-his# Interval in milliseconds for refreshing the monitored directories.
historyserver.archive.fs.refresh-interval: 5000

3)启动历史服务器 
bin/historyserver.sh start 
4)停止历史服务器 
bin/historyserver.sh stop 

🌋深入运行流程

☄️总体

        (1)JobMaster 

        JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job)。所以 JobMaster和具体的Job 是一一对应的,多个 Job 可以同时运行在一个Flink 集群中, 每个 Job 都有一个自己的JobMaster。需要注意在早期版本的 Flink 中,没有 JobMaster 的概念;而 JobManager 的概念范围较小,实际指的就是现在所说的 JobMaster。 
        在作业提交时,JobMaster 会先接收到要执行的应用。JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager上。

        (2)资源管理器(ResourceManager) 
        ResourceManager 主要负责资源的分配和管理,在Flink 集群中只有一个。所谓“资源”,
主要是指TaskManager 的任务槽(task slots)。任务槽就是Flink 集群中的资源调配单元,包含
了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上
执行。 
        这里注意要把 Flink 内置的 ResourceManager 和其他资源管理平台(比如 YARN)的
ResourceManager 区分开。 
        (3)分发器(Dispatcher) 
        Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher 也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。 
       4)任务管理器(TaskManager)

        TaskManager 是Flink 中的工作进程,数据流的具体计算就是它来做的。Flink 集群中必须至少有一个TaskManager;每一个TaskManager 都包含了一定数量的任务槽(task slots)。Slot是资源调度的最小单位,slot 的数量限制了TaskManager 能够并行处理的任务数量。 

        启动之后,TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后,TaskManager 就会将一个或者多个槽位提供给JobMaster 调用,JobMaster 就可以分配任务来执行了。 
        在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager交换数据。 

☄️核心概念

并行度设置


        当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。 
        在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行

        这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。 
        例如:如上图所示,当前数据流中有source、map、window、sink 四个算子,其中sink 算
子的并行度为1,其他算子的并行度都为 2。所以这段流处理程序的并行度就是 2。 

(1)代码中设置 
我们在代码中,可以很简单地在算子后跟着调用setParallelism()方法,来设置当前算子的
并行度: 

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); 

这种方式设置的并行度,只针对当前算子有效。 

另外,我们也可以直接调用执行环境的 setParallelism()方法,全局设定并行度: 

env.setParallelism(2); 

这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行
度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。 

🌌:这里要注意的是,由于 keyBy 不是算子,所以无法对keyBy 设置并行度,并行度通常是针对可以并行执行的算子的属性,用于确定同时执行多个任务的数量。对于 keyBy 来说,它并不直接执行任何计算,而是仅仅重新组织数据流以便后续的操作。实际的并行度设置通常是在后续的算子上

(2)提交应用时设置 
在使用 flink run 命令提交应用时,可以增加-p 参数来指定当前应用程序执行的并行度,
它的作用类似于执行环境的全局设置: 

bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount  
./FlinkTutorial-1.0-SNAPSHOT.jar 

3) 配置文件中设置 
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度: 
parallelism.default: 2 
这个设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是提交时的-p 参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU 核心数

算子链

1)一对一(One-to-one,forwarding) 
        这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map算子,source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-to-one 的对应关系。这种关系类似于Spark 中的窄依赖。 

2)重分区(Redistributing) 
        在这种模式下,数据流的分区会发生改变。比如图中的map 和后面的keyBy/window 算子之间,以及keyBy/window 算子和Sink 算子之间,都是这样的关系。 
        每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于 Spark 中的 shuffle


        在Flink 中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分,如下图所示。每个task 会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。

Flink 默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:

// 禁用算子链 
.map(word -> Tuple2.of(word, 1L)).disableChaining(); // 从当前算子开始新链 
.map(word -> Tuple2.of(word, 1L)).startNewChain()

任务槽

很显然,TaskManager 的计算资源是有限的,并行的任务越多,每个线程的资源就会越少。(一锅饭吃的人越多,每个人吃的越少)

那一个 TaskManager 到底能并行处理多少个任务呢?为了控制并发量,我们需要在TaskManager 上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽(task slots)。 

每个任务槽(task slot)其实表示了 TaskManager 拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。

假如一个TaskManager有三个slot,那么它会将管理的内存平均分成三份,每个slot独自占据一份。这样一来,我们在slot上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。
所以现在我们只要2个TaskManager,就可以并行处理分配好的5个任务了。

在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中,可以设置TaskManager 的 slot 数量,默认是 1 个 slot。 

taskmanager.numberOfTaskSlots: 8 

需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将slot 数量配置为机器的CPU 核心数,尽量避免不同任务之间对CPU 的竞争。这也是开发环境默认并行度设为机器CPU 数量的原因

当我们将资源密集型和非密集型的任务同时放到一个 slot 中,它们就可以自行分配对资源占用的比例,从而保证最重的活平均分配给所有的 TaskManager。 

当然,Flink 默认是允许 slot 共享的,如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,我们也可以通过设置“slot 共享组”手动指定:

 .map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");

任务槽的共享组_bilibili   有点混,需要用的时候看几分钟

任务槽和并行度的关系 

任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力 , 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置;而并行度是动态概念,也就是 TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置

举例说明:假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9 个task slot,表示集群最多能并行执行9 个同一算子的子任务。 
而我们定义word count 程序的处理操作是四个转换算子: 
source→ flatmap→ reduce→ sink 
        当所有算子并行度相同时,容易看出source 和flatmap 可以合并算子链,于是最终有三个
任务节点。 

        如果我们没有任何并行度设置,而配置文件中默认parallelism.default=1,那么程序运行的默
认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot
只有1个。9个slot只用了1个,有8个空闲

作业并行度设置为2,那么总共有6个任务,共享任务槽之后会占用2个slot。同样,就有7个slot空闲,计算资源没有充分利用。所以可以看到,设置合适的并行度才能提高效率

怎样设置并行度效率最高呢?当然是需要把所有的slot都利用起来。考虑到slot共享,我们可以直接把并行度设置为9,这样所有27个任务就会完全占用9个slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。

另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置sink算子的并行度为1。这时其他的算子并行度依然为9,所以总共会有19个子任务。根据slot共享的原则,它们最终还是会占用全部的9个slot,而sink任务只在其中一个slot上执行。

☄️作业提交流程

Standalone 会话模式作业提交流程

逻辑流图——作业流图——执行流图——物理流图

Web UI,点击作业就能看到对应的作业图

☄️ Yarn 应用模式作业提交流程 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/160011.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计 基于协同过滤算法的白酒销售系统的设计与实现 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

IP 协议的相关特性(部分)

IP 协议的报文格式 4位版本号&#xff1a; 用来表示IP协议的版本&#xff0c;现有的IP协议只有两个版本&#xff0c;IPv4&#xff0c;IPv6。 4位首部长度&#xff1a; 设定和TCP的首部长度一样 8位服务类型&#xff1a; &#xff08;真正只有4位才有效果&#xff09;&#xf…

【LeetCode 算法专题突破】双指针(⭐)

文章目录 前言1. 移动零题目描述代码 2. 复写零题目描述代码 3. 快乐数题目描述代码 4. 盛最多水的容器题目描述代码 5. 有效三角形的个数题目描述代码 6. 三数之和题目描述代码 7. 四数之和题目描述代码 总结 前言 学算法入门必学的一个章节&#xff0c;双指针算法&#xff0…

2023/10/15总结

学习总结 最近开始写项目了&#xff0c;然后写的过程中遇到了跨域问题。 为什么会出现跨域问题 由于浏览器的同源策略限制。同源策略是一种约定&#xff0c;它是浏览器最核心也是最基本的安全功能。如果缺少了同源策略&#xff0c;那么浏览器的正常功能可能都会收到影响。所谓…

雷电模拟器上使用第一个frida(四)第一个HOOK

经过上述三篇&#xff0c;已经可以使用python3.8.10编写代码&#xff0c;利用frida14.2.18和雷电模拟器9.0.60(9)&#xff0c;Android 9交互。 雷电模拟器上使用第一个frida&#xff08;一&#xff09;之安装-CSDN博客 雷电模拟器上使用第一个frida&#xff08;二&#xff09…

【git的使用方法】——上传文件到gitlab仓库

先进入到你克隆下来的仓库的目录里面 比如&#xff1a;我的仓库名字为zhuox 然后将需要上传推送的文件拷贝到你的克隆仓库下 这里的话我需要拷贝的项目是t3 输入命令ls&#xff0c;就可以查看该文件目录下的所有文件信息 然后输入git add 文件名 我这边输入的是 &#x…

windows内网渗透正向代理

内网渗透正向代理 文章目录 内网渗透正向代理1 正向代理图2 环境准备2.1 正向代理需求&#xff1a; 3 网卡配置3.1 【redream】主机3.2 【base】主机双网卡3.3 【yvkong】网卡设置 4 启动4.1【redream】网卡配置&#xff1a;4.2【base】网卡配置&#xff1a;4.3【yvkong】网卡地…

MQTT整合

MQTT整合 MQTT服务器软件筛选MQTT服务器软件mosquitto下载修改mosquitto配置,并启动mosquitto服务利用mosquitto工具测试订阅与发布可视化MQTT客户端工具MQTTX使用SpringBoot整合MQTT1.2.3.4.5.6.MQTT服务器软件筛选 MQ遥测传输(MQTT)是轻量级基于代理的发布/订阅的消息传输…

以数智化指标管理,驱动光伏能源行业的市场推进

近年来&#xff0c;碳中和、碳达峰等降低碳排放、提升环境健康度的政策和技术改进正在不断地被社会所认可和引起重视&#xff0c;也被越来越多的企业在生产运营和基础建设中列为重要目标之一。而光伏能源行业作为全球绿色能源、新能源的优秀解决方案&#xff0c;充分利用太阳能…

Android Studio SDKGradleJDK等工具的正确使用

AS在安装使用过程中可能会占用C盘大量空间&#xff0c;对于C盘容量本来就小的人来说非常不友好&#xff0c;其实我们可以自定义安装路径 software development kit安卓软件开发包 Android SDK是一种免费的专门编程语言&#xff0c;允许您创建Android应用程序。Android SDK由谷…

Day1力扣打卡

打卡记录 最长相邻不相等子序列 I&#xff08;脑筋急转弯&#xff09; 链接 思路&#xff1a;形如 11100110001 要达到最大&#xff0c;必须在重复数字选出一个&#xff0c;即在111中取一个1&#xff0c;在00中取一个0&#xff0c;以此类推最终便得到最长相邻不相等子序列。 c…

【Linux】shell运行原理及权限

主页点击直达&#xff1a;个人主页 我的小仓库&#xff1a;代码仓库 C语言偷着笑&#xff1a;C语言专栏 数据结构挨打小记&#xff1a;初阶数据结构专栏 Linux被操作记&#xff1a;Linux专栏 LeetCode刷题掉发记&#xff1a;LeetCode刷题 算法&#xff1a;算法专栏 C头疼…

Linux入门攻坚——3、基础命令学习-文件管理、别名、glob、重定向、管道、用户及组管理、权限管理

文件管理&#xff1a;cp&#xff0c;mv&#xff0c;rm cp&#xff1a;复制命令&#xff0c;copy cp [OPTION]... [-T] SRC DEST cp [OPTION]... SRC... DIRECTORY cp [OPTION]... -t DIRECTORY DEST... 如果目标不存在&#xff0c;新建DEST&#xff0c;并将…

笔试算法题ACM模式输入输出处理

1. Python input之后得到的全是string类型&#xff0c;数字需要用int(n)进行转换 读取单个数 n int(input()) 读取一串数组&#xff1a; nums [int(n) for n in input().split()] &#xff08;nums是个数组&#xff09; 读取字符串&#xff1a; stringinput().split(…

qt笔记之qml下拉标签组合框增加发送按钮发送标签内容

qt笔记之qml下拉标签组合框增加发送按钮发送标签内容 code review! 文章目录 qt笔记之qml下拉标签组合框增加发送按钮发送标签内容1.运行2.文件结构3.main.qml4.main.cc5.MyClass.h6.MyClass.cc7.CMakeLists.txt8.ComboBox.pro9.qml.qrc 1.运行 2.文件结构 3.main.qml 代码 …

【1314. 矩阵区域和】

目录 一、题目描述二、算法思想三、代码实现 一、题目描述 二、算法思想 三、代码实现 class Solution { public:vector<vector<int>> matrixBlockSum(vector<vector<int>>& mat, int k) {//先预处理数组int nmat.size();//行int mmat[0].size();…

python--短路运算,把0、空字符串和None看成 False,其他数值和非空字符串都看成 True

代码 print(3 and 4 and 5) # 5 print(5 and 6 or 7) # 6 4 > 3 and print(‘hello world’) # 输出hello world 注释&#xff1a; 在逻辑运算中&#xff0c;不一定逻辑运算符的两边都是纯表达式。也可以是数值类型的数据。 Python把0、空字符串和None看成 False&#xff…

STM32 ---- 再次学习STM32F103C8T6/STM32F409IGT6

目录 一、环境搭建及介绍 关于STM32基础介绍 新建工程 外设案例 LED流水灯 蜂鸣器 上拉电阻和下拉电阻知识 电压比较器 c语言基础知识 类型、结构体、枚举 类型int8_t int16_t int32_t 宏替换 #define 和typedef用法 结构体两种填充方法 和 命名规则 枚举用法 常用…

FPGA复习(功耗)

减小功耗 就得减小电流 电流和CF有关&#xff08; C: 电容&#xff08;被门数目和布线长度影响&#xff09; F:时钟频率&#xff09; 方法大纲 减小功耗&#xff1a;1 时钟控制 2输入控制 3减小供电电压 4双沿触发器 5修改终端 同步数字电路降低动态功耗&#xff1a;动态禁止…

langchain callback学习

文章目录 一.quickstart二.Async callbacks三.把回调消息写入文件如何提取出来正则表达式官方的方法 一.quickstart 官方文档在此&#xff1a;https://python.langchain.com/docs/modules/callbacks/ 发现自己之前用langchain都是纯纯只是跑通&#xff0c;要能灵活使用还是得深…