【大数据】Spark学习笔记

初识Spark

SparkHadoop

HadoopSpark
起源时间20052009
起源地MapReduceUniversity of California Berkeley
数据处理引擎BatchBatch
编程模型MapReduceResilient distributed Datesets
内存管理Disk BasedJVM Managed
延迟
吞吐量
优化机制手动手动
APILow levelhigh level
流处理NASpark Streaming
SQL支持Hive, ImpalaSparkSQL
Graph支持NAGraphX
机器学习支持NASparkML

Spark对比Hadoop特点

Spark优缺点

  1. Spark将运算的中间数据存放在内存, 迭代计算效率更高; 而MapReduce的中间结果需要保存到磁盘
  2. Spark容错性更高, 通过弹性分布式数据集RDD来实现高容错; 一部分数据丢失或戳错可以通过数据集的计算过程的血缘关系来实现重建; MapReduce发生错误只能重新计算
  3. Spark相比于Hadoop提供了transformationaction这两大类的多功能api, 以及流式处理Spark Streaming模块, 图计算GraphX等等; MapReduce只提供了mapreduce两种操作
  4. Spark框架和生态更加复杂, 首先有RDD, 血缘lineage, 执行时的有向无环图DAG/stage划分等, 很多时候都需要根据不同场景分别调优以达到性能要求; 而MapReduce框架及应用较为简单, 但运行较为稳定, 更适合长期稳定运行

Hadoop优缺点

优点:

  1. 高可靠性: hadoop可以按位存储和处理数据
  2. 高扩展性
  3. 高效性: Hadoop能够在节点之间动态的移动数据, 并保证各个节点的动态平衡
  4. 高容错性: Hadoop能够保存数据的多个副本, 并且能够自动将失败的任务重新分配

缺点:

  1. 不适合低延迟的数据访问
  2. 无法高效存储大量小文件
  3. 不支持多用户写入及任意文件的修改

基本概念

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  1. Application: 用户编写的Spark应用程序, 包含了driver程序以及集群上运行的程序代码, 物理机器上涉及了driver, master, worker三个节点
  2. RDD(Resilient Distributed Dataset): 弹性分布式数据集Spark中最基本的数据抽象, 代表了一个不可变, 可分区, 可并行计算的集合. RDD具有数据流模型的特点: 自动容错/位置感知性调度/和可伸缩性. RDD允许用户在执行多个查询时显示地将工作集缓存在内存中, 后续的查询能够重用工作机, 这极大地提升了查询速度. RDD包含:
    1. 分片(Partition): 即苏聚集的基本组成功单位, 对于RDD来说, 每个分片都会被一个计算任务吹了, 并决定并行计算的粒度. 用户可以在创建RDD时指定RDD的分片个数, 如果没有指定, 则会采取默认值即分配到的CPUCore个数
    2. 分区计算函数: Spark中RDD的计算是以分片为单位的, 每个RDD都会实现compute函数以达到这个目的. compute函数会对迭代器进行复合, 不需要保存每次计算结果
    3. 重建: 在部分分区数据丢失时, Spark可以通过这个以来关系重新计算丢失的分区数据, 而不是对RDD的所有分区进行计算
    4. Partitioner即RDD的分片函数: 当前Spark中实现了两种类型的分片函数, 一个是基于哈希的HashPartitioner, 另一个是基于范围的RangePartitioner. 只有对于key-value的RDD, 才会有Partitioner
    5. 优先位置(preferedlocation): 对于一个HDFS文件来说, 这个列表保存的就是每个Partition所在的块的位置, 按住奥"移数据不如移动计算"的理念, Spark在记性任务调度的时候, 会尽可能地讲计算任务分配到其所要处理的块的位置
  3. DAG: 有向无环图
  4. Task: 被发送到executor上的工作单元, 每个Task负责一个分区的数据
  5. ShuffleMapTask: 输出是shuffle所需的数据, stage的划分也以此为依据, shuffle之前的所有变换是一个stage, shuffle之后的操作是另个一个stage
  6. resultTask: 输出是计算结果
  7. Job: 一个Job包含多个RDD及作用于RDD上面的各种操作; 他包含多个task的并行计算, 可以理解为SparkRDD里面的action, 每个action的出发会生成一个job. 用户提交的job会提交给DAGSCheduler; job会被分解为Stage, Stage会被细化乘Task, Task就是每个Partition上的单个数据处理流程
  8. Stage: 是job的基本调度单位, 一个Job会分为多组Task, 每组Task被称为一个Stage就行MapStage, ReduceStage,或者也被称为TaskSet, 代表一组关联的, 相互之间没有Shuffle依赖关系的组成的任务集
  9. Partition: Partition类似hadoop的Split,计算是以partition为单位进行的
  10. Cluster Manager: 指的是在集群上获取资源的外部服务。主要有三种类型:
    1. Standalon : spark原生的资源管理,由Master负责资源的分配。
    2. Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架。
    3. Hadoop Yarn: 主要是指Yarn中的Resource Manager。

Spark 组成

  1. Spark Core: Spark核心, 所有核心功能均为Spark提供, Spark Core以RDD为数据抽象, 提供Api, 可以支持海量离线数据批处理计算
  2. SparkSQL: 基于Spark Core之上, 提供结构化数据的处理模块, 支持以SQL语言对数据的处理, 本身针对离线的计算场景, 同时基于SparkSQL, Spark提供了StructuredStreaming模块, 可以进行数据的流式计算
  3. SparkStream: 以SparkCore为基础, 提供数据的流式计算功能
  4. MLLib: 以SparkCore为基础, 进行机器学习计算
  5. GraphX: 以SparkCore为基础, 进行图计算, 提供了大量图计算相关的Api

Spark运行模式

  1. 本地模式(单机): 本地模式是以一个独立的进程, 通过多个线程来模拟整个Spark运行的环境
  2. Standalone模式(集群): Spark中各个角色以独立进程的形式存在, 并组成Spark集群环境
  3. Hadoop YARN模式(集群): Spark中的各个角色运行在YARN容器内部, 并组成Spark集群环境
  4. Kubernetes模式(容器集群): Spark中的各个角色运行在Kubernetes容器内部, 并组成Spark环境

Spark架构

在这里插入图片描述

  1. Yarn角色分配:
    1. 以资源管理层面: ResoureManger, ResoureManager
    2. 任务计算层面: ApplicationMaster, Task(容器内计算框架的工作角色)
  2. Spark角色分配:
    1. Master: 管理集群的资源
    2. Worker: 集群中任何一个可以运行spark应用代码的节点. Worker是物理节点, 可以在上面启动Executor进程 分配节点资源
    3. Driver: Spark中的Driveer是运行Applicationmain函数, 并且创建了SparkContext; 创建SparkContext的目的是为了准备Spark应用程序的运行环境. 在SparkSparkContext负责与Cluster Manager通信, 进行资源申请/任务分配和监控等. 当Excutor部分运行完毕后, Driver同时负责将SparkContext关闭 单个任务的管理
    4. Executor: 在每个Worker上为某应用启动的一个进程, 该进程负责运行Task, 并且负责将数据存在内存或磁盘上, 每个任务都有各独立的Executor. Executor是一个执行Task的容器 单个任务的执行

Standalone架构

Standalone模式Spark自带的一种集群模式, 集群由Master和Spark组成. 除了Master和Worker以外, 还可能由HistoryServer, 该进程会在Spark Application运行完成之后, 保存事件日志到HDFS, 启动HistoryServer可以查看应用相关的信息

基本使用

安装 Spark1

wget https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
tar -xvf spark-3.4.1-bin-hadoop3.tgz
sudo mv spark-3.4.1-bin-hadoop3 /usr/local/spark
vim ~/.bashrc
export SPARK_HOME="/usr/local/spark"
/usr/local/spark/bin/spark-shell

Spark shell - Spark Jobs (passnight.local)包含Spark访问界面

使用Spark实现WordCount

package com.passnight.bigdata.spark;import lombok.Cleanup;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class WordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");@Cleanup JavaSparkContext context = new JavaSparkContext(conf);JavaRDD<String> data = context.textFile("hdfs://server.passnight.local/test/word list.txt", 10);JavaPairRDD<String, Integer> result = data.flatMap(line -> Arrays.stream(line.split(" ")).iterator()).mapToPair(word -> new Tuple2<>(word, 1)) // 映射成词频.reduceByKey(Integer::sum) // 聚合词频// 排序.mapToPair(Tuple2::swap).sortByKey(false).mapToPair(Tuple2::swap);System.out.println("-".repeat(100));System.out.println(result.collect());System.out.println("-".repeat(100));}
}

输出如下(省略了日志)

[(I,4), (like,2), (passnight,2), (love,2), (hadoop,2)]

RDD

RDD基本概念

  1. 分布式计算需要的机制, RDD是提供这些机制的一个抽象
    1. 分区控制
    2. Shuffle控制
    3. 数据存储/序列化/发送
    4. 数据计算
  2. RDD定义:
    1. Resilient Distributed Dataset(弹性分布式数据集): 是Spark中最基本的数据抽象, 表示一个不可变/可分区/可并行计算的集合, 三个单词分别有以下含义:
    2. Dataset: 一个数据集合, 用于存放数据
    3. Distributed: RDD中的数据是分布式存储的, 可用于分布式计算
    4. Resilient: RDD中的数据可以存储在内存中或者磁盘中
  3. RDD的数据具有以下特性:
    1. 不可变: RDD是不可变集合
    2. 分区性: 数据集合被划分为多个部分, 每个部分被称为分区 对于KV型数据可以有分区器; 且数据读取会尽量靠近数据所在地(移动计算而非数据); 分区是RDD的最小存储单位
    3. 并行性: 计算方法是并行的, 计算方法会作用在每个分区上
    4. 依赖性: RDD之间具有相互依赖的关系 RDD有血缘关系
  4. RDD在WordCount中的数据流:在这里插入图片描述

RDD创建

RDD可以通过读取文件或集合创建rdd

package com.passnight.bigdata.spark;import lombok.Cleanup;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;public class RDDCreation {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local[*]");@Cleanup JavaSparkContext context = new JavaSparkContext(conf);// 通过并行化的方式创建RDD, 默认分区数为核心数JavaRDD<Integer> rdd = context.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9), 3);System.out.println("-".repeat(100));System.out.println(rdd.collect());System.out.println("-".repeat(100));// 也可以通过本地文件创建; 这里的最小分区数是参考值, 而非强制值JavaRDD<String> rdd1 = context.textFile("bigdata/src/main/resources/word list.txt", 100);System.out.println("-".repeat(100));System.out.println(rdd1.getNumPartitions());System.out.println("-".repeat(100));System.out.println(rdd1.collect());System.out.println("-".repeat(100));// 从hdfs读取文件JavaRDD<String> rdd2 = context.textFile("hdfs://server.passnight.local/test/word list.txt");System.out.println("-".repeat(100));System.out.println(rdd2.getNumPartitions());System.out.println("-".repeat(100));System.out.println(rdd2.collect());System.out.println("-".repeat(100));}// 读取多个小文件JavaPairRDD<String, String> rdd3 = context.wholeTextFiles("bigdata/src/main/resources");System.out.println("-".repeat(100));System.out.println(rdd3.getNumPartitions());System.out.println("-".repeat(100));System.out.println(rdd3.collect());System.out.println("-".repeat(100));
}

输出为:

# 这里省略了日志和分隔符
[1, 2, 3, 4, 5, 6, 7, 8, 9]
61
[I love passnight, I like passnight, I love hadoop, I like hadoop]
2
[I love passnight, I like passnight, I love hadoop, I like hadoop]
[(file:/************/bigdata/src/main/resources/word list.txt,I love passnight
I like passnight #......................

Transformation算子

  1. Transformation算子: 返回值仍是一个RDD的算子 这类算子是lazy加载的, 如果没有action算子, 这类算子是不工作的; 如flatMap是一类典型的Transformation算子
  2. Action算子: 返回值不是RDD的算子 例如collect

map算子

功能: map算子, 是将RDD中的数字逐条处理, 返回新的RDD

class Map {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Map").setMaster("local[*]"));List<Integer> rdd = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).map(i -> i * 10).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出为:

计算结果:[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

flatMap算子

功能: 先对rdd进行map操作, 再摊平嵌套

class FlatMap {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("FlatMap").setMaster("local[*]"));List<String> rdd = context.parallelize(Arrays.asList("1 2 3", "4 5 6", "7 8 9"), 3).flatMap(line -> Arrays.stream(line.split(" ")).iterator()).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出为:

计算结果:[1, 2, 3, 4, 5, 6, 7, 8, 9]

可以看到多个数组被摊平为一个数组

reduceByKey算子

功能: 针对KV型RDD, 先对key进行分组, 然后根据提供的聚合逻辑, 完成组内数据的聚合操作

class ReduceByKey {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("ReduceByKey").setMaster("local[*]"));List<Tuple2<String, Integer>> rdd = context.parallelizePairs(Stream.of(1, 1, 1, 2, 2, 2, 3, 4, 4, 3, 10).map(i -> new Tuple2<>(String.format("值: %d", i), i)).collect(Collectors.toList()), 3).reduceByKey(Integer::sum).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[(值: 4,8), (值: 1,3), (值: 2,6), (值: 10,10), (值: 3,6)]

可以看到不同值被分组, 然后进行求和

mapToValues算子

功能: 针对二元元组RDD, 对其内部的Value进行map操作

class MapToValues {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("MapToValues").setMaster("local[*]"));List<Tuple2<String, Integer>> rdd = context.parallelizePairs(Stream.of(1, 1, 1, 2, 2, 2, 3, 4, 4, 3, 10).map(i -> new Tuple2<>(String.format("值: %d", i), i)).collect(Collectors.toList()), 3).mapValues(i -> i * 10).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[(值: 1,10), (值: 1,10), (值: 1,10), (值: 2,20), (值: 2,20), (值: 2,20), (值: 3,30), (值: 4,40), (值: 4,40), (值: 3,30), (值: 10,100)]

可以看到只有值发生了变化, 且变为了原来的10倍

groupBy算子

功能: 将RDD的数据进行分组

class GroupBy {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("GroupBy").setMaster("local[*]"));List<Tuple2<String, Iterable<Tuple2<String, Integer>>>> rdd = context.parallelizePairs(Arrays.asList(Tuple2.apply("a", 1), Tuple2.apply("b", 2), Tuple2.apply("b", 1), Tuple2.apply("a", 3), Tuple2.apply("c", 1)), 3).groupBy(Tuple2::_1).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[(c,[(c,1)]), (a,[(a,1), (a,3)]), (b,[(b,2), (b,1)])]

可以看到已经根据key分组了

ffilter算子

功能: 过滤符合条件的数据

class Filter {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Filter").setMaster("local[*]"));List<Integer> rdd = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).filter(i -> i % 2 == 0).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[0, 2, 4, 6, 8]

可以看到已将偶数都过滤出来了

distinct算子

功能: 将rdd数据去重

class Distinct {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Distinct").setMaster("local[*]"));List<Integer> rdd = context.parallelize(Arrays.asList(1, 1, 1, 2, 2, 2, 3, 3, 3), 3).distinct(2).collect();System.out.printf("计算结果:%n %s%n", rdd);List<Tuple2<String, Integer>> rdd2 = context.parallelizePairs(Arrays.asList(Tuple2.apply("a", 1),Tuple2.apply("b", 1), Tuple2.apply("b", 1),Tuple2.apply("a", 3), Tuple2.apply("a", 1)), 3).distinct(2).collect();System.out.printf("计算结果:%n %s%n", rdd2);}
}

输出结果为:

计算结果:[2, 1, 3]
计算结果:[(a,1), (a,3), (b,1)]

可以看到无论是KV型数据还是普通的数据, 都已经去重了

union算子

功能: 将两个rdd合并成一个rdd

class Union {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Union").setMaster("local[*]"));JavaRDD<Object> rdd1 = context.parallelize(IntStream.range(0, 4).boxed().collect(Collectors.toList()), 3);JavaRDD<Object> rdd2 = context.parallelize(IntStream.range(2, 7).boxed().collect(Collectors.toList()), 3);JavaRDD<Object> rdd3 = context.parallelize(IntStream.range(7, 10).boxed().map(String::valueOf).collect(Collectors.toList()), 3);List<Object> rdd = rdd1.union(rdd2).union(rdd2).union(rdd3).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[0, 1, 2, 3, 2, 3, 4, 5, 6, 2, 3, 4, 5, 6, 7, 8, 9]

可以看到可以合并数据类型, 合并也不会进行去重操作

join算子

功能: 对两个RDD执行join操作, 可以实现SQL的内连接/外连接

class Join {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Join").setMaster("local[*]"));JavaPairRDD<Integer, String> rdd1 = context.parallelizePairs(Arrays.asList(Tuple2.apply(1, "张三"), Tuple2.apply(2, "李四"),Tuple2.apply(3, "王五"), Tuple2.apply(4, "赵六")), 3);JavaPairRDD<Integer, String> rdd2 = context.parallelizePairs(Arrays.asList(Tuple2.apply(1, "生产部"), Tuple2.apply(2, "销售部")), 3);// 默认按照两个rdd的key进行关联, 不像sql无需用on添加条件List<Tuple2<Integer, Tuple2<String, String>>> join = rdd1.join(rdd2).collect();List<Tuple2<Integer, Tuple2<String, Optional<String>>>> leftOuterJoin = rdd1.leftOuterJoin(rdd2).collect();System.out.printf("计算结果(join):%n %s%n", join);System.out.printf("计算结果(leftOuterJoin):%n %s%n", leftOuterJoin);}
}

输出结果为:

计算结果(join):[(1,(张三,生产部)), (2,(李四,销售部))]
计算结果(leftOuterJoin):[(3,(王五,Optional.empty)), (4,(赵六,Optional.empty)), (1,(张三,Optional[生产部])), (2,(李四,Optional[销售部]))]

可以看到两个元组集合根据key关联在一起了, 左外连接保留了在右侧没有对应key的元组

intersection算子

功能: 求两个rdd的交集, 并返回一个rdd

class Intersection {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Intersection").setMaster("local[*]"));JavaRDD<Integer> rdd1 = context.parallelize(IntStream.range(0, 8).boxed().collect(Collectors.toList()), 3);JavaRDD<Integer> rdd2 = context.parallelize(IntStream.range(5, 7).boxed().collect(Collectors.toList()), 3);List<Integer> rdd = rdd1.intersection(rdd2).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为

计算结果:[6, 5]

可以看到只有在两个集合中都存在的才被输出

glom算子

功能: 将RDD的数据按照分区加上嵌套

class Glom {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Glom").setMaster("local[*]"));List<List<Integer>> rdd1 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).glom().collect();System.out.printf("计算结果:%n %s%n", rdd1);List<List<Integer>> rdd2 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 2).glom().collect();System.out.printf("计算结果:%n %s%n", rdd2);}
}

输出结果为:

计算结果:[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
计算结果:[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

可以看到分区数和numSlices参数相对应

groupByKey算子

功能: 针对KV型RDD, 自动按照Key分组

class GroupByKey {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("GroupByKey").setMaster("local[*]"));List<Tuple2<String, Iterable<Integer>>> rdd = context.parallelizePairs(Stream.of(1, 1, 1, 2, 2, 2, 3, 4, 4, 3, 10).map(i -> new Tuple2<>(String.format("值: %d", i), i)).collect(Collectors.toList()), 3).groupByKey().collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为

计算结果:[(值: 4,[4, 4]), (值: 1,[1, 1, 1]), (值: 2,[2, 2, 2]), (值: 10,[10]), (值: 3,[3, 3])]

可以看到已经根据key进行分组了

sortBy算子

功能: 根据输入的函数, 对RDD进行排序

class SortBy {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("SortBy").setMaster("local[*]"));List<Tuple2<String, Integer>> rdd = context.parallelize(new Random().ints(1, 100).boxed().map(integer -> Tuple2.apply(String.format("值(%d)",integer), integer)).limit(10).collect(Collectors.toList()), 3)// 若要全局有序, Partition只能设置为1, 否则只能保证分区内局部有序.sortBy(Tuple2::_2, true, 1).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[((13),13), ((15),15), ((21),21), ((46),46), ((52),52), ((55),55), ((55),55), ((66),66), ((87),87), ((90),90)]

可以看到已经根据元组的第二个元素排序了

sortByKey算子

功能: 针对KV型RDD, 按照Key进行排序

class SortByKey {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("SortByKey").setMaster("local[*]"));List<Tuple2<String, Integer>> rdd = context.parallelizePairs(new Random().ints(1, 100).boxed().map(integer -> Tuple2.apply(String.format("值(%d)", integer), integer)).limit(10).collect(Collectors.toList()), 3).sortByKey(true, 1).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[((11),11), ((25),25), ((45),45), ((63),63), ((64),64), ((65),65), ((71),71), ((77),77), ((79),79), ((98),98)]

可以看到结果已经根据key排序了

Action算子

countByKey算子

功能: 统计key出现的次数, 这个算子是

class CountByKey {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("CountByKey").setMaster("local[*]"));java.util.Map<String, Long> rdd = context.parallelizePairs(Stream.of(1, 1, 1, 2, 2, 2, 3, 4, 4, 3, 10).map(i -> new Tuple2<>(String.format("值: %d", i), i)).collect(Collectors.toList())).countByKey();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:{值: 2=3, 值: 4=2, 值: 3=2, 值: 10=1, 值: 1=3}

可以看到已经根据Key进行计数了

collect算子

功能: 将RDD各个分区内的数据, 统一手机到一个Driver中, 形成一个List对象

class Collect {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Collect").setMaster("local[*]"));List<Integer> rdd = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).collect(); // 注意使用这个算子, 要确认结果集不会太大, 否则可能会导致Driver OOMSystem.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

reduce算子:

功能: 根据传入的逻辑进行聚合

class Reduce {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Reduce").setMaster("local[*]"));Integer result = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).reduce(Integer::sum);System.out.printf("计算结果:%n %s%n", result);}
}

输出结果值为:

计算结果:45

可以看到成功实现求和

flod算子

功能: 相当于有初始值的聚合, 每个分区内都会有一个初始值, 且分区间聚合也有该初始值

class Fold {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Collect").setMaster("local[*]"));Integer result = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).reduce(Integer::sum);System.out.printf("计算结果:%n %s%n", result);}
}

输出结果为:

计算结果:85

三个分区聚合引入三个初始值, 因此三个分区聚合后的结果为[16, 25, 34], 它们再聚合, 并添加10作为初始值, 最后的结果为 10 + 16 + 25 + 34 = 85 10 + 16 + 25 + 34 = 85 10+16+25+34=85

first算子

功能: 取出rd的第一个元素

class First {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("First").setMaster("local[*]"));Integer result = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).first();System.out.printf("计算结果:%n %s%n", result);}
}

输出结果为:

计算结果:0

可以看到去除第一个元素

top算子

功能: 对RDD结果集降序排序, 取前N个

class Top {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Top").setMaster("local[*]"));List<Integer> top3 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.collectingAndThen(Collectors.toList(), (list) -> {Collections.shuffle(list);return list;})), 3).top(3);System.out.printf("计算结果:%n %s%n", top3);}
}

输出结果为:

计算结果:[9, 8, 7]

count算子

功能: 返回RDD的数据数

class Count {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Count").setMaster("local[*]"));long count = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).count();System.out.printf("计算结果:%n %s%n", count);}
}

输出结果为:

计算结果:10

takeSample算子

功能: 随机抽样RDD的数据

class TakeSample {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("TakeSample").setMaster("local[*]"));List<Integer> sample = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).takeSample(true, 3);System.out.printf("计算结果:%n %s%n", sample);}
}

输出结果为:

计算结果:[5, 4, 9]

可以看到随机取了三个rd中的元素

takeOrderd算子

功能: 对RDD进行排序后取前N个 相比于top, 可以制定排序方法

class TakeOrdered {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("TakeOrdered").setMaster("local[*]"));List<Integer> sample = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.collectingAndThen(Collectors.toList(), list -> {Collections.shuffle(list);return list;})), 3).takeOrdered(3);System.out.printf("计算结果:%n %s%n", sample);}
}

输出结果为:

计算结果:[0, 1, 2]

forEach算子

功能: 对rdd的每个元素执行所提供的操作, 但相比于map, 没有返回值 注意forEach是直接由executor执行的, 其他的算子是由Driver输出的

class ForEach {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("ForEach").setMaster("local[*]"));context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).foreach(System.out::println);}
}

输出结果为:

值: 0|值: 3|值: 6|值: 7|值: 4|值: 5|值: 1|值: 2|值: 8|值: 9|

saveAsTextFile算子

功能: 将数据结果写入到文件当中, 这个任务是由Executor执行的 支持本地文件系统, 也支持hdfs; 因为是由Executor执行的, 所以每个分区都会写一部分

class SaveAsTextFile implements Serializable {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("SaveAsTextFile").setMaster("local[*]"));context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).saveAsTextFile("result");}
}

可以看到结果成功写入到文件当中了 , 且结果的文件数量和分区数量相同

passnight@passnight-s600:~/project/note/spring/result$ ll
total 36
drwxr-xr-x  2 passnight passnight 4096 114 14:34 ./
drwxrwxr-x 15 passnight passnight 4096 114 14:34 ../
-rw-r--r--  1 passnight passnight    6 114 14:34 part-00000
-rw-r--r--  1 passnight passnight   12 114 14:34 .part-00000.crc
-rw-r--r--  1 passnight passnight    6 114 14:34 part-00001
-rw-r--r--  1 passnight passnight   12 114 14:34 .part-00001.crc
-rw-r--r--  1 passnight passnight    8 114 14:34 part-00002
-rw-r--r--  1 passnight passnight   12 114 14:34 .part-00002.crc
-rw-r--r--  1 passnight passnight    0 114 14:34 _SUCCESS
-rw-r--r--  1 passnight passnight    8 114 14:34 ._SUCCESS.crc

分区操作算子

mapPartitions算子

功能: 同map一样, 但一次操作一整个分区的数据 这样可以极大减少网络io次数

class MapPartitions {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("MapPartitions").setMaster("local[*]"));List<Integer> rdd = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).mapPartitions(integerIterator -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(integerIterator, 0), false).map(integer -> integer * 10).iterator()).collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

可以看到所有元素的值都变为了原来的10倍

foreachPartitions算子

功能: 同forEach一样, 但一次操作整个分区的数据

class ForeachPartitions {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("ForeachPartitions").setMaster("local[*]"));context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).mapPartitions(integerIterator -> Stream.generate(integerIterator::next).map(integer -> integer * 10).iterator()).foreachPartition(it -> System.out.printf("值: %s|", it));}
}

输出结果为:

值: java.util.Spliterators$1Adapter@8c66b7c|值: java.util.Spliterators$1Adapter@6e74b18e|值: java.util.Spliterators$1Adapter@6cb5e424|

partitionBy算子

功能: 对RDD进行自定义分区操作

class PartitionBy {public static void main(String[] args) {@CleanupJavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("PartitionBy").setMaster("local[*]"));List<List<Tuple2<Integer, String>>> rdd = context.parallelizePairs(Stream.of(1, 1, 1, 2, 2, 2, 3, 4, 4, 3, 10).map(i -> new Tuple2<>(i, String.format("值: %d", i))).collect(Collectors.toList())).partitionBy(new Partitioner() {@Overridepublic int numPartitions() {return 2;}@Overridepublic int getPartition(Object key) {assert key instanceof Integer;Integer k = (Integer) key;return k > 3 ? 1 : 0;}}).glom().collect();System.out.printf("计算结果:%n %s%n", rdd);}
}

输出结果为:

计算结果:[[(1,值: 1), (1,值: 1), (1,值: 1), (2,值: 2), (2,值: 2), (2,值: 2), (3,值: 3), (3,值: 3)], [(4,值: 4), (4,值: 4), (10,值: 10)]]

可以看到大于3和小于3的分为了两组

repartition算子

功能: 改变分区的数量 注意添加分区可能会导致shuffle, 进而影响到性能, 因此尽量不要改变分区大小, 更不要增大分区

class Repartition {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Repartition").setMaster("local[*]"));List<List<Integer>> rdd1 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).glom().collect();System.out.printf("计算结果:%n %s%n", rdd1);List<List<Integer>> rdd2 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 2).repartition(5).glom().collect();System.out.printf("计算结果:%n %s%n", rdd2);List<List<Integer>> rdd3 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 2).repartition(1).glom().collect();System.out.printf("计算结果:%n %s%n", rdd3);}
}

输出结果为:

计算结果:[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]计算结果:[[1, 6], [2, 7], [3, 8], [4, 9], [0, 5]]计算结果:[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

colalesce算子

功能: 修改分区大小 同repartition相比, 它有一个安全机制, 需要打开shuffle才能增加分区

class Coalesce {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Coalesce").setMaster("local[*]"));List<List<Integer>> rdd1 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).glom().collect();System.out.printf("计算结果:%n %s%n", rdd1);List<List<Integer>> rdd2 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 2).coalesce(5).glom().collect();System.out.printf("计算结果:%n %s%n", rdd2);List<List<Integer>> rdd3 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 2).coalesce(1).glom().collect();System.out.printf("计算结果:%n %s%n", rdd3);}
}

输出结果为:

计算结果:[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
计算结果:[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
计算结果:[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

RDD持久化

缓存

  1. rdd之间的血缘关系
    1. rdd之间相互迭代计算, 通过老的rdd计算生成rdd, 新的rdd生成之后老的rdd会被释放以节约内存空间
  2. rdd持久化技术
    1. 新的rdd生成后老的rdd会被释放, 而倘若一个rdd会被使用多次, 这样就要重新计算, 此时可以通过将其持久化到磁盘上来节约计算资源
    2. Spark中可以通过cache方法将其缓存到内存中, 和persist将其持久化到磁盘上 persist也可以只持久化到内存或多个内存副本中
    3. 可以通过unpresisit来主动清理缓存

下面是一个例子, rdd1和rdd2会被使用两次

public class RddCache {@SneakyThrowspublic static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Distinct").setMaster("local[*]"));JavaRDD<Integer> rdd1 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()));JavaRDD<Integer> rdd2 = rdd1.map(x -> x * 10);rdd2.cache(); // 将rdd保存下来JavaRDD<String> rdd3 = rdd2.map(String::valueOf);Integer sum = rdd2.reduce(Integer::sum);String expression = rdd3.reduce(String::concat);System.out.printf("计算结果:%n %s%n", sum);System.out.printf("计算结果:%n %s%n", expression);TimeUnit.DAYS.sleep(1);}
}

在管理界面, 可以看到DAG图:

在这里插入图片描述

在这里插入图片描述

由图可知rdd1和rdd2被计算了2次; 在将rdd缓存下来之后, rdd1和rdd2就只被计算了1次

在这里插入图片描述

在这里插入图片描述

CheckPoint

  1. CheckPoint技术类似cache一样, 也是将RDD的数据保存起来, 但只支持硬盘存储.
  2. CheckPoint 在设计上被认为是安全的, 因此不会保留血缘关系 保留血缘关系在丢失后可以重新计算
  3. CheckPoint存储RDD是集中存储的, 不像Cache是分散存储的 例如将CheckPoint存储到HDFS, 并由HDFS保证其完整性
class CheckPoint {@SneakyThrowspublic static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("CheckPoint").setMaster("local[*]"));context.setCheckpointDir("checkpoint");JavaRDD<Integer> rdd1 = context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()));JavaRDD<Integer> rdd2 = rdd1.map(x -> x * 10);rdd2.checkpoint();JavaRDD<String> rdd3 = rdd2.map(String::valueOf);Integer sum = rdd2.reduce(Integer::sum);String expression = rdd3.reduce(String::concat);System.out.printf("计算结果:%n %s%n", sum);System.out.printf("计算结果:%n %s%n", expression);TimeUnit.DAYS.sleep(1);}
}

可以看到rdd2被缓存下来了

在这里插入图片描述

并且任务2直接从CheckPoint开始执行

在这里插入图片描述

共享变量

广播变量

在这里插入图片描述

  1. 假设一个变量需要被多个分区使用, 可以将该变量标记为广播变量
  2. 若两个分区处于同一个进程, 分区二在请求共享变量的时候, 会被通知可以从同进程的其他线程中获取

广播变量的使用

class Broadcast {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Distinct").setMaster("local[*]"));Map<Integer, String> nameMap = Map.of(3, "张三", 4, "李四", 5, "王五", 6, "赵六");List<Tuple3<Integer, String, Integer>> scoreMap = Arrays.asList(Tuple3.apply(3, "语文", 100),Tuple3.apply(3, "数学", 100),Tuple3.apply(4, "语文", 100),Tuple3.apply(4, "数学", 100),Tuple3.apply(5, "语文", 100),Tuple3.apply(5, "数学", 100),Tuple3.apply(5, "英语", 100),Tuple3.apply(4, "英语", 100),Tuple3.apply(3, "英语", 100));org.apache.spark.broadcast.Broadcast<Map<Integer, String>> broadcastNameMap = context.broadcast(nameMap);System.out.printf("计算结果:%n %s%n", context.parallelize(scoreMap).map(tuple -> Tuple3.apply(nameMap.get(tuple._1()), tuple._2(), tuple._3())).collect());System.out.printf("计算结果:%n %s%n", context.parallelize(scoreMap).map(tuple -> Tuple3.apply(broadcastNameMap.getValue().get(tuple._1()), tuple._2(), tuple._3())).collect());}
}
  1. 以上面的例子为例, broadcast可以将变量封装为广播变量; 这样就可以节约部分情况下变量的传播
  2. 使用广播变量而不使用RDD的原因在于: 使用RDD可能会导致shuffle, 进而使得性能降低 上例中, 假设不是Map而是遍历list找到到对应的key的话, 遍历过程rdd的每个元素都要一次网络io; 广播变量可以一次性传输全量io

累加器

假设要累加分布式对象的数量, 若没有变量共享, 每个分区都会有一个累加器, 进而导致累加的数量少于实际的数量, 下面是一个累加器的例子

class Accumulator {public static void main(String[] args) {@Cleanup JavaSparkContext context = new JavaSparkContext(new SparkConf().setAppName("Accumulator").setMaster("local[*]"));AtomicLong count = new AtomicLong(0); // 不适用累加器context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).map(x -> {count.incrementAndGet();System.out.println("计算过程: " + count.get());return x;}).collect();System.out.printf("计算结果:%n %s%n", count.get());LongAccumulator countAccumulator = context.sc().longAccumulator();// 累加器context.parallelize(IntStream.range(0, 10).boxed().collect(Collectors.toList()), 3).map(x -> {countAccumulator.add(1);System.out.println("累加器计算过程: " + countAccumulator.value());return x;}).collect();System.out.printf("累加器计算结果:%n %s%n", countAccumulator.value());}
}

输出结果为:

计算过程: 1
计算过程: 1
计算过程: 1
计算过程: 2
计算过程: 2
计算过程: 3
计算过程: 2
计算过程: 3
计算过程: 3
计算过程: 4
计算结果:0累加器计算过程: 1
累加器计算过程: 2
累加器计算过程: 3
累加器计算过程: 1
累加器计算过程: 2
累加器计算过程: 3
累加器计算过程: 4
累加器计算过程: 1
累加器计算过程: 2
累加器计算过程: 3
累加器计算结果:10

可以看到每个分区都有一份累加器的拷贝(Executor的拷贝), 并且结算结果是单独的一份拷贝(Driver的拷贝) 传递是值传递, 而非引用传递, 分布式环境下也无法实现引用传递; 但是如果使用Accumulator的话, 尽管各个分区都是值传递, 但是最后累加的结果会作用在Drive的父拷贝上 注意, 多一个rdd被创创建多次, 会导致accumulator被执行多次, 可以使用cache解决这个问题

Spark Scheduler

  1. Spark的计算调度是Spark Scheduler完成的, 而任务的调度又先后关系, 基于这些关系形成的DAG划分Stage, 调度中Spark Scheduler将每个任务发到指定的节点运行
  2. 基本概念
    1. Action: 流水线的开关, 只有执行了Action算子, 前面的Transformation算子才会开始执行

    2. Job: 任务, 一个Action会产生一个job

    3. DAG: 有向无环图, 这里特指RDD间血缘关系形成的有向无环图 在运行时, 会生成带有分区关系的DAG

    4. 宽依赖: 父RDD的一个分区, 将数据发给子RDD的多个分区 此过程也被成为shuffle

    5. 窄依赖: 父RDD的一份分区, 全部将数据发送给子RDD的一个分区

    6. 如下图所示,所有都是窄依赖 子节点接受多个父节点也属于窄依赖在这里插入图片描述

    7. 该图所有的情况都是宽依赖 可以看到所有的RDD都存在分叉在这里插入图片描述

    8. Stage: stage是通过宽依赖划分的, 一个宽依赖会划分出一个新的Stage, 因此Stage内部一定是窄依赖

内存迭代计算

在这里插入图片描述

  1. Spark调度器会根据DAG, 按照宽窄以来划分DAG阶段
  2. Spark调度器会尽量将窄依赖划分为一个任务, 这样可以减少网络交互IO
  3. 如下图, Task1-Task3都可以在一个分区上完成计算, 因此Spark可以将这些算子调度在同一个内存计算管道
  4. 倘若Task3-Task6都在Executor上, 他们之间的数据交互也是通过内存 底层是本地回环网络

Spark并行度

  1. 定义: 同一时间内, 同时运行的Task数量
  2. 全局并行度可以通过spark.default.parallelism配置 可以在启动参数/配置文件/SparkConf对象中配置

Spark任务调度

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  1. DAG Scheduler: 处理逻辑的DAG图, 最后得到逻辑上的Task划分
  2. Task Scheduler: 基于DAG Scheduler调度处的逻辑划分, 决定任务实际在那些物理Executor上执行, 以及监控管理他们的运行

Spark SQL

基本使用

  1. SparkSQL是Spark的一个模块, 用于处理海量的结构化数据
  2. SparkSQL特点
    1. 融合性: SQL可以无缝集成在代码中, 随时用SQL处理数据
    2. 统一数据访问: 一套标准API课读写不同数据源
    3. Hive兼容: 可以使用SparkSQL直接计算并生成Hive数据表
    4. 标准化连接: 支持标准化JDBC/ODBC连接, 方便和各种数据库进行交互
  3. SparkSQL和Hive的异同点
    1. 相同点:
      1. Spark和Hive都是分布式SQL计算引擎
      2. 都可以运行在YARN之上
    2. SparkSQL特点
      1. 内存计算; 底层基于SparkRDD
      2. 无元数据管理
    3. Hive特点
      1. 磁盘计算, 底层基于MapReduce
      2. 元数据管理基于MetaStore
  4. SparkSQL中的数据抽象:
    1. DataFrame, 是以二维表数据结构存储 类似于Pandas, 但是是分布式存储
    2. SchemaRDD对象: 类似于RDD, 对RDD修改使其支持SQL
    3. DataSet对象: 用于Java/Scala语言, 带有泛型特性
  5. SparkSession: 类似于SparkContext, 是Spark的入口对象
    1. 可以用作SparkSQL入口对象
    2. 也可以用于SparkCore编程, 因为可以通过SparkSession获取SparkContext对象

读取csv

对于订单数据

id,user_id,commodity_code,count,money
2,user1,00001,2,200
3,user1,00001,2,200
4,user1,00001,2,200
9,user1,00001,2,200
10,user1,00001,2,200
11,user1,00001,2,200
12,user1,00001,2,200
13,user1,00001,2,200
15,user1,00001,2,200
18,user1,00001,20,200

可以通过spark读取

public class SparkSQLBase {public static void main(String[] args) throws AnalysisException {// 创建SparkSession对象SparkSession spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate();// 通过SparkSession获取SparkContextSparkContext context = spark.sparkContext();Dataset<Row> df = spark.read().csv("bigdata/src/main/resources/order.csv").toDF("id", "user_id", "commodity_code", "count", "money");df.printSchema();df.show();// 创建表df.createTempView("order");// 写sqlspark.sql("SELECT * FROM order limit 3;").show();// 使用dsl风格写sqldf.where("count=20").show();}
}

输出为

root|-- id: string (nullable = true)|-- user_id: string (nullable = true)|-- commodity_code: string (nullable = true)|-- count: string (nullable = true)|-- money: string (nullable = true)+---+-------+--------------+-----+-----+
| id|user_id|commodity_code|count|money|
+---+-------+--------------+-----+-----+
| id|user_id|commodity_code|count|money|
|  2|  user1|         00001|    2|  200|
|  3|  user1|         00001|    2|  200|
|  4|  user1|         00001|    2|  200|
|  9|  user1|         00001|    2|  200|
| 10|  user1|         00001|    2|  200|
| 11|  user1|         00001|    2|  200|
| 12|  user1|         00001|    2|  200|
| 13|  user1|         00001|    2|  200|
| 15|  user1|         00001|    2|  200|
| 18|  user1|         00001|   20|  200|
+---+-------+--------------+-----+-----++---+-------+--------------+-----+-----+
| id|user_id|commodity_code|count|money|
+---+-------+--------------+-----+-----+
| id|user_id|commodity_code|count|money|
|  2|  user1|         00001|    2|  200|
|  3|  user1|         00001|    2|  200|
+---+-------+--------------+-----+-----++---+-------+--------------+-----+-----+
| id|user_id|commodity_code|count|money|
+---+-------+--------------+-----+-----+
| 18|  user1|         00001|   20|  200|
+---+-------+--------------+-----+-----+

DataFrame

  1. DataFrame是一个二维表结构, 因此由以下三点构成:
    1. 行: 数据行
    2. 列: 记录一个列的数据, 并且描述一个列的信息
    3. 表结构: 描述表的结构
  2. DataFrame具体组成
    1. StructType: 描述整个DataFrame的表结构
    2. StructField: 描述一个列的信息
  3. 数据层面组成:
    1. Row: 记录一整行的数据
    2. Column: 记录一个列的数据, 并且包含列的信息

通过StructType构建DataSet

    @Testpublic void buildFromRdd() throws AnalysisException {@Cleanup JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());JavaRDD<Row> rdd = context.textFile("src/main/resources/traffic.txt", 10).map(line -> line.split("\t")).map(words -> RowFactory.create(Long.parseLong(words[0]), words[1]));Dataset<Row> df = spark.createDataFrame(rdd, DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("phone_number", DataTypes.LongType, true),DataTypes.createStructField("ip", DataTypes.StringType, true))));df.printSchema();// 展示数据// 展示前20条数据, 并且不截断数据df.show(20, false);// 将DataSet注册为临时表; 这样就可以查询了df.createTempView("traffic");spark.sql("SELECT * FROM traffic where phone_number < 14589530085").show();}

从不同数据源读取数据(以CSV为例)

    @Testpublic void buildFromSparkSql() throws AnalysisException {Dataset<Row> df = spark.read().format("csv").option("header", false).option("sep", "\t").option("encoding", StandardCharsets.UTF_8.name()).schema(DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("phone_number", DataTypes.LongType, true),DataTypes.createStructField("ip", DataTypes.StringType, true)))).load("src/main/resources/traffic.txt");df.printSchema();df.createTempView("traffic");spark.sql("SELECT * FROM traffic where phone_number < 14589530085").show();}

DataFrame操作

数据准备

    private final static SparkSession spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate();private final static Dataset<Row> df = spark.read().format("csv").option("header", false).option("sep", "\t").option("encoding", StandardCharsets.UTF_8.name()).schema(DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("phone_number", DataTypes.LongType, true),DataTypes.createStructField("ip", DataTypes.StringType, true),DataTypes.createStructField("host", DataTypes.StringType, true),DataTypes.createStructField("up", DataTypes.LongType, false),DataTypes.createStructField("down", DataTypes.LongType, false),DataTypes.createStructField("code", DataTypes.IntegerType, false)))).load("src/main/resources/traffic.txt");

通过DSL语法操作

    @Testpublic void dslStyleQuery() {df.select("ip", "code").filter(df.col("phone_number").lt(14589530085L)).limit(10).show();}

输出为

+--------------+----+
|            ip|code|
+--------------+----+
| 110.11.174.29| 200|
| 21.234.130.14| 200|
| 90.242.200.96| 404|
|  68.99.109.14| 200|
|148.227.226.79| 404|
|153.178.25.132| 200|
| 191.49.192.31| 500|
| 10.60.145.193| 500|
|  52.122.13.63| 500|
| 203.82.225.65| 500|
+--------------+----+

通过SQL操作

    @Testpublic void sqlStyleQuery() throws AnalysisException {
//        // 创建全局临时试图, 可以跨session共享
//        df.createGlobalTempView()
//        // 同createTempView, 但是视图存在则替换;
//        df.createOrReplaceGlobalTempView();df.createTempView("traffic");spark.sql("SELECT code, count(*) FROM traffic group by code").show();}

输出为

+----+--------+
|code|count(1)|
+----+--------+
| 500|      38|
| 404|      25|
| 200|      37|
+----+--------+

SparkSQL实现WordCount

通过rdd分词

    @Testpublic void wordCount_buildFromRDD() throws AnalysisException {@Cleanup JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());JavaRDD<Row> rdd = context.textFile("hdfs://server.passnight.local/test/word list.txt", 10).flatMap(line -> Arrays.stream(line.split(" ")).iterator()).map(RowFactory::create);Dataset<Row> df = spark.createDataFrame(rdd, DataTypes.createStructType(List.of(DataTypes.createStructField("word", DataTypes.StringType, false))));df.createTempView("words");spark.sql("SELECT word, count(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show();}

输出为

+---------+---+
|     word|cnt|
+---------+---+
|        I|  4|
|     love|  2|
|passnight|  2|
|     like|  2|
|   hadoop|  2|
+---------+---+

通过functions分词

    @Testpublic void wordCount_buildFromSparkSql() throws AnalysisException {Dataset<Row> words = spark.read().text("hdfs://server.passnight.local/test/word list.txt");words.printSchema();Dataset<Row> df2 = words.withColumn("value", functions.explode(functions.split(words.col("value"), " ")));df2.createTempView("words");df2.groupBy("value").count().orderBy("count").show();}

输出为

+---------+-----+
|    value|count|
+---------+-----+
|     love|    2|
|passnight|    2|
|     like|    2|
|   hadoop|    2|
|        I|    4|
+---------+-----+

写api

@Testpublic void writeText() {// text只能写出一列数据, 因此要将df转化为一列df.select(functions.concat_ws("---",functions.col("ip"),functions.col("up"),functions.col("down"))).write().mode("overwrite").format("text").save("data.txt");}@Testpublic void writeCsv() {df.select(functions.col("ip"),functions.col("up"),functions.col("down")).write().mode("overwrite").option("sep", ",").option("header", true).format("csv").save("data.csv");}@Testpublic void writeJson() {df.select(functions.col("ip"),functions.col("up"),functions.col("down")).write().mode("overwrite").format("json").save("data.json");}@Testpublic void writeParquet() {df.select(functions.col("ip"),functions.col("up"),functions.col("down")).write().mode("overwrite").format("parquet").save("data.parquet");}

UDF

  1. 在SparkSQL分析处理数据时, 往往需要使用到函数; 而SparkSQL自带的函数可能无法覆盖全部的需求, 因此SparkSQL可以通过自定义UDF来实现自定义函数
  2. 在Hive中, UDF分为以下三类
    1. UDF(User Defined Function)函数:
      1. 1对1关系, 输入一个值, 经过函数输出后输出一个值
      2. 在Hive中集成UDF类, 方法名称为evaluate, 返回值不能为void; 本质上是一个方法
    2. UDAF(User Defined Aggregation Function)聚合函数
      1. 多对1关系, 输入多个值输出一个值, 通常与groupBy一起使用
    3. UDTF(User Defined Table-Generating Function)函数
      1. 1对多的关系, 输入一个值, 输出多个值 类似flatMap

基本UDF

  1. UDF的使用有以下三种方式
    1. 调用spark.udf().register()后, 通过funcions.callUDF调用
    2. 通过funcitons.udf的返回值调用
    3. 调用spark.udf().register()后, 直接在SQL中调用
    @Testpublic void basicUdf() throws AnalysisException {Dataset<Row> df = spark.createDataFrame(IntStream.range(0, 10).boxed().map(RowFactory::create).collect(Collectors.toList()),DataTypes.createStructType(List.of(DataTypes.createStructField("value", DataTypes.IntegerType, false))));// 注册一个udf, 名称为`timeTen`// `timeTen`名称可以可以用于SQL风格调用// dsl风格通过`functions.callUDF()`调用// 也可以通过functions.udf注册, 这样可以直接通过返回的方法调用spark.udf().register("timeTen", (UDF1<Integer, Integer>) x -> 10 * x, DataTypes.IntegerType);// dsl风格, 使用`functions.callUDF()`调用UserDefinedFunction timeTen = functions.udf((UDF1<Integer, Integer>) x -> x * 10, DataTypes.IntegerType);df.withColumn("value", functions.callUDF("timeTen", functions.col("value"))).show();// dsl风格, 使用`functions.udf()`返回值df.withColumn("value", timeTen.apply(functions.col("value"))).show();// SQL风格, 直接在SQL中调用df.createTempView("values");spark.sql("select timeTen(value) from values").show();}

输出为

+-----+
|value|
+-----+
|    0|
|   10|
|   20|
|   30|
|   40|
|   50|
|   60|
|   70|
|   80|
|   90|
+-----++-----+
|value|
+-----+
|    0|
|   10|
|   20|
|   30|
|   40|
|   50|
|   60|
|   70|
|   80|
|   90|
+-----++--------------+
|timeTen(value)|
+--------------+
|             0|
|            10|
|            20|
|            30|
|            40|
|            50|
|            60|
|            70|
|            80|
|            90|
+--------------+

返回数组的UDF

    @Testpublic void arrayUdf() {Dataset<Row> df = spark.createDataFrame(IntStream.range(0, 10).boxed().map(RowFactory::create).collect(Collectors.toList()),DataTypes.createStructType(List.of(DataTypes.createStructField("value", DataTypes.IntegerType, false))));UserDefinedFunction toArray = functions.udf((UDF1<Integer, List<Integer>>) x -> Arrays.asList(x, x, x, x, x), DataTypes.createArrayType(DataTypes.IntegerType));df.withColumn("value", toArray.apply(functions.col("value"))).show();}

输出为

+---------------+
|          value|
+---------------+
|[0, 0, 0, 0, 0]|
|[1, 1, 1, 1, 1]|
|[2, 2, 2, 2, 2]|
|[3, 3, 3, 3, 3]|
|[4, 4, 4, 4, 4]|
|[5, 5, 5, 5, 5]|
|[6, 6, 6, 6, 6]|
|[7, 7, 7, 7, 7]|
|[8, 8, 8, 8, 8]|
|[9, 9, 9, 9, 9]|
+---------------+

返回Map类型的UDF

    @Testpublic void mapUdf() {Dataset<Row> df = spark.createDataFrame(IntStream.range(0, 10).boxed().map(RowFactory::create).collect(Collectors.toList()),DataTypes.createStructType(List.of(DataTypes.createStructField("value", DataTypes.IntegerType, false))));UserDefinedFunction toArray = functions.udf((UDF1<Integer, Map<Integer, String>>) x -> Map.of(x, String.valueOf(x)), DataTypes.createMapType(DataTypes.IntegerType, DataTypes.StringType));df.withColumn("value", toArray.apply(functions.col("value"))).show();}

输出为

+--------+
|   value|
+--------+
|{0 -> 0}|
|{1 -> 1}|
|{2 -> 2}|
|{3 -> 3}|
|{4 -> 4}|
|{5 -> 5}|
|{6 -> 6}|
|{7 -> 7}|
|{8 -> 8}|
|{9 -> 9}|
+--------+

窗口函数

  1. 窗口函数: 窗口函数既显示聚合前的数据又显示聚合后的数据 即显示每一行都显示聚合结果
public class WindowTest {private final static SparkSession spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate();private final static Dataset<Row> df = spark.read().format("csv").option("header", false).option("sep", "\t").option("encoding", StandardCharsets.UTF_8.name()).schema(DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("phone_number", DataTypes.LongType, true),DataTypes.createStructField("ip", DataTypes.StringType, true),DataTypes.createStructField("host", DataTypes.StringType, true),DataTypes.createStructField("up", DataTypes.LongType, false),DataTypes.createStructField("down", DataTypes.LongType, false),DataTypes.createStructField("code", DataTypes.IntegerType, false)))).load("src/main/resources/traffic.txt");@BeforeClasspublic static void setUpClass() throws AnalysisException {df.createTempView("traffic");}@Testpublic void aggregationWindow() {spark.sql("SELECT *, AVG(down) OVER() AS avg_down FROM traffic").show();}@Testpublic void orderWindow() {spark.sql("SELECT *, RANK() OVER(ORDER BY down DESC) AS rank_down," +"DENSE_RANK() OVER(PARTITION BY code ORDER BY down DESC) AS dense_rank_down," +"ROW_NUMBER() OVER(ORDER BY down) AS row_number_down FROM traffic").show();}
}

输出结果为

+------------+---------------+--------------------+----+----+----+--------+
|phone_number|             ip|                host|  up|down|code|avg_down|
+------------+---------------+--------------------+----+----+----+--------+
| 14591430480| 206.175.250.82|        web-49.28.cn|6652|4853| 200| 4597.34|
| 14576404331|  110.11.174.29|       lt-91.duxu.cn|3691|9180| 200| 4597.34|
| 14582487728|  21.234.130.14|    desktop-19.13.cn|2797|1428| 200| 4597.34|
| 14596521336| 149.125.91.187|    db-46.guiying.cn|4742|3870| 500| 4597.34|
| 15964887988|201.254.165.183|desktop-12.guiyin...|8266|8951| 500| 4597.34|
| 14582499209|  90.242.200.96|         db-36.05.cn|3686|1143| 404| 4597.34|
| 14505200322|   68.99.109.14|       web-94.40.org|6978|4684| 200| 4597.34|
| 15057102608|  73.31.103.153|desktop-77.zhongw...|1180| 785| 404| 4597.34|
| 15961211597| 159.244.71.102|       web-89.dh.net|8526|4965| 500| 4597.34|
| 15311413947|   60.85.30.231|  db-34.shenhuang.cn|1942|9698| 500| 4597.34|
| 13755692548| 148.227.226.79|   srv-89.fanggao.cn|3049|2243| 404| 4597.34|
| 15512665231| 172.147.244.20| lt-73.taoguiying.cn|9494| 151| 200| 4597.34|
| 13671972925| 153.178.25.132|        srv-73.31.cn|3311|1452| 200| 4597.34|
| 18142899590|  31.150.73.196|        web-49.nl.cn|2909| 277| 500| 4597.34|
| 15013760479|   94.26.117.22|      email-47.63.cn|5645|4756| 200| 4597.34|
| 15696235979|   80.73.193.75|        lt-91.lei.cn|9845|1267| 404| 4597.34|
| 15678423363| 171.44.202.193|         db-37.99.cn|7496|7354| 200| 4597.34|
| 13313631905|  191.49.192.31|     laptop-78.nd.cn|3037|3070| 500| 4597.34|
| 15911783755|   208.18.32.83|       db-48.yao.net|4846|4935| 404| 4597.34|
| 14589530086| 57.193.203.100|   srv-25.mingcao.cn|1861|3034| 200| 4597.34|
+------------+---------------+--------------------+----+----+----+--------++------------+---------------+--------------------+----+----+----+---------+---------------+---------------+
|phone_number|             ip|                host|  up|down|code|rank_down|dense_rank_down|row_number_down|
+------------+---------------+--------------------+----+----+----+---------+---------------+---------------+
| 14576404331|  110.11.174.29|       lt-91.duxu.cn|3691|9180| 200|        5|              1|             96|
| 15848614274|221.193.203.253|  web-81.duanxiao.cn|8474|8973| 200|        6|              2|             95|
| 15133313425|   3.41.203.203|    laptop-65.fan.cn|1979|8496| 200|       12|              3|             89|
| 14722781518|  41.33.230.230|laptop-66.zhangze...|3840|8161| 200|       15|              4|             86|
| 13985614323| 105.154.67.146|    laptop-67.jie.cn|5629|7876| 200|       18|              5|             83|
| 15025055835|   57.67.224.58|   web-46.weiliao.cn|1093|7830| 200|       19|              6|             82|
| 15708423956|   52.173.24.63|  web-36.yangchao.cn| 201|7774| 200|       20|              7|             81|
| 15550368967|  181.80.90.147|  laptop-44.yanli.cn|4669|7770| 200|       21|              8|             80|
| 15281538689| 173.204.178.87|    web-83.weimin.cn|7082|7718| 200|       23|              9|             78|
| 13618975336|   178.12.49.98|      srv-13.min.net|9290|7560| 200|       24|             10|             77|
| 14765852831|  121.98.240.15|   laptop-81.fang.cn|3595|7556| 200|       25|             11|             76|
| 15678423363| 171.44.202.193|         db-37.99.cn|7496|7354| 200|       27|             12|             74|
| 13434369051|   220.82.35.57| srv-03.longqian.org|2160|6650| 200|       30|             13|             71|
| 14554599007|    52.35.92.91| web-91.xiulanlai.cn|6098|5124| 200|       38|             14|             63|
| 14591430480| 206.175.250.82|        web-49.28.cn|6652|4853| 200|       42|             15|             59|
| 15013760479|   94.26.117.22|      email-47.63.cn|5645|4756| 200|       43|             16|             58|
| 14505200322|   68.99.109.14|       web-94.40.org|6978|4684| 200|       46|             17|             55|
| 13913021809| 190.111.163.19|        srv-83.jn.cn|8296|4349| 200|       48|             18|             53|
| 15108282222|   56.175.78.40|   laptop-17.mao.net|8846|3830| 200|       55|             19|             46|
| 18788825153|   61.76.43.152|        lt-48.81.net|8101|3812| 200|       56|             20|             45|
+------------+---------------+--------------------+----+----+----+---------+---------------+---------------+

SparkSQL执行流程

  1. RDD的执行流程RDD->DAGScheduler->TaskSceduler->Worker

  2. 与RDD不同的是, SparkSQL会对写完的代码执行自动优化; 以提高代码执行效率

  3. SparkSQL可以自动优化而RD不行的原因

    1. RDD仅包含数据而不包含格式; DataFrame是有结构的二维表结构
    2. SparkSQL的优化器为Catalyst优化器
  4. Catalyst执行流程:

    1. API
      Catalyst
      RDD
      Cluster
    2. API层接受SQL语句, Catalyst解析SQL并生对应的RDD执行计划, 并由集群执行
    

Catalyst优化器

具体流程

元数据
API
未解析的逻辑计划
逻辑计划
优化的逻辑计划
物理执行计划
RDD
  1. 解析SQL, 生成AST: 在这里插入图片描述

  2. 在AST中加入元数据信息, 便于后续优化 如score.id -> id#1#L; 表score.id的id为1, 类型为Long

  3. 进行优化, 主要的友发方式有谓词下推列值裁剪

    1. 谓词下推: 尽量下推谓词操作, 这样可以减少操作时候的数据量
    2. 列值裁剪: 在断言下推后执行裁剪, 裁剪掉不需要的列, 进而减少需要处理的数据量
  4. 生成执行计划: 根据上述过程生成的优化后的AST, 生成物理计划, 从而生成RDD来执行

引用

^:


  1. Downloads | Apache Spark ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/233217.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL之视图外连接、内连接和子查询的使用

目录 一、视图 1.1 含义 1.2 操作 1.3 SQL数据 二、连接查询案例 &#xff08;1&#xff09;查询" 01 "课程比" 02 "课程成绩高的学生的信息及课程分数 &#xff08;2&#xff09;查询同时存在" 01 "课程和" 02 "课程的情况 &a…

docker安裝gocd-server,并配置gitlab授权登录

gocd的地址&#xff1a;Installing GoCD server on Windows | GoCD User Documentation gocd文档&#xff1a;GitHub - gocd/docker-gocd-server: Docker server image for GoCD 一、docker拉取gocd镜像 #拉取server镜像 docker pull gocd/gocd-server:v21.1.0docker pull g…

java SSM水质历史数据可视化设计myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM水质历史数据可视化设计是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主…

QT常用控件使用及布局

QT常用控件使用及布局 文章目录 QT常用控件使用及布局1、创建带Ui的工程2、ui界面介绍1、界面设计区2、对象监视区3、对象监属性编辑区4、信号与槽5、布局器6、控件1、Layouts1、布局管理器2、布局的dome 2、Spacers3、Buttons4、项目视图组(Item Views)5、项目控件组(Item Wid…

“数据要素×”正式来袭|美创“全栈能力、深入场景”保障数据价值安全释放

千呼万唤&#xff0c;1月4日&#xff0c;国家数据局等17部门联合印发的《“数据要素”三年行动计划&#xff08;2024—2026年&#xff09;》&#xff08;下称《三年行动计划》&#xff09;正式发布&#xff01; 作为国家数据局成立以来公开发布的首个重磅文件&#xff0c;《三年…

大数据StarRocks(四) :常用命令

这次主要介绍生产工作中使用Starrocks时的常用命令 4.1 连接StarRocks 4.1.1 Linux命令行连接 [roothadoop1011 fe]# yum install mysql -y [roothadoop1011 fe]# mysql -h hadoop101 -uroot -P9030 -p4.1.2 Windows客户端 DBeaver 连接 4.2 常用命令 4.2.1 查看状态 1. 查…

uniapp获取手机当前信息及应用版本

appVersion 是app端查询的数据信息 appWgtVersion 是浏览器端查询的数据信息 onLoad() {const systemInfo uni.getSystemInfoSync();console.log(systemInfo);// #ifdef H5const uniAppVersion systemInfo.appVersion;// #endif// #ifndef H5const uniAppVersion systemIn…

案例分享:Qt多国语言输入法软键盘

若该文为原创文章&#xff0c;转载请注明出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/135346374 红胖子(红模仿)的博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬结…

vue-springboot基于java的实验室安全考试系统

本系统为用户而设计制作实验室安全考试系统&#xff0c;旨在实现实验室安全考试智能化、现代化管理。本实验室安全考试管理自动化系统的开发和研制的最终目的是将实验室安全考试的运作模式从手工记录数据转变为网络信息查询管理&#xff0c;从而为现代管理人员的使用提供更多的…

模板管理支持批量操作,DataEase开源数据可视化分析平台v2.2.0发布

2024年1月8日&#xff0c;DataEase开源数据可视化分析平台正式发布v2.2.0版本。 这一版本的功能升级包括&#xff1a;在“模板管理”页面中&#xff0c;用户可以通过模板管理的批量操作功能&#xff0c;对已有模板进行快速重新分类、删除等维护操作&#xff1b;数据大屏中&…

大数据 - Doris系列《二》- Doris安装(亲测成功版)

目录 &#x1f436;2.1 安装前准备 &#x1f959;1.设置系统最大文件打开句柄数 >启动一个程序的时候&#xff0c;打开文件的数量就是句柄数 &#x1f959;2.设置文件包含限制一个进程可以拥有的VMA(虚拟内存区域)的数量 &#x1f959;3.时钟同步 &#x1f959;4.关闭交…

labelImg的安装与使用

目录 1、查看本机是否安装labelImg 2、安装labelImg 3、创建自己的数据集 3.1 建立新文件夹 3.2 打开labelImg 注意&#xff1a;出现闪退的情况处理。 4、文件格式转换 4.1 修改文件夹路径 4.2 新建datasets文件夹 4.3 修改图片路径 4.4 执行 1、查看本机是否安装la…

uniCloud 云函数

相对于云函数&#xff0c;官方更推荐使用 云对象 新建云函数 编辑云函数 uniCloud-aliyun/cloudfunctions/hello_func/index.js use strict; exports.main async (event, context) > {let {name} eventreturn 你好&#xff0c;${name}! };云函数接收的参数从event中解构获…

部署可道云网盘的一个漏洞解决

目录 1漏洞展示 2.防范措施 1漏洞展示 因为可道云网盘的上传文档有保存在 /data/Group/public/home/文档/ 中,当别有用心之人知道个人部署的域名与上次的文件后&#xff0c;可以进行访问拿到uid。例我在我部署的网盘上上次一个aa.php 文件&#xff0c;然后拿来演示 然后通过…

密码学中的Hash函数

目录 一. 介绍 二. hash函数的五个基本性质 &#xff08;&#xff11;&#xff09;压缩性 &#xff08;&#xff12;&#xff09;正向计算简单性 &#xff08;&#xff13;&#xff09;逆向计算困难性 &#xff08;&#xff14;&#xff09;弱无碰撞性 &#xff08;&…

RabbitMQ(八)消息的序列化

目录 一、为什么需要消息序列化&#xff1f;二、常用的消息序列化方式1&#xff09;Java原生序列化&#xff08;默认&#xff09;2&#xff09;JSON格式3&#xff09;Protobuf 格式4&#xff09;Avro 格式5&#xff09;MessagePack 格式 三、总结 RabbitMQ 是一个强大的消息中间…

安全基础~信息搜集3

文章目录 知识补充APP信息搜集php开发学习理解漏洞 知识补充 端口渗透总结 python Crypto报错&#xff1a;https://blog.csdn.net/five3/article/details/86160683 APP信息搜集 1. AppInfoScanner 移动端(Android、iOS、WEB、H5、静态网站)信息收集扫描工具 使用教程 演示&…

【Harmony OS - 网络请求】

在一个应用开发中&#xff0c;网络请求是必不可少的&#xff0c;我们一般用的fetch、axios来进行http请求&#xff0c;在鸿蒙中也可以通过createHppt来发生一个http请求&#xff0c;它们都是异步请求返回的Promise&#xff0c;下面我们将介绍’ohos.net.http’和axios这两种方式…

网络端口(包括TCP端口和UDP端口)的作用、定义、分类,以及在视频监控和流媒体通信中的定义

目 录 一、什么地方会用到网络端口&#xff1f; 二、端口的定义和作用 &#xff08;一&#xff09;TCP协议和UDP协议 &#xff08;二&#xff09;端口的定义 &#xff08;三&#xff09;在TCP/IP体系中&#xff0c;端口(TCP和UDP)的作用 &#xff08;…

SSM框架学习笔记01 | 注解开发

文章目录 1. 注解形式定义bean2.纯注解开发3.bean管理4. 依赖注入5. 第三方bean管理总结 1. 注解形式定义bean Compoenet ControllerServiceRepository 配合代码块 <context:component-scan /> 使用 2.纯注解开发 Configuration ComponentScan AnnotationConfigApplicati…