Flink之Partitioner(分区规则)

Flink之Partitioner(分区规则)

方法注释
global()全部发往1个task
broadcast()广播(前面的文章讲解过,这里不做阐述)
forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事
shuffle()随机分配(只是随机,同Spark的shuffle不同)
rebalance()轮询分配,默认机制就是rebalance()
recale()一般是下游task是上游task的并行度的倍数时,在生成job时,会将下游中的某几个subtask和上游的某个subtask绑成一组,然后在组内上游subtask以轮询的方式将数据发送给下游的subtask.
partitionCustom自定义分区器(这里不做演示,后续会单独写一个自定义分区器的内容)
keyBy()根据数据key的HashCode进行Hash分配
  • global

    global在实际业务场景中使用的不是很多,一般都是需要全局数据汇总的时候才会用到.global就是将上游的数据全部发往下游的第一个subtask中,也就是说下游设置再多的并行度是没意义的,所以使用global的时候,下游的task的并行度都是1.
    在这里插入图片描述
    这里结合代码看一下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置并行度为3,且设置数据分区方式为globalDataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).global();// 切分字符串,设置并行度为1SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(1);//......env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看代码中upperCaseMapStreamsplitFlatMapStream之间数据的发送方式
    在这里插入图片描述

  • forward

    forward其实就是一对一发送数据,和之前讲解Task的文章中提到的算子之间OneToOne的模式是一样的,就是可以将forward理解为同一个task chain[算子链]中算子之间的数据传输方式,但是使用forward的前提是上下游的算子并行度是一致的也就是上下游的subtask数量保持一致,图解如下:
    在这里插入图片描述

    代码内容如下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为forward分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).forward();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(3).startNewChain(); // 这里加上.startNewChain是为了在WebUI能看到效果,因为upperCaseMapStream和splitFlatMapStream的并行度是一致的,不加startNewChain默认的机制会将两者划分到同一个算子链中,就看不到实际的效果了.// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • shuffle

    通过前面WebUI的图片我们可以看到,从Socket数据源将数据发送到第一个map的时候使用的是默认的rebalance方式,也就是轮询发送的方式,而这里说的shuffle虽然也是一对多的发送方式,但是发送数据时是随机的,举个例子,上游有3subtask,下游有5subtask,数据源有5条数据,上游中的某一个subtask向下游发送数据时,是随机发送的,下游的5subtask并不是每个都一定能接受到数据,可能有的接收到1条,有的接收到2条,有的接收到3条数据,这就是shuffle发送数据的方式.

    如果说上两个operator并行度一致,上游选择了shuffle发送数据的方式,那么两个operator会绑定成一个task chain么?不会,因为shuffle的数据发送方式就已经导致两个operator不是OneToOne的模式了.
    在这里插入图片描述
    代码示例:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为shuffle分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).shuffle();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(7)// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • Rebalance

    rebalance就是Flink默认的数据分发机制,直白的讲就是给每个小朋友一人一块糖果,直到发完为止,不偏不倚,这个不细说了,没什么可讲的.
    在这里插入图片描述

  • recale

    关于recale前面说到了是组内的方式进行轮询分发数据,这里就以图解的方式进行讲解,便于理解.

    Flink任务启动时,如果发现上下游中使用了recale分发数据的方式就会将上下游的subtask进行分组绑定,如上游有2个subtask,下游有四个subtask,就会将上游的一个subtask和下游的两个subtask进行绑定,如下图:
    在这里插入图片描述

    当上下游对应的subtask分组后,上下游组内的subtak就会以组内轮询的方式发送数据,如下图:
    在这里插入图片描述

  • keyBy

    keyBy使用的HASH分区方式,实际是hashCode() + murmurHash()的组合方式,这个在源码的KeyGroupRangeAssignment类中是可以看到的,简单来说根据keyhash值模除以下游的最大并行度(return MathUtils.murmurHash(keyHash) % maxParallelism;).

    关于keyBy的使用应该都很熟悉了,这里直接给大家看演示结果吧,如下图:
    在这里插入图片描述

以上就是对Flink中分区规则的讲解.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/92991.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

玩转VS code 之 C/C++ 环境配置篇

