207.Flink(二):架构及核心概念,flink从各种数据源读取数据,各种算子转化数据,将数据推送到各数据源

一、Flink架构及核心概念

1.系统架构

  • JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。
  • 一个job对应一个jobManager

 2.并行度

(1)并行度(Parallelism)概念

一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。

流程序的并行度 = 其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。

(2)设置并行度

对某个具体算子设置并行度:

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

全局设置并行度:

env.setParallelism(2);

提交任务时指定:

  • 通过页面上传jar的时候可以指定
  • 可以在命令行启动的时候通过 -p 3指定

flink-conf.yaml中配置:

parallelism.default: 2

优先级:

代码中具体算子 > 代码中全局 > 提交任务指定 > 配置文件中指定

3.算子链

(1)算子间的数据传输

*1)一对一(One-to-one,forwarding)

这种模式下,数据流维护着分区以及元素的顺序。它们之间不需要重新分区,也不需要调整数据的顺序。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。

*2)重分区(Redistributing)

在这种模式下,数据流的分区会发生改变。每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。

(2)合并算子链

在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分 

// 禁用算子链,该算子不会和前面和后面串在一起
.map(word -> Tuple2.of(word, 1L)).disableChaining();// 全局禁用算子链
env.disableChaining();// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

  • 当一对一的时候,每个运算量都很大,这个时候不适合串在一起。
  • 当需要定位具体问题的时候,不串在一起更容易排查问题

4.任务槽

(1)任务槽(Task Slots)概念

Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)。

TaskManager的计算资源是有限的,为了控制并发量,TaskManager对每个任务运行所占用的内存资源做出明确的划分,这就是所谓的任务槽(task slots)。

每个任务槽的大小是均等的,且任务槽之间的资源不可以互相借用。

如图,每个TaskManager有三个任务槽,每个槽运行自己的任务。槽的大小均等。

(2)任务槽数量的设置

在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中,可以设置TaskManager的slot数量,默认是1个slot。

taskmanager.numberOfTaskSlots: 8

slot目前仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,建议将slot数量配置为机器的CPU核心数。

(3)任务对任务槽的共享

在同一个作业中,不同任务节点的并行子任务可以放在同一个slot上执行

 可以共享:

  • 同一个job中,不同算子的子任务才可以共享同一个slot。这些子任务是同时运行
  • 前提是:属于同一个slot共享组,默认都是“default”

手动指定共享组:

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");

共享的好处:允许我们保存完整的作业管道。这样一来,即使某个TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行

(4)任务槽和并行度的关系

  • 任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置
  • 并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置

如果是yarn模式,申请的TaskManager的数量 = job并行度 / 每个TM的slot数量,向上取整

即:假设10个并行度的job,每个TM的slot是3个,那么需要10/3,向上取整,即需要最少4个TaskManager

二、作业提交流程

1.Standalone会话模式作业提交流程

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

  • 逻辑流图:列出并行度,算子,各算子之间关系(一对一还是需要重分区)
  • 作业图:将一对一的算子做算子链的优化,作业中间会有中间结果集
  • 执行图:将并行度展开,并标注每个并行处理的算子
  • 物理图:基本同执行图,是执行图的落地

2.Yarn应用模式作业提交流程

三、 DataStream API

DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。

1.执行环境(Execution Environment)

(1)创建执行环境

*1)StreamExecutionEnvironment.getExecutionEnvironment();

它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境

*2)StreamExecutionEnvironment.createLocalEnvironment();

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数

*3)StreamExecutionEnvironment
          .createRemoteEnvironment(
            "host",                   // JobManager主机名
            1234,                     // JobManager进程端口号
               "path/to/jarFile.jar"  // 提交给JobManager的JAR包
        );

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

 (2)执行模式(Execution Mode)

流批一体:代码api是同一套,可以指定为 批,也可以指定为 流。

通话代码配置:

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

通过命令行配置:

bin/flink run -Dexecution.runtime-mode=BATCH

(3)触发程序执行

当main()方法被调用时,并没有真正处理数据。只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。

所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

如果在一段代码里面执行多个任务,可以使用env.executeAsync();

package com.atguigu.env;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** TODO** @author cjp* @version 1.0*/
public class EnvDemo {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment
//                .getExecutionEnvironment();  // 自动识别是 远程集群 ,还是idea本地环境.getExecutionEnvironment(conf); // conf对象可以去修改一些参数//                .createLocalEnvironment()
//        .createRemoteEnvironment("hadoop102", 8081,"/xxx")// 流批一体:代码api是同一套,可以指定为 批,也可以指定为 流// 默认 STREAMING// 一般不在代码写死,提交时 参数指定:-Dexecution.runtime-mode=BATCHenv.setRuntimeMode(RuntimeExecutionMode.BATCH);env
//                .socketTextStream("hadoop102", 7777).readTextFile("input/word.txt").flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1).print();env.execute();/** TODO 关于execute总结(了解)*     1、默认 env.execute()触发一个flink job:*          一个main方法可以调用多个execute,但是没意义,指定到第一个就会阻塞住*     2、env.executeAsync(),异步触发,不阻塞*         => 一个main方法里 executeAsync()个数 = 生成的flink job数*     3、思考:*         yarn-application 集群,提交一次,集群里会有几个flink job?*         =》 取决于 调用了n个 executeAsync()*         =》 对应 application集群里,会有n个job*         =》 对应 Jobmanager当中,会有 n个 JobMaster*/
//        env.executeAsync();// ……
//        env.executeAsync();}
}

