spark
Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍。除了Map和Reduce操作之外,Spark还支持SQL查询,流数据,机器学习和图表数据处理。开发者可以在一个数据管道用例中单独使用某一能力或者将这些能力结合在一起使用。
spark架构图
Driver:是作业的主进程,是程序入口,Driver进行启动后,向master发送请求,进行注册,申请资源;
Master:进行管理所有worker节点,分配任务,收集运行信息,监控worker存活状态;
worker:与master节点通信,管理spark任务,启动Executor
Executor:一个Executor执行多个task
task:task是一个线程,具体的spark任务就是在task上,有多少分区就有多少task
- Driver进程启动之后,会进行初始化操作,并发送请求到Master:我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPUcore。
- Master接收到Driver注册之后,发送请求给worker,进行资源的调度和分配;Worker接收到master请求,启动Executor:而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。
- Executor启动后,向Driver进行反注册,然后正式执行Spark程序;先读取数据源,创建RDD:在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
- 在worker上生成RDD之后,Driver会根据RDD定义的操作,提交相应数量的Task到Executor上: Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
- 当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
- 因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
- task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。
spark任务执行流程
- 运行流程以SparkContext为程序的总入口,在sparkcontext的初始化过程中,Spark会分别创建DAGScheduler(作业调度)和TaskSchedule(任务调度)
- DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task组成,这些Task的执行逻辑完全相同,只是作用于不同的数据。在DAGScheduler将这组Task划分完成之后,会将这组Task(TaskSets)提交到TaskScheduler。
- TaskScheduler负责Task级的调度,将DAGScheduler提交过来的TaskSet按照指定的调度实现,分别对接到不同的资源管理系统。
(1) 构建Spark Application运行环境,SparkContext向资源管理器注册,并申请Executor,资源管理器分配并启动Executor
(2) Executor发送心跳到资源管理器,进行注册;然后SparkContext构建DAG图,DAGScheduler将DAG分解成Satge和task,并发送给TaskScheduler
(3) Executor向sparkContext申请task,TaskScheduler将task发送给Executor运行,同时sparkContext将应用程序发送给Executor,运行完毕并释放资源。
如何使用Spark实现TopN的获取
聊一聊Spark实现TopN的几种方式 - 知乎 (zhihu.com)
(1)将数据使用reparation或coalesce分区成多个RDD;
(2)每个分区数据加载到内存使用局部排序算法在每个分区获取topn;
(3)将每个分区topn收集到驱动程序,并合并成一个RDD;
(4)对该RDD进行全局排序,获取最终topn;
(5)排序算法可以使用takeordered或sortby等操作;
reduceBykey vs groupByKey
-
groupbykey:根据key对RDD进行分组,直接进行shuffle操作,这样方法会将RDD所有的元素打乱重新分组,处理大数据时可能会导致性能问题。
-
reducebykey方法先将相同key元素进行合并,然后对value进行聚合操作,最终返回一个新的RDD。这样在本地聚合,在进行shuffle操作可以减少shuffle操作数量。
- reduceByKey 返回的结果是 RDD [k,v],即每个 key 对应一个 value。groupByKey 返回的结果是 RDD [k, Iterable[v]],即每个 key 对应一个可迭代的 value 集合。
// 最终结果("a", 3), ("b", 7), ("c", 5)
// 最终结果("a", Iterable(1,2)), ("b", Iterable(3,4)), ("c", Iterable(5))
SparkSQL中left outer join操作,left semi join操作
- left outer join会返回右表中与左表匹配的记录或用NULL填充,而left semi join不会返回右表中的任何列,left inner join会返回右表中与左表匹配的记录。
-
spark join三种方式
-
hash join : 依次读取 小表的数据,对于每一行数据根据 join key 进行 hash,hash 到对应的 bucket, 生成 Hash Table 中一条记录。数据缓存在内存里,如果内存放不下,需dump 到外存。然后再次扫描 大表的数据,使用相同的 hash 函数映射 HashTable 中的记录,映射成功之后再检查 join 的条件,如果匹配成功就可以将二者 join 在一起。
-
broadcast hash join:broadcast阶段: 将小表广播分发到大表所在的所有主机;hash join阶段: 在每个executor上执行单机版hash join,小表映射,大表试探
-
shuffle hash join:在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时先在shufle阶段: 分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。其次hash join阶段: 每个分区节点上的数据单独执行单机hash join算法
-
sort-merge join:首先,shufle阶段: 将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理,
第二阶段,sort阶段: 对单个分区节点的两表数据,分别进行排序。最后,merge阶段: 对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同ioin key就merge输出,否则取更小一边。比如按升序排列,某个值明显比它大了,后面肯定就不会有相等的,就不用继续比较了,节省了时间和内存。
spark参数以及参数调优
Spark是一个分布式计算框架,它有很多参数可以用来调优性能和资源利用。根据不同的场景和需求,可以选择合适的参数来优化Spark任务的执行效率。一般来说,Spark的参数可以分为以下几类:
- 资源参数:这类参数用来控制Spark任务分配和使用的资源,包括CPU核数、内存大小、磁盘空间等。例如,
spark.executor.cores
用来设置每个Executor的核数,spark.executor.memory
用来设置每个Executor的内存大小,spark.default.parallelism
用来设置默认的并行度等。 - Shuffle参数:这类参数用来控制Spark任务在进行Shuffle操作时的行为,包括Shuffle文件的缓存、压缩、分区等。例如,
spark.shuffle.file.buffer
用来设置Shuffle文件写入磁盘时的缓冲区大小,spark.shuffle.compress
用来设置是否对Shuffle文件进行压缩,spark.sql.shuffle.partitions
用来设置Shuffle操作产生的分区数等。 - 数据倾斜参数:这类参数用来解决Spark任务在进行Join或GroupBy等操作时出现的数据倾斜问题,即某些分区中的数据量过大或过小,导致计算不均衡。例如,
spark.sql.adaptive.enabled
用来开启自适应查询执行(AQE),它可以根据运行时的统计信息动态调整Shuffle分区数和Join策略等,spark.sql.broadcastTimeout
用来设置广播Join的超时时间,spark.sql.autoBroadcastJoinThreshold
用来设置广播Join的阈值等。 - 代码参数:这类参数用来优化Spark任务的代码逻辑,包括使用高效的算子、数据结构、序列化方式等。例如,使用mapPartitions代替map,使用reduceByKey代替groupByKey,使用Kryo代替Java序列化等。
spark数据倾斜
https://blog.csdn.net/zxl55/article/details/79572475
数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。
可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。
Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
- 使用Hive ETL预处理数据 :
- 过滤少数导致倾斜的key :如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。
- 提高shuffle操作的并行度 :在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。
- 两阶段聚合(局部聚合+全局聚合):方案实现原理:将原本相同的key通过map算子附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
- 将reduce join转为map join:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
- 采样倾斜key并分拆join操作:方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。
- 使用随机前缀和扩容RDD进行join:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
Spark开发调优
-
避免创建重复的RDD:之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD,进而增加了作业的性能开销。
-
尽可能复用同一个RDD:两个RDD的value数据是完全一样的,那么此时我们可以只使用key-value类型的那个RDD。
-
对多次使用的RDD进行持久化:persist(StorageLevel.MEMORY_AND_DISK_SER)
-
避免使用shuffle类算子:shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
因此可以使用Broadcast与map进行join:
-
使用map-side预聚合的shuffle操作:map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。
-
使用高性能的算子:
- 使用mapPartitions替代普通map:
- 使用foreachPartitions替代foreach:
- 使用filter之后进行coalesce操作:因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition
-
广播大变量:在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。每个Executor内存中,就只会驻留一份广播变量副本。
-
使用Kryo优化序列化性能:1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。2、将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。3、使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Spark资源调优
- num-executors:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
- executor-memory:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
- executor-cores:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
- driver-memory:该参数用于设置Driver进程的内存。
- spark.default.parallelism:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
- spark.storage.memoryFraction:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6
- spark.shuffle.memoryFraction:参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
spark shuffle
-
未经优化的HashShuffleManager:shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。
-
优化后的HashShuffleManager:开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。**当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。**也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
-
SortShuffleManager的普通运行机制:在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
-
SortShuffleManager的bypass运行机制:此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行 排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也 就节省掉了这部分的性能开销。
spark vs mapreduce
spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。
但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。
spark 容错机制
- 调度层容错:Spark使用DAGScheduler和TaskScheduler两个调度器来管理任务的执行。当一个Stage或者一个Task失败时,调度器会尝试重新提交和执行它们,直到达到最大重试次数或者任务成功为止。
- RDD Lineage容错:Spark的RDD是基于Lineage(血统)的不可变数据结构,它记录了RDD之间的依赖关系和转换操作。当一个RDD的部分分区丢失或损坏时,Spark可以根据Lineage重新计算和恢复这些分区。根据依赖关系的不同,RDD分为窄依赖和宽依赖。窄依赖表示子RDD的每个分区最多依赖于父RDD的一个分区,这样的依赖可以在本地节点上完成计算,容错开销较低。宽依赖表示子RDD的每个分区可能依赖于父RDD的多个或全部分区,这样的依赖需要跨节点进行数据传输和计算,容错开销较高。
- Checkpoint容错:Checkpoint是一种通过将RDD的数据保存到外部存储系统(如HDFS)来断开Lineage链的方法,从而减少容错时的重算开销。Checkpoint一般适用于以下两种情况:一是当Lineage过长时,如果重算,开销太大,如PageRank等迭代算法;二是当存在宽依赖时,如果重算,会产生冗余计算,如groupByKey等聚合算法。Checkpoint需要手动调用RDD的checkpoint()方法来触发,并且需要在SparkContext中设置checkpoint目录。
spark sql过程
Spark SQL是Spark系统的核心组件,它可以将用户编写的SQL语句或者DataFrame/Dataset API转换成Spark Core的RDD操作,从而实现对结构化或者半结构化数据的高效处理。Spark SQL的执行流程主要包括以下几个步骤:
- 解析:Spark SQL使用一个叫做Catalyst的查询编译器,它可以将SQL语句或者DataFrame/Dataset API解析成一棵逻辑算子树(Logical Plan),表示用户的查询意图。
- 分析:Catalyst会对逻辑算子树进行分析,解析其中的表名、列名、数据类型等信息,生成一棵解析后的逻辑算子树(Analyzed Logical Plan)。
- 优化:Catalyst会对解析后的逻辑算子树进行优化,应用各种优化规则,如常量折叠、谓词下推、列裁剪等,生成一棵优化后的逻辑算子树(Optimized Logical Plan)。
- 物理计划:Catalyst会根据优化后的逻辑算子树,生成一个或多个物理算子树(Physical Plan),表示不同的执行策略。例如,join操作可以用SortMergeJoin或者BroadcastHashJoin来实现。Catalyst会使用一个成本模型(Cost Model)来选择最优的物理算子树(Spark Plan)。
- 代码生成:Catalyst会将最优的物理算子树转换成可执行的代码,利用Scala语言的quasiquotes功能,生成Java字节码,提高执行效率。
- 执行:Spark SQL会将生成的代码提交给Spark Core执行引擎,利用RDD的操作和调度机制,完成分布式计算。
spark checkpoint和persist和cache区别
-
cache()默认将RDD持久化到内存中,如果内存不足,可以将数据写入磁盘。
-
persist0可以让用户指定数据持久化的级别,可以将RDD持久化到内存、磁盘或者是内存和磁盘的组合中。此外,persist0还可以让用户指定数据持久化的序列化方式,这对于一些需要自定义序列化方式的对象非常有用。
-
checkpoint0方法则可以将RDD的数据写入到磁盘上,从而避免OOM的问题。此外,checkpoint方法也可以优化RDD的依赖关系,减少宽依赖的产生,提高任务的并行度,从而提高任务的执行效率。checkpoint方法会触发一个新的job来计算RDD,并将计算结果写入磁盘。因此,使用checkpoint0方法会比cache和persist方法产生更多的开销。checkpoint方法一般用于对一些重要的、频繁使用的RDD进行持久化,以提高程序的健壮性和执行效率。
spark vs zookeeper
- Spark可以利用Zookeeper实现高可用性。在Spark中,如果Master节点出现故障,那么整个集群就无法正常工作了。为了解决这个问题,Spark可以借助Zookeeper来实现Master节点的故障转移。具体来说,就是在集群中启动多个Master节点,并将它们注册到Zookeeper上。Zookeeper会选举出一个Leader Master,并将其地址通知给所有的Worker节点。当Leader Master出现故障时,Zookeeper会重新选举出一个新的Leader Master,并通知给所有的Worker节点。这样就可以保证集群在Master节点故障时仍然可以继续工作。
- Spark可以利用Zookeeper实现分布式协调服务。在Spark中,有些场景需要对分布式系统进行协调和同步,比如共享变量、广播变量、累加器等。为了实现这些功能,Spark可以借助Zookeeper来提供一致性和原子性的保证。具体来说,就是将这些需要协调的数据存储在Zookeeper上,并通过Zookeeper提供的API来进行读写操作。这样就可以保证数据在分布式环境中的一致性和可靠性。
spark中的task,stage,job之间的关系
根据算子是否Action算子,来划分job
根据算子是否需要shuffle,来划分stage
根据rdd分区数,划分task
Action操作触发一个job,jon交给DAGsheduler分解成stage
- Application:用户编写的Spark应用程序,由一个或多个Job组成。提交到Spark之后,Spark会为Application分配资源,将程序进行转换并执行。
- Job(作业):由Action算子触发生成的由一个或多个Stage组成的计算作业。
- Stage(调度阶段):每个Job会根据RDD的宽依赖被切分为多个Stage,每个Stage都包含一个TaskSet。
- TaskSet(任务集):一组关联的,但相互之间没有shuffle依赖关系的Task集合。一个TaskSet对应的调度阶段。
- Task(任务):RDD中的一个分区对应一个Task,Task是单个分区上最小的处理流程单元。
RDD
RDD 是 Spark 提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。
- 即如果某个结点上的 RDD partition 因为节点故障,导致数据丢失,那么 RDD 可以通过自己的数据来源重新计算该 partition。这一切对使用者都是透明的。
- RDD 的弹性体现在于 RDD 上自动进行内存和磁盘之间权衡和切换的机制。
RDD vs Dataframe
一般来说,如果你的数据是结构化或半结构化的,比如表格、JSON、CSV等,那么使用DataFrame会更方便,因为它可以提供更高层次的API,支持SQL查询,以及通过Catalyst优化器进行性能优化。而如果你的数据是非结构化的,比如文本、图像、二进制等,那么使用RDD会更灵活,因为它可以让你自定义转换和处理函数,以及控制数据分区和缓存等底层细节。
-
RDD 是 Spark 的基础数据抽象,它可以处理各种类型的数据,包括结构化、半结构化和非结构化数据。DataFrame 是 Spark SQL 模块提供的一种高层数据抽象,它只针对结构化或半结构化数据,需要指定数据的 schema(结构信息)。
-
RDD 是分布式的 Java 对象的集合,每个对象可以是任意类型,Spark 不关心对象的内部结构。DataFrame 是分布式的 Row 对象的集合,每个 Row 对象包含多个列,每列有名称和类型,Spark 可以根据 schema 优化数据的存储和计算。
-
RDD 提供了 low-level 的转换和行动操作,可以用函数式编程的风格来操作数据,但是不支持 SQL 语言和 DSL(特定领域语言)。DataFrame 提供了 high-level 的转换和行动操作,可以用 SQL 语言和 DSL 来操作数据,比如 select, groupby 等。
-
RDD 的优点是编译时类型安全,可以在编译时发现错误,而且具有面向对象编程的风格。DataFrame 的优点是利用 schema 信息来提升执行效率、减少数据读取以及执行计划的优化,比如 filter 下推、裁剪等。
-
RDD 的缺点是构建大量的 Java 对象占用了大量的堆内存空间,导致频繁的垃圾回收(GC),影响程序执行效率。而且数据的序列化和反序列化性能开销很大。DataFrame 的缺点是编译时类型不安全,只能在运行时发现错误,而且不具有面向对象编程的风格。
RDD创建方法
-
使用程序中的集合创建RDD
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用**SparkContext的paralleize()**方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。
//案例:1到10累加求和(scala) val arr = Array(1,2,3,4,5,6,7,8,9,10) val rdd = sc.parallelize(arr) val sum = rdd.reduce(_+_)
-
使用本地的文件创建RDD
-
使用HDFS来创建RDD
通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。
//案例:文件字数统计 val rdd = sc.textFile("data.txt") val wordcount = rdd.map(line => line.length).reduce(_+_)
Spark的RDD操作
- transformat操作会针对已有的RDD创建一个新的RDD
- action则主要是对RDD进行最后的操作
RDD宽窄依赖(stage划分依据)
- 窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖,如map,filter,union;
- 宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle),对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段,如sort,reducebykey,groupbykey;
- spark stage划分,就是从最后一个RDD往前推算,遇到窄依赖就将其加入该stage,如果遇到宽依赖则断开。
RDD持久化
RDD 持久化是 Spark 的一个重要特性,它可以将 RDD 的数据保存在内存或磁盘中,避免重复计算。RDD 持久化的实现主要有以下几个步骤:
- 调用 RDD 的 cache() 或 persist() 方法,将 RDD 标记为需要持久化的状态。这两个方法的区别是 cache() 相当于 persist(StorageLevel.MEMORY_ONLY),而 persist() 可以指定不同的存储级别,如 MEMORY_AND_DISK、DISK_ONLY 等。
- 在第一次对 RDD 执行 action 操作时,触发 RDD 的计算,并将计算结果按照存储级别保存在各个节点的内存或磁盘中。同时,记录 RDD 的依赖关系和分区信息,以便在缓存丢失时重新计算。
- 在后续对 RDD 的操作中,如果发现 RDD 已经被持久化,就直接从内存或磁盘中读取数据,而不需要重新计算。如果缓存的数据丢失或不足,就根据 RDD 的依赖关系和分区信息重新计算缺失的分区。
spark streaming
Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。
它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。
Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交给 spark 的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的 batch 组成的。
Spark Streaming 整合 Kafka 的两种模式
- 基于Receiver的方式:这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。这种方式可以通过启用预写日志(Write Ahead Log,WAL)来实现零数据丢失,但是可能会造成数据重复消费的问题。
- 基于Direct的方式:这种方式没有使用Receiver,而是直接从Kafka中读取数据。这种方式使用了Kafka的低层次Consumer API,可以周期性地查询每个topic的每个partition中的最新offset,然后根据设定的每个partition的最大拉取速率(maxRatePerPartition)来处理每个batch。这种方式不需要预写日志,也不会造成数据重复消费的问题,但是需要自己管理offset的记录和更新。
Kafka
https://zhuanlan.zhihu.com/p/495217561
Kafka 本质上是一个 MQ(Message Queue),使用消息队列的好处?
-
解耦:允许我们独立的扩展或修改队列两边的处理过程。
-
可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
-
缓冲:有助于解决生产消息和消费消息的处理速度不一致的情况。
-
灵活性&峰值处理能力:不会因为突发的超负荷的请求而完全崩溃,消息队列能够使关键组件顶住突发的访问压力。
-
异步通信:消息队列允许用户把消息放入队列但不立即处理它。
kafka工作流程
-
生产者(Producer)向Kafka集群发送消息,消息被封装成一个ProducerRecord对象,该对象包含了要发送的主题(Topic)、分区(Partition)、键(Key)、值(Value)等信息。
-
Kafka集群根据ProducerRecord对象的信息,将消息存储到相应的主题和分区中。主题是逻辑上的消息分类单位,分区是物理上的消息存储单位,每个分区对应一个日志文件(Log),日志文件中的消息都有一个唯一的偏移量(Offset)。
-
消费者(Consumer)从Kafka集群订阅主题,并从指定的分区中拉取消息。消费者属于某个消费者组(Consumer Group),同一个消费者组内的消费者可以消费同一个主题的不同分区,同一个分区只能被同一个消费者组内的某个消费者消费,以避免重复消费。消费者会记录自己消费到了哪个分区的哪个偏移量,以便出错恢复时继续消费。
-
Kafka集群依赖于Zookeeper来保存和管理集群元数据,例如主题、分区、副本、偏移量等信息。Kafka集群中的每个节点都是一个Broker,每个Broker可以容纳多个主题和分区。每个分区都有多个副本,其中一个为主副本(Leader),负责处理读写请求,其余为从副本(Follower),负责同步主副本的数据。当主副本发生故障时,会从从副本中选举出一个新的主副本。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费
kafka常用指令
列出现有的主题
[root@node1 ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka
创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1
查看分区信息
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
查看指定主题的详细信息
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_1
删除指定主题
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1
开启生产者
[root@node1 ~]# kafka-console-producer.sh --topic topic_1 --broker-list localhost:9020
开启消费者
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
开启消费者方式二,从头消费,不按照偏移量消费
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --from-beginning