spark SQL 任务参数调优1

1.背景

要了解spark参数调优,首先需要清楚一部分背景资料Spark SQL的执行原理,方便理解各种参数对任务的具体影响。



一条SQL语句生成执行引擎可识别的程序,解析(Parser)、优化(Optimizer)、执行(Execution) 三大过程。其中Spark SQL 解析和优化如下图

  1. Parser模块:未解析的逻辑计划,将SparkSql字符串解析为一个抽象语法树/AST。语法检查,不涉及表名字段。

  2. Analyzer模块:解析后的逻辑计划,该模块会遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函数绑定,然后根据元数据信息Catalog对数据表中的字段和基本函数进行解析。

  3. Optimizer模块:该模块是Catalyst的核心,主要分为RBO和CBO两种优化策略,其中RBO是基于规则优化(谓词下推(Predicate Pushdown) 、常量累加(Constant Folding) 、列值裁剪(Column Pruning)),CBO是基于代价优化。

  4. SparkPlanner模块:优化后的逻辑执行计划OptimizedLogicalPlan依然是逻辑的,并不能被Spark系统理解,此时需要将OptimizedLogicalPlan转换成physical plan(物理计划),如join算子BroadcastHashJoin、ShuffleHashJoin以及SortMergejoin 。

  5. CostModel模块:主要根据过去的性能统计数据,选择最佳的物理执行计划。这个过程的优化就是CBO(基于代价优化)。

在实际Spark执行完成一个数据生产任务(执行一条SQL)的基本过程:

(1)对SQL进行语法分析,生成逻辑执行计划
(2)从Hive metastore server获取表信息,结合逻辑执行计划生成并优化物理执行计划
(3)根据物理执行计划向Yarn申请资源(executor),调度task到executor执行。
(4)从HDFS读取数据,任务执行,任务执行结束后将数据写回HDFS。

上述运行过程
过程 (2)主要是driver的处理能力
过程 (3)主要是executor 、driver的处理能力、作业运行行为

本文从作业的运行过程(2)(3)各选择一个参数介绍从而了解运行过程。

目前的spark参数以及相关生态的参数列表几百个:
Hadoop参数:https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
hive参数:Configuration Properties - Apache Hive - Apache Software Foundation
spark参数:spark 配置参数 Configuration - Spark 3.5.0 Documentation
                     spark 优化参数 Performance Tuning - Spark 3.5.0 Documentation
                     spark 执行参数 Spark SQL and DataFrames - Spark 2.0.0 Documentation

                     各个公司自定义参数:set spark.sql.insertRebalancePartitionsBeforeWrite.enabled = true

其他网上参考的参数:Hive常用参数总结-CSDN博客

参数列表

参数类型

参数

设置值

描述

资源利用

spark.driver.memory
spark.driver.cores
spark.driver.memoryOverhead
spark.executor.memory

5g

--driver-memory 5G

每个exector的内存大小,后缀"k", "m", "g" or "t"

input split

spark.hadoop.hive.exec.orc.split.strategy

spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;

spark.hadoop.mapreduce.input.fileinputformat.split.minsize;
spark.sql.files.maxPartitionBytes

BI 、ETL 、HYBRID

shuffle

spark.sql.shuffle.partitions

200

spark.default.parallelism

80, 100, 200, 300

join

1.spark.hadoop.hive.exec.orc.split.strategy 参数


            1. 参数作用:参数控制在读取ORC表时生成split的策略,影响任务执行时driver压力和mapper 数量。
            2. 参数介绍 : 参数来源于hive  :hive.exec.orc.split.strategy官方定义如下图,当任务执行开始时,ORC有三种分割文件的策略 BI 、ETL 、HYBRID(默认)
HYBRID模式:文件数过多和文件小的场景下,当文件数大于mapper count (总文件大小/hadoop默认分割大小128M) 且文件大小小于HDFS默认(128M)的大小。
ETL:生成分割文件之前首先读取ORC文件的footer(存储文件信息的文件),
BI: 直接分割文件,没有访问HDFS上的数据。

ORC文件的footer是什么?
  ORC 文件原理:全称 Optimized Row Columnar 1.ORC是一个文件格式比较高效的读取、写入、处理hive数据。(我之前理解是一个高效压缩文件)。2.序列化和压缩: intger和String 序列化。按照文件块增量的压缩。
