Flink之Task解析

Flink之Task解析

  对Flink的Task进行解析前,我们首先要清楚几个角色TaskManagerSlotTaskSubtaskTaskChain分别是什么

角色注释
TaskManager在Flink中TaskManager就是一个管理task的进程,每个节点只有一个TaskManager
SlotSlot就是TaskManager中的槽位,一个TaskManager中可以存在多个槽位,取决于服务器资源和用户配置,可以在槽位中运行Task实例
Task其实Task在Flink中就是一个类,其中可以包含一个或多个算子,这个取决于算子链的构成
SubTaskSubTask就是Task类的并行实例可以是一个或多个,也就是说当代码执行的那一刻开始,就根据用户所设置或者默认的并行度创建出多个SubTask
TaskChainTaskChain就是算子链,何为算子链?就是在一个Task实例中出现的串行算子,算子间必须是OneToOne模式且并行度相同.

  上面对几个角色进行了一个简单的阐述,后面会结合图解和伪代码进行讲解,这里我们以计算中比较经典wordcount为例子,伪代码如下所示:

public class FLinkWordCount {public static void main(String[] args) throws Exception {// 创建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\// 设置并行度3env.setParallelism(3)// 读取数据文件DataStreamSource<String> streamSource = env.readTextFile("xxx");// 转大写DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())// 转成tuple2格式,计数1SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));// 按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);// 求和keyed.sum("f1")env.execute();}
}

  上面的代码中我们使用了两次map,一次keyBy,一次sum算子,我们下面就结合这几个算子进行讲解,讲解之前有两个条件需要先记住:

  • 同一个Task并行实例不能放在同一个TaskSlot上运行,一个TaskSlot上可以运行多个不同的Task并行实例
  • 同一个共享组的算子允许共享槽位,不同共享组的算子决不允许共享槽位

  上面这两句话一定要记牢,以便于后面的理解.

算子链划分及Task槽位分配

算子链划分

可以根据上面的代码理解下图:
在这里插入图片描述

上图中我们可以看到两个map组成一个task chain,keyBysum组成一个task chain,这里说一下原因,首先就是两个map的并行度是一致的,而且是OneToOne模式,所以可以将两个map绑定成一个算子链,并将其放入到一个SubTask中,而到了keyBy这里为什么不能再放入到一个task chain中,这里我们可以思考一下,keyBy时会发生什么?以spark的角度来说会发生shuffle对吧,这就导致了不能满足OneToOne的模式,简单来说我们也可以想清楚,如果keyBymap组成一个task chain那么还怎么做wordcount?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bckucHbv-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task2.png)]

通过上图应该很容易理解了.

Task槽位分配

  上面讲了关于task chain怎么划分的,为什么这样划分,这里讲一下为什么同一个Task的并行实例(SubTask)不能在同一个task slot中.其实这个也很容易就想清楚,如果同一Task的多个SubTask都出现在一个task slot中那么还有什么意义呢?当这些SubTask出现在一个task slot中时就会发生串行计算,那并行的意义也就没有了.

  同时这种机制也保证了任务的容错性,也就是说对于同一个Task一旦某一个task slot出现异常的情况,其他的task slot中的SubTask还能正常运行,如果将这些SubTask放到一个task slot中,当这个task slot出现异常情况时,就会影响整个任务的执行.

  总结来说,这种设计保证了Flink任务的隔离性、容错性、资源利用性.这里用图解的方式便于大家记忆,如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rlgqeo6A-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task3.png)]

槽位共享及算子链断/连

槽位共享

  前面讲过同一个Task的多个SubTask不能出现在一个task slot中,但是不同TaskSubTask是可以共享同一个task slot的,但是在Flink中有一个机制,就是用户(开发人员)可以自定义不同的算子间是否可以共享同一个task slot,如上面的例子中两个map的并行度一致并且符合OneToOne的模式,在正常情况下必然会会分到一个task chain中,但是Flink给用户提供了的slot group的概念,也就是说用户可以将这两个map分配到不同的slot group中,这种情况下两个map就不会划分到一个task chain中,试想一下当两个map都不允许共享同一个task slot时,怎么可能划分到同一个task chain中呢?

  伪代码如下:

public class FLinkWordCount {public static void main(String[] args) throws Exception {// 创建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\// 设置并行度3env.setParallelism(1)// 读取数据文件DataStreamSource<String> streamSource = env.readTextFile("xxx");// 转大写DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())// 通过slotSharingGroup()将upperCaseSource作为一个分组"g1"SingleOutputStreamOperator<String> slotGroup1 = upperCaseSource.slotSharingGroup("g1");// 转成tuple2格式,计数1SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));// 通过slotSharingGroup()将mapStream作为一个分组"g3"SingleOutputStreamOperator<Tuple2<String, Integer>> slotGroup2 = mapStream.slotSharingGroup("g2");// 按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);// 求和keyed.sum("f1")env.execute();}
}

上面的代码中我们将upperCaseSourcemapStream分成了两个task slot,这样两个map就不可以共享相同的task slot,同时代码中将并行度改为了1,这样便于图解,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-saLgMu0Q-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task4.png)]
如果说集群中总task slot只有3个,并且在代码中两个map设置了不同的task slot且两个map的并行度都为3时会怎么样?很简单,提交任务时就会报错,因为提交任务所需要的资源已经超出了集群的资源.

  这里说一下对于对task slot进行分组处理的实际用处,就以代码中两个map为例子,在实际的业务中如果两个map处理的数据量都极大,如果将两个map的计算都放到一个节点的一个task slot时会发生什么?数据的积压、任务异常失败等等都有可能发生,但是有slotSharingGroup我们就可以保证同一个task slot不会承载过大的计算任务,也就达到了资源合理分配的目的.

