aws emr启动standalone的flink集群

关键组件

  • Client,代码由客户端获取并做转换,之后提交给JobManger
  • JobManager,对作业进行中央调度管理,获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。
  • TaskManager,数据的处理操作
image-20240105171119363

在emr上自建standalone集群,文件分发脚本xsync

#!/bin/bash
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for host in $nodelist
doecho ==================== $host ====================for file in $@doif [ -e $file ]thenpdir=$(cd -P $(dirname $file); pwd)fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done

先进入节点中创建flink文件(root用户)

#!/bin/bash
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for node in $nodelist; doecho "Executing commands on $node"ssh -o StrictHostKeyChecking=no $node "sudo mkdir -p /usr/lib/flink/sudo chown -R hadoop:hadoop /usr/lib/flink"
done

将/usr/lib/flink/同步到其他节点上

xsync /usr/lib/flink
xsync /etc/flink/conf.dist

重新软连接

#!/bin/bash
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for node in $nodelist; doecho "Executing commands on $node"ssh -o StrictHostKeyChecking=no $node "sudo rm /usr/lib/flink/confsudo ln -s /etc/flink/conf.dist/ /usr/lib/flink/conf"
done

直接启动失败,需要指定workfile

No workers file. Please specify workers in 'conf/workers'.

root用户下写入workers配置

yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}' > /usr/lib/flink/conf/workersecho $(hostname).cn-north-1.compute.internal:8081 > /usr/lib/flink/conf/masters

jobmanager配置

# JobManager 节点地址.
jobmanager.rpc.address: $(hostname).cn-north-1.compute.internal
jobmanager.bind-host: 0.0.0.0 # default
rest.address: 0.0.0.0 # webui
rest.bind-address: 0.0.0.0

taskmanager配置

echo taskmanager.host: $(hostname).cn-north-1.compute.internal >> /usr/lib/flink/conf/flink-conf.yaml
echo taskmanager.bind-host: 0.0.0.0 >> /usr/lib/flink/conf/flink-conf.yaml

修改权限

sudo chown -R hadoop:hadoop /var/lib/flink

启动集群

  • 不知道为什么worker节点始终无法启动taskmanagerrunner,最终发现没有权限

  • 这个路径是提供webui上传jar文件用的

    mkdir: cannot create directory ‘/var/run/flink’: Permission denied
    /usr/lib/flink/bin/flink-daemon.sh: line 82: /var/run/flink: No such file or directory
    flock: 200: Bad file descriptor
    Starting taskexecutor daemon on host ip-192-168-28-247.
    /usr/lib/flink/bin/flink-daemon.sh: line 145: /var/run/flink/flink-hadoop-taskexecutor.pid: No such file or directory
    
/usr/lib/flink/bin/start-cluster.sh

在master上查看日志

在这里插入图片描述

最终taskmanager成功注册

image-20240105201137482

关闭集群

/usr/lib/flink/bin/stop-cluster.sh

jpsall脚本

#!/bin/bash
masternode=$(hostname).cn-north-1.compute.internal
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for host in $masternode $nodelist
doecho =============== $host ===============ssh $host jps
done

在yarn模式下,提交 JAR 文件后,它就会变成由 Flink JobManager 管理的作业。

  • JobManager 位于托管 Flink 会话 Application Master 进程守护程序的 YARN 节点上

集群生命周期大于job,因此实际上对应session模式

flink run --jobmanager localhost:8081 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput flink run --jobmanager localhost:8081 -c org.example.wc.WordCountBatch flinkall-1.0.0.jar

部署模式

session模式

启动flink session

  • 5.5.0 版本中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行
flink-yarn-session -d

image-20240105110801556

启动session后提交任务

flink run --jobmanager yarn-cluster -yid application_1704427099392_0001  /usr/lib/flink/examples/streaming/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: 8XKQF6W01QDQCRTD, Extended Request ID: Bsi6AK3alrxV5OYL5aZhu05h/RusTGUBm9P9hRu5dFu0whCv68DKpvFjf8CYL9Wc5zSEoaL759M=)

可以设置region

  • 看起来是6.15版本的emr_flink存在问题,默认region不对
-Dfs.s3a.bucket.endpoint.region=cn-north-1

在resourcemanager的Tracking UI会跳转到flink jobmanager

但是flink的taskmanager日志并没有汇聚到yarn历史服务器中

查看session fluster的ip和端口号

image-20240105222430792

在这里插入图片描述

或者直接在提交jar界面看ip和端口地址,监听的是内网ip

  • 找到后可以在idea中注册

per-job模式

直接运行batch任务

flink run -m yarn-cluster -Dexecution.runtime-mode=BATCH flinktutorial17-1.0.jar

不同类型任务的name

image-20240105155322448

application模式

提交任务

  • 将jar拷贝到/usr/lib/flink/lib

  • 指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包

/usr/lib/flink/bin/standalone-job.sh start --job-classname org.example.wc.WordCountBatchStarting standalonejob daemon on host ip-192-168-30-184.

