如何用代码提交spark任务并且获取任务权柄

在国内说所有可能有些绝对,因为确实有少数大厂技术底蕴确实没的说能做出自己的东西,但其他的至少95%数据中台平台研发方案,都是集群中有一个持久化的程序,来接收任务信息,并向集群提交任务同时获取任务的权柄,把任务的appid和日志通过套接字的方式向外提供。

对于spark任务来说无非就是两种形式,要不传过来的是个jar包,要不就是一个sql语句,其他的就是一些任务参数,整体上就和正常的web项目开发没太大差别,不同的就是服务端是以哪种方式处理任务的提交的,给大家分享我经历过的项目中用过的三种处理方式,当然这不是全部,业内确实有真东西,不过人家不开源罢了。

第一种:spark官方提供的SparkLauncherAPI

这种方式,你可以在b站上常见,但是它使用起来限制特别大,我也只用过一次,而且还是在尝试阶段就被放弃了,感觉就和一个半成品一样,最让人难受的一点是它监听任务的最终状态是4个独立的不可变枚举值,而监听程序会终止在第一个触发到的不可变枚举值,就是说如果任务先进入了完成状态,但它的最终状态是失败,那权柄只能生效到完成阶段,后面就监听不到了,观察过源码到时找到了底层更新状态的依据,但是属于受保护包下,不能被public直接调用,还有很多其他的坑,所以说像个半成品

使用它,首先导入SparkLauncherAPI的依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-launcher_2.11</artifactId> <!-- 这里要替换为你的 scala 同版本 ,spark-launcher版本不同,支持的scala也不同具体去maven官方仓库中看--><version>2.1.1</version> <!--SparkLauncherAPI要和你用的 Spark 同版本 -->
</dependency>

随后它的提交任务代码如下

