自定义的wordcount
数据处理过程
- 加载jar包
查看后面的pom文件
以上为需要的jar包路径,将其导入至idea中
- Map
package com.hadoop;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
// K1 V1 K2 V2 的数据类型
public class WMap extends Mapper<LongWritable, Text, Text, IntWritable> {//Context标识上下文,比如上一节的输入以及下一节的输出,一个JOB可能存在多个MAP和多个REDUCE@Overridepublic void map(LongWritable key1, Text value1, Context context)throws IOException, InterruptedException {//获取数据,v1是输入String data = value1.toString();//逻辑:分词String[] words = data.split(" ");//v2是一个集合的形式//k2和k1的数据类型是相同的,表示一个具体的分类for (String w : words) {//这是对下文的编写,即输出// K2 V2context.write(new Text(w), new IntWritable(1));}}
}
- Reduce
package com.hadoop;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
// K3 V3 K4 V4
public class WReduce extends Reducer<Text, IntWritable, Text,IntWritable>{// 集合V3@Overrideprotected void reduce(Text k3, Iterable<IntWritable> v3, Context context)throws IOException, InterruptedException {//求和int total=0;for (IntWritable v:v3){total+=v.get();}//输入和输出必须是hadoop支持的类型context.write(k3,new IntWritable(total));}
}
- Main
package com.hadoop;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.streaming.io.InputWriter;public class Main {public static void main(String[] args) throws Exception {//1.创建job。Job job =Job.getInstance(new Configuration());//2.任务入口job.setJarByClass(Main.class);//3.指定任务的Map和输出类型job.setMapperClass(WMap.class);job.setMapOutputKeyClass(Text.class);//k2job.setMapOutputValueClass(IntWritable.class);//v2//4.指定Reduce和输出类型job.setReducerClass(WReduce.class);job.setOutputKeyClass(Text.class);//k4job.setOutputValueClass(IntWritable.class);//v4//任务输入和输出FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//任务执行//参数true表示打印相关的日志job.waitForCompletion(true);}
}
- 打包部署执行
采用Maven进行管理
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hadoop</groupId><artifactId>Mapreduce_wordcount</artifactId><version>1.0-SNAPSHOT</version><name>Mapreduce_wordcount</name><description>wunaiieq</description><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--版本控制--><hadoop.version>2.7.3</hadoop.version></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-api</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-streaming</artifactId><version>${hadoop.version}</version></dependency></dependencies><!--构建配置--><build><plugins><plugin><!--声明--><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><!--具体配置--><configuration><archive><manifest><!--jar包的执行入口--><mainClass>com.hadoop.Main</mainClass></manifest></archive><descriptorRefs><!--描述符,此处为预定义的,表示创建一个包含项目所有依赖的可执行 JAR 文件;允许自定义生成jar文件内容--><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><!--执行配置--><executions><execution><!--执行配置ID,可修改--><id>make-assembly</id><!--执行的生命周期--><phase>package</phase><goals><!--执行的目标,single表示创建一个分发包--><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
- 运行
hadoop jar Mapreduce_wordcount-1.0-SNAPSHOT-jar-with-dependencies.jar /input/data.txt /output/wordcount/
- 效果
结果查看
hdfs dfs -cat /output/wordcount/part-r-00000