【基础篇】七、Flink核心概念

文章目录

  • 1、并行度
  • 2、并行度的设置
  • 3、算子链
  • 4、禁用算子链
  • 5、任务槽
  • 6、任务槽和并行度的关系

1、并行度

要处理的数据量很多时,可以把一个算子的操作(比如前面demo里的flatMap、sum),"复制"多份到多个节点,数据来了以后可以到任意一个节点执行。即将一个算子任务拆分成多个并行的子任务,再分发到不同的节点上执行,实现真正的并行计算。(好绕口,就是把一个活儿让好几个Task节点共同去做)

在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行

在这里插入图片描述

某一个算子的子任务的个数被称之为其并行度(parallelism)。 一条流水线上,几个人在同时干着打螺丝,几个人在同时处理着焊电路板。同一个程序,不同的算子,可以有不同的并行度。一个流程序的并行度,可以认为就是其所有算子中最大的并行度。如上图,source、map、window、sink四个算子,sink为1,其余为2,则这个流处理程序的并行度为2。

2、并行度的设置

方式一:代码中设置

算子后跟着调用setParallelism()方法为某一个算子设置并行度

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);   //map算子并行度为2

执行环境对象后面调setParallelism()方法设置全局并行度,对所有算子生效

env.setParallelism(2);

一般不设全局,会导致无法动态扩容。

方式二:提交应用时指令中设置

-p参数来指定当前应用程序执行的并行度,类似上面的全局设置

bin/flink run –p 2 –c com.plat.SocketStreamWordCount  ./FlinkDemo-1.0-SNAPSHOT.jar

这种和Web控制台设置一个意思:

在这里插入图片描述

方式三:配置文件中设置

在集群的配置文件flink-conf.yaml中直接更改默认并行度:

parallelism.default: 2

这个设置对于整个集群上提交的所有作业有效,初始值为1。当代码中没设置、提交时没指定,就用这个配置文件的。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数

在这里插入图片描述

最后,本地调试想看控制台界面,可创建本地环境执行对象,用于本地调试:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createlocalEnvironmentWithwebuI(new Configuration());

访问localhosr:8081,socket是特殊的,只能是1,改不了,其余算子均为4。

在这里插入图片描述
最后,这几种方式的优先级为:代码中为某算子单独设定 > 代码中执行环境对象全局设置 > 提交时指定 > 配置文件

3、算子链

一个数据流,数据在各种算子之间传输的形式可能是一对一(one-to-one)的直通(forwarding),也可能是打乱的重分区(redistributing)。

在这里插入图片描述

一对一(One-to-one,forwarding)

如上图,source算子读完数据后,可以直接发给map算子接着处理。map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,即一对一,一个算子的task和一个算子的task数据一样。特点是:

  • 数据不需要重新分区
  • 数据不需要调整顺序

重分区(Redistributing)

和一对一的直流相反,此时数据的分区会发生改变,如图中,map完数据后,直接keyBy/window(注意keyBy自身不是算子),按key分组了。也就是每一个算子的子任务task,会根据某些规则,把数据发送到不同的下游task,从而引起了数据重分区。

合并算子链

在Flink中,并行度相同一对一(one to one)算子操作,可以直接连接在一起形成一个大的任务(task),每个task又会被一个线程执行,即算子链。合并的条件:

  • 两算子并行度相等(子任务数量一样)
  • 两算子为one to one的直流关系

在这里插入图片描述

上图中Source和map之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有5个任务,由5个线程并行执行合并算子链的机制,可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

4、禁用算子链

Flink默认会按照算子链的原则进行链接合并,但有的场景不适合合并,比如:

  • 两个算子串在一起,它们的子任务task搭配形成n组(n为并行度),每组共用一个线程,但如果两个算子本身计算任务都很重,那就不适合串一起,就像两个脾气都差的人合租,此时应该断开算子链
  • 当出现错误,需要定位问题是哪个算子时,就要禁用算子链

全局禁用算子链:

//env为执行环境对象
env.disableOperatorChaining();

disableChaining方法可只给某个算子设置禁用算子链,那它和它前后的算子就都不能再组成算子链(控制台上UI会显示Forward,表明本来是一对一的算子链关系)

.map(word -> Tuple2.of(word, 1L)).disableChaining();

在这里插入图片描述

startNewChain方法,从当前算子开始新链,即只和前面的算子断开,和后面的算子能串一起的话还是会串

// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

5、任务槽

Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行子任务。但每个TaskManager总的计算资源有限,并行任务越多,每个线程能分到的可用资源就越少,为了限制TaskManager能并行处理的最大任务数,提出任务槽(task slots)的概念,对TaskManager上对每个任务运行所占用的资源做出明确的划分。一锅饭,能盛6碗,谁来都夹一筷子,谁都吃不饱,因此,锅前放6个碗,也就是分为6碗饭,来一个人,就端走一碗,端没了别人就去其他锅,分到饭的六个人,也不用和别人抢,且能吃饱。这个碗就是任务槽。

在这里插入图片描述

比如一个TaskManager上有三个slot,那就把这个TaskManager的内存资源分为三份,一个插槽一份。如此,在插槽上执行一个子任务时,就相当于划定了一块内存给这个子任务专款专用,不需要和其他子任务去竞争内存资源。前面提到的合并成算子链后的5个子任务,两个TaskManager就可实现,如上图。

任务槽数量的设置

在flink安装目录的conf/flink-conf.yaml配置文件中,可以设置每个TaskManager的slot数量,默认是1个slot。

taskmanager.numberOfTaskSlots: 8

slot目前仅仅用来隔离内存,不会涉及CPU的隔离在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。(这就像合租,内存就像卧室,厕所就像CPU,三个人,三间房,但一个厕所也够用,类比CPU时间片和线程切换)

子任务task对任务槽的共享

上面讲到,一人一碗饭,一个子任务一个插槽。而插槽的共享,就是放宽了政策,不同类型的算子,它们的并行子任务允许放到同一个插槽上并行执行(注意,依旧并行)。如下图,两个TaskManager,6个插槽,每个插槽上的子任务对应的算子种类都不一样。

在这里插入图片描述

如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。slot共享的好处在于:

  • 活儿大致平均分配到了所有的TaskManager
  • slot有好几种算子的子任务,组合起来就是一个完整的作业管道或者流。此时,即使某个TaskManager宕机,其他节点也不受影响,作业继续执行

如果不希望默认的slot共享,比如需要让某个算子的task独享一个slot,就可以设置slot共享组

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("taskTest");

只有属于同一个slot共享组的子任务,才会开启slot共享,这个组默认是default,不同slot共享组之间的任务是完全隔离的,必须分配到不同的slot上。

6、任务槽和并行度的关系

  • 任务槽slot是一个静态概念,表示最大的并发上限。假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示集群最多能并行执行同一算子的9个子任务。
  • 并行度是一个动态概念,表示实际运行占用了几个。比如并行度为4,即这个算子有4个子任务task,需要放在4个插槽上。此时,并行度为4,slot为9。

Job运行时,必须插槽slot的数量必须大于等于并行度,否则任务运行失败:NoResourceAvailableException:could not acquire the minimun required resources 。注意Yarn等模式部署时,会动态申请TaskManager,申请的TM的数量 = job并行度 /每个TM的slot的数量,向上取整。

比如,某算子并行度为10,即它有10个task要放在不同的插槽上,此时插槽有9个,那就不能运行,而不是9个跑完再让第十个执行。再比如,一个Flink程序中定义了4个算子:

source→ flatmap→ reduce→ sink

前提: flink-conf.yaml中taskmanager.numberOfTaskSlots数量为3(建议为CPU核心数),假设TaskManager数量也为3,即插槽有3*3=9个

Case1:并行度parallelism.default=1

在这里插入图片描述

分析:4种算子,并行度为1 ⇒ 其中两个形成算子链算一个 ⇒ 三个子任务 ⇒ 同一作业的不同种类的算子的任务,共享任务槽 ⇒ 总共占用一个插槽,剩8个可用

Case2:全局并行度为2

在这里插入图片描述

分析:三种算子,并行度为2 ⇒ 其中两个形成算子链算一个 ⇒六个子任务 ⇒ 插槽共享 ⇒ 总共占用2个插槽,剩7个可用 ⇒ 计算机资源利用不充分,设置合适的并行度才能提高效率

Case3:全局并行度为9

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 插槽共享 ⇒ 占九个

在这里插入图片描述

Case4:全局set为9,但sink算子set为1

在这里插入图片描述

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 29 + 11 = 19个子任务 ⇒ 插槽共享

最后,可以看到,整个流处理程序的并行度,就是所有算子并行度的最大值,因为这代表了程序运行所需要的插槽slot的数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/159551.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

phpcms_v9模板制作及二次开发常用代码

0:调用最新文章,带所在版块 {pc:get sql"SELECT a.title, a.catid, b.catid, b.catname, a.url as turl ,b.url as curl,a.id FROM v9_news a, v9_category b WHERE a.catid b.catid ORDER BY a.id DESC " num"15" cache"300"} {lo…

postman接口测试

HTTP的接口测试工具有很多,可以进行http请求的方式也有很多,但是可以直接拿来就用,而且功能还支持的不错的,我使用过的来讲,还是postman比较上手。 优点: 1、支持用例管理 2、支持get、post、文件上传、响…

计网面试复习自用

五层: 应用层:应用层是最高层,负责为用户提供网络服务和应用程序。在应用层,用户应用程序与网络进行交互,发送和接收数据。典型的应用层协议包括HTTP(用于网页浏览)、SMTP(用于电子邮…

如何提高企业工作微信的管理效率?

微信作为一款拥有数亿用户的软件,其使用频率在全国范围内居高不下。随着企业的不断发展,微信在工作中的应用也变得越来越广泛。为了更好地服务客户并提升业务效益,企业通常会为新入职员工配置工作微信以便于业务沟通和客户服务。然而&#xf…

iCloud涨价不用慌!学会使用群晖生态将本地SSD“上云”

文章目录 前言本教程解决的问题是:按照本教程方法操作后,达到的效果是想使用群晖生态软件,就必须要在服务端安装群晖系统,具体如何安装群晖虚拟机请参考: 1. 安装并配置synology drive1.1 安装群辉drive套件1.2 在局域…

存在已打开的MicrosoftEdge浏览器,无法执行安装

存在问题:UiBot Creator 安装Chrome扩展时,存在已打开的MicrosoftEdge浏览器,无法执行安装。 解决办法: 打开MicrosoftEdge浏览器,然后在浏览器页面右上角打开“…”图标 第二步,打开“…”图标之后&…

MAC上使用Wireshark常见问题

文章目录 介绍正文启动异常-Permission denied解决方法 过滤协议和地址指定源地址和目的地址调整 time format 介绍 简单记录Wireshark在日常使用过程中的遇到的小case。 正文 Wireshark相较于tcpdump使用较为简单,交互也更为友好。 点击Start即可启动抓包 启动…

【OpenCv光流法进行运动目标检测】

opencv系列文章目录 文章目录 opencv系列文章目录前言一、光流法是什么?二、光流法实例1.C的2.C版本3.python版本 总结 前言 随着计算机视觉技术的迅猛发展,运动目标检测在图像处理领域中扮演着至关重要的角色。在现实世界中,我们常常需要追…

Mysql5.7大限将至升级Mysql 8.0过程记录(未完)

一、前言 时间很快,到2023年10月底,MySQL 5.7就到了它的EOL(End of Life),届时将不会提供任何补丁,无法应对潜在的安全风险;是时候和 MySQL 5.7 说再见了!!!&…

C++语言实现网络爬虫详细代码

当然&#xff01;下面是一个用C语言实现的基本网络爬虫的详细代码示例&#xff1a; #include <iostream> #include <string> #include <curl/curl.h> size_t writeCallback(void* contents, size_t size, size_t nmemb, std::string* output) {size_t totalS…

强化科技创新“辐射力”,中国移动的数智化大棋局

作者 | 曾响铃 文 | 响铃说 丝滑流畅的5G连接、每时每刻的数字生活服务、无处不在的智能终端、拟人交流的AI助手、梦幻般的XR虚拟现实、直接感受的裸眼3D…… 不知不觉&#xff0c;那个科幻片中的世界&#xff0c;越来越近。 数智化新世界的“气氛”&#xff0c;由一个个具…

GEE 18:基于GEE平台的土地荒漠化监测与分析【论文复现】

Desertification 1. 研究背景1.1 参考论文1.2 参数获取1.2.1 NDVI1.2.2 Albedo1.2.3 Normalizing indices1.2.4 Calculating the quantitative relationship1.2.5 Calculating DDI2. GEE2.1 数据2.2 GEE code2.2.1 Study region2.2.2 Reomove cloud for Landsat-82.2.3 Calcula…

CISA 彻底改变了恶意软件信息共享:网络安全的突破

在现代网络安全中&#xff0c;战术技术和程序&#xff08;TTP&#xff09;的共享对于防范网络事件至关重要。 因此&#xff0c;了解攻击向量和攻击类型之间的关联如今是让其他公司从其他公司遭受的 IT 事件中受益&#xff08;吸取经验教训&#xff09;的重要一步。 美国主要网…

PyTorch入门教学——使用PyCharm创建一个PyTorch项目

首先需要创建好PyTorch的虚拟环境&#xff0c;步骤&#xff1a;PyTorch入门教学——简介与环境配置-CSDN博客打开PyCharm&#xff0c;新建项目&#xff0c;选择项目的存放位置。选择先前配置的解释器&#xff0c;也就是虚拟环境中的解释器。&#xff08;记住创建的虚拟环境所在…

【Express】服务端渲染(模板引擎 EJS)

EJS&#xff08;Embedded JavaScript&#xff09;是一款流行的模板引擎&#xff0c;可以用于在Express中创建动态的HTML页面。它允许在HTML模板中嵌入JavaScript代码&#xff0c;并且能够生成基于数据的动态内容。 下面是一个详细的讲解和示例&#xff0c;演示如何在Express中…

Linux:Mac VMware Fusion13以及CentOS7安装包

Linux&#xff1a;Mac VMware Fusion13以及CentOS7安装包 1. Mac VMware Fusion132. CentOS7安装包3. 安装 1. Mac VMware Fusion13 下载官网地址&#xff1a;https:www.vmware.com/products/fusion/fusion-evaluation.html 2. CentOS7安装包 注意是m芯片需要使用arm架构的i…

手动下载/安装Xcode的simulator

目录 前言解决方案1.获取simulator包下载地址1.1 Apple后台1.2 手动 2.使用三方下载工具下载3.使用命令安装simulator 前言 Xcode某个版本更新之后不带iOS的Simulator,导致全新下载一个Xcode后没法编译项目.公司的网又很坑,每次断掉点重试都重新下载,导致完全没法下下来.特别影…

论文阅读:Seeing in Extra Darkness Using a Deep-Red Flash

论文阅读&#xff1a;Seeing in Extra Darkness Using a Deep-Red Flash 今天介绍的这篇文章是 2021 年 ICCV 的一篇 oral 文章&#xff0c;主要是为了解决极暗光下的成像问题&#xff0c;通过一个深红的闪光灯补光。实现了暗光下很好的成像效果&#xff0c;整篇文章基本没有任…

Go-Python-Java-C-LeetCode高分解法-第十周合集

前言 本题解Go语言部分基于 LeetCode-Go 其他部分基于本人实践学习 个人题解GitHub连接&#xff1a;LeetCode-Go-Python-Java-C 欢迎订阅CSDN专栏&#xff0c;每日一题&#xff0c;和博主一起进步 LeetCode专栏 我搜集到了50道精选题&#xff0c;适合速成概览大部分常用算法 突…

FutureTask的测试使用和方法执行分析

FutureTask类图如下 java.util.concurrent.FutureTask#run run方法执行逻辑如下 public void run() {if (state ! NEW ||!RUNNER.compareAndSet(this, null, Thread.currentThread()))return;try {Callable<V> c callable;if (c ! null && state NEW) {V res…