2.源算子(Source)

从Flink1.12开始,主要使用流批统一的新Source架构:

DataStreamSource<String> stream = env.fromSource(…)

(1)创建pojo对象

需要空参构造器,所有属性的类型都是可以序列化的

package com.atguigu.bean;import java.util.Objects;/*** TODO** @author cjp* @version 1.0*/
public class WaterSensor {public String id;//水位传感器类型public Long ts;//传感器记录时间戳public Integer vc;//水位记录// 一定要提供一个 空参 的构造器public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) &&Objects.equals(ts, that.ts) &&Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}

(2)从集合中读取数据

package com.atguigu.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** TODO** @author cjp* @version 1.0*/
public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 从集合读取数据DataStreamSource<Integer> source = env.fromElements(1,2,33); // 从元素读
//                .fromCollection(Arrays.asList(1, 22, 3));  // 从集合读source.print();env.execute();}
}

(3)从文件读取数据

先添加配置:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.17.0</version></dependency>
package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** TODO** @author cjp* @version 1.0*/
public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 从文件读: 新Source架构FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();env.execute();}
}
/**** 新的Source写法:*   env.fromSource(Source的实现类,Watermark,名字)**/

(4)从Socket读取数据

DataStream<String> stream = env.socketTextStream("localhost", 7777);

(5)从Kafka读取数据

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version>
</dependency>
package com.atguigu.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/*** TODO** @author cjp* @version 1.0*/
public cl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/139136.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实验五 熟悉 Hive 的基本操作

实验环境&#xff1a; 1.操作系统&#xff1a;CentOS 7。 2.Hadoop 版本&#xff1a;3.3.0。 3.Hive 版本&#xff1a;3.1.2。 4.JDK 版本&#xff1a;1.8。 实验内容与完成情况&#xff1a; &#xff08;1&#xff09;创建一个内部表 stocks&#xff0c;字段分隔符为英文逗号…

爬虫 — Scrapy 框架(一)

目录 一、介绍1、同步与异步2、阻塞与非阻塞 二、工作流程三、项目结构1、安装2、项目文件夹2.1、方式一2.2、方式二 3、创建项目4、项目文件组成4.1、piders/__ init __.py4.2、spiders/demo.py4.3、__ init __.py4.4、items.py4.5、middlewares.py4.6、pipelines.py4.7、sett…

BOM与DOM--记录

BOM基础&#xff08;BOM简介、常见事件、定时器、this指向&#xff09; BOM和DOM的区别和联系 JavaScript的DOM与BOM的区别与用法详解 DOM和BOM是什么&#xff1f;有什么作用&#xff1f; 图解BOM与DOM的区别与联系 BOM和DOM详解 JavaScript 中的 BOM&#xff08;浏览器对…

睿趣科技:抖音开通蓝V怎么操作的

在抖音这个充满创意和活力的社交媒体平台上&#xff0c;蓝V认证成为了许多用户的梦想之一。蓝V认证不仅是身份的象征&#xff0c;还可以增加用户的影响力和可信度。但是&#xff0c;要在抖音上获得蓝V认证并不是一件容易的事情。下面&#xff0c;我们将介绍一些操作步骤&#x…

小米笔试题——01背包问题变种

这段代码的主要思路是使用动态规划来构建一个二维数组 dp&#xff0c;其中 dp[i][j] 表示前 i 个产品是否可以组合出金额 j。通过遍历产品列表和可能的目标金额&#xff0c;不断更新 dp 数组中的值&#xff0c;最终返回 dp[N][M] 来判断是否可以组合出目标金额 M。如果 dp[N][M…

Android studio安卓生成APK文件安装包方法

1.点击Build->Generate Signed Bundle/APK 2.选择APK 3.首次生成&#xff0c;没有jks文件&#xff0c;就点击Create new。再次生成&#xff0c;直接点Next 4.选择创建jks文件路径 5.点击Next 6.选择release 7.生成完成的apk安装包路径

【论文阅读 08】Adaptive Anomaly Detection within Near-regular Milling Textures

2013年&#xff0c;太老了&#xff0c;先不看 比较老的一篇论文&#xff0c;近规则铣削纹理中的自适应异常检测 1 Abstract 在钢质量控制中的应用&#xff0c;我们提出了图像处理算法&#xff0c;用于无监督地检测隐藏在全局铣削模式内的异常。因此&#xff0c;我们考虑了基于…

uniapp小程序点击按钮直接退出小程序效果demo(整理)

点击按钮直接退出小程序 <navigator target"miniProgram" open-type"exit">退出小程序</navigator>