PS&#xff1a;俺是菜鸟&#xff0c;整理和踩坑试错花了不少时间&#xff0c;如果这篇文章对您有用的话&#xff0c;请麻烦您留下免费的赞赞&#xff0c;赠人玫瑰&#xff0c;手留余香&#xff0c;码字踩坑不易&#xff0c;望三连支持 上一篇&#xff1a;玩转 VS code 之下载篇…

激活函数总结(十):激活函数补充(Identity、LogSigmoid、Bent Identity)

激活函数总结&#xff08;十&#xff09;&#xff1a;激活函数补充 1 引言2 激活函数2.1 Identity激活函数2.2 LogSigmoid激活函数2.3 Bent Identity激活函数 3. 总结 1 引言 在前面的文章中已经介绍了介绍了一系列激活函数 (Sigmoid、Tanh、ReLU、Leaky ReLU、PReLU、Swish、…

【Spring系列篇--关于IOC的详解】

目录 面试经典题目&#xff1a; 1. 什么是spring&#xff1f;你对Spring的理解&#xff1f;简单来说&#xff0c;Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架。 2.什么是IoC&#xff1f;你对IoC的理解&#xff1f;IoC的重要性?将实例化对象的权利从程序员…

Rust软件外包开发语言的特点

Rust 是一种系统级编程语言&#xff0c;强调性能、安全性和并发性的编程语言&#xff0c;适用于广泛的应用领域&#xff0c;特别是那些需要高度可靠性和高性能的场景。下面和大家分享 Rust 语言的一些主要特点以及适用的场合&#xff0c;希望对大家有所帮助。北京木奇移动技术有…

Windows上使用FFmpeg实现本地视频推送模拟海康协议rtsp视频流

场景 Nginx搭建RTMP服务器FFmpeg实现海康威视摄像头预览&#xff1a; Nginx搭建RTMP服务器FFmpeg实现海康威视摄像头预览_nginx rtmp 海康摄像头_霸道流氓气质的博客-CSDN博客 上面记录的是使用FFmpeg拉取海康协议摄像头的rtsp流并推流到流媒体服务器。 如果在其它业务场景…

【Axure高保真原型】JS日期选择器筛选中继器表格

今天和大家分享JS日期选择器筛选中继器表格的原型模板&#xff0c;通过调用浏览器的日期选择器&#xff0c;所以可以获取真实的日历效果&#xff0c;具体包括哪一年二月份有29天&#xff0c;几号对应星期几&#xff0c;都是真实的&#xff0c;获取日期值后&#xff0c;通过交互…

Unity C# 之 Azure 微软SSML语音合成TTS流式获取音频数据以及表情嘴型 Animation 的简单整理

Unity C# 之 Azure 微软SSML语音合成TTS流式获取音频数据以及表情嘴型 Animation 的简单整理 目录 Unity C# 之 Azure 微软SSML语音合成TTS流式获取音频数据以及表情嘴型 Animation 的简单整理 一、简单介绍 二、实现原理 三、注意事项 四、实现步骤 五、关键代码 一、简…

JVS开源基础框架:平台基本信息介绍

JVS是面向软件开发团队可以快速实现应用的基础开发脚手架&#xff0c;主要定位于企业信息化通用底座&#xff0c;采用微服务分布式框架&#xff0c;提供丰富的基础功能&#xff0c;集成众多业务引擎&#xff0c;它灵活性强&#xff0c;界面化配置对开发者友好&#xff0c;底层容…

嵌入式:ARM Day4

一、自己编写代码实现三盏灯点亮 源码&#xff1a; .text .global _start _start: 进行一次初始化bl RCC_INITbl LED1_INITbl LED2_INITbl LED3_INITb looploop: 循环开关灯bl LED1_ONbl delay_1sbl LED1_OFFbl delay_1sbl LED2_ONbl delay_1sbl LED2_OFFbl delay_1sbl…

kafka线上问题优化

如何防止消息丢失 生产者&#xff1a; 使用同步发送把ack设成1或者all&#xff08;非0&#xff0c;0可能会出现消息丢失的情况&#xff09;&#xff0c;并且设置同步的分区数>2 消费者&#xff1a;把自动提交改成手动提交 如何防止重复消费 在防止消息丢失的方案中&#…