算子链断/连

  前面讲了关于将两个map进行slotSharingGroup后会将两个map划分到不同的task chain,如果有这样一个情况两个map满足OneToOne的模式且并行度相同时,我们不使用slotSharingGroup能否将两个map划分成不同的task chain?答案是当然可以的,Flink为我们提供了对应的API,伪代码如下:

public class FLinkWordCount {public static void main(String[] args) throws Exception {// 创建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\// 设置并行度3env.setParallelism(3)// 读取数据文件DataStreamSource<String> streamSource = env.readTextFile("xxx");// 转大写DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())// 转成tuple2格式,计数1SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));// 将mapStream划分到一个新的task chain中SingleOutputStreamOperator<Tuple2<String, Integer>> newTaskChainMapStream = mapStream.startNewChain();// 按照单词分组KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);// 求和keyed.sum("f1")env.execute();}
}

在上面代码中我们调用了startNewChain()后就可以将mapStream划分到一个新的task chain中,这样的情况下,两个map既属于不同的task chain又可以共享同一个task slot,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jOIlz8uH-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task5.png)]
以上就是对于Task的讲解,如有错误欢迎指出,如有问题共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/93786.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构与算法】十大经典排序算法-希尔排序

&#x1f31f;个人博客&#xff1a;www.hellocode.top &#x1f3f0;Java知识导航&#xff1a;Java-Navigate &#x1f525;CSDN&#xff1a;HelloCode. &#x1f31e;知乎&#xff1a;HelloCode &#x1f334;掘金&#xff1a;HelloCode ⚡如有问题&#xff0c;欢迎指正&#…

数据结构之动态内存管理机制

目录 数据结构之动态内存管理机制 占用块和空闲块 系统的内存管理 可利用空间表 分配存储空间的方式 空间分配与回收过程产生的问题 边界标识法管理动态内存 分配算法 回收算法 伙伴系统管理动态内存 可利用空间表中结点构成 分配算法 回收算法 总结 无用单元收…

机器学习、cv、nlp的一些前置知识

为节省篇幅&#xff0c;不标注文章来源和文章的问题场景。大部分是我的通俗理解。 文章目录 向量关于向量的偏导数&#xff1a;雅可比矩阵二阶导数矩阵&#xff1a;海森矩阵随机变量随机场伽马函数beta分布数学术语坐标上升法协方差训练集&#xff0c;验证集&#xff0c;测试集…

SpringBoot案例 调用第三方接口传输数据

一、前言 最近再写调用三方接口传输数据的项目&#xff0c;这篇博客记录项目完成的过程&#xff0c;方便后续再碰到类似的项目可以快速上手 项目结构&#xff1a; 二、编码 这里主要介绍HttpClient发送POST请求工具类和定时器的使用&#xff0c;mvc三层架构编码不做探究 pom.x…

43、TCP报文(一)

本节内容开始&#xff0c;我们正式学习TCP协议中具体的一些原理。首先&#xff0c;最重要的内容仍然是这个协议的封装结构和首部格式&#xff0c;因为这里面牵扯到一些环环相扣的知识点&#xff0c;例如ACK、SYN等等&#xff0c;如果这些内容不能很好的理解&#xff0c;那么后续…

SASS 学习笔记

SASS 学习笔记 总共会写两个练手项目&#xff0c;成品在 https://goldenaarcher.com/scss-study 可以看到&#xff0c;代码在 https://github.com/GoldenaArcher/scss-study。 什么是 SASS SASS 是 CSS 预处理&#xff0c;它提供了变量&#xff08;虽然现在 CSS 也提供了&am…

web集群学习:搭建 LNMP应用环境

目录 LNMP的介绍&#xff1a; LNMP组合工作流程&#xff1a; FastCGI介绍&#xff1a; 1、什么是 CGI 2、什么是 FastCGI 配置LNMP 1、部署LNMP环境 2、配置LNMP环境 LNMP的介绍&#xff1a; 随着 Nginx Web 服务的逐渐流行&#xff0c;又岀现了新的 Web 服务环境组合—…

面向对象编程(OOP):Python中的抽象与封装

