【Flink】Flink 的八种分区策略(源码解读)

Flink 的八种分区策略(源码解读)

  • 1.继承关系图
    • 1.1 接口:ChannelSelector
    • 1.2 抽象类:StreamPartitioner
    • 1.3 继承关系图
  • 2.分区策略
    • 2.1 GlobalPartitioner
    • 2.2 ShufflePartitioner
    • 2.3 BroadcastPartitioner
    • 2.4 RebalancePartitioner
    • 2.5 RescalePartitioner
    • 2.6 ForwardPartitioner
    • 2.7 KeyGroupStreamPartitioner
    • 2.8 CustomPartitionerWrapper

Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

在这里插入图片描述

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

1.继承关系图

1.1 接口:ChannelSelector

public interface ChannelSelector<T extends IOReadableWritable> {/*** 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).*/void setup(int numberOfChannels);/***根据当前的record以及Channel总数,*决定应将record发送到下游哪个Channel。*不同的分区策略会实现不同的该方法。*/int selectChannel(T record);/***是否以广播的形式发送到下游所有的算子实例*/boolean isBroadcast();
}

1.2 抽象类:StreamPartitioner

public abstract class StreamPartitioner<T> implementsChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {private static final long serialVersionUID = 1L;protected int numberOfChannels;@Overridepublic void setup(int numberOfChannels) {this.numberOfChannels = numberOfChannels;}@Overridepublic boolean isBroadcast() {return false;}public abstract StreamPartitioner<T> copy();
}

1.3 继承关系图

在这里插入图片描述

2.分区策略

2.1 GlobalPartitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

在这里插入图片描述

/*** 发送所有的数据到下游算子的第一个task(ID = 0)* @param <T>*/
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//只返回0,即只发送给下游算子的第一个taskreturn 0;}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "GLOBAL";}
}

2.2 ShufflePartitioner

随机选择一个下游算子实例进行发送。

在这里插入图片描述

/*** 随机的选择一个channel进行发送* @param <T>*/
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private Random random = new Random();@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//产生[0,numberOfChannels)伪随机数,随机发送到下游的某个taskreturn random.nextInt(numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return new ShufflePartitioner<T>();}@Overridepublic String toString() {return "SHUFFLE";}
}

2.3 BroadcastPartitioner

发送到下游所有的算子实例。

在这里插入图片描述

/*** 发送到所有的channel*/
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;/*** Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道*/@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");}@Overridepublic boolean isBroadcast() {return true;}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "BROADCAST";}
}

2.4 RebalancePartitioner

通过循环的方式依次发送到下游的 task

在这里插入图片描述

/***通过循环的方式依次发送到下游的task* @param <T>*/
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo;@Overridepublic void setup(int numberOfChannels) {super.setup(numberOfChannels);//初始化channel的id,返回[0,numberOfChannels)的伪随机数nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2//则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;return nextChannelToSendTo;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "REBALANCE";}
}

2.5 RescalePartitioner

基于上下游 Operator 的并行度,将记录以循环的方式输出到下游 Operator 的每个实例。

举例:

  • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

在这里插入图片描述

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo = -1;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}return nextChannelToSendTo;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "RESCALE";}
}

Flink 中的执行图可以分成四层:StreamGraphJobGraphExecutionGraph物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “”,并不是一个具体的数据结构。

StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitionerRescalePartitioner 列为 POINTWISE 分配模式,其他的为 ALL_TO_ALL 分配模式。代码如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,// 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)DistributionPattern.POINTWISE,resultPartitionType);} else {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,// 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)DistributionPattern.ALL_TO_ALL,resultPartitionType);}

2.6 ForwardPartitioner

发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

在这里插入图片描述

/*** 发送到下游对应的第一个task* @param <T>*/
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "FORWARD";}
}

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner,对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

//在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();
}if (partitioner instanceof ForwardPartitioner) {//如果上下游的并行度不一致,会抛出异常if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}
}

2.7 KeyGroupStreamPartitioner

根据 key 的分组索引选择发送到相对应的下游 subtask

在这里插入图片描述

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/*** 根据key的分组索引选择发送到相对应的下游subtask* @param <T>* @param <K>*/
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}//调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}
...
}
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
.../*** 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,* 即该key发送到哪一个task*/public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}/***根据key分配一个分组id(keyGroupId)*/public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");//获取key的hashcodereturn computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}/*** 根据key分配一个分组id(keyGroupId),*/public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {//与maxParallelism取余,获取keyGroupIdreturn MathUtils.murmurHash(keyHash) % maxParallelism;}//计算分区index,即该key group应该发送到下游的哪一个算子实例public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}
...