Android Key/Trust Store研究+ssl证书密钥

前言&#xff1a;软件搞环境涉及到了中间件thal trustzone certificate key&#xff0c;翻译过来是thal信任区域证书密钥 &#xff0c;不明白这是什么&#xff0c;学习一下 ssl证书密钥 SSL密钥是SSL加密通信中的重要组成部分。SSL证书通过加密算法生成&#xff0c;用于保护网…

Oracle 11g RAC部署笔记

搭了三次才搭好&#xff0c;要记录一下。 1. Oracle 11g RAC部署的相关步骤以及需要的包&#xff0c;可以参考这里。 Oracle 11g RAC部署_12006142的技术博客_51CTO博客Oracle 11g RAC部署&#xff0c;Oracle11gRAC部署操作环境&#xff1a;CentOS7.4Oracle11.2.0.4一、主机网…

解决老版本Oracle VirtualBox 此应用无法在此设备上运行问题

问题现象 安装华为eNSP模拟器的时候&#xff0c;对应的Oracle VirtualBox-5.2.26安装的时候提示兼容性问题&#xff0c;无法进行安装&#xff0c;具体版本信息如下&#xff1a; 软件对应版本备注Windows 11专业工作站版22H222621eNSP1.3.00.100 V100R003C00 SPC100终结正式版…

利用优化算法提高爬虫任务调度效率

目录 一、任务调度优化的重要性 二、选择合适的优化算法 三、建立任务调度模型 四、设计适应性函数 五、算法实施和调优 六、性能评估和优化结果分析 代码示例 总结 随着网络信息的爆炸式增长&#xff0c;网络爬虫在信息获取和数据挖掘等领域的应用越来越广泛。然而&am…

Arduino程序设计(十一)8×8 共阳极LED点阵显示(74HC595)

88 共阳极LED点阵显示 前言一、74HC595点阵模块1、74HC595介绍2、74HC595工作原理3、1088BS介绍4、74HC595点阵模块 二、点阵显示实验1、点阵显示初探2、点阵显示进阶3、点阵显示高阶3.1 点阵显示汉字&#xff08;方法1&#xff09;3.2 点阵显示汉字&#xff08;方法2&#xff…

conda的安装和使用

参考资料&#xff1a; https://www.bilibili.com/read/cv8956636/?spm_id_from333.999.0.0 https://www.bilibili.com/video/BV1Mv411x775/?spm_id_from333.999.0.0&vd_source98d31d5c9db8c0021988f2c2c25a9620 目录 conda是啥以及作用conda的安装conda的启动conda的配置…

2023华为杯D题——基于Kaya模型的碳排放达峰实证研究

一、前言 化石能源是推动现代经济增长的重要生产要素&#xff0c;经济生产活动与碳排放活动密切相关。充分认识经济增长与碳排放之间的关系对转变生产方式&#xff0c;确定碳达峰、碳中和路径极为必要。本研究在对经济增长与碳排放关系现有研究梳理的基础上&#xff0c;系统地分…

【二叉树魔法:链式结构与递归的纠缠】

本章重点 二叉树的链式存储二叉树链式结构的实现二叉树的遍历二叉树的节点个数以及高度二叉树的创建和销毁二叉树的优先遍历和广度优先遍历二叉树基础oj练习 1.二叉树的链式存储 二叉树的链式存储结构是指&#xff0c;用链表来表示一棵二叉树&#xff0c;即用链来指示元素的逻辑…

23. 图论 - 图的由来和构成

文章目录 图的由来图的构成Hi, 你好。我是茶桁。 从第一节课上到现在,我基本上把和人工智能相关的一些数学知识都教给大家了,终于来到我们人工智能数学的最后一个部分了,让我们从今天开始进入「图论」。 图论其实是一个比较有趣的领域,因为微积分其实更多的是对应连续型的…

iOS——KVC(键值编码)

键值编码&#xff08;KVC&#xff09; KVC&#xff08;Key Value Coding&#xff09;是一种允许以字符串形式间接操作对象属性的方式。 最基本的KVC是由NSKeyValueCoding协议提供支持&#xff0c;最基本的操作属性如下&#xff1a; setValue: 属性值 forKey: 属性名&#xff…

微信小程序之项目基本结构、页面的基础及宿主环境

文章目录 前言一、基本组成结构基本组成小程序页面的组成部分JSON配置文件作用 二、页面基础pagesWXML和HTML的区别WXSS和CSS的区别小程序中js文件分类 三、小程序宿主环境总结 前言 微信小程序的项目基本结构、页面的基础及宿主环境 一、基本组成结构 基本组成 新建一个微信…

迁移 MySQL 数据到 OceanBase 集群

使用 mysqldump 将 mysql的表结构和数据同步到 OceanBase 的MySQL 租户中 Mysql数据库导出 mysqldump -h127.0.0.1 -P3306 -uroot –p --single-transaction --hex-blob --routines --events --triggers --set-gtid-purgedOFF --databases teller >teller.sql mysql> …