大数据之Flink(二)

4、部署模式

flink部署模式:

  • 会话模式(Session Mode)
  • 单作业模式(Per-Job Mode)
  • 应用模式(Application Mode)

区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。

4.1、会话模式

先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执行时间短的大量作业。

在这里插入图片描述

4.2、单作业模式

资源共享会导致问题,为了隔离资源要为每个提交的作业启动一个集群,即单作业模式。
在这里插入图片描述
作业完成后集群关闭,资源释放,一般借助yarn、K8s资源管理框架启动集群。实际应用首选模式

4.3、应用模式

前面两种模式,代码都在客户端上执行,由客户端提交给JobManager,导致客户端需要占用大量网络带宽,加重客户端所在节点的资源消耗。应用模式把应用提交到JobManager运行,每个提交的应用单独启动一个JobManager,执行结束后JobManager关闭。
在这里插入图片描述
总结:

  • 应有模式与单作业模式是提交作业后才创建集群
  • 单作业模式是通过客户端来提交,客户端解析出的每一个作业对应一个集群
  • 应用模式直接由JobManager执行应用程序

5、YARN运行模式

5.1、会话模式部署

yarn部署过程:

  1. 客户端把Flink应用提交给yarn的ResourceManager,yarn的ResourceManager向yarn的NodeManager申请容器
  2. Flink部署JobManager和TaskManager到容器上,在启动集群
  3. Flink根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源

配置准备

  1. 修改配置文件

     vim /etc/profile
    
  2. 添加环境变量

    export HADOOP_CLASSPATH=`hadoop classpath`
    
  3. 环境变量生效

     source /etc/profile
    
  • 会话模式部署

启动测试
在这里插入图片描述
提交jar任务
在这里插入图片描述
运行状态
在这里插入图片描述

5.2、应用模式部署(生产环境推荐)

与但作业模式类似,直接执行flink run-application命令即可,先将jar拷贝到flink根文件夹下

使用命令提交作业

 bin/flink run-application -t yarn-application -c FlinkDemo.StreamWordCount ./flinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

使用HDFS提交(生产环境推荐)

1、上传flink依赖到HDFS的flink-dist文件夹

hadoop fs -put lib/ /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist

2、上传jar包到HDFS到flink-jars文件夹

hadoop fs -mkdir /flink-jars
hadoop fs -put flinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar/ /flink-jars

3、运行jar包

bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop100:8020/flink-dist" -c  FlinkDemo.StreamWordCount hdfs://hadoop100:8020/flink-jars/flinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

历史服务器:查看停止的job的统计信息

6、flink运行时架构

以Standalone会话模式为例
在这里插入图片描述

  • 作业管理器(JobManager)

    JobManager是一个Flink集群任务管理和调度的核心,是控制应用执行的主进程,每个应用都有一JobManager。JobMaster是JobManager中最核心的组件,负责处理单独作业。JobMaster和job一一对应,多个job可运行在同一集群中,每个job有一个对应的JobMaster。

  • 资源管理器

    负责资源的分配和管理。资源主要指TaskManager的任务槽(task slot)。任务槽为flink集群中资源调配单元,包含执行计算的cpu和内存。每个task要分配到一个slot上执行。

  • 分发器

    负责提供rest接口,用来提交应用并且负责为每一个新提交的作业启动一个新的JobMaster,也会启动webUI

  • 任务管理器(TaskManager)

    TaskManager是flink中的工作进程,负责数据流的具体计算。flink集群中必须至少有一个TaskManager,每个TaskManager包含一定数量的task slot。slot为资源调度的最小单位,slot的数量限制了TaskManager能够并行处理的任务数量

7、核心概念

7.1、并行度

当要处理大量数据时,可以把算子操作“复制”多分,每个算子都可执行计算任务。一个任务就拆分成多个子任务,实现了并行计算。flink执行过程中,每个算子包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同容器中执行。

一个算子的子任务个数为其并行度。一个流程序并行度是其所有算子中最大的并行度。
在这里插入图片描述
可通过代码设置并行度(优先度最高),默认为电脑CPU的线程数。

算子.setParallelism(并行度)

全局环境设置并行度

env.setParallelism(并行度)

通过web页面/命令行提交时设置并行度(优先度低)

flink的yaml配置文件配置并行度(优先度最低)

优先级:算子指定>env指定>提交时设置>配置文件配置

7.2、算子链
  1. 一对一one-to-one(forwarding)

    这种模式数据流维护着分区以及元素的顺序。如source和map算子,source算子读取数据后直接发给map算子处理,他们之间不需要重新分区和调整顺序,保持着一对一的关系。map、source、flatMap都是这种一对一的关系。

  2. 重分区redistributing

    数据流的分区会发生改变。map和,keyBy/window算子之间,keyBy/window算子之间与sink算子之间都是重分区关系。

  • 合并算子链:将并行度相同的一对一算子操作可以直接链接在一起形成一个子任务,被一个线程执行。
7.3、任务槽

flink每一个taskManager都是一个JVM进程,它可以启动多个独立的线程来并行执行多个子任务。为了控制并发量就需要再taskManager上对每个任务运行所占用的资源做出明确的划分,就是任务槽。

每个任务槽表示taskManager拥有计算资源的固定大小的子集。这些资源用来独立执行一个子任务。

假如一个taskManager有三个slot,就会将管理的内存均分成三份,每个slot独占一份,slot不会去争抢资源。**slot仅用来隔离内存,不隔离CPU。**建议slot数量为cpu核心数,避免争抢cpu资源。

同一作业不同算子的并行子任务可以放到同一slot上执行。
在这里插入图片描述

7.4、任务槽与并行度关系

任务槽是静态概念,是指taskManager具有并发执行能力;并行度是动态概念,程序运行时的实际使用的并发能力。slot的数量是最大并行度。并行度超过slot数量flink不能运行。

使用yarn动态申请资源:申请taskManager数量=并行度/每个taskManager的slot数(向上取整)

8、作业提交流程

8.1、Standalone会话模式

在这里插入图片描述

8.2、yarn应用模式

在这里插入图片描述

9、DataStream API

DataStream API是flink核心层API。一个flink程序就是对DataStream的各种转换。代码一般由几个部分组成:
在这里插入图片描述

9.1、执行环境

1、创建环境

StreamExecutionEnvironment类的对象是所有flink程序的基础。最常用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、程序执行

flink是由事件驱动的,只有等数据到来才会触发计算。需要显式调用执行环境的execute()方法来触发程序执行。execute()将一直等待作业完成返回一个执行结果(JobExecutionResult)。

env.execute();
9.2、源算子source
9.2.1、准备工作

数据模型

字段名数据类型说明
idString水位传感器类型
tsLong传感器记录时间戳
vcInteger水位记录

定义类

package bean;import java.util.Objects;/*** @Title: WaterSensor* @Author lizhe* @Package bean* @Date 2024/5/29 21:06* @description: 水位类*/
public class WaterSensor {public String id;public Long ts;public Integer vc;public String getId() {return id;}public WaterSensor setId(String id) {this.id = id;return this;}public Long getTs() {return ts;}public WaterSensor setTs(Long ts) {this.ts = ts;return this;}public Integer getVc() {return vc;}public WaterSensor setVc(Integer vc) {this.vc = vc;return this;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public WaterSensor() {}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) &&Objects.equals(ts, that.ts) &&Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
package source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** @Title: CollectionDemo* @Author lizhe* @Package source* @Date 2024/5/29 22:06* @description:*/
public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//从数组读取数据DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 34));source.print();//直接读取数据DataStreamSource<Integer> source1 = env.fromElements(22, 4456, 66);source1.print();env.execute();}
}
9.2.2、从集合中读数据
package source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** @Title: CollectionDemo* @Author lizhe* @Package source* @Date 2024/5/29 22:06* @description:*/
public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//从数组读取数据DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 34));source.print();//直接读取数据DataStreamSource<Integer> source1 = env.fromElements(22, 4456, 66);source1.print();env.execute();}
}
9.2.3、从文件中读数据

导入依赖

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.13.0</version></dependency>
package source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Title: FileSourceDemo* @Author lizhe* @Package source* @Date 2024/5/29 22:13* @description:*/
public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineFormat(), new Path("input/words.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),"filesource").print();env.execute();}
}
9.2.4、从Kafka读数据

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

导入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.13.6</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.13.6</version>
</dependency>

启动kafka集群
在这里插入图片描述
编写代码

package source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Title: FileSourceDemo* @Author lizhe* @Package source* @Date 2024/5/29 22:13* @description:*/
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("192.168.132.100:9092,192.168.132.101:9092,192.168.132.102:9092").setGroupId("test").setTopics("test").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"filesource").print();env.execute();}
}
9.2.5、flink数据类型

查看TypeInformation(实现序列化)。TypeInformation类是flink中所有类型描述符的基类。涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化、反序列化、比较器。

  • 基本类型:java基本类型及其包装类,还有void、string、date、bigDecimal、bigInteger
  • 数组类型:基本类型数组和对象数组
  • 复合数据类型:元组类型(Tuple)
  • 辅助类型:List、Map
  • 泛型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/421863.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue3使用vue-qrcode-reader实现扫码绑定设备功能

需求描述 移动端进入网站后&#xff0c;登录网站进入设备管理界面。点击添加设备&#xff0c;可以选择直接添加或者扫一扫。点击扫一扫进行扫描二维码获取设备序列号自动填充到添加设备界面的序列号输入框中。然后点击完成进行设备绑定。 安装vue-qrcode-reader 这里使用的版…

2024.9.11 作业

绘制组件制作时钟 代码&#xff1a; /*******************************************/ 文件名&#xff1a;widget.h /*******************************************/ #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent> #include &l…

MAX3483ESA+T具有±15kV ESD保护的+3.3V、低功耗收发器,适用于RS-485和RS-422通信

MAX3483ESAT具有15kV ESD保护的3.3V、低功耗收发器&#xff0c;适用于RS-485和RS-422通信。每个器件包含一个驱动器和一个接收器。MAX3483ESAT具有限摆率驱动器&#xff0c;可充分降低EMI并减少因电缆端接不当引起的反射&#xff0c;从而实现数据速率高达250kbps的无误差数据传…

【中间件】-容器编排平台Kubernetes简介

目录 什么是K8s 为什么需要K8s 什么是容器(Contianer) K8s能做什么&#xff1f; K8s的架构原理 控制平面(Control plane) kube-apiserver etcd kube-scheduler kube-controller-manager cloud-controller-manager 小结 节点组件(Node) container runtime Pod kubelet ku…

AnyChart 数据可视化框架

AnyChart 数据可视化框架 AnyChart 是一个灵活的 JavaScript&#xff08;HTML5、SVG、VML&#xff09;图表框架&#xff0c;适合任何需要数据可视化的解决方案。 目录 下载并安装开始插件将 AnyChart 与 TypeScript 结合使用将 AnyChart 与 ECMAScript 6 结合使用技术集成贡献…

Anolis OS 7.9(龙蜥操作系统)上Oracle12C Release 2 (12.2)打补丁

本文的oracle使用的是单实例环境 一、打补丁前环境准备 1、确保make, ar, ld,和 nm四个可执行命令在$PATH中 export PATH$PATH:/bin2、查看已装的Oracle的OPatch版本 #切换到oracle用户 su - oracle#进入到数据库的安装目录下的opatch目录 cd /ora01/app/oracle/product/12…

JS_函数声明

JS中的方法,多称为函数,函数的声明语法和JAVA中有较大区别 函数说明 函数没有权限控制符不用声明函数的返回值类型,需要返回在函数体中直接return即可,也无需void关键字参数列表中,无需数据类型调用函数时,实参和形参的个数可以不一致声明函数时需要用function关键字函数没有…

github actions CICD简单使用案例

参考&#xff1a; https://developer.aliyun.com/article/1540773 https://github.com/ViggoZ/producthunt-daily-hot/blob/main/.github/workflows/generate_markdown.yml 1、创建github项目 目录&#xff1a; .github/workflows/fetch-news.yml actions执行yaml&#xff08;…

C语言 | Leetcode C语言题解之第397题整数替换

题目&#xff1a; 题解&#xff1a; //第一种动态规划:超时 // class Solution { // public: // int integerReplacement(int n) { // vector<int>dp(n1,0); // dp[1]0; // for(int i2;i<n;i){ // if(i%20){ // …

Vue接入高德地图并实现基本的路线规划功能

目录 一、申请密钥 二、安装依赖 三、代码实现 四、运行截图 五、官方文档 一、申请密钥 登录高德开放平台&#xff0c;点击我的应用&#xff0c;先添加新应用&#xff0c;然后再添加Key。 如图所示填写对应的信息&#xff0c;系统就会自动生成。 二、安装依赖 npm i am…

学生签到系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;学生管理&#xff0c;教师管理&#xff0c;签到信息管理&#xff0c;学生签到管理&#xff0c;班课信息管理&#xff0c;加入班课管理&#xff0c;课程信息管理 微信端账号功能包括&#xff1a;系统首…

C++(三)----内存管理

1.C/C内存分布 看下面这个问题&#xff08;考考你们之前学的咋样&#xff09;&#xff1a; int globalVar 1; static int staticGlobalVar 1; void Test() {static int staticVar 1;int localVar 1;int num1[10] {1, 2, 3, 4};char char2[] "abcd";char* pCh…

JDK 安装及配置教程(Windows)【安装】

文章目录 一、 下载1. 官网下载2. 其它渠道 二、 安装三、 配置四、 验证五、 双 JDK 环境 软件 / 环境安装及配置目录 一、 下载 1. 官网下载 安装地址&#xff1a;https://www.oracle.com/ 打开浏览器输入网址 https://www.oracle.com/index.html&#xff0c;进入 Oracle …

Python——turtle库(海龟绘图)介绍与使用

一、概述 在 Python 中&#xff0c;海龟绘图提供了一个实体“海龟”形象&#xff08;带有画笔的小机器动物&#xff09;&#xff0c;假定它在地板上平铺的纸张上画线。 二、运行环境 本文运行环境&#xff1a;Windows11&#xff0c;Python3.11&#xff0c;Pycharm2023.1.4 使…

哈佛斯坦福大学团队联合发布病理基础模型CHIEF,全面提升癌症诊断的准确性|顶刊精析·24-09-12

小罗碎碎念 今日顶刊&#xff1a;Nature 今天精读的这篇文章于24-09-04发表于Nature&#xff0c;作者来自哈佛大学、斯坦福大学。 作者角色作者姓名单位名称&#xff08;英文&#xff09;单位名称&#xff08;中文&#xff09;第一作者Xiyue WangDepartment of Biomedical Info…

如何快速清理Docker中的停止容器?

如何快速清理Docker中的停止容器? 方法一:使用`docker container prune`方法二:结合`docker ps`和`docker rm`注意(这些命令慎用,确定容器不需要之后再执行)💖The Begin💖点点关注,收藏不迷路💖 Docker容器在停止后可能会占用不必要的磁盘空间。如何清理这些停止的…

k8s以及prometheus

#生成控制器文件并建立控制器 [rootk8s-master ~]# kubectl create deployment bwmis --image timinglee/myapp:v1 --replicas 2 --dry-runclient -o yaml > bwmis.yaml [rootk8s-master ~]# kubectl expose deployment bwmis --port 80 --target-port 80 --dry-runclient…

第 9 章图像分割

图像分割是将一幅图像分割成有意义区域的过程。区域可以是图像的前景与背景或图像中一些单独的对象。这些区域可以利用一些诸如颜色、边界或近邻相似性等特征进行构建。本章中&#xff0c;我们将看到一些不同的分割技术。 9.1 图割&#xff08;Graph Cut&#xff09; 图论中的…

Hive中的分区表与分桶表详解

目录 分区表和分桶表 分区表 分区表基本语法 1. 创建分区表 2. 分区表读写数据 1&#xff09;写数据 &#xff08;1&#xff09;LOAD &#xff08;2&#xff09;INSERT 2&#xff09;读数据 3. 分区表基本操作 1&#xff09;查看所有分区信息 2&#xff09;增加分区 …

iOS 知识点记录

王巍 博客地址&#xff1a;OneVs Den git地址&#xff1a;onevcat (Wei Wang) GitHub 江湖人称喵神&#xff0c;目前就职于line。喵神的博客涉及方面比较广, 有Obejctive-C, Swift, SwiftUI, Unity等等。博客内容很有深度&#xff0c;非常值得关注。 戴铭 博客地址&#xff1…