目录
一、编写WordCount(Spark_scala)提交到spark高可用集群
1.项目结构
2.导入依赖
3.编写scala版的WordCount
4.maven打包
5.运行jar包
6.查询hdfs的输出结果
二、本地编写WordCount(Spark_scala)读取本地文件
1.项目结构
2.编写scala版的WordCount
3.编辑Edit Configurations配置文件
4.直接本地运行LocalWordCount.scala文件
三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地
1.项目结构
2.编写java版的WordCount
3.编辑Edit Configurations配置文件
4.运行后查看结果
四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地
1.项目结构
2.编写java-lambda版的WordCount
3.编辑Edit Configurations配置文件
4.运行后查看结果
搞了一个晚上加一个白天,总算搞出来了,呼~~
本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:
《Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2》
一、编写WordCount(Spark_scala)提交到spark高可用集群
首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码
1.项目结构
2.导入依赖
<name>spark-in-action</name><url>http://maven.apache.org</url><!-- 定义的一些常量 --><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><encoding>UTF-8</encoding><spark.version>3.3.2</spark.version><scala.version>2.12.15</scala.version></properties><dependencies><!-- scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency></dependencies><!-- 配置Maven的镜像库 --><!-- 依赖下载国内镜像库 --><repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></repository></repositories><!-- maven插件下载国内镜像库 --><pluginRepositories><pluginRepository><id>ali-plugin</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></pluginRepository></pluginRepositories><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
3.编写scala版的WordCount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCount")// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(_.split(" "))// 将单词和1组合放入到元组中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}
4.maven打包
这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖
5.运行jar包
首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的
创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下
提交命令
[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1
--master spark://node141:7077 spark的master节点
--class cn.doitedu.day01.WordCount 运行的类名
--executor-memory 1g 占用的内存
--total-executor-cores 4 占用的核数
./spark-in-action-1.0.jar 运行的jar包地址
hdfs://node141:9000/words.txt 代码中args[0]对应的参数
hdfs://node141:9000/out-1 代码中args[1]对应的参数
6.查询hdfs的输出结果
二、本地编写WordCount(Spark_scala)读取本地文件
1.项目结构
2.编写scala版的WordCount
package cn.doitedu.day01import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 使用本地模式运行Spark程序(开发调试的时候使用)*/
object LocalWordCount {def main(args: Array[String]): Unit = {// 指定当前用户为rootSystem.setProperty("HADOOP_USER_NAME", "root")val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(line => {val words = line.split(" ")println(words) // debugwords})// 将单词和1组合放入到元祖中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}
3.编辑Edit Configurations配置文件
4.直接本地运行LocalWordCount.scala文件
查看运行结果
三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地
1.项目结构
2.编写java版的WordCount
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String lines) throws Exception {String[] words = lines.split("\\s+");return Arrays.asList(words).iterator();}});JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);}});JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 将原来的kv顺序颠倒 (flink,3) ----> (3,flink)JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {return tp.swap(); // 交换}});JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {return tp.swap();}});result.saveAsTextFile(args[1]);jsc.stop();}
}
3.编辑Edit Configurations配置文件
4.运行后查看结果
四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地
1.项目结构
2.编写java-lambda版的WordCount
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class JavaLambdaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
// JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);// 排序JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);// 调回来JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);result.saveAsTextFile(args[1]);jsc.stop();}
}
3.编辑Edit Configurations配置文件
4.运行后查看结果
好啦~~