一、前言
flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。
1.1 Flink数据传输
- 组件之间的通信消息传输,即Client、JobManager、TaskManager之间的信息传递,采用Akka框架(主要用作组件间的协同,如心跳检测、状态上报、指标统计、作业提交和部署等)。
- 算子之间的流数据传输
- 本地线程内的流数据传输(同一个SubTask中):同一个SubTask内的两个Operator(属于同一个OperatorChain)之间的数据传输是方法调用,即上游算子处理完数据后,直接调用下游算子的processElement方法。
- 本地线程间的流数据传输(同一个TaskManager的不同SubTask中):即同一个TaskManager(JVM进程)中的不同Task(线程,本质上是SubTask)的算子之间的数据传输,通过本地内存进行数据传递,存在数据序列化和反序列过程。
- 跨网络的流数据传输(不同TaskManager的SubTask中):采用Netty框架,通过Socket传递,也存在数据序列化和反序列过程。
flink中的重分区算子定义上下游subtask之间数据传递的方式,SubTask之间进行数据传递模式有两种,一种是one-to-one(forwarding)模式,另一种是redistributing的模式。
1.2 重分区算子数据传递的两种方式
- One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如下图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
- Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。
flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。
二、分区策略
数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了以下几种分区策略和自定义分区策略。已实现的分区策略对应的API为:
自定义分区策略的API为CustomPartitionerWrapper。
各个API的继承关系如下图所示:
ChannelSelector是分区策略的顶层接口,其决定了记录应该写入哪个逻辑通道,通道可理解为下游算子的某个实例,或下游并行算子的某个子任务。该接口的定义源码如下:
抽象类StreamPartitioner实现了ChannelSelector接口,是一个用于流程序的特殊的ChannelSelector,其中定义了一些通用的分区策略方法。Flink中的所有分区策略(分区器)都继承了StreamPartitioner类,并且实现了各自独有的分区规则。
三、内置分区策略
3.1 BinaryHashPartitioner
该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,我们这里不做过多讲解。
3.2 BroadcastPartitioner
广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略。
来一段代码演示下:
/*** 微信公众号:老周聊架构*/
public class PartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号dataStream.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}});//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Integer> dataStreamAfter = dataStream.broadcast();//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).print();env.execute("PartitionerTest Job");}
}
直接IDEA控制台输出:
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
执行分区策略后,每个元素所属的分区如下:
对比表发现,广播分区策略将上游每个元素分别发送到了下游算子的所有分区,这种策略会把数据复制多份,向下游算子的每个分区发送一份。
我们把上面的任务提交到Flink,同样也可以看出前面分区前每个子任务两条数据,分区后每个子任务六条数据。
3.3 ForwardPartitioner
转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。
这里把上面的代码调整下:
dataStream.forward()
IDEA控制台输出:
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
执行分区策略后,每个元素所属的分区如下:
对比发现,转发分区策略将上游同一个分区的元素发送到了下游同一个分区中。使用数据流图表示如下图:
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
3.4 GlobalPartitioner
全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。
这里把上面的代码调整下:
dataStream.global()
IDEA控制台输出:
分区前:
分区后:
全局分区策略将上游所有分区中的所有元素发送到了下游编号为0的分区中:
3.5 .KeyGroupStreamPartitioner
Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。selectChannel()方法源码如下:
总的来说,Flink底层计算通道索引(分区编号)的流程如下:
- 计算Key的HashCode值。
- 将Key的HashCode值进行特殊的Hash处理,即MathUtils.murmurHash(keyHash),返回一个非负哈希码。
- 将非负哈希码除以最大并行度取余数,得到keyGroupId,即Key组索引。
- 使用公式keyGroupId×parallelism/maxParallelism得到分区编号。parallelism为当前算子的并行度,即通道数量;maxParallelism为系统默认支持的最大并行度,即128。
3.6 RebalancePartitioner
平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。
这里把上面的代码调整下:
dataStream.setParallelism(2);
dataStreamAfter.setParallelism(3);
dataStream.rebalance()
IDEA控制台输出:
分区前:
分区后:
平衡分区策略将上游所有元素均匀发送到了下游算子的所有分区:
3.7 RescalePartitioner
重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。
上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。
假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。
这里把上面的代码调整下:
dataStream.rescale()
同时将第一个map算子的并行度设置为2,第二个map算子的并行度设置为4。
IDEA控制台输出:
分区前:
分区后:
接下来改变map算子的并行度,将第一个map算子的并行度设置为4,第二个map算子的并行度设置为2。
如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。
3.8 ShufflePartitioner
随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。
这里把上面的代码调整下:
dataStream.shuffle()
这里就不做过多演示了。我们下面来看下自定义分区策略。
四、自定义分区策略
自定义分区策略的API为CustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。
4.1 新建自定义分区器
新建分区器类MyCustomPartitioner并实现接口Partitioner(Object表示分区Key的数据类型),实现其中未实现的方法partition(),在该方法中添加相应的分区逻辑。
/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(Object key, int numPartitions) {if (key.equals("chinese")) {return 0;} else if (key.equals("math")) {return 1;} else {return 2;}}
}
上述代码通过partition()方法取得分区编号,将Key值等于chinese的元素分配到编号为0的分区,将Key值等于math的元素分配到编号为1的分区,其余元素分配到编号为2的分区。
4.2 使用自定义分区器
调用DataStream的partitionCustom()方法传入自定义分区器类MyCustomPartitioner的实例,可以对DataStream按照自定义规则进行重新分区,代码如下:
/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class CustomPartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(String value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));Map<String, Integer> map = new HashMap<>();map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));return map;}}).setParallelism(2);//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(Map<String, Integer> value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).setParallelism(3).print();env.execute("CustomPartitionerTest Job");}
}
分区前:
分区后:
自定义分区策略将上游所有元素按照自定义的规则发送到了下游的3个分区中。
把任务给到Flink上去跑,发现:
这是因为泛型擦除,下面的DataStream泛型需要指定类型,不能
小知识:
在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦除,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除。
类型加好以后,再跑一下任务,会出现任务成功。