flink的安装与使用(ubuntu)

组件版本

虚拟机:ubuntu-20.04.6-live-server-amd64.iso

flink:flink-1.18.0-bin-scala_2.12.tgz

jdk:jdk-8u291-linux-x64.tar

flink 下载

1、官网:https://flink.apache.org/downloads/

2、清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

flink 安装

1、上传文件至服务器指定路径

/usr/local/myapp/flink

2、解压文件

tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /usr/local/myapp/flink

jdk 安装

1、ubuntu 中自带了 jdk,先将其卸载

sudo apt-get remove *openjdk*
sudo apt-get autoremove

2、上传文件至服务器指定路径

/usr/local/myapp/jdk

3、解压文件

tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk

4、配置环境变量

vim /etc/profile

在文末增加配置(路径根据自身情况进行调整)

export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib

5、测试 jdk

root@vm1:/usr/local/myapp/jdk# java -version
java version "1.8.0_291"
Java(TM) SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291

测试 flink

1、进入到 flink 的安装路径下

cd /usr/local/myapp/flink/flink-1.18.0/

2、修改配置文件

vim conf/flink-conf.yaml

内容

jobmanager.bind-host: 0.0.0.0

3、关闭/禁用防火墙

systemctl stop ufw.service
systemctl disable ufw.service

4、启动 flink

./bin/start-cluster.sh

5、浏览器访问:http://ip:8081/

能看到内容说明正常

设置 flink 的 Standalone 模式集群并上传任务执行

1、机器规划

类型主机名IP
JobManagervm1192.168.141.120
TaskManagervm2192.168.141.121
TaskManagervm3192.168.141.122

2、设置每个服务器的机器名

vim /etc/hostname

3、设置每个服务器的 hosts 文件

vim /etc/hosts

增加三台服务器的机器名对照

192.168.141.120 vm1
192.168.141.121 vm2
192.168.141.122 vm3

使其立即生效(建议到这一步后,都重新启动下)

source /etc/hosts

4、设置服务器间的免密登录

4.1、自身免密

vm1 执行(vm2/vm3 同理)

ssh-keygen -t rsa

之后的内容全部回车即可

生成后,可在 /root/.ssh/ 中看到 id_rsa.pub 文件

通过命令设置到认证文件中

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 

重启服务器,通过命令测试是否可以免密登录自身

ssh vm1

通过 exit 命令可以退出当前的 ssh 登录

4.2、设置相互免密(以 vm1 为演示,其余服务器同理)

在 vm1 服务器中,将生成的自身密钥传输到其余两台服务器上

scp /root/.ssh/id_rsa.pub root@vm2:/root
scp /root/.ssh/id_rsa.pub root@vm3:/root

在 vm2/vm3 服务器中,将传输过来的密钥,通过命令设置到认证文件中

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 

vm1 设置完成,通过命令来测试能不能直接登录到 vm2/vm3 中

ssh vm2
ssh vm3

vm2/vm3 同理,都需要执行这些步骤:

A、生成自身密钥,添加到自身的认证文件中

B、将自身密钥传输到其余的服务器中,并在该服务器中通过命令设置自身密钥到其余服务器的认证文件中

注意:vm2 和 vm3 执行时,一个服务器完全执行结束/测试后,再进行下一个,不然会有密钥文件存在被覆盖的风险

5、设置主机时间同步

安装工具

apt-get install -y ntpdate

执行同步

ntpdate -u ntp.sjtu.edu.cn

6、配置 flink

以下以 vm1 为例,其他服务器的配置可将配置好的配置文件同步过去

6.1、masters 文件

vim masters

内容

vm1:8081

6.2、workers 文件

vim workers

内容

vm2
vm3

6.3、flink-conf.yaml 文件

vim flink-conf.yaml

内容(篇幅问题,去掉了注释)

