文章目录
- 问题引出和分析
- 需求
- 代码实现
- 运行结果
- 分区总结
问题引出和分析
要求将统计结果按照条件输出到不同的文件中(分区),比如将统计结果按照手机归属地不同省份输出到不同的文件中。
默认的分区:
默认的分区是根据key的hashCode对Reduce Tasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
我们可以自定义Partition:
需求
将上一次的统计结果按照手机归属地不同省份输出到不同文件中(分区)
数据集:
我们期望手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
代码实现
我们在前面的基础上,增加一个分区类(代码执行顺序:从map方法出来,经过分区,去了reduce方法):
package com.atxiaoyu.mapreduce.Partition;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {//text是手机号String phone=text.toString();String prePhone=phone.substring(0,3); //取出手机号前三位int partition;if("136".equals(prePhone)){partition=0;}else if ("137".equals(prePhone)){partition=1;}else if ("138".equals(prePhone)){partition=2;}else if ("139".equals(prePhone)){partition=2;}else {partition=4;}return partition;}
}
然后在driver类中再添加两行建立连接:
job.setPartitionerClass(ProvincePartitioner.class);job.setNumReduceTasks(5);
运行结果
有5个输出文件:
每一个文件的内容与我们分析的一致,按手机号前三位分到了不同的文件中:
分区总结
我们自定义的分区数和在driver类中的job.setNumReduceTasks()
设置的分区数之间有一定的关系。
(1)如果ReducerTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-00xx;
(2)如果1<ReducerTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如果ReducerTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReducerTask,最终也就只会产生一个结果文件part-r-0000;
(4)分区号必须从0开始,逐一累加。
比如说我们自定义分区数为5,则
(1)job.setNumReduceTasks(1):
程序会正常运行,只不过只会产生一个输出文件。
(2)job.setNumReduceTasks(2);
会报错。
(3)job.setNumReduceTasks(6);
大于5,程序会正常运行,会产生空文件。