【Flink系列三】数据流图和任务链计算方式

上文介绍了如何计算并行度和slot的数量,本文介绍Flink代码提交后,如何生成计算的DAG数据流图。

程序和数据流图

  • 所有的Flink程序都是由三部分组成的:Source、Transformation和Sink
  • Source负责读取数据源,Transformation利用各种算子进行处理加工(Flink不区分transfer算子和action算子,统一都认为算子),Sink负责输出
  • 在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分
  • 每一个dataflow以一个或者多个Source开始,以一个或者多个sink结束。dataflow类似于任意的有向无环图(DAG)
  • 在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系

最终生成的数据流图

执行图(ExecutionGraph)

Flink中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph ->物理执行图

  • StreamGraph:是根据用户通过Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构
  • JobGraph:StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构。主要的优化为,将多个符合条件的节点chain在一起作为一个节点(注意这个符合条件的计算方式)
  • ExecutionGraph: Jobanager根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构
  • 物理执行图:JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的“图”, 并不是一个具体的数据结构。

用一张图表达

  1. 代码提交运行后,会在Client生成StreamGraph初始化版本,有一个操作就会生成一个算子任务
  2. keyby操作不会进行计算,只是简单的分区,aggregation操作才是计算,所以一开始keyby和Aggregation都是合并在一个算子任务中
  3. 将满足条件的算子合并成一个大任务(one-to-one),所以讲keybyAggregation 和Sink合并成一个任务
  4. 在JM上生成ExecutionGraph,按并行度将任务展开,通过ExecutionEdge连接
  5. 执行图和物理执行图已经非常相似了,目前只需要关心ExecutionGraph即可

数据传输形式

  • 一个程序中,不同的算子可能具有不同的并行度
  • 算子之间的传输数据的形式可以是one-to-one(forwarding)的模式,也可以是redistributing的模式,具体是哪一种形式,可以取决于算子的种
  1. One-to-One:Stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务生产的元素的个数、顺序相同,map、filter、flatmap等算子都是one-to-one的对应关系。
  2. Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如keyBy基于hashCode重分区、而broadcast和rebanlance会算计重新分区,这些神算子都会引起redistribute过程,而redistribute过程就类似于spark中的shuffle过程。

任务链(Operator Chains)

  • Flink采用了一种称为任务链的优化技术,可以在特定的条件下减少通过本地通信的开销。为了满足任务链的要求,必须将两个或者多个算子设为下个年头给你的并行度,通过本地转发(local forward)的方式进行连接
  • 相同并行度的one-to-one操作,Flink这样相连的算子链接在一起形成一个task,原来的算子称为里面的subtask
  • 并行度相同,并且是one-to-one操作,两个条件缺一不可

如下图,红框标注的 Forward代表数据是one-to-one的,可以进行任务合并,但是Hash和Reblance不行。(图中为了分开展示设置了不同的slotGroup)

如果不设置共享组的话,算子任务会合并

下面来看一下一个视图

大家可以看出,只有并行度相同,且one-to-one操作才能合并task

如果不想合并task呢,大家可以思考一下,这里给出答案

  1. 设置共享组(上一篇文章有介绍),但是这种方式会造成资源的浪费
  2. 通过disableOperatorChaining来设置,可以作用于env上(表达所有算子任务都不合并),作用于单个算子上时使用disableChaining或者startNewChain,具体使用看具体业务场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/213535.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux各目录结构说明

文章目录 目录说明源码放哪里?拓展:Linux里面安装软件是装在home目录还是opt目录还是/usr/local好? bin boot dev etc home lib lib64 lostfound media mnt opt proc root run sbin srv sys tmp usr var 目录说明 bin 存放二进制可执行文件&…

《Spring Cloud Alibaba 从入门到实战》分布式配置

分布式配置 1、简介 Nacos 提供用于存储配置和其他元数据的 key/value 存储,为分布式系统中的外部化配置提供服务器端和客户端支持。 Spring Cloud Alibaba Nacos Config 是 Config Server 和 Client 的替代方案,在特殊的 bootstrap 阶段,…

2023.12.4 关于 Spring Boot 统一异常处理

目录 引言 统一异常处理 异常全部监测 引言 将异常处理逻辑集中到一个地方,可以避免在每个控制器或业务逻辑中都编写相似的异常处理代码,这降低了代码的冗余,提高了代码的可维护性统一的异常处理使得调试和维护变得更加容易,通…

机器学习之无监督学习:九大聚类算法

今天,和大家分享一下机器学习之无监督学习中的常见的聚类方法。 今天,和大家分享一下机器学习之无监督学习中的常见的聚类方法。 在无监督学习中,我们的数据并不带有任何标签,因此在无监督学习中要做的就是将这一系列无标签的数…

Python实现PDF-Excel

轻松解决PDF格式转Excel(使用python实现) 实现思路: 要将PDF转换为Excel,可以使用以下步骤: 解析PDF内容:首先,需要使用Python中的第三方库(如PyPDF2、pdfminer等)来解…

