文章目录
- 1、 分区算子:随机分区
- 2、分区算子:轮询分区
- 3、分区算子:重缩放分区
- 4、分区算子:广播
- 5、分区算子:全局分区
- 6、自定义分区
重分区,即数据"洗牌",将数据分配到下游算子的并行子任务中。常见的分区策略有:
- 随机分区
- 轮询分区
- 重缩放分区
- 广播
- 全局分区
- 自定义分区
1、 分区算子:随机分区
调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
demo代码:socket模拟无界流,设置并行度为2,读入数据后接shuffle,再打印
public class ShuffleExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> stream = env.socketTextStream("node01", 9527);stream.shuffle().print()env.execute();}
}
可以看到,数据流经过shuffle重分区后,到输出算子的哪个子任务是随机的:
2、分区算子:轮询分区
用DataStream的.rebalance()方法,rebalance使用的是Round-Robin负载均衡算法,就像发牌,将输入流数据平均分配到下游的并行任务中去。
stream.rebalance().下游算子....
3、分区算子:重缩放分区
和轮询相似,不同的是,重缩放是局部轮询,只给和它组队的分区轮询数据,而不是给所有分区。(调用rescale方法时,其实底层也是使用Round-Robin算法进行轮询)
stream.rescale().下游算子...
4、分区算子:广播
通常,一条数据去往一个子任务去处理即可,但广播就是:调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
stream.broadcast().print();
5、分区算子:全局分区
极端的分区方式,会将数据流中的数据全都发送到下游算子的第一个子任务中去, 相当于强行让下游任务并行度变成了1(注意是相当于,不是真把下游算子并行度改成1了,只是不往其他子任务发数据了,其他有跟没有一样了)
。
stream.global().print();
发送数据:
效果:
6、自定义分区
以上的Flink分区策略都不满足需求时,可自定义分区策略,首先实现Partitioner接口,自定义分区器:
public class MyPartitioner implements Partitioner<String> {@Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}
}//numPartitions 下游算子并行度
//这里的分区策略,简单写个key除以算子并行度取余
//该方法返回的是分区索引
使用自定义分区器:
public class PartitionCustomDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);DataStreamSource<String> socketDS = env.socketTextStream("node01", 9527);DataStream<String> myDS = socketDS.partitionCustom(new MyPartitioner(),value -> value);myDS.print();env.execute();}
}
- 自定义分区用的api是partitionCustom
- partitionCustom方法第一个参数是自定义的分区器对象
- 第二个参数是key的选择器对象KeySelector,和keyBy方法一样,value -> value即以输入数据为key
到此,就实现了数据往下游算子的子任务分配时,按什么规则来分。以上就是Flink的常用分区策略,此外,还有一种one-to-one的分区器,带上自定义分区器,共8种: