接上文:一文说清flink从编码到部署上线
1.引言
Apache Hadoop的Yarn是许多数据处理框架中非常流行的资源提供者。Flink的服务提交给Yarn的ResourceManager后,ResourceManager会在由Yarn的NodeManager管理的机器上动态分配运行容器。Flink在这些容器上部署自己的任务。
Yarn模式是将Flink交由Yarn来进行资源分配,因此,在启动Yarn模式时,需要保证集群上的Hadoop集群已经启动(包括HDFS和Yarn),HADOOP_HOME环境变量也已经正常配置。
2.yarn session
2.1 yarn-session.sh(开辟资源)+flink run(提交任务)
在yarn上申请一个固定的flink集群,然后所有任务都共享这个集群内的资源。会话模式同样也有–datched 或者 -d 这样的模式。 默认情况下是attached mode 绑定模式。这种模式下客户端提交一个任务到flink集群后,客户端程序会继续执行。持续跟踪任务在集群中的运行状态。如果集群上任务执行失败了,本地客户端也会显示出这些错误。而如果本地客户端应用停止了,也同样会通知集群停止对应的任务。
加上–detached后,就转变为 detached mode 解除绑定模式。这种模式下本地客户端提交任务到集群后就停止了。任务在集群中的执行状态需要由yarn或者其他的管理工具进行监控。
这种模式下会启动yarn session,并且会启动Flink的两个必要服务JobManager和Task-managers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)。
通过./bin/yarn-session.sh脚本启动YARN Session。
脚本可以携带的参数:
-n(--container):TaskManager的数量。(1.10 已经废弃)
-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-q:显示可用的YARN资源(内存,内核);
-tm:每个TaskManager容器的内存(默认值:MB)
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行。
注意:
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或–detached。
确定TaskManager数:
Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager。
2.2 启动
yarn-session.sh -tm 1024 -s 4 -d
上面的命令的意思是,每个 TaskManager 拥有4个 Task Slot(-s 4),并且被创建的每个 TaskManager 所在的YARN Container 申请 1024M 的内存,同时额外申请一个Container用以运行ApplicationMaster以及Job Manager。
TM的数量取决于并行度,如下图:
访问:http://10.86.97.191:8099/cluster/apps
2.3 提交任务
提交任务:
flink run -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql" -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
按照下面步骤进入Flink管理页面:
常用命令说明:
# 提交任务
flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
# 重新绑定另外一个yarn session
yarn-session.sh -id application_XXXX_YY
# 查看帮助
yarn-session.sh -h
会话模式在执行时会在本地创建一个临时的配置文件,默认创建在/tmp目录下。
3.Application Mode 应用模式
应用模式将在任务的启动时临时在yarn上申请一个flink集群。任务从main方法启动开始就会提交到yarn上的flink集群执行。执行完成后,集群就会立即注销。
文章:一文说清flink从编码到部署上线 用的就是这种模式。在此就不过多展开了。
flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql" -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
常用命令说明:
# 提交任务,主要是记住这个-t 参数
flink run-application -t yarn-application /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
# 查看集群上的任务
flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# 手动停止集群上的任务
flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
# 使用所有节点都能访问到的jar包来提交任务。
flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
hdfs://myhdfs/jars/my-application.jar
4.Per-job Cluster Mode 单任务模式
这种模式跟应用模式很类似,也会给每个应用在yarn上申请一个单独的flink集群。只不过这种模式下,任务是先在本地执行,构建数据处理链。构建完成后再将任务提交到flink集群上执行。
flink run -t yarn-per-job --detached -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql" -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
常用命令说明:
# 提交任务
flink run -t yarn-per-job --detached /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
# 查询集群上正在执行的任务
flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# 手动停止集群上的任务
flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
<jobId>
日志如下图所示:
5.注意
在生产环境中,一般Yarn上的资源都比较充足,优先建议使用Perjob模式,其次是Application模式。 这两种模式能够更好进行应用隔离。当然,如果集群的资源确实非常紧张,也可以使用Session模式。