启动taskmanager

/usr/lib/flink/bin/taskmanager.sh start

在application模式下,yarn中同样没有记录

查看jps进程

image-20240105220805065

在master上查看日志

image-20240105220010908

yarn运行模式

yarn运行模式下同样可以使用三种部署模式

session模式

flink-yarn-session -d命令参数

  • Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配 TaskManager 和 slot
-d:分离模式
-jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB
-nm(--name):配置在 YARN UI 界面上显示的任务名
-qu(--queue):指定 YARN 队列名
-tm(--taskManager):配置每个 TaskManager 所使用内存。

per-job模式

提交任务

/usr/lib/flink/bin/flink run -t yarn-per-job -c org.example.wc.WordCountBatch flinkall-1.0.0.jar-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job" (deprecated), "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".

如下报错的解决

  • 在 flink 的/opt/module/flink-1.17.0/conf/flink-conf.yaml 配置文件中设置classloader.check-leaked-classloader: false

image-20240105223157373

application模式

提交任务

/usr/lib/flink/bin/flink run-application -t yarn-application -c org.example.wc.WordCountBatch flinkall-1.0.0.jar/usr/lib/flink/bin/flink run-application -t yarn-application s3://zhaojiew-bigdata/app/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput 

可以看到print输出此时在jobmanager中

image-20240105223507271

但是看起来并不支持将jar存储在s3?已知问题,怀疑和flink本身有关,因为使用s3作为input和output是没有问题的

在这里插入图片描述

可以在提交任务时指定依赖,并非任务jar

bin/flink run-application -t yarnapplication -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flinkdist" -c com.atguigu.wc.SocketStreamWordCount
hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/337362.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式】创建型-建造者模式

前言 在面向对象的软件开发中&#xff0c;构建复杂对象时经常会遇到许多挑战。一种常见的解决方案是使用设计模式&#xff0c;其中建造者模式是一个强大而灵活的选择。本文将深入探讨建造者模式的原理、结构、优点以及如何在实际项目中应用它。 一、复杂的对象 public class…

vue3学习使用笔记

1.学习参考资料 vue3菜鸟教程&#xff1a;https://www.runoob.com/vue3/vue3-tutorial.html 官方网站&#xff1a;https://cn.vuejs.org/ 中文文档: https://cn.vuejs.org/guide/introduction.html Webpack 入门教程&#xff1a;https://www.runoob.com/w3cnote/webpack-tutor…

手机离线翻译哪个好?断网翻译也能超丝滑

有时在异国他乡&#xff0c;面对语言不通的窘境&#xff0c;即便是简单的对话也变得异常困难&#xff0c;真是挑战满满&#xff01; 然而&#xff0c;能离线翻译的软件让语言障碍不再是问题&#xff0c;不必依赖网络也能轻松进行翻译啦~ 只需下载所需的语言包&#xff0c;选择…

Nginx企业级负载均衡:技术详解系列(14)—— 账户认证功能

你好&#xff0c;我是赵兴晨&#xff0c;97年文科程序员。 你有没有听说过Nginx的账户认证功能&#xff1f;这可不只是一个技术问题&#xff0c;它关系到我们上网时的安全和便利。就像家里需要一把钥匙才能进们一样&#xff0c;Nginx的账户认证功能就是确保有只有授权的人才能…

登录校验及全局异常处理器

登录校验 会话技术 会话:用户打开浏览器,访问web服务器的资源,会话建立,直到有一方断开连接,会话结束.在一次会话中可以包含多次请求和响应会话跟踪:一种维护浏览器状态的方法,服务器需要识别多次请求是否来自于同一浏览器,以便在同一次会话请求间共享数据会话跟踪方案 客户端…

华为 CANN

华为 CANN 1 介绍1.1 概述1.2 CANN 是华为昇腾计算产业的重要一环1.3 昇腾系列处理器1.4 昇腾 AI 产业1.5 从 AI 算法到产品化落地流程1.6 多样性计算架构1.7 人工智能各层级图示1.8 人工智能技术发展历史 2 CANN vs CUDA支持平台优化方向编程接口生态系统与应用性能与功能 3 C…

Qt xml学习之calculator-qml

1.功能说明&#xff1a;制作简易计算器 2.使用技术&#xff1a;qml,scxml 3.项目效果&#xff1a; 4.qml部分&#xff1a; import Calculator 1.0 //需要引用对应类的队友版本 import QtQuick 2.12 import QtQuick.Window 2.12 import QtQuick.Controls 1.4 import QtScxml…

[深度学习]yolov10+bytetrack+pyqt5实现目标追踪

【简介】 利用YOLOv10、ByteTrack和PyQt5实现目标追踪是一个强大的组合&#xff0c;可以为用户提供一个交互式的实时目标追踪界面。以下是一个简化版的实现思路描述&#xff1a; 首先&#xff0c;YOLOv10是一个先进的目标检测算法&#xff0c;能够准确识别视频或图像中的目标…

