Flink 1.18.1+ Hadoop 3.4.0
一、准备工作
系统:Mac M1 (MacOS Sonoma 14.3.1)
JDK:jdk1.8.0_381 (注意:尽量一定要用JDK8,少用高版本)
Scala:2.12
JDK安装在本机的/opt/jdk1.8.0_381.jdk/Contents/Home下,Scala安装在/opt/scala-2.12.10下,并在.bash_profile中已配置好环境变量
export JAVA_HOME=/opt/jdk1.8.0_381.jdk/Contents/Home
export PATH=$JAVA_HOME/bin:$PATHexport SCALA_HOME=/opt/scala-2.12.10
export PATH=$SCALA_HOME/bin:$PATH
二、安装Hadoop
单纯运行Flink的话没必要安装Hadoop环境,但为了在Flink SQL中使用Hive数仓的话,还是得安装Hadoop基础环境。
2.1 下载解压
下载Hadoop 3.4.0(截止当前的最新版本)
国内镜像地址:Index of /apache/hadoop/common
将下载后的hadoop-3.4.0.tar.gz放到/opt下直接双击进行解压,如下:
2.2 配置Hadoop环境变量
打开.bash_profile(Mac下可用文本编辑器打开编辑),添加如下变量
export HADOOP_HOME=/opt/hadoop-3.4.0
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
保存使之生效:
source ~/.bash_profile
2.3 配置Hadoop配置文件
1. 编辑hadoop-env.sh
打开/opt/hadoop-3.4.0/etc/hadoop/hadoop-env.sh,添加一行:
export JAVA_HOME=/opt/jdk1.8.0_381.jdk/Contents/Home
2. 编辑core-site.xml
打开/opt/hadoop-3.4.0/etc/hadoop/core-site.xml,在<configuration>中添加如下配置:
<configuration><property><name>hadoop.tmp.dir</name><value>/opt/hdfs/tmp/</value></property><property><name>fs.default.name</name><value>hdfs://127.0.0.1:9000</value></property>
</configuration>
其中/opt/hdfs/tmp为自定义的HDFS路径。
3. 编辑hdfs-site.xml
打开/opt/hadoop-3.4.0/etc/hadoop/hdfs-site.xml,在<configuration>中添加如下配置:
<configuration><property><name>dfs.replication</name><value>1</value></property>
</configuration>
4. 编辑mapred-site.xml
打开/opt/hadoop-3.4.0/etc/hadoop/mapred-site.xml,在<configuration>中添加如下配置:
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property>
</configuration>
5. 编辑yarn-site.xml
打开/opt/hadoop-3.4.0/etc/hadoop/yarn-site.xml,在<configuration>中添加如下配置:
<configuration><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property><property><name>yarn.resourcemanager.hostname</name><value>127.0.0.1</value></property><property><name>yarn.acl.enable</name><value>0</value></property><property><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PERPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value></property>
</configuration>
2.4 配置其他工具类jar包(使用Flink SQL时才需要配置)
下载高版本jline,替换原来的旧版本,例如下载:jline-3.26.2.jar
下载地址:https://mvnrepository.com/artifact/org.jline/jline/3.26.2
将其分别放到:
/opt/hadoop-3.4.0/share/hadoop/hdfs/lib
/opt/hadoop-3.4.0/share/hadoop/yarn/lib
这两个目录下,并将原有的jline-3.9.0.jar删掉。
注意:该配置只为了解决Flink SQL使用过程中的jar包报错问题,即Hadoop自带的jline版本太低,无法适配高版本flink,如果单纯只使用Hadoop或是Spark的能力,无需进行该配置。
2.5 设置SSH免密登录
在个人目录下输入以下命令:
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 0600 ~/.ssh/id_rsa.pub
中间问是否覆盖此前的ssh,选择Y:
三、启动Hadoop
首次启动Hadoop之前必须进行Namenode格式化(以后不需要):
cd /opt/hadoop-3.4.0/sbin
hdfs namenode -format
输出如下日志信息:
运行start-all.sh直接启动,包含了启动start-dfs.sh和start-yarn.sh。出现如下信息表示启动成功:
在浏览器中输入:http://localhost:9870/
显示如下:
至此,Hadoop已正常启动。
注:关闭命令为:stop-all.sh
四、配置Flink
当前Flink最新版本为1.19.1,但目前的1.19版本仍未支持Iceberg runtime,考虑到后续可能会使用Iceberg数据湖,因此选择Flink 1.18.1。
Multi-Engine Support - Apache Iceberg
4.1 配置Flink环境变量
下载Flink 1.18.1
下载地址:Downloads | Apache Flink
将下载好的flink-1.18.1-bin-scala_2.12.tgz放到/opt下,双击进行解压。
配置环境变量,打开.bash_profile,添加如下行:
export PATH=/opt/flink-1.18.1/bin:$PATH
使之生效:
source ~/.bash_profile
注意:Mac里也可以在~/.zshrc中配置。
4.2 配置Flink 其他jar包(和hive/iceberg适配连接)
注意选择适配flink 1.18.1版本的jar包。
下载commons-cli-1.8.0.jar
地址:https://mvnrepository.com/artifact/commons-cli/commons-cli/1.8.0
下载flink-connector-hive_2.12-1.18.1.jar
地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.18.1/
下载flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
地址:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/
下载hive-exec-3.1.3.jar
地址:https://mvnrepository.com/artifact/org.apache.hive/hive-exec/3.1.3
下载iceberg-flink-runtime-1.18-1.5.2.jar
地址:https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.18/1.5.2/
下载iceberg-hive-runtime-1.5.2.jar
地址:https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.5.2/
以上jar包下载完后放到/opt/flink-1.18.1/lib下。
五、运行Flink SQL
5.1 启动Flink
运行以下命令:
cd /opt/flink-1.18.1
./bin/start-cluster.sh
输出如下信息:
在浏览器中打开:http://localhost:8081/
可以看到如下信息:
注:关闭flink的命令为:
./bin/stop-cluster.sh
5.2 启动Flink SQL
输入:
./bin/sql-client.sh embedded shell
看到如下信息表示启动成功:
可能会有如下警告信息,可忽略,原因为log4j jar包存在冲突。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.18.1/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.4.0/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
查看当前的catalogs:
show catalogs;
注:Catalog 是一个元数据存储,它提供了一种集中的方式来管理元数据信息,Catalog 存储了 Flink 中使用的所有元数据,包括表结构、分区信息、用户定义函数等。对于一个数据表的定位是 catalog名.数据库名.表名。因此首先需要创建一个 Catalog,然后在 Catalog 中创建数据库,最后在数据库中创建表。
输出:
默认只有1个default_catalog。
创建新的catalog:
CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','clients'='5','property-version'='1','warehouse'='file:///opt/warehouse/iceberg-hive-catalog'
);
注意设置“warehouse”为自己的路径,别的不用改。
查看catalogs:
show catalogs;
输出:
显示新的catalog创建成功了。
若要退出Flink SQL直接输入:
exit;
六、相关问题
1. Hadoop启动yarn时报错。控制台错误信息:
Starting resourcemanager ERROR: Cannot set priority of resourcemanager process 20248
在/opt/hadoop-3.4.0/logs查看相关日志,具体报错信息为:
2024-07-11 10:01:08,633 ERROR org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Error starting ResourceManager
java.lang.ExceptionInInitializerError
at com.google.inject.internal.cglib.reflect.$FastClassEmitter.<init>(FastClassEmitter.java:67)
at com.google.inject.internal.cglib.reflect.$FastClass$Generator.generateClass(FastClass.java:72)
at com.google.inject.internal.cglib.core.$DefaultGeneratorStrategy.generate(DefaultGeneratorStrategy.java:25)
at com.google.inject.internal.cglib.core.$AbstractClassGenerator.create(AbstractClassGenerator.java:216)
at com.google.inject.internal.cglib.reflect.$FastClass$Generator.create(FastClass.java:64)
at com.google.inject.internal.BytecodeGen.newFastClass(BytecodeGen.java:204)
at com.google.inject.internal.ProviderMethod$FastClassProviderMethod.<init>(ProviderMethod.java:256)
at com.google.inject.internal.ProviderMethod.create(ProviderMethod.java:71)
at com.google.inject.internal.ProviderMethodsModule.createProviderMethod(ProviderMethodsModule.java:275)
at com.google.inject.internal.ProviderMethodsModule.getProviderMethods(ProviderMethodsModule.java:144)
at com.google.inject.internal.ProviderMethodsModule.configure(ProviderMethodsModule.java:123)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:340)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:349)
at com.google.inject.AbstractModule.install(AbstractModule.java:122)
at com.google.inject.servlet.ServletModule.configure(ServletModule.java:49)
at com.google.inject.AbstractModule.configure(AbstractModule.java:62)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:340)
at com.google.inject.spi.Elements.getElements(Elements.java:110)
at com.google.inject.internal.InjectorShell$Builder.build(InjectorShell.java:138)
at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:104)
at com.google.inject.Guice.createInjector(Guice.java:96)
at com.google.inject.Guice.createInjector(Guice.java:73)
at com.google.inject.Guice.createInjector(Guice.java:62)
at org.apache.hadoop.yarn.webapp.WebApps$Builder.build(WebApps.java:420)
at org.apache.hadoop.yarn.webapp.WebApps$Builder.start(WebApps.java:468)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.startWepApp(ResourceManager.java:1486)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceStart(ResourceManager.java:1599)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:195)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.main(ResourceManager.java:1801)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make protected final java.lang.Class java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @6cc27570
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Method.checkCanSetAccessible(Method.java:199)
at java.base/java.lang.reflect.Method.setAccessible(Method.java:193)
at com.google.inject.internal.cglib.core.$ReflectUtils$2.run(ReflectUtils.java:56)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:318)
at com.google.inject.internal.cglib.core.$ReflectUtils.<clinit>(ReflectUtils.java:46)
... 29 more
原因:JDK版本太高,笔者之前装的是JDK 17。
解决方案:换成JDK 8,一切正常。试了网上别的一些解决方案,都不奏效。
2. Flink SQL执行时报错:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
原因:Hadoop环境变量配置有误。
解决方案:参考2.2配置Hadoop环境变量。
注:无需配置一个名称为HADOOP_CLASSPATH的变量。
注:网上另有一种方案是将flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar放到flink lib目录下,由于我们是自己单独配置了Hadoop环境,因此无需下载该jar包。
地址:https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0
3. Flink SQL启动报错:
Caused by: java.lang.ExceptionInInitializerError: Exception java.lang.NoSuchMethodError: org.jline.utils.AttributedStyle.foreground(III)Lorg/jline/utils/AttributedStyle; [in thread "main"]
at org.apache.flink.table.client.cli.parser.SyntaxHighlightStyle$BuiltInStyle.<clinit>(SyntaxHighlightStyle.java:57)
at org.apache.flink.table.client.config.SqlClientOptions.<clinit>(SqlClientOptions.java:76)
at org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59)
at org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633)
at org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615)
at org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554)
at org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3493)
at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:549)
原因:jline版本太低。
解决方案:参考2.4配置相关jar包。
4. Flink SQL执行报错:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException
原因:缺hive相关jar包,尤其是hive-exec-3.1.3.jar。
解决方案:参考4.2配置相关jar包。
参考:
Hadoop 安装教程 (Mac m1/m2版)_m1 安装hadoop-CSDN博客
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/hive/overview/
Flink集成Iceberg小小实战-腾讯云开发者社区-腾讯云