Ribbon 饥饿加载

Ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载: 一、懒加载 Ribbon 默认为懒加载即在首次启动Application…

数据结构之插入排序

目录 前言 插入排序 直接插入排序 插入排序的时间复杂度 希尔排序 前言 在日常生活中,我们不经意间会遇到很多排序的场景,比如在某宝,某东上买东西,我们可以自己自定义价格是由高到低还是由低到高,再比如在王者某…

修改移远提供的GobiNet、quectel-CM源码,使其支持有方N720 4G模块

最近在研究imx6ull linux下4G模块驱动的移植,参考的移远ec20的移植方法,添加了GobiNet驱动,编译了quectel-CM工具,并且可以正常拨号,分配到ip,如下: ping外网也没有压力,如下…

Qt12.8

使用手动连接,将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中,在自定义的槽函数中调用关闭函数 将登录按钮使用qt5版本的连接到自定义的槽函数中,在槽函数中判断ui界面上输入的账号是否为"admin",密码是否为…

使用Pytorch实现Grad-CAM并绘制热力图

这篇是我对哔哩哔哩up主 霹雳吧啦Wz 的视频的文字版学习笔记 感谢他对知识的分享 看一下这个main cnn.py的文件 那这里我为了方便 就直接从官方的torch vision这个库当中导入一些我们常用的model 比如说我这里的例子是采用的mobile net v3 large这个模型 然后这里我将pretrain设…

openEuler 20.03 (LTS-SP2) aarch64 cephadm 部署ceph18.2.0【1】离线部署 准备基础环境

准备3台虚拟机服务器(均可访问公网) 10.2.1.176 (作为操作机) 10.2.1.191 10.2.1.219 安装基础工具 yum install -y vim 配置hosts 编辑/etc/hosts,添加 10.2.1.176 ceph-176 10.2.1.191 ceph-191 10.2.1.219 ceph-219 配置免密登录…

JVM 执行引擎篇

机器码、指令、汇编语言 机器码 各种用二进制编码方式表示的指令,叫做机器指令码。开始,人们就用它采编写程序,这就是机器语言。机器语言虽然能够被计算机理解和接受,但和人们的语言差别太大,不易被人们理解和记忆&a…

【MySQL语言汇总[DQL,DDL,DCL,DML]以及使用python连接数据库进行其他操作】

MySQL语言汇总[DQL,DDL,DCL,DML] SQL分类1.DDL:操作数据库,表创建 删除 查询 修改对数据库的操作对表的操作复制表(重点)!!!!! 2.DML:增删改表中数据3.DQL:查询表中的记录…

HLS实现图像膨胀和腐蚀运算--xf_dilation和xf_erosion

一、图像膨胀和图像腐蚀概念 我们先定义,需要处理的图片为二值化图像A。图片的背景色为黑色,即像素值为0。图片的目标色为白色,即像素值为1。 再定义一个结构元S,结构元范围内所有的像素为白色,像素值为1。 1、图像的…

RedHat9中安装Mysql8.0+出现“错误:GPG 检查失败“的处理

近期通过VM安装了RedHat9,之后在RedHat9中安装Mysql8.0的时候出现了个问题:“错误:GPG 检查失败”,如图所示: 解决方案:重新导入新的秘钥即可,如下所示: rpm --import https://rep…

连接Redis报错解决方案

连接Redis报错&解决方案 问题描述:Could not connect to Redis at 127.0.0.1:6379: 由于目标计算机积极拒绝,无法连接。 问题原因:redis启动方式不正确 解决方案: 在redis根目录下打开命令行窗口,输入命令redi…

Android studio生成二维码

1.遇到的问题 需要生成一个二维码&#xff0c;可以使用zxing第三方组件&#xff0c;增加依赖。 //生成二维码 implementation com.google.zxing:core:3.4.1 2.代码 展示页面 <ImageViewandroid:id"id/qrCodeImageView"android:layout_width"150dp"an…

公有云迁移研究——AWS Translate

大纲 1 什么是Translate2 Aws Translate是怎么运作的3 Aws Translate和Google Translate的区别4 迁移任务4.1 迁移原因 5 Aws Translate的Go demo6 迁移中遇到的问题6.1 账号和权限问题&#xff1a;6.2 小语种 1 什么是Translate Translate是一种文本翻译服务&#xff0c;它使…

HttpComponents: 领域对象的设计

1. HTTP协议 1.1 HTTP请求 HTTP请求由请求头、请求体两部分组成&#xff0c;请求头又分为请求行(request line)和普通的请求头组成。通过浏览器的开发者工具&#xff0c;我们能查看请求和响应的详情。 下面是一个HTTP请求发送的完整内容。 POST https://track.abc.com/v4/tr…

安卓MediaRecorder(2)录制源码分析

文章目录 前言JAVA new MediaRecorder() 源码分析android_media_MediaRecorder.cpp native_init()MediaRecorder.java postEventFromNativeandroid_media_MediaRecorder.cpp native_setup() MediaRecorder 参数设置MediaRecorder.prepare 分析MediaRecorder.start 分析MediaRec…