这里的知识点在于如何合并两张表,事实上这种业务场景我们很熟悉了,这就是我们在学习 MySQL 的时候接触到的内连接,左连接,而现在我们要学习 mapreduce 中的做法
这里我们可以选择在 map 阶段和reduce阶段去做
数据:
链接: https://pan.baidu.com/s/1PH1J8SIEJA5UX0muvN-vuQ?pwd=idwx 提取码: idwx
顾客信息
订单信息
编写实体类 CustomerOrder
这里我们除了顾客与订单的属性外,额外定义了一个状态,用来区分当前类是顾客信息还是订单信息
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class CustomerOrders implements WritableComparable<CustomerOrders> {private Integer customerId;private String customerName;private Integer orderId;private String orderStatus;// 标签private String flag;@Overridepublic String toString() {return "CustomerOrders{" +"customerId=" + customerId +", customerName='" + customerName + '\'' +", orderId=" + orderId +", orderStatus='" + orderStatus + '\'' +", flag='" + flag + '\'' +'}';}public CustomerOrders() {}public CustomerOrders(Integer customerId, String customerName, Integer orderId, String orderStatus, String flag) {this.customerId = customerId;this.customerName = customerName;this.orderId = orderId;this.orderStatus = orderStatus;this.flag = flag;}public Integer getCustomerId() {return customerId;}public void setCustomerId(Integer customerId) {this.customerId = customerId;}public String getCustomerName() {return customerName;}public void setCustomerName(String customerName) {this.customerName = customerName;}public Integer getOrderId() {return orderId;}public void setOrderId(Integer orderId) {this.orderId = orderId;}public String getOrderStatus() {return orderStatus;}public void setOrderStatus(String orderStatus) {this.orderStatus = orderStatus;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic int compareTo(CustomerOrders o) {return 0;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(customerId);dataOutput.writeUTF(customerName);dataOutput.writeInt(orderId);dataOutput.writeUTF(orderStatus);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.customerId = dataInput.readInt();this.customerName = dataInput.readUTF();this.orderId = dataInput.readInt();this.orderStatus = dataInput.readUTF();this.flag = dataInput.readUTF();}
}
1. 在 reduce 阶段合并
传入两个文件
(1)map 阶段
setup方法在 map 方法前运行,找到当前数据所在文件的名称,用来区分当前这条数据是顾客信息还是订单信息
map 方法将传进来的数据包装成对象,最后已键值对的形式传给下一阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text,Text,CustomerOrders> {String fileName = "";@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit) context.getInputSplit();System.out.println("setup method: "+ fileSplit.getPath().toString());fileName = fileSplit.getPath().getName();System.out.println("fileName : "+fileName);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// System.out.println("map stage:");
// System.out.println("key : "+key+"\tvalue : "+value);String[]field = value.toString().split(",");CustomerOrders customerOrders = new CustomerOrders();if (fileName.startsWith("orders")){ //订单内容customerOrders.setCustomerId(Integer.parseInt(field[2]));customerOrders.setCustomerName("");customerOrders.setOrderId(Integer.parseInt(field[0]));customerOrders.setFlag("1");customerOrders.setOrderStatus(field[3]);}else { //用户信息customerOrders.setCustomerId(Integer.parseInt(field[0]));customerOrders.setCustomerName(field[1]);customerOrders.setOrderId(0);customerOrders.setFlag("0");customerOrders.setOrderStatus("");}Text text = new Text(customerOrders.getCustomerId().toString());context.write(text, customerOrders);}
}
(2)reduce 阶段
这里的 reduce 方法则是,先遍历找到唯一的一个顾客信息,然后将顾客信息填充到订单信息中,再合并为一个 Text 输出
当然也可以有不同的写法,比如将每一条订单信息处理完后就写入 context 之后输出
还有就是这里的对象的赋值写的不太好,但是不能直接用=去赋值,可以使用 BeanUtils 的 copyproperties()方法去赋值
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class ReduceJoinReducer extends Reducer<Text,CustomerOrders,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException {System.out.println("reduce stage: key:"+key+" values:"+values);String customerName = "";String text = "";List<CustomerOrders> list = new ArrayList<>();for (CustomerOrders co : values){if (co.getFlag().equals("0")){customerName = co.getCustomerName();}CustomerOrders customerOrders = new CustomerOrders();customerOrders.setCustomerName(co.getCustomerName());customerOrders.setFlag(co.getFlag());customerOrders.setCustomerId(co.getCustomerId());customerOrders.setOrderStatus(co.getOrderStatus());customerOrders.setOrderId(co.getOrderId());list.add(customerOrders);}System.out.println(list);System.out.println();for (CustomerOrders co : list){if (co.getFlag().equals("1")){CustomerOrders customerOrders = new CustomerOrders();customerOrders = co;customerOrders.setCustomerName(customerName);customerOrders.setFlag("2");System.out.println(customerOrders.toString());text += customerOrders.toString()+"\t";}}System.out.println(text);System.out.println("customerName:"+customerName);context.write(key, new Text(text));}
}
(3)driver 启动
基本操作,设置好各个参数
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CustomerOrders.class);job.setReducerClass(ReduceJoinReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job ,new Path[]{new Path(args[0]),new Path(args[1])});Path path = new Path(args[2]);FileSystem fs = FileSystem.get(path.toUri(),conf);if (fs.exists(path)){fs.delete(path, true);}FileOutputFormat.setOutputPath(job,path);fs.close();job.waitForCompletion(true);
}}
2. 在 map 阶段合并
传入一个文件,另一个文件以缓存文件cachefile的形式传入,这种方法要注意,cachefile的大小不能太大,可以形象的打个比方,你去朋友家做客,晚上朋友家没有被子,你捎带个被子过去,这是可以的,但是如果说你朋友缺个房子,你不能捎带个房子过去对吧。
(1)map 阶段
setup方法使用 io 流的方法将顾客信息读取进来,使用 List<CustomerOrders>去存储
map 方法对于每个订单信息都遍历一次列表,通过顾客编号这一关联属性去找到对应的顾客信息并填充进去
import md.kb23.demo03.CustomerOrders;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.codehaus.jackson.map.util.BeanUtil;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;public class MapJoinMapper extends Mapper<LongWritable, Text, CustomerOrders, NullWritable> {private List<CustomerOrders> list = new ArrayList<CustomerOrders>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {URI[] cashFiles = context.getCacheFiles();for (URI uri : cashFiles){System.out.println(uri.getPath());String currentFileName = new Path(uri).getName();if (currentFileName.startsWith("customers")){String path = uri.getPath();BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));String line;while ((line = br.readLine())!=null){System.out.println(line);String[] field = line.split(",");CustomerOrders customerOrders = new CustomerOrders(Integer.parseInt(field[0]),field[1]+" "+field[2],0,"","");list.add(customerOrders);}}}}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[]orderField = value.toString().split(",");int customerId = Integer.parseInt(orderField[2]);CustomerOrders customerOrders = null;for (CustomerOrders customer : list){if (customer.getCustomerId()==customerId){customerOrders=customer;}}CustomerOrders order = new CustomerOrders();if (customerOrders!=null){order.setCustomerName(customerOrders.getCustomerName());}else {order.setCustomerName("");}order.setCustomerId(customerId);order.setOrderStatus(orderField[3]);order.setFlag("1");order.setOrderId(Integer.parseInt(orderField[0]));context.write(order, null);}
}
(2)driver 启动
这里我们在 map 阶段已经将事情都做完了,就不用再额外写一个 reduce 了,另外就是注意一下 cachefile 的添加方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {long start = System.currentTimeMillis();Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMapper.class);job.setMapOutputKeyClass(CustomerOrders.class);job.setMapOutputValueClass(NullWritable.class);Path inpath = new Path("in/demo3/orders.csv");FileInputFormat.setInputPaths(job,inpath);Path outpath = new Path("out/out5");FileSystem fs = FileSystem.get(outpath.toUri(),conf);if(fs.exists(outpath)){fs.delete(outpath,true);}FileOutputFormat.setOutputPath(job,outpath);//设置 reduce 阶段任务数job.setNumReduceTasks(0);Path cashPath = new Path("in/demo3/customers.csv");job.addCacheFile(cashPath.toUri());job.waitForCompletion(true);long end = System.currentTimeMillis();System.out.println("程序运行时间:"+(end-start));}
}