package com.wy;import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;import java.io.*;
import java.util.concurrent.CountDownLatch;public class Main {public static void main(String[] args) throws IOException, InterruptedException {// 使用SparkLauncher提交任务,将任务需要的环境进行封装//这里只是核心使用需要的设置,sparkLauncher还支持其他的方法设置其他的内容需要的自己看SparkLauncher sparkLauncher = new SparkLauncher().setSparkHome("D:\\mydevtool\\spark-2.1.1-hadoop2.7")//这个程序最终运行的服务器需要同时存在一个spark的install路径.setAppResource("D:\\test\\myscala-1.0.jar") //你的任务jar包.setMainClass("test.Test1")//任务主类.setMaster("local")//master 或者 yarn.setAppName("代码提交")//任务名称.addAppArgs("D:\\test\\123.txt")//任务的主类入参,这里是个可变参.setVerbose(false);/*这里注释是写一个伪代码,意在你可以处理出任务运行配置,比如内存资源等,传递给sparkLauncherfor (Map.Entry<String, String> conf : otherConfigParams.entrySet()) {sparkLauncher.setConf(conf.getKey(), conf.getValue());}*//*这里注释是写一个伪代码,意在你可以处理出任务主类入参后传递给sparkLauncherif (mainParams.length != 0) {launcher.addAppArgs(mainParams);}*///同步时,必须使用CountDownLatch 不然监听程序是异步的,拿不到任务权柄CountDownLatch countDownLatch = new CountDownLatch(1);// 启动应用并获取任务权柄,并传入一个监听类,监听任务不同状态时的事件SparkAppHandle sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() {//任务运行状态改变的时候触发的操作@Overridepublic void stateChanged(SparkAppHandle sparkAppHandle) {//状态发生变更时,此时将任务id拿出来if ( sparkAppHandle.getAppId() != null ){System.out.println("任务状态:"+sparkAppHandle.getState().toString());System.out.println("任务ID:"+sparkAppHandle.getAppId());}//诸如此类,你可以按照你的需求定义任务不同状态下要干的事if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.RUNNING)==0){System.out.println("任务开始运行");}//诸如此类,你可以按照你的需求定义任务不同状态下要干的事if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.FINISHED)==0){System.out.println("任务正常完成");countDownLatch.countDown();}if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.FAILED)==0){System.out.println("任务发生错误");countDownLatch.countDown();}if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.KILLED)==0){System.out.println("任务被终止");countDownLatch.countDown();}}//任务的上下文发生变动时的事件,一般不用@Overridepublic void infoChanged(SparkAppHandle sparkAppHandle) {}});//这里的日志的代码,但是输出不能放在这里,按照整体来讲应该要有一个阻塞方法任务开始运行获取流,它实际使用起来发现只有放在监听里面才能正常获取日志,但是开头也说了,监听的生命周期有问题,这也是最终放弃使用的原因之一,你要是只想体验一下,把这部分代码放在监听中在开始run的状态下开始输出就行BufferedReader reader = null;try{String line;reader = new BufferedReader(new InputStreamReader(sparkLauncher.launch().getInputStream(),"UTF-8"));while ((line = reader.readLine()) != null) {System.out.println("日志流在输出:"+line);}} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);} finally {}countDownLatch.await();}
}

第二种:Runtime直接启动脚本

这种方式是最简单的,也是大部分中小项目用的方式,和第一种方式一样的是程序运行在集群中,将submit的日志直返回,使用方通过判断的方式处理出任务的appid和url

public static void main(String[] args) {String command = "spark-submit --master yarn .......";Process p = null;String line = null;try {p = Runtime.getRuntime().exec(command);BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));while ((line = br.readLine()) != null){//这里将日志输出出去}br.close();} catch (IOException e) {e.printStackTrace();}}

第三种:kyuubi

在开源spark之上,业内有一个叫kyuubi的二次开发引擎,专门做sql查询的,它对sql开发做了很多优化,比如文件聚合、最终一次数据整理等,此外提供了专门的接口来查询任务的信息,一般不投入大成本又能达到效果的就用kyuubi了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/503567.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql--基础篇--数据类型(整数,浮点数,日期,枚举,二进制,空间类型等)

MySQL提供了多种数据类型&#xff0c;用于定义表中列的数据格式。选择合适的数据类型不仅可以提高查询性能&#xff0c;还能确保数据的完整性和准确性。 一、数值类型 数值类型用于存储整数、浮点数和定点数。根据精度和范围的不同&#xff0c;数值类型可以分为以下几类&…

nlp培训重点-2

1. 贝叶斯公式 import math import jieba import re import os import json from collections import defaultdictjieba.initialize()""" 贝叶斯分类实践P(A|B) (P(A) * P(B|A)) / P(B) 事件A&#xff1a;文本属于类别x1。文本属于类别x的概率&#xff0c;记做…

sunrays-framework(太阳射线框架搭建)

文章目录 1.基本环境搭建1.创建项目sunrays-framework2.新增忽略文件3.删除src目录4.交给Git管理 2.sunrays-dependencies模块&#xff1a;统一管理依赖1.创建模块2.不要交给父模块管理&#xff0c;这是独立的&#xff01;&#xff01;&#xff01;3.删除src目录4.pom.xml统一管…

JVM vs JDK vs JRE

JVM是Java虚拟机的缩写&#xff0c; 用于实现Java的一次编译&#xff0c;处处运行。 Java代码写成.class后&#xff0c;由本地的虚拟机运行。 JDK&#xff08;Java Development Kit&#xff09;是一个功能齐全的 Java 开发工具包&#xff0c;供开发者使用。 JDK包含了JRE。…

Android修改开机动画路径

frameworks\base\cmds\bootanimation\BootAnimation.cpp 路径的定义 优先查找的顺序

select下拉框,首次进入页面没有显示value的情况

bug场景&#xff1a; 类似这种bug情况排查如下&#xff1a; 首先 理解含义 options就是存放键值对的&#xff0c;id就是key&#xff0c;对上了它就自动把label显示 而且如果你用来当作key和label的字段&#xff0c;与后端返回的不一致&#xff0c;还可以进行更改 其次 排查接…

Redis中的主从/Redis八股

四、Redis主从 1.搭建主从架构 不像是负载均衡&#xff0c;这里是主从&#xff0c;是因为redis大多数是读少的是写 步骤 搭建实例&#xff08;建设有三个实例&#xff0c;同一个ip不同端口号&#xff09; 1&#xff09;创建目录 我们创建三个文件夹&#xff0c;名字分别叫700…

Mysql--基础篇--函数(字符串函数,日期函数,数值函数,聚合函数,自定义函数及与存储过程的区别等)

MySQL提供了丰富的内置函数&#xff0c;涵盖了字符串处理、数值计算、日期和时间操作、聚合统计、控制流等多种功能。这些函数可以帮助你简化SQL查询&#xff0c;提升开发效率。 除了内置函数&#xff0c;MySQL还支持自定义函数&#xff08;User-Defined Functions&#xff09;…

【linux系统之redis6】redis的安装与初始化

下载redis的linux对应的安装包&#xff0c;并上传到linux虚拟机里面 解压压缩包 tar -zxzf redis-6.2.6.tar.gz解压后&#xff0c;进入redis文件 cd redis-6.2.6执行编译 make && make install看到下图&#xff0c;就说明redis安装成功了 默认的安装路径&#xff0c…

怎么管理电脑usb接口,分享四种USB端口管理方法

怎么管理电脑usb接口&#xff0c;分享四种USB端口管理方法 USB接口作为电脑重要的外部接口&#xff0c;方便了数据传输和设备连接。 然而&#xff0c;不加管理的USB接口也可能带来安全隐患&#xff0c;例如数据泄露、病毒传播等。 因此&#xff0c;有效管理电脑USB接口至关重…

[开源]自动化定位建图系统

系统状态机&#xff1a; 效果展示&#xff1a; 1、 机器人建图定位系统-基础重定位&#xff0c;定位功能演示 2、 机器人建图定位系统-增量地图构建&#xff0c;手动回环检测演示 3、… 开源链接&#xff1a; https://gitee.com/li-wenhao-lwh/lifelong-backend Qt人机交互…

重新整理机器学习和神经网络框架

本篇重新梳理了人工智能&#xff08;AI&#xff09;、机器学习&#xff08;ML&#xff09;、神经网络&#xff08;NN&#xff09;和深度学习&#xff08;DL&#xff09;之间存在一定的包含关系&#xff0c;以下是它们的关系及各自内容,以及人工智能领域中深度学习分支对比整理。…

PyTorch 框架实现线性回归:从数据预处理到模型训练全流程

系列文章目录 01-PyTorch新手必看&#xff1a;张量是什么&#xff1f;5 分钟教你快速创建张量&#xff01; 02-张量运算真简单&#xff01;PyTorch 数值计算操作完全指南 03-Numpy 还是 PyTorch&#xff1f;张量与 Numpy 的神奇转换技巧 04-揭秘数据处理神器&#xff1a;PyTor…

Elasticsearch:优化的标量量化 - 更好的二进制量化

作者&#xff1a;来自 Elastic Benjamin Trent 在这里&#xff0c;我们解释了 Elasticsearch 中的优化标量量化以及如何使用它来改进更好的二进制量化 (Better Binary Quantization - BBQ)。 我们的全新改进版二进制量化 (Better Binary Quantization - BBQ) 索引现在变得更强大…

科普CMOS传感器的工作原理及特点

在当今数字化成像的时代&#xff0c;图像传感器无疑是幕后的关键 “功臣”&#xff0c;它宛如一位神奇的 “光影魔法师”&#xff0c;通过光电效应这一奇妙的物理现象&#xff0c;将光子巧妙地转换成电荷&#xff0c;为图像的诞生奠定基础。而在众多类型的图像传感器中&#xf…

IDEA中Maven依赖包导入失败报红的潜在原因

在上网试了别人的八个问题总结之后依然没有解决&#xff1a; IDEA中Maven依赖包导入失败报红问题总结最有效8种解决方案_idea导入依赖还是报红-CSDN博客https://blog.csdn.net/qq_43705131/article/details/106165960 江郎才尽之后突然想到一个原因&#xff1a;<dep…

Java100道面试题

1.JVM内存结构 1. 方法区&#xff08;Method Area&#xff09; 方法区是JVM内存结构的一部分&#xff0c;用于存放类的相关信息&#xff0c;包括&#xff1a; 类的结构&#xff08;字段、方法、常量池等&#xff09;。字段和方法的描述&#xff0c;如名称、类型、访问修饰符…

虚表 —— 隐藏行(简单版)

因为隐藏行改变了listview内部行号处理机制&#xff0c;需要处理大量细节&#xff0c;如listview内部用于传递行号的各种消息、通知等、封装的各种读取行号的函数等。 所以在工作量很大&#xff0c;一处纰漏可能导致重大bug的情况下&#xff0c;仅对隐藏行功能进行了简单封装&…

UDP -- 简易聊天室

目录 gitee&#xff08;内有详细代码&#xff09; 图解 MessageRoute.hpp UdpClient.hpp UdpServer.hpp Main.hpp 运行结果&#xff08;本地通信&#xff09; 如何分开对话显示&#xff1f; gitee&#xff08;内有详细代码&#xff09; chat_room zihuixie/Linux_Lear…

python制作翻译软件

本文复刻此教程&#xff1a;制作属于自己的翻译软件-很简单【Python】_哔哩哔哩_bilibili 一、明确需求&#xff08;以搜狗翻译为例&#xff09; &#xff08;1&#xff09;网址&#xff1a;https://fanyi.sogou.com/text &#xff08;2&#xff09; 数据&#xff1a;翻译内容…