聊聊Flink:Flink的分区机制

一、前言

flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。

1.1 Flink数据传输

  • 组件之间的通信消息传输,即Client、JobManager、TaskManager之间的信息传递,采用Akka框架(主要用作组件间的协同,如心跳检测、状态上报、指标统计、作业提交和部署等)。
  • 算子之间的流数据传输
    • 本地线程内的流数据传输(同一个SubTask中):同一个SubTask内的两个Operator(属于同一个OperatorChain)之间的数据传输是方法调用,即上游算子处理完数据后,直接调用下游算子的processElement方法。
    • 本地线程间的流数据传输(同一个TaskManager的不同SubTask中):即同一个TaskManager(JVM进程)中的不同Task(线程,本质上是SubTask)的算子之间的数据传输,通过本地内存进行数据传递,存在数据序列化和反序列过程。
    • 跨网络的流数据传输(不同TaskManager的SubTask中):采用Netty框架,通过Socket传递,也存在数据序列化和反序列过程。

flink中的重分区算子定义上下游subtask之间数据传递的方式,SubTask之间进行数据传递模式有两种,一种是one-to-oneforwarding)模式,另一种是redistributing的模式。

1.2 重分区算子数据传递的两种方式

  • One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如下图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
  • Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。

在这里插入图片描述

flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。

二、分区策略

数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了以下几种分区策略和自定义分区策略。已实现的分区策略对应的API为:
在这里插入图片描述
自定义分区策略的API为CustomPartitionerWrapper。

各个API的继承关系如下图所示:
在这里插入图片描述
ChannelSelector是分区策略的顶层接口,其决定了记录应该写入哪个逻辑通道,通道可理解为下游算子的某个实例,或下游并行算子的某个子任务。该接口的定义源码如下:
在这里插入图片描述

抽象类StreamPartitioner实现了ChannelSelector接口,是一个用于流程序的特殊的ChannelSelector,其中定义了一些通用的分区策略方法。Flink中的所有分区策略(分区器)都继承了StreamPartitioner类,并且实现了各自独有的分区规则。

三、内置分区策略

3.1 BinaryHashPartitioner

该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,我们这里不做过多讲解。

3.2 BroadcastPartitioner

广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略。

来一段代码演示下:

/*** 微信公众号:老周聊架构*/
public class PartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号dataStream.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}});//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Integer> dataStreamAfter = dataStream.broadcast();//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).print();env.execute("PartitionerTest Job");}
}

直接IDEA控制台输出:
在这里插入图片描述

从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述
执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述
对比表发现,广播分区策略将上游每个元素分别发送到了下游算子的所有分区,这种策略会把数据复制多份,向下游算子的每个分区发送一份。
在这里插入图片描述
我们把上面的任务提交到Flink,同样也可以看出前面分区前每个子任务两条数据,分区后每个子任务六条数据。
在这里插入图片描述
在这里插入图片描述
3.3 ForwardPartitioner

转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。

这里把上面的代码调整下:

dataStream.forward()

IDEA控制台输出:
在这里插入图片描述
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述

执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述

对比发现,转发分区策略将上游同一个分区的元素发送到了下游同一个分区中。使用数据流图表示如下图:

在这里插入图片描述
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
在这里插入图片描述
对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
在这里插入图片描述
3.4 GlobalPartitioner

全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。

这里把上面的代码调整下:

dataStream.global()

IDEA控制台输出:
在这里插入图片描述
分区前:
在这里插入图片描述

分区后:
在这里插入图片描述

全局分区策略将上游所有分区中的所有元素发送到了下游编号为0的分区中:
在这里插入图片描述

3.5 .KeyGroupStreamPartitioner

Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。selectChannel()方法源码如下:
在这里插入图片描述
在这里插入图片描述

总的来说,Flink底层计算通道索引(分区编号)的流程如下:

  • 计算Key的HashCode值。
  • 将Key的HashCode值进行特殊的Hash处理,即MathUtils.murmurHash(keyHash),返回一个非负哈希码。
  • 将非负哈希码除以最大并行度取余数,得到keyGroupId,即Key组索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分区编号。parallelism为当前算子的并行度,即通道数量;maxParallelism为系统默认支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。

这里把上面的代码调整下:

dataStream.setParallelism(2);

dataStreamAfter.setParallelism(3);

dataStream.rebalance()

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述
分区后:

在这里插入图片描述

平衡分区策略将上游所有元素均匀发送到了下游算子的所有分区:
在这里插入图片描述

3.7 RescalePartitioner

重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。

上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。

假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。

这里把上面的代码调整下:

dataStream.rescale()

同时将第一个map算子的并行度设置为2,第二个map算子的并行度设置为4。

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述

分区后:
在这里插入图片描述
在这里插入图片描述

接下来改变map算子的并行度,将第一个map算子的并行度设置为4,第二个map算子的并行度设置为2。

在这里插入图片描述

如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。