文件结构:三级结构:stripes 存在具体的数据行组(索引、数据行、stripe footer 的信息),file footer 文件的辅助信息(stripe的列表、每个stripe行数、列的数据类型、列上聚合信息 最大值最小值),psotscipt 文件的压缩参数和压缩后的大小。

       3.使用方法和场景: 因此ETL模式下读取的file footer是每个orc文件块的辅助信息。对于一些较大的ORC表,footer可能非常大,ETL模式下读取大量hdfs的数据信息切分文件,导致driver的开销压力过大,这种情况适用BI模式比较合适。
    一些配合使用参数 如:spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; spark.hadoop.mapreduce.input.fileinputformat.split.minsize; map输入最小最大分割块,maxsize 和minsize在输入端控制ORC文件的分割合并。当spark 从hive表中读取数据是会创建一个HadoopRDD的实例,HadoopRDD根据computeSplitSize方法分割文件(org.apache.hadoop.mapreduce.lib.input.FileInputFormat ) Math.max(minSize, Math.min(maxSize, blockSize) 源代码Source code,因此文件表的小文件过多3M大小,根据公式一个小文件就是一个split分割生成大量的patitions,导致tasks数量就巨大,整个任务性能瓶颈可能在读取资源数据缓解。

文件分割源码

   spark.sql.files.maxPartitionBytes  单partition的最大字节数, 为了防止把已经设置好的分割块再次合并,可以将 set spark.hadoopRDD.targetBytesInPartition=-1。

2.spark.sql.shuffle.partitions


    参数作用: 在任务有shuffle时候(join或者聚合场景下)控制partitions的数量。
    参数介绍:

Property Name

Default

meaning

链接

翻译

不同点

共同点

spark.sql.shuffle.partitions

200

Configures the number of partitions to use when shuffling data for joins or aggregations.

Spark SQL and DataFrames - Spark 2.0.0 Documentation

Spark SQL中shuffle过程中Partition的数量

仅适用于DataFrame ,group By, join 触发数据shuffle,因此这些数据转换后的结果会导致分区大小需要通过Spark.sql.shuffle.partitions 中设置的值。
如果任务没有join 或者聚合操作,参数设置不会生效。

配置shuffle partitions 的数量

spark.default.parallelism

For distributed shuffle operations like reduceByKeyand join, the largest number of partitions in a parent RDD. For operations like parallelizewith no parent RDDs, it depends on the cluster manager:

  • Local mode: number of cores on the local machine

  • Mesos fine grained mode: 8

  • Others: total number of cores on all executor nodes or 2, whichever is larger

Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

Configuration - Spark 3.5.0 Documentation

1.reduceByKey
指定分区数  val rdd2 = rdd1.reduceByKey( _ + _, 10)
不指定分区数val rdd1 = rdd2.reduceByKey(_ + _ )
2.join 
val rdd3 = rdd1.join(rdd2),rdd3里Partition的数量由父rdd中最多的Partition数量决定,因此使用join算子时,应增加父rdd中的Partition数量。

1.若当前RDD执行shuffle操算子如reducebykey 和join ,则为在父RDD中最大的partition数。
2.若当前RDD没有上一个RDD则集群管理器分配
  2.1 本地模式:机器核数
  2.2 Mesos上 8
  2.3 所有executor的核数或者是2的最大值

spark.default.parallelism 是随 RDD 引入的,当用户未设置时候,返回reduceByKey(), groupByKey(), join() 转换的默认分区数,仅适用于RDD。

参数用法:在提交作业的通过 --conf 来修改这两个设置的值,方法如下:或者
         spark-submit --conf spark.sql.shuffle.partitions=300 --conf spark.default.parallelism=300
                     sqlContext.setConf("spark.sql.shuffle.partitions", "300")
                     sqlContext.setConf("spark.default.parallelism", "300”)

参数介绍2.0:chatGPT3.5 的答案

     理解spark的并行度:

  1.  资源的并行  exector数和cpu core数

  2.  数据的并行  spark作业在各个stage的task 的数量是并行执行,task数量设置成Spark Application总CPU core数量的2~3倍,同时尽量提升Spark运行效率和速度;

    
     扩展: flink 的并行度

参考文档:
1.Spark SQL底层执行流程详解(好文收藏)-腾讯云开发者社区-腾讯云  spark 执行原理
2.ORC 参数:Configuration Properties - Apache Hive - Apache Software Foundation
3.ORC文件定义: LanguageManual ORC - Apache Hive - Apache Software Foundation
4.oRC解读: 深入理解ORC文件结构-CSDN博客
5.hadoop input: How does Spark SQL decide the number of partitions it will use when loading data from a Hive table? - Stack Overflow
6.文件分割:从源码看Spark读取Hive表数据小文件和分块的问题 - 掘金, How does Spark SQL decide the number of partitions it will use when loading data from a Hive table? - Stack Overflow
7.spark手册:How to Set Apache Spark Executor Memory - Spark By {Examples}
8.并行: performance - What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? - Stack Overflow
9.flink的并行 : 并行执行 | Apache Flink
10.reducebykey :scala - reduceByKey: How does it work internally? - Stack Overflow
11.key values : 4. Working with Key/Value Pairs - Learning Spark [Book]
12.spark并行:    Spark调优之 -- Spark的并行度深入理解(别再让资源浪费了)_spark并行度-CSDN博客
13.场景:  spark SQL 任务参数调优1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/146869.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析:数据分析篇

文章目录 第一章 科学计算库Numpy1.1 认识Ndarray1.2 Ndarray的属性1.3 Numpy中的数据类型1.4 Numpy数组1.4.1 Numpy数组的创建1.4.2 Numpy数组的基本索引和切片1.4.3 Numpy布尔索引1.4.4 数组运算和广播机制1.4.5 Numpy数组的赋值和Copy复制1.4.6 Numpy数组的形状变换1.4.7 Nu…

【ROS入门】使用 ROS 动作(Action)机制实现目标请求、进度与完成结果的反馈

文章结构 任务要求话题模型实现步骤定义action文件按照固定格式创建action文件编辑配置文件编译生成中间文件 编写服务端和客户端vscode配置服务端客户端编译配置文件执行 任务要求 使用 ROS 动作(Action)机制实现目标请求、进度与完成结果的反馈: 创建服务端&…

推荐算法——Apriori算法原理

0、前言: 首先名字别读错:an pu ruo ao rui 【拼音发音】Apriori是一种推荐算法推荐系统:从海量数据中,帮助用户进行信息的过滤和选择。主要推荐方法有:基于内容的推荐、协同过滤推荐、基于关联规则的推荐、基于知识的…

leetCode 53.最大子数和 图解 + 贪心算法/动态规划+优化

53. 最大子数组和 - 力扣(LeetCode) 给你一个整数数组 nums ,请你找出一个具有最大和的连续子数组(子数组最少包含一个元素),返回其最大和。 子数组 是数组中的一个连续部分。 示例 1: 输入…

字符串,字符数组,类型转换,整数越界,浮点数,枚举

目录 自动类型转换 强制类型转换 数据类型 sizeof 数据类型所占字节数 整数越界 浮点数 字符型 字符串变量 ​编辑字符串的输入输出 main函数的参数 ,argc,argv 单个字符输入输出 putchar getchar strlen,strcmp,strcat,strchr,strstr strlen 求字…

BASH shell脚本篇4——函数

这篇文章介绍下BASH shell中的函数。之前有介绍过shell的其它命令,请参考: BASH shell脚本篇1——基本命令 BASH shell脚本篇2——条件命令 BASH shell脚本篇3——字符串处理 函数是代码重用的最重要方式。Bash函数可以定义为一组命令,在b…

【STM32 CubeMX】移植u8g2(一次成功)

文章目录 前言一、下载u8g2源文件二、复制和更改文件2.1 复制文件2.2 修改文件u8g2_d_setup文件u8g2_d_memory 三、编写oled.c和oled.h文件3.1 CubeMX配置I2C3.2 编写文件oled.holed.c 四、测试代码main函数测试代码 总结 前言 在本文中,我们将介绍如何在STM32上成…

电脑显示系统错误怎么办?

有时我们在开机时会发现电脑无法开机,并显示系统错误,那么这该怎么办呢?下面我们就一起来了解一下。 方法1. 替换SAM文件解决问题 1. 重启电脑并进入安全模式。 Win8/10系统:在启动电脑看到Windows标志时,长按电源键…

gradle中主模块/子模块渠道对应关系通过配置实现

前言: 我们开发过程中,经常会面对针对不同的渠道,要产生差异性代码和资源的场景。目前谷歌其实为我们提供了一套渠道包的方案,这里简单描述一下。 比如我主模块依赖module1和module2。如果主模块中声明了2个渠道A和B&#xff0c…

【Java 进阶篇】JDBC ResultSet 遍历结果集详解

在Java数据库编程中,经常需要执行SQL查询并处理查询结果。ResultSet(结果集)是Java JDBC中用于表示查询结果的关键类之一。通过遍历ResultSet,我们可以访问和操作从数据库中检索的数据。本文将详细介绍如何使用JDBC来遍历ResultSe…

手把手教你做个智能加湿器(一)

一、前言 目前常见的加湿器类电子产品一般是由PCBA和外壳组成,我们将从PCB设计,然后编写软件,接着设计外壳,设计出一个完整的产品出来。 需要使用到软件: Altium Designer 17 SolidWorks 2019 Keil 4 二…

Error: node: unknown or unsupported macOS version: :dunno 错误解决

一、原因 今天安装 brew install node报错了,错误信息如下: 二、解决方案 1)查找homebrew-cask安装位置 echo $(brew --repo homebrew/homebrew-cask) // 输出 /opt/homebrew/Library/Taps/homebrew/homebrew-cask2)使用 gi…

selenium下载安装 -- 使用谷歌驱动碰到的问题

安装教程参考: http://c.biancheng.net/python_spider/selenium.html 1. 谷歌浏览器和谷歌驱动版本要对应(但是最新版本谷歌对应的驱动是没有的,因此要下载谷歌历史其他版本): 谷歌浏览器历史版本下载: https://www.chromedownloads.net/chrome64win/谷歌浏览器驱动下载: http:…

计算机网络分层结构

一、OSI参考模型(法定标准) 1.由国际标准化组织(ISO)提出的开放系统互连(OSI)参考模型 2.OSI七层结构: 3.通信过程: 4.各层功能 应用层-能和用户交互产生网络流量(需要联网)的程序,常见协议有文件传输(FTP)、电子邮件(SMTP)、万维网(HTTP)…

SpringCloud(一)Eureka、Nacos、Feign、Gateway

文章目录 概述微服务技术对比 Eureka服务远程调用服务提供者和消费者Eureka注册中心搭建注册中心服务注册服务发现Ribbon负载均衡负载均衡策略饥饿加载 NacosNacos与Eureka对比Nacos服务注册Nacos服务分集群存储NacosRule负载均衡服务实例权重设置环境隔离 Nacos配置管理配置热…

算法精品讲解(2)——DP问题入门(适合零基础者,一看就会)

目录 前言 DP问题它是什么(了解) 从中学的例题谈起 再来说一下,DP问题的核心思想(理解) DP问题的解决方法 先说方法论: 再说具体的例子 例一: 例二: 例三: DP和…

使用ExLlamaV2在消费级GPU上运行Llama2 70B

Llama 2模型中最大也是最好的模型有700亿个参数。一个fp16参数的大小为2字节。加载Llama 270b需要140 GB内存(700亿* 2字节)。 只要我们的内存够大,我们就可以在CPU上运行上运行Llama 2 70B。但是CPU的推理速度非常的慢,虽然能够运行,速度我…

postgresql-管理数据表

postgresql-管理数据表 创建表数据类型字段约束表级约束模式搜索路径 修改表添加字段删除字段添加约束删除约束修改字段默认值修改字段数据类型重命名字段重命名表 删除表 创建表 在 PostgreSQL 中,使用 CREATE TABLE 语句创建一个新表: CREATE TABLE …

量化交易全流程(四)

本节目录 数据准备(数据源与数据库) CTA策略 数据源: 在进行量化分析的时候,最基础的工作是数据准备,即收集数据、清理数据、建立数据库。下面先讨论收集数据的来源,数据来源可分为两大类:免…

nodejs开发环境搭建

Nodejs是一个开源的、跨平台JavaScript运行时环境,其使用V8引擎对JavaScript脚本执行解释,在前后端分离的应用架构设计中,其既能支持web页面服务应用的开发、也能支持后端接口服务应用的开发,类似于Java语言的J2EE运行时环境&…