每天一道leetcode:1926. 迷宫中离入口最近的出口(图论中等广度优先遍历)

今日份题目&#xff1a; 给你一个 m x n 的迷宫矩阵 maze &#xff08;下标从 0 开始&#xff09;&#xff0c;矩阵中有空格子&#xff08;用 . 表示&#xff09;和墙&#xff08;用 表示&#xff09;。同时给你迷宫的入口 entrance &#xff0c;用 entrance [entrancerow, …

配置vscode

配置vscode 设置相关 网址&#xff1a;https://code.visualstudio.com/ 搜索不要用百度用这个&#xff1a;cn.bing.com 1.安装中文包 Chinese (Simplified) (简体中文) 2.安装 open in browser 3.安装主题 Atom One Dark Theme 4. 安装图标样式 VSCode Great Icons 5.安装 L…

使用navicat连接postgresql报错问题解决

使用navicat连接postgresql报错问题解决 一、问题现象&#xff1a; 最近使用Navicat来连接postgreSQL数据库&#xff0c;发现连接不上&#xff0c;报错信息如下&#xff1a; 自己百度了一下&#xff0c;发现pgsql 15版本以后&#xff0c;有些系统表的列名改了&#xff0c;pg_…

HTTPS 的加密流程

目录 一、HTTPS是什么&#xff1f; 二、为什么要加密 三、"加密" 是什么 四、HTTPS 的工作过程 1.对称加密 2.非对称加密 3.中间人攻击 4.证书 总结 一、HTTPS是什么&#xff1f; HTTPS (Hyper Text Transfer Protocol Secure) 是基于 HTTP 协议之上的安全协议&…

关于Android Studio Http Proxy设置

对敌人最大的蔑视就是沉默。--鹿丸 我们使用Android Studio 开始构建的时候会有卡顿的情况&#xff0c;甚至死机&#xff0c;也就是所谓的【android studio】构建卡住问题&#xff0c;如果依赖库类都是国内的&#xff0c;检查是否开启了代理 这个地方选择下面的自动代理 国内…

Visual Studio 2019 c++ 自定义注释 ----doxygen

可加入C 也可自定义。 <?xml version"1.0" encoding"utf-8"?> <CodeSnippets xmlns"http://schemas.microsoft.com/VisualStudio/2005/CodeSnippet"><CodeSnippet Format"1.0.0"><Header><Title>注释…

打开vim的语法高亮

在一个Ubuntu中自带的vim版本是8.2.4919&#xff0c;默认就是开始了语法高亮的&#xff0c;打开一个Java文件效果如下&#xff1a; 它不仅仅对Java文件有语法高亮&#xff0c;对很多的文件都有&#xff0c;比如vim的配置文件也有语法高亮&#xff0c;有语法高亮时读起来会容易…

【左神算法刷题班】第17节:在有序二维数组中查找目标值、等于目标字符串的子序列个数

第17节 题目1&#xff1a;在有序二维数组中查找目标值 给定一个每一行有序、每一列也有序&#xff0c;整体可能无序的二维数组 再给定一个数num&#xff0c; 返回二维数组中有没有num这个数 例子 数组如下&#xff0c;找 6 是否存在。 1 3 5 7 2 4 6 13 3 9 14 …

[C++]笔记-函数的栈空间(避免栈空间溢出)

错误1 当数组的内存占用较大时,会引发异常 #include <iostream> using namespace std; int main() {char buff[2000000];cout << (int)buff[sizeof(buff) - 1] << endl; 错误 2 当调用次数较小的时候,栈内存还没有满,可以输出,该地址相减除以1024等于100,就…

CANoe自动化工程的搭建

基于XMLCAPL建立自动化工程 1、导入ini文件2、新建 Test Environment3、报告类型4、代码编写 1、导入ini文件 工程的配置的文件&#xff0c;配置DUT相关信息&#xff0c;具体视工程而编写内容。 2、新建 Test Environment 1、新建XML测试用例环境 2、导入XML测试用例文件 …