env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMEDjobmanager.rpc.address: vm1jobmanager.rpc.port: 6123jobmanager.bind-host: 0.0.0.0jobmanager.memory.process.size: 1600mtaskmanager.bind-host: 0.0.0.0taskmanager.memory.process.size: 1728mtaskmanager.numberOfTaskSlots: 3parallelism.default: 1jobmanager.execution.failover-strategy: regionrest.port: 8081rest.address: vm1rest.bind-address: vm1blob.server.port: 45579

7、启动集群

只需在 vm1 上启动集群模式即可

root@vm1:/usr/local/myapp/flink/flink-1.18.0# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.

可以看到 vm2/vm3 的也会被启动,不需要手动去 vm2/vm3 再启动一次了

可以通过 java 的 jps 命令查看程序是否启动成功了

vm1 上

在这里插入图片描述

vm2 上

在这里插入图片描述

vm3 上

在这里插入图片描述

从图上可以分析出是以 Standalone 的集群模式启动了,其中 vm1 是 JobManager,vm2/vm3 是 TaskManager

8、页面查看状态

浏览器输入地址:http://192.168.141.120:8081/

可看到主页面

在这里插入图片描述

9、自定义一个任务

idea 创建一个 maven 项目

9.1、依赖及插件

<properties><flink.version>1.18.0</flink.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

