使用CDH6.3.2安装了hadoop集群,但是CDH不支持flink的安装,网上有CDH集成flink的文章,大都比较麻烦;但其实我们只需要把flink的作业提交到yarn集群即可,接下来以CDH yarn为基础,flink on yarn模式的配置步骤。
一、部署flink
1、下载解压
官方下载地址:Downloads | Apache Flink
注意:CDH6.3.2是使用的scala版本是2.11(可以去CHD中spark目录lib下,看一下scala版本),所以下载的flink也要scala_2.11版本的。
2、解压
cd /data/softs tar -zxvf flink-1.14.5-bin-scala_2.11.tgz
#修改名称
mv softs/flink-1.14.5 /data/flink-yarn
3、修改flink配置
vim conf/flink-conf.yaml
#配置java环境变量
env.java.home: /usr/local/jdk1.8.0_281/
#以下为高可用配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:8020/flink/yarn/ha
high-availability.zookeeper.quorum: master1:2181,node1:2181,node2:2181
high-availability.zookeeper.path.root: /flink-yarn
high-availability.cluster-id: /cluster_flink_yarn
4、修改操作用户(针对以session模式启动flink)
vim bin/yarn-session.sh
#操作hdfs的用户
export HADOOP_USER_NAME=hdfs
5、分发到其它节点
将配置好的flink分发到其它两个节点(我的集群是三个节点)
scp -r flink-yarn node1:/data/
scp -r flink-yarn node2:/data/
6、配置全局环境变量
想要让 Flink 服务运行与 YARN 之上,首先需要让 Flink 能够发现 YARN 和 HDFS 的相关配置,因此,需要通过HADOOP_CLASSPATH、HADOOP_CONF_DIR 属性来指定 Hadoop 配置文件所在目录;
因此需要在各个节点配置这两个属性的去全局变量。
vim /etc/profile
#添加如下两行
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.yarn/
#刷新
source /etc/profile
7、设置归属用户
因为flink需要将作业提交到yarn集群上,即需要访问或者操作hadoop集群,所以需要有hdfs用户的权限(CDH集群默认hdfs用户有操作hadoop的权限),所以要将flink的归属用户设置为hdfs,且后续都必须用hdfs用户提交flink的作业。在各个节点执行如下操作:
chown -R hdfs:hdfs flink-yarn
二、提交flink作业
1、上传作业jar包
这里使用的是一个单词统计的jar包,使用时需要传入一个服务器IP作为监听的对象
rz flink-on-k8s-demo-1.0-SNAPSHOT.jar
2、在被监听服务器上发送消息
#在172.16.12.103 这台服务器上执行,并输入单词
nc -lk 7777
3、使用application模式启动flink作业
./bin/flink run-application -t yarn-application \ #指定flink作业的启动方式
-c com.yale.StreamWordCount \ #指定程序的入口类
../softs/flink-on-k8s-demo-1.0-SNAPSHOT.jar \ #程序jar包
172.16.12.103 #入参(被监听的服务器IP)
4、查看作业执行情况
打开yarn的webUI
可以看到一个正在运行的任务,点击 applicationId 进去,可以看到有两个容器,
点击logs进去
再点击taskmanager.out,可以看到单词统计的结果,说明成功了!!
三、遇到的问题
1、org.apache.flink.client.deployment.ClusterDeploymentException
答:flink的scala版本和CDH的scala版本不一致,将flink换成scala_2.11版本。
2、Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME
答:在flink-conf.yaml文件中添加env.java.home属性指定java home。