2.8 CustomPartitionerWrapper

通过 Partitioner 实例的 Partition 方法(自定义的)将记录输出到下游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;Partitioner<K> partitioner;KeySelector<T, K> keySelector;public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {this.partitioner = partitioner;this.keySelector = keySelector;}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance(), e);}//实现Partitioner接口,重写partition方法return partitioner.partition(key, numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "CUSTOM";}
}

比如:

public class CustomPartitioner implements Partitioner<String> {// key: 根据key的值来分区// numPartitions: 下游算子并行度@Overridepublic int partition(String key, int numPartitions) {return key.length() % numPartitions;//在此处定义分区策略}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/272851.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全栈的自我修养 ———— css中常用的布局方法flex和grid

在项目里面有两种常用的主要布局:flex和grid布局&#xff08;b站布局&#xff09;&#xff0c;今天分享给大家这两种的常用的简单方法&#xff01; 一、flex布局1、原图2、中心对齐3、主轴末尾或者开始对其4、互相间隔 二、grid布局1、基本效果2、加间隔3、放大某一个元素 一、…

Linux第74步_“设备树”下的LED驱动

使用新字符设备驱动的一般模板&#xff0c;以及设备树&#xff0c;驱动LED。 1、添加“stm32mp1_led”节点 打开虚拟机上“VSCode”&#xff0c;点击“文件”&#xff0c;点击“打开文件夹”&#xff0c;点击“zgq”&#xff0c;点击“linux”&#xff0c;点击“atk-mp1”&am…

基于GAN对抗网进行图像修复

一、简介 使用PyTorch实现的生成对抗网络&#xff08;GAN&#xff09;模型&#xff0c;包括编码器&#xff08;Encoder&#xff09;、解码器&#xff08;Decoder&#xff09;、生成器&#xff08;ResnetGenerator&#xff09;和判别器&#xff08;Discriminator&#xff09;。…

LCR 112. 矩阵中的最长递增路径【leetcode】/dfs+记忆化搜索

LCR 112. 矩阵中的最长递增路径 给定一个 m x n 整数矩阵 matrix &#xff0c;找出其中 最长递增路径 的长度。 对于每个单元格&#xff0c;你可以往上&#xff0c;下&#xff0c;左&#xff0c;右四个方向移动。 不能 在 对角线 方向上移动或移动到 边界外&#xff08;即不允…

前端覆盖率报告生成

前端精准测试是精准测试体系的一部分&#xff0c;但是由于前端项目比较灵活&#xff0c;各种框架&#xff0c;脚手架再加上开发同学写的不够规范&#xff0c;所以投入产出比较低&#xff0c;这部分内容在网上的资料也比较少。为了完善我们的精准测试体系&#xff0c;今年做了前…

21、状态模式(行为性模式)

版本一、get状态指针 #include <iostream> using namespace std;//前置声明 class Context;//状态 class State{ public://4个状态virtual void toUp (Context& context){ }virtual void toDown (Context& context){ }virtual void toLeft (Context& cont…

学习和认知的四个阶段,以及学习方法分享

本文分享学习的四个不同的阶段&#xff0c;以及分享个人的一些学习方法。 一、学习认知的四个阶段 我们在学习的过程中&#xff0c;总会经历这几个阶段&#xff1a; 第一阶段&#xff1a;不知道自己不知道&#xff1b; 第二阶段&#xff1a;知道自己不知道&#xff1b; 第三…

命名实体识别,根据实体计算准确率、召回率和F1

文章目录 简介数据格式介绍准确率、召回率和F1评估评估代码评估结果 进一步阅读参考 简介 使用大模型训练完命名实体识别的模型后&#xff0c;发现不知道怎么评估实体识别的准确率、召回率和F1。于是便自己实现了代码&#xff0c;同时提供了完整可运行的项目代码。 完整代码&…

SpringBoot快速入门(介绍,创建的3种方式,Web分析)

目录 一、SpringBoot介绍 二、SpringBootWeb快速入门 创建 定义请求处理类 运行测试 三、Web分析 一、SpringBoot介绍 我们可以打开Spring的官网(Spring | Home)&#xff0c;去看一下Spring的简介&#xff1a;Spring makes Java simple。 Spring发展到今天已经形成了一种…

学会Web UI框架--Bootstrap,快速搭建出漂亮的前端界面

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属的专栏&#xff1a;前端泛海 景天的主页&#xff1a;景天科技苑 文章目录 Bootstrap1.Bootstrap介绍2.简单使用3.布局容器4.Bootstrap实现轮播…

机器学习|KNN和Kmeans

KNN和Kmeans KNN KNN-K个最近的邻居&#xff0c;而K是可人先预设出来的。 所谓近朱者赤&#xff0c;近墨者黑。 可以选取离当前最近的K个样本来作为辅助判断&#xff0c;因为本样本和最近的K个样本应该是处于一种相似的状态。 以下是一个苹果和梨的识别任务。 图上会出现一个未…

011-keep-alive详解

keep-alive详解 1、简介2、keep-alive的使用效果未使用keep-alive的效果图使用keep-alive的效果图include和exclude指定是否缓存某些组件使用keep-alive的钩子函数执行顺序问题 3、keep-alive的应用场景举例4、总结 1、简介 keep-alive 是 Vue 的内置组件&#xff0c;当它包裹…

Elasticsearch架构原理

一. Elasticsearch架构原理 1、Elasticsearch的节点类型 在Elasticsearch主要分成两类节点&#xff0c;一类是Master&#xff0c;一类是DataNode。 1.1 Master节点 在Elasticsearch启动时&#xff0c;会选举出来一个Master节点。当某个节点启动后&#xff0c;然后使用Zen D…

理清关系简化LeetCode题库第3047题求交集区域内的最大正方形面积问题求解

3047. 求交集区域内的最大正方形面积 难度&#xff1a;中等 问题描述&#xff1a; 在二维平面上存在 n 个矩形。给你两个下标从 0 开始的二维整数数组 bottomLeft 和 topRight&#xff0c;两个数组的大小都是 n x 2 &#xff0c;其中bottomLeft[i]和topRight[i]分别代表第i个…

爬虫(五)

1. 前端JS相关 三元运算 v1 条件 ? 值A : 值B; # 如果条件成立v1值A&#xff0c;不成立v1等于值Bres 1 1 ? 99 : 88 # res99特殊的逻辑运算 v1 11 || 22 # Ture v2 9 || 14 # 9 v3 0 || 15 # 15 v3 0 || 15 || "zhangfei" # 15赋值和…

Java二叉树 (2)

&#x1f435;本篇文章将对二叉树的一些基础操作进行梳理和讲解 一、操作简述 int size(Node root); // 获取树中节点的个数int getLeafNodeCount(Node root); // 获取叶子节点的个数int getKLevelNodeCount(Node root,int k); // 获取第K层节点的个数int getHeight(Node r…

【Claude3】利用Python中完成对Bedrock上的Claude的API调用

文章目录 1. 前期准备工作2. 安装和配置AWS CLI v23. 使用AWS configure命令配置AWS凭据4. 安装访问Bedrock的SDK5. 访问Amazon Bedrock UI6. 订阅Bedrock上的Claude模型7. 通过CLI命令列出所有可用的Claude模型8. 向Claude 3 Sonnet on Bedrock生成文本9. 参考链接 1. 前期准备…

云原生架构设计:分布式消息队列技术解析

消息队列是在消息传输过程中保存消息的容器&#xff0c;消息队列管理器在将消息从源到目标时充当中间人的角色&#xff0c;消息队列的主要目的是提供路由并保证消息的可靠传递。如果发送消息时接收者不可用&#xff0c;那消息队列就会保留消息&#xff0c;直到下次成功消费为止…

Excel 快速填充/输入内容

目录 一. Ctrl D/R 向下/右填充二. 批量输入内容 一. Ctrl D/R 向下/右填充 ⏹如下图所示&#xff0c;通过快捷键向下和向右填充数据 &#x1f914;当选中第一个单元格之后&#xff0c;可以按住Shift后&#xff0c;再选中最后一个单元格&#xff0c;可以选中第一个单元格和最…

CleanMyMac X4.14.7永久免费Mac电脑清理和优化软件

CleanMyMac X 是一款功能强大的 Mac 清理和优化软件&#xff0c;适合以下几类人群使用&#xff1a; 需要定期清理和优化 Mac 的用户&#xff1a;随着时间的推移&#xff0c;Mac 设备上可能会积累大量的无用文件、缓存和垃圾&#xff0c;导致系统运行缓慢。CleanMyMac X 的智能扫…