OC IOS 文件解压缩预览

热很。。热很。。。。夏天的城市只有热浪没有情怀。。。 来吧&#xff0c;come on。。。 引用第三方库&#xff1a; pod SSZipArchive 开发实现&#xff1a; 一、控制器实现 头文件控制器定义&#xff1a; // // ZipRarViewController.h // // Created by carbonzhao on 2…

更新mirh connect 内置derby数据库密码

更新mirh connect 内置derby数据库密码 1、下载derby连接客户端 https://archive.apache.org/dist/db/derby/ 选择任意版本即可&#xff0c;比如 https://archive.apache.org/dist/db/derby/db-derby-10.14.2.0/db-derby-10.14.2.0-bin.zip 2、连接mirh文件数据库 1、把mi…

12 FreeRTOS 调试与优化

1、调试 1.1 打印 在FreeRTOS工程中使用了microlib&#xff0c;里面实现了printf函数。 只需要实现一下以下函数即可使用printf。 int fputc(int ch; FILE *f); 假如要从串口实现打印函数&#xff1a; int fputc( int ch, FILE *f ) {//指定串口USART_TypeDef* USARTx USAR…

黑马一站制造数仓实战2

问题 DG连接问题 原理&#xff1a;JDBC&#xff1a;用Java代码连接数据库 Hive/SparkSQL&#xff1a;端口有区别 可以为同一个端口&#xff0c;只要不在同一台机器 项目&#xff1a;一台机器 HiveServer&#xff1a;10000 hiveserver.port 10000 SparkSQL&#xff1a;10001…

如何克隆非默认分支

直接git clone下来的我们知道是默认分支&#xff0c;那如何克隆其他分支呢&#xff1a; 比如这个&#xff0c;我们想克隆AdvNet。 我们可以在本地文件夹打开Git Bash 依次输入&#xff1a; git clone --branch AdvNet https://github.com/wgcban/SemiCD.git cd SemiCD git b…

Java基础入门day62

day62 AJAX 概念 AJAX&#xff1a; Asynchronous Javascript And XML AJAX是一种无需重新加载整个网页的情况下&#xff0c;能够更新部分网页的技术 AJAX是一种用于创建快速动态网页的技术 通过在后台与服务器进行少量数据交换&#xff0c;AJAX可以使网页实现异步更新 传统…

源码编译安装LAMP(安装apeche mysql php 论坛 网站 巨详细版)

目录 一.LAMP架构相关概述 1.各组件作用 Linux&#xff08;平台&#xff09; Apache&#xff08;前台&#xff09; MySQL&#xff08;后台&#xff09; PHP/Perl/Python&#xff08;中间连接&#xff09; 总结 二.编译安装Apache httpd服务 1.关闭防火墙&#xff0c;将…

docker一键部署EFK系统(elasticsearch filebeat kibana metricbeat es-head)

EFK日志系统搭建 EFK日志系统介绍功能需求搭建elasticsearch集群规划前提部署核对证书及权限 EFK日志系统介绍 Elasticsearch 是一个实时的、分布式的可扩展的搜索引擎&#xff0c;允许进行全文、结构化搜索&#xff0c;它通常用于索引和搜索大量日志数据&#xff0c;也可用于…

【计算机网络】——物理层(图文并茂)

物理层 一.物理层概述1.物理层要实现的功能2.物理层接口特征1.机械特性2.电气特性3.功能特性4.过程特性 二.物理层下面的传输媒体1.传输媒体的分类2.导向型传输媒体1.同轴电缆2.双绞线3.光纤 3.非导向型传输媒体1.无线电波2.微波3.红外线4.激光5.可见光 三.传输方式1.串行传输与…

Kali : 安装Google Chrome 浏览器和ChromeDriver

目录 一、安装Google Chrome 浏览器 1、下载Google Chrome 2、安装Chrome 3、安装依赖包 二、安装ChromeDriver 1、查看Chrome版本 ​2、下载ChromeDriver 3、解压下载包 4、设置全局访问 5、赋予可执行权限 6、验证chromedriver 7、程序测试 一、安装Google Chrom…

Kafka系列之高频面试题

基础 简介 特点&#xff1a; 高吞吐、低延迟&#xff1a;kafka每秒可以处理几十万条消息&#xff0c;延迟最低只有几毫秒&#xff0c;每个Topic可以分多个Partition&#xff0c;Consumer Group对Partition进行Consumer操作可扩展性&#xff1a;Kafka集群支持热扩展持久性、可…

WEB攻防-JAVAWEB项目常见漏洞

知识点 1.JavaWeb常见安全及代码逻辑 2.目录遍历&身份验证&逻辑&JWT 3.访问控制&安全组件&越权&三方组件 本篇主要了解以上问题在javaweb中的呈现&#xff0c; 第一个重点理解URL与javaweb代码框架的对应方式&#xff0c;java在没有代码的情况下是很难…