3.8 ShufflePartitioner

随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。

这里把上面的代码调整下:

dataStream.shuffle()

这里就不做过多演示了。我们下面来看下自定义分区策略。

四、自定义分区策略

自定义分区策略的API为CustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。

4.1 新建自定义分区器

新建分区器类MyCustomPartitioner并实现接口Partitioner(Object表示分区Key的数据类型),实现其中未实现的方法partition(),在该方法中添加相应的分区逻辑。

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(Object key, int numPartitions) {if (key.equals("chinese")) {return 0;} else if (key.equals("math")) {return 1;} else {return 2;}}
}

上述代码通过partition()方法取得分区编号,将Key值等于chinese的元素分配到编号为0的分区,将Key值等于math的元素分配到编号为1的分区,其余元素分配到编号为2的分区。

4.2 使用自定义分区器

调用DataStream的partitionCustom()方法传入自定义分区器类MyCustomPartitioner的实例,可以对DataStream按照自定义规则进行重新分区,代码如下:

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class CustomPartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(String value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));Map<String, Integer> map = new HashMap<>();map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));return map;}}).setParallelism(2);//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(Map<String, Integer> value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).setParallelism(3).print();env.execute("CustomPartitionerTest Job");}
}

分区前:
在这里插入图片描述

分区后:

在这里插入图片描述
自定义分区策略将上游所有元素按照自定义的规则发送到了下游的3个分区中。

把任务给到Flink上去跑,发现:
在这里插入图片描述

这是因为泛型擦除,下面的DataStream泛型需要指定类型,不能
在这里插入图片描述

小知识:

在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦除,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除

类型加好以后,再跑一下任务,会出现任务成功。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/472046.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索SAP财务管理软件:重塑企业财务管理新境界

在当今瞬息万变的商业环境中&#xff0c;企业对于财务管理的精准性、高效性和透明度要求日益增高。作为全球领先的企业管理软件解决方案提供商&#xff0c;SAP凭借其强大的财务管理软件&#xff0c;正引领着全球企业迈向财务管理的新纪元。 SAP 财务管理系统通过智能化技术&am…

数字孪生乡村:数字乡村智慧化营建思路

数字化技术已然成为全球理论和产业界关注的热点命题 &#xff0c;并广泛应用于城市规划、交通管理、工业、医疗、教育等领域&#xff0c;已经成为文化遗产保护领域最主要方式 &#xff0c;如数字非遗、数字文物、数字文旅等。 传统村落的数字化保护呈现由单一技术向多技术集成…

《FreeRTOS任务基础知识以及任务创建相关函数》

目录 1.FreeRTOS多任务系统与传统单片机单任务系统的区别 2.FreeRTOS中的任务&#xff08;Task&#xff09;介绍 2.1 任务特性 2.2 FreeRTOS中的任务状态 2.3 FreeRTOS中的任务优先级 2.4 在任务函数中退出 2.5 任务控制块和任务堆栈 2.5.1 任务控制块 2.5.2 任务堆栈…

SpringCloud基础 入门级 学习SpringCloud 超详细(简单通俗易懂)

Spring Cloud 基础入门级学习 超详细&#xff08;简单通俗易懂&#xff09; 一、SpringCloud核心组件第一代&#xff1a;SpringCloud Netflix组件第二代&#xff1a;SpringCloud Alibaba组件SpringCloud原生组件 二、SpringCloud体系架构图三、理解分布式与集群分布式集群 四、…

Photoshop(PS)——人像磨皮

1.新建一个文件&#xff0c;背景为白色&#xff0c;将图片素材放入文件中 2.利用CtrlJ 复制两个图层出来&#xff0c;选择第一个拷贝图层&#xff0c;选择滤镜---杂色---蒙尘与划痕 3.调整一下数值&#xff0c;大概能够模糊痘印痘坑&#xff0c;点击确定。 4.然后选择拷贝2图层…

core 文件

sysctl -a | grep core_pattern 查看core 的路径 linux下寻找段错误的方法 - 空水 - 博客园 /var/log/messages dmesg -T 一、dmesg命令 dmesg命令&#xff0c;用于获取程序出错时的堆栈地址&#xff0c;用grep过滤出发生崩溃的程序&#xff0c;以及对应的堆栈信息 [Thu Nov …

centos rich 美观打印日志

文章目录 步骤 1: 安装 Python 和 pip步骤 2: 安装 rich-cli步骤 3: 验证安装步骤 4: 使用 rich-cli参考 在 CentOS 上安装 rich-cli 工具&#xff0c;你可以按照以下步骤进行操作。rich-cli 是一个命令行工具&#xff0c;用于将 rich 库的功能&#xff08;例如美化输出&#x…

《动手学深度学习》中d2l库的安装以及问题解决

当我们在按照《动手学深度学习》这本书或者网课学习时会有需要导入d2l库的使用。​d2I是一个与《动手学深度学习》(Dive into Deep Learning&#xff09;一书配套的开源教学库&#xff0c;它包含了作者李沐设计的深度学习相关代码和示例。这个库旨在帮助读者通过实践经验来理解…

【大模型实战篇】vLLM的由来以及大模型部署、推理加速实践

1. 问题背景分析及vLLM的由来 大模型毫无疑问&#xff0c;在工作、生活中已经逐渐扮演越来越重要的角色。但大模型的尺寸一般都比较大&#xff0c;处理一个大模型请求的成本可能比传统关键字查询高出 10 倍。推理的成本代价较高&#xff0c;因此提高大模型服务系统的吞吐量&…

常用在汽车PKE无钥匙进入系统的高度集成SOC芯片:CSM2433

CSM2433是一款集成2.4GHz频段发射器、125KHz接收器和8位RISC&#xff08;精简指令集&#xff09;MCU的SOC芯片&#xff0c;用在汽车PKE无钥匙进入系统里。 什么是汽车PKE无钥匙进入系统&#xff1f; 无钥匙进入系统具有无钥匙进入并且启动的功能&#xff0c;英文名称是PKE&…

路由器基本原理与配置

一 &#xff0c; 路由是什么&#xff1f; 从源主机到目标主机的转发过程&#xff1b; 二 &#xff0c; 路由器 &#xff08;1&#xff09;路由器的工作原理 路由器是一种三层设备&#xff0c;是使用IP地址寻址&#xff0c;实现从源IP到达目标IP地址的端到端的服务&#xff0c…

NPOI 实现Excel模板导出

记录一下使用NPOI实现定制的Excel导出模板&#xff0c;已下实现需求及主要逻辑 所需Json数据 对应参数 List<PurQuoteExportDataCrInput> listData [{"ItemName": "电缆VV3*162*10","Spec": "电缆VV3*162*10","Uom":…

element plus的表格内容自动滚动

<el-table:data"tableData"ref"tableRef"borderstyle"width: 100%"height"150"><el-table-column prop"date" label"名称" width"250" /><el-table-column prop"name" label&…

【Linux网络编程】简单的UDP网络程序

目录 一&#xff0c;socket编程的相关说明 1-1&#xff0c;sockaddr结构体 1-2&#xff0c;Socket API 二&#xff0c;基于Udp协议的简单通信 一&#xff0c;socket编程的相关说明 Socket编程是一种网络通信编程技术&#xff0c;它允许两个或多个程序在网络上相互通信&…

AI 写作(九)实战项目二:智能新闻报道(9/10)

一、项目概述 在当今信息爆炸的时代&#xff0c;新闻传播行业正面临着前所未有的挑战与机遇。随着科技的飞速发展&#xff0c;人们获取信息的渠道日益多样化&#xff0c;对新闻的时效性、准确性和个性化需求也不断提高。在这样的背景下&#xff0c;AI 写作在智能新闻报道中的重…

针对gitgitee的使用

1.下载git 链接 打开终端&#xff0c;桌面鼠标右键 2.配置密钥 登录gitee。 设置密钥 查看官方文档 跟着教程 复制最后的输出进行密钥添加 验证是否添加成功 3.创建&连接远程仓库 创建仓库 git终端进行配置 远程仓库克隆到本地 桌面终端clone,克隆他人|自己的仓库到本地…

DNS批量解析管理软件有什么用

在复杂的网络环境中&#xff0c;DNS批量解析管理软件犹如一把功能强大的钥匙&#xff0c;开启了高效网络管理的大门&#xff0c;为网络运营和维护带来了诸多便利。 1、对于网络服务提供商而言&#xff0c;DNS批量解析管理软件极大地提高了工作效率 传统的DNS解析管理方式在处…

二叉树遍历的非递归实现和复杂度分析

一&#xff0c;用栈实现二叉树先序遍历 1&#xff0c;原理 我用自己的口水话解释一下&#xff1a;准备一个栈&#xff0c;从根节点开始&#xff0c;先判断栈是否为空&#xff0c;如果否&#xff0c;就弹出一个元素&#xff0c;对弹出元素进行自定义处理&#xff0c;再将它的左…

ElasticSearch-全文检索(一)基本介绍

简介 Elasticsearch&#xff1a;官方分布式搜索和分析引擎 | Elastic 全文搜索属于最常见的需求&#xff0c;开源的Elasticsearch是目前全文搜索引擎的首选。 它可以快速地储存、搜索和分析海量数据。维基百科、StackOverflow、Github都采用它 Elastic的底层是开源库Lucene。但…

65 mysql 的 表元数据锁

前言 这里我们来看一下 mysql 这边的 元数据锁, 术语称之为 MDL 我们这里 忽略它的实现, 我们仅仅看 其具体的使用的地方 因为它的实现 也就可以理解为另外一个 表排他锁, 具体的实现来说 和表排他锁 类似 我们这里 仅仅去了解 在各种类型的语句中 MDL 的使用的地方 lock …