文章目录 &#x1f340;引言&#x1f340; 类与对象&#x1f340;封装&#x1f340;继承&#x1f340;多态&#x1f340;面向对象编程的优势&#x1f340;使用面向对象编程的场景&#x1f340;实例化与构造函数&#x1f340; 成员属性和类属性&#x1f340;魔术方法&#x1f34…

初识Sentinel

目录 1.解决雪崩的方式有4种&#xff1a; 1.1.2超时处理&#xff1a; 1.1.3仓壁模式 1.1.4.断路器 1.1.5.限流 1.1.6.总结 1.2.服务保护技术对比 1.3.Sentinel介绍和安装 1.3.1.初识Sentinel 1.3.2.安装Sentinel 1.4.微服务整合Sentinel 2.流量控制 2.1.簇点链路 …

K8S调度

K8S调度 一、List-Watch 机制 controller-manager、scheduler、kubelet 通过 List-Watch 机制监听 apiserver 发出的事件&#xff0c;apiserver 通过 List-Watch 机制监听 etcd 发出的事件1.scheduler 的调度策略 预选策略/预算策略&#xff1a;通过调度算法过滤掉不满足条件…

什么是EM(最大期望值算法)

什么是EM(最大期望值算法) 在现实生活中&#xff0c;苹果百分百是苹果&#xff0c;梨百分白是梨。 生活中还有很多事物是概率分布&#xff0c;比如有多少人结了婚&#xff0c;又有多少人有工作&#xff0c; 如果我们想要调查人群中吸大麻者的比例呢&#xff1f;敏感问题很难得…

PHP8的字符串操作1-PHP8知识详解

字符串是php中最重要的数据之一&#xff0c;字符串的操作在PHP编程占有重要的地位。在使用PHP语言开发web项目的过程中&#xff0c;为了实现某些功能&#xff0c;经常需要对某些字符串进行特殊的处理&#xff0c;比如字符串的格式化、字符串的连接与分割、字符串的比较、查找等…

【Hyper-V】Windows11 家庭版怎么启用虚拟机Hyper-V

在电脑Windows11系统上启用虚拟机Hyper-V&#xff0c;打开 启用和关闭WIndows功能&#xff0c;找到其中一项Hyper-V&#xff0c;对于家庭版的系统用户来说&#xff0c;这个选项是没有的&#xff0c;接下来讲一讲怎么开启。 安装Hyper-V 创建一个文件名为Hyper-v.bat&#xff…

springcloud3 hystrix实现服务降级的案例配置2

一 服务降级的说明 1.1 服务降级说明 "服务器忙&#xff0c;请稍后在试"不让客户达等待&#xff0c;立即返回一个友好的提示。 1.2 服务降级的触发情况 1.程序运行异常&#xff1b; 2.超时&#xff1b; 3.服务熔断触发服务降级&#xff1b;4 .线程池/信号量打…

Web菜鸟教程 - Springboot接入认证授权模块

网络安全的重要性不言而喻&#xff0c;如今早已不是以前随便弄个http请求就能爬到数据的时代&#xff0c;而作为一个架构师&#xff0c;网络安全必须在产品开发之初就考虑好。因为在产品开发的后期&#xff0c;一方面是客户增多&#xff0c;压力变大&#xff0c;可供利用的时间…

[LeetCode]矩阵对角线元素的和

解题 思路 1: 循环,找到主对角线的下标和副对角线的下标,如果矩阵长或宽为奇数的时候,需要减去中间公共的那一个值,中间公共的那个数的下标为mat[mat.size()/2][mat.size()/2]副对角线的下标为 mat [i][mat.size()-i-1] class Solution { public:int diagonalSum(vector<ve…

2.阿里云对象存储OSS

1.对象存储概述 文件上传&#xff0c;是指将本地图片、视频、音频等文件上传到服务器上&#xff0c;可以供其他用户浏览或下载的过程。文件上传在项目中应用非常广泛&#xff0c;我们经常发抖音、发朋友圈都用到了文件上传功能。 实现文件上传服务&#xff0c;需要有存储的支持…

软考笔记——10.项目管理

进度管理 进度管理就是采用科学的方法&#xff0c;确定进度目标&#xff0c;编制进度计划和资源供应计划&#xff0c;进行进度控制&#xff0c;在与质量、成本目标协调的基础上&#xff0c;实现工期目标。 具体来说&#xff0c;包括以下过程&#xff1a; (1) 活动定义&#…

(stm32)低功耗模式

低功耗模式 执行哪个低功耗模式的程序判断流程 标志位设置操作一定要在WFI/WFE之前&#xff0c;调用此指令后立即进入睡眠判断流程 模式对比 睡眠模式 停止模式 待机模式

Effective C++学习笔记(8)

目录 条款49&#xff1a;了解new-handler的行为条款50&#xff1a;了解new和delete的合理替换时机条款51&#xff1a;编写new和delete时需固守常规条款52&#xff1a;写了placement new也要写placement delete条款53&#xff1a;不要轻忽编译器的警告条款54&#xff1a;让自己熟…