9.2、程序内容

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;// 无界流
public class UnboundStreamJob {public static void main(String[] args) throws Exception {//1 获取flink运行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//2.加载数据源为dataStream  ,绑定客户机的9999端口,将这个网络端口发送的数据加载为dataStreamDataStreamSource<String> dataStream = environment.socketTextStream("192.168.141.122", 9999, "\n");//3.执行多个转换算子 ,SingleOutputStreamOperator是DataStreamSource子类SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, String>() {@Override//value:表示一个待处理的数据,在这里就是一行字符串//out:  用于输出结果的工具对象public void flatMap(String value, Collector<String> out) throws Exception {//拆分value,通过out输出结果String[] words = value.split("//s+");   //去除一个或多个空格for (String word : words) {out.collect(word);}}})  //执行一行字符串拆分为多个单词.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}) //将多个单词转换为(单词,1) 这种tuple2对象.keyBy(0)  //根据单词为key分组,0表示tuple2中的第一个属性,也就是单词.sum(1);//统计每组单词的个数,  1表示tuple2中第2个属性,也就是次数//4.通过sink算子输出结果result.print();//5.发布执行environment.execute("flinkWordCount"); //为任务起别名}}

9.3、程序说明

与 vm3 所在的 IP 为 192.168.141.122 在 9999 端口上进行 socket 通信,程序接收到消息后,进行计算并输出到控制台中

10、在 vm3 上开启一个 socket 通信(这一步一定要在上传任务之前进行)

netcat -lk 9999

11、提交任务(WebUI 方式)

11.1、打包刚才的程序,将打包好的 jar 包复制到某个好找的路径

11.2、打开网页中的 Submit New Job 选项,并点击 Add New

在这里插入图片描述

11.3、选择刚才打包的 jar 包进行上传,之后点击该 jar 包,填写启动类的路径,之后点击 Submit 提交按钮

在这里插入图片描述

11.4、正常情况下,任务就发布完成了,可以在 Task Managers 查看哪个节点的 Free Slots 相比 All Slots 减少了一个,那么这个节点的服务器就是执行该任务的服务器

在这里插入图片描述

12、提交任务(命令方式)

12.1、上传 jar 包到服务器中(任意一个服务器都行)

root@vm1:/usr/local/myapp/flink/task# ls
demo01-1.0-SNAPSHOT.jar

12.2、添加到任务中

../flink-1.18.0/bin/flink run -d -c xx.xx.xx.UnboundStreamJob demo01-1.0-SNAPSHOT.jar

说明:需要指定启动类

12.3、看到下面的信息,说明提交任务完成

Job has been submitted with JobID a893314f5efbb93bf3e6edefa578fd35

13、测试

13.1、点击该服务器,其中的 Stdout 就是控制台输出的地方

我们在 vm3 中开启的 socket 通信中,发送一条消息

在这里插入图片描述

13.2、回到页面中,刷新下控制台输出,会发现多了一个输出信息

在这里插入图片描述

13.3、至此,测试就完成了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/180927.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux进行时】磁盘文件结构

磁盘 上篇文章&#xff0c;我们提及文件是存放在磁盘当中&#xff0c;本篇文件我们来了解一下磁盘的结构&#xff01;&#xff01;&#xff01; 磁盘的概念&#xff1a; ❓什么是磁盘&#xff1f; &#x1f4a1;磁盘&#xff08;disk&#xff09;是指利用磁记录技术存储数据…

图解系列--理解L3交换机的性能与功能

04.01 何为 L3 交换机 L3交换机是一种在L2 交换机的基础上增加了路由选择功能的网络硬件&#xff0c;能够通过基于ASIC 和 FPGA 的硬件处理高速实现网络功能和转发分组。L2 是指 OSI 参考模型中的L2, 也就是数据链路层。L2 交换机能够基于该层主要编址的 MAC 地址&#xff0c;…

【Linux】:重定向和用户缓冲区

重定向和用户缓冲区 一.输出重定向1.现象2.系统调用接口 二.缓冲区1.引子2.刷新 三.回答引例 文件描述符对应匹配规则&#xff1a;从0下标开始&#xff0c;寻找最小的没有被使用的数组位置&#xff0c;它就是新的文件描述符(fd)。 一.输出重定向 1.现象 在这里我们向1号文件内…

[C++进阶篇]STL中vector的使用

一、vector的介绍 1.vector的介绍 vector是表示可变大小数组的序列容器。vector也采用的连续存储空间来存储元素&#xff0c;就是可以采用下标对vector的元素进行访问&#xff0c;和数组一样。它的大小是可以动态改变的。 2.重要的接口组成 二、 vector迭代器的使用 2.1 ve…

Spring Boot 整合SpringSecurity和JWT和Redis实现统一鉴权认证

&#x1f4d1;前言 本文主要讲了Spring Security文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#xff1a;努力…

fastapi-Headers和Cookies

在FastAPI中&#xff0c;Headers是一个特殊的类型&#xff0c;用于处理HTTP请求头&#xff08;Headers&#xff09;。Headers允许你接收、访问和修改HTTP请求中的头部信息。 使用Headers&#xff0c;你可以在FastAPI的路由视图中将请求头作为参数接收&#xff0c;并对它们进行…

京东数据平台:2023年9月京东智能家居行业数据分析

鲸参谋监测的京东平台9月份智能家居市场销售数据已出炉&#xff01; 9月份&#xff0c;智能家居市场销售额有小幅上涨。根据鲸参谋电商数据分析平台的相关数据显示&#xff0c;今年9月&#xff0c;京东平台智能家居的销量为37万&#xff0c;销售额将近8300万&#xff0c;同比增…

静态、友好、内在:解析C++中的这些特殊元素和对象复制的优化

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; &#x1f354;前言&#xff1a; 前面我们学习了C中关于类与对象的许多知识点&#xff0c;今天我们继续学习类与对象&#xff0c;最后再总结一下类与对象中的一些关键字内容&#xff0c;以及需要注意的细节。满满的干货…

3D高斯泼溅(Splatting)简明教程

在线工具推荐&#xff1a; Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 3D场景编辑器 3D 高斯泼溅&#xff08;Splatting&#xff09;是用于实时辐射场渲染的 3D 高斯分布描述的一种光栅化技术&#xff0c;它允许实时渲染从小图像样…

Windows10安装Anaconda与Pytorch的记录

这是一篇关于安装Anaconda和Pytorch的记录与复盘&#xff0c;写的原因是我电脑恢复系统之后东西全没了&#xff0c;再装Pytorch的时候一脸懵逼忘了怎么弄了&#xff0c;写篇记录以备我下一次安装。 1、Anaconda的安装 1.1、Anaconda安装包下载 下载链接: Free Download | An…

基于 Amazon EC2 和 Amazon Systems Manager Session Manager 的堡垒机的设计和自动化实现

文章目录 1. 背景2. 云上堡垒机设计2.1 安全设计2.2 高可用和弹性设计2.3 监控告警设计2.4 自动化部署设计2.4.1 堡垒机代码设计2.4.2 Session Manager 配置设计2.4.3 堡垒机 IAM 角色设计 3. 部署堡垒机3.1 堡垒机部署架构图3.2 堡垒机自动化部署 4. 堡垒机使用场景4.1 堡垒机…

SpringBoot集成JPA实现分页和CRUD

SpringBoot集成JPA实现分页和CRUD 文章目录 SpringBoot集成JPA实现分页和CRUDpom.xmlapplication.propertiesaddCategory.jspeditCategory.jsphello.jsplistCategory.jspCategoryCategoryDAOCategoryServiceCategoryServiceImplPage4NavigatorRedisConfigCategoryControllerHel…

JavassmMYSQL宠物领养系统08465-计算机毕业设计项目选题推荐(附源码)

目 录 摘要 1 绪论 1.1课题背景及意义 1.2研究现状 1.3ssm框架介绍 1.3论文结构与章节安排 2 宠物领养系统系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据流程 3.3.2 业务流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例分析 …

【数智化案例展】某国际高端酒店品牌——呼叫中心培训数智化转型项目

‍ 维音案例 本项目案例由维音投递并参与数据猿与上海大数据联盟联合推出的《2023中国数智化转型升级创新服务企业》榜单/奖项”评选。 大数据产业创新服务媒体 ——聚焦数据 改变商业 培训是呼叫中心管理的重要环节&#xff0c;由于员工流动性强、培训需求多样、考核流程繁琐…

竞赛 深度学习猫狗分类 - python opencv cnn

文章目录 0 前言1 课题背景2 使用CNN进行猫狗分类3 数据集处理4 神经网络的编写5 Tensorflow计算图的构建6 模型的训练和测试7 预测效果8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习猫狗分类 ** 该项目较为新颖&a…

【H.264】RTP h264 码流 实例解析分析 3 : webrtc

【srs】SRS检测IBMF还是annexb 【H.264】RTP h264 码流 实例解析分析 2 : mediasoup收包 mediasoup 并没完整解析rtp包的内容,可能与mediasoup 只需要转发,不需要解码有关系。 webrtc 本身都是全的。 m98代码,先说关键: webrtc的VideoRtpDepacketizer 第一:对RTPVideoType…

操作系统——初始文件管理(王道视频p58)

1.总体概述&#xff1a; 这一节&#xff0c;主要是 作为 后续 “文件系统”的引子 我认为可以思考的点&#xff1a; &#xff08;1&#xff09;文件之间的逻辑结构——windows中采用根什么的“树状结构”&#xff0c;而文件在外存中的实际物理结构又是什么样的 &#xff08…

《向量数据库指南》——用了解向量数据库Milvus Cloud搭建高效推荐系统

了解向量数据库 ANN 搜索是关系型数据库无法提供的功能。关系型数据库只能用于处理具有预定义结构、可直接比较值的表格型数据。因此,关系数据库索引也是基于这一点来比较数据。但是 Embedding 向量无法通过这种方式直接相互比较。因为我们不知道向量中的每个值代表什么意思,…

趋势:实时的stable diffusion

视频中使用了实时模型&#xff1a;只需2~4 个步骤甚至一步即可生成768 x 768分辨率图像。 这项技术可以把任意的stable diffusion模型转为实时模型。 潜在一致性模型 LCM LCM 只需 4,000 个训练步骤&#xff08;约 32 个 A100 GPU 一小时&#xff09;即可从任何预训练的SD模型中…

【RtpSeqNumOnlyRefFinder】webrtc m98: ManageFrameInternal 的帧决策过程分析

Jitterbuffer(FrameBuffer)需要组帧以后GOP内的参考关系 JeffreyLau 大神分析 了组帧原理而参考关系(RtpFrameReferenceFinder)的生成伴随了帧决策 FrameDecisionFrameDecision 影响力 帧的缓存。调用 OnAssembledFrame 传递已经拿到的RtpFrameObject 那么,RtpFrameObject…