1、如何跳过对某行数据的处理
第一行数据是字段名不需要处理,我们知道第一行偏移量是0(行记录的时候是从数组首地址开始,到了行标识符进行一次计数,这个计数就是行偏移量,从0开始),我们根据偏移量值进行判断,然后用中断方法把第一行数据跳过。
// 根据偏移量把第一行筛选出来:if (0== key.get()){return; // 中断方法:即不对符合条件的数据进行处理,也就是跳过这些数据不做处理}
2、接下来是对需求数据的Map处理
String[] line = value.toString().replaceAll("\"","").split(",");//对可能数组越界的字符串数据过滤:用判断把长度不符合的数组剔除if (11== line.length){//对符合要求的数据开始写出:格式---K:省市年月日(拼接),V:温度StringBuilder outKey = new StringBuilder();outKey.append(line[1]).append(line[2]).append(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDateTime.parse(line[9], DateTimeFormatter.ofPattern("d/M/yyyy HH:mm:ss"))));context.write(new Text(outKey.toString()),new IntWritable(Integer.parseInt(line[5])));}
3、接下来是reduce处理逻辑
根据业务需求写出数据
package com.yjxxt.Weather;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Reducer中四个泛型解释* KEYIN: MapTask写出数据的key:地区年月日* VALUEIN:MapTask写出数据的value 温度(N条,因为记录了每天不同时刻的温度)* KEYOUT: Reducetask写出数据的key 地区年月日* VALUEOUT: Reducetask写出数据的value 温度最值*/
public class WeatherReducer extends Reducer <Text, IntWritable, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {//这里的VALUEOUT是要最高温和最低温,所以reduce要把拉取过来的温度进行比较(从map-->reduce:数据是1:N模型)int max=-100,min=100;//用比较函数找到最值for (IntWritable value:values) {max = Math.max(max, value.get());min = Math.min(max, value.get());}//将最终结果写出去:VALUEOUT也写成文本形式context.write(key,new Text("最高温度["+max+"]最低温度["+min+"]"));}
}