此篇博客主要记录一下商品推荐系统的主要实现过程。
一、获取用户对商品的偏好值
代码实现
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep1 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep1(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");// 将行为转化为偏好值double like = 0.0;if (split.length >= 3) {String str = split[2].toLowerCase();if (str.equals("paysuccess")) { // 支付成功like = 0.3;} else if (str.equals("addreview")) { //评论like = 0.3;} else if (str.equals("createorder")) { // 创建订单like = 0.2;} else if (str.equals("addcar")){ // 加入购物车like = 0.15;} else { // 浏览like = 0.05;}}// key=用户:商品 value=[偏好,偏好]Text outkey = new Text(split[0] + ":" + split[1]);DoubleWritable outvalue = new DoubleWritable(like);context.write(outkey, outvalue);}}public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {// 避免精度丢失选用bigDecimalBigDecimal sum = new BigDecimal(0.0);for (DoubleWritable value : values) {BigDecimal v = new BigDecimal(value.get());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);// 默认reducejob.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));// 6、运行job.waitForCompletion(true);return 0;}
}
二、将偏好数据整理成偏好矩阵
代码实现
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep2 extends Configured implements Tool {public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");// 将商品id作为输出keyText outkey = new Text(split[1]);// 将用户id与偏好值组合形成valueText outvalue = new Text(split[0] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep2(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep2.GS2Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 默认reducejob.setReducerClass(GoodsStep2.GS2Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));// 6、运行job.waitForCompletion(true);return 0;}
}
三、统计商品共现次数
代码实现
笛卡尔积
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.ArrayList;public class GoodsStep3 extends Configured implements Tool {public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");Text outkey = new Text(split[0]);Text outvalue = new Text(split[1]);context.write(outkey, outvalue);}}public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> list = new ArrayList<>();for (Text value:values) {list.add(value.toString());}for (String g1 : list) {for (String g2:list) {if (!g1.equals(g2)) {Text outkey = new Text(g1);Text outvalue = new Text(g2);context.write(outkey, outvalue);}}}}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep3(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step3");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS3Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 默认reducejob.setReducerClass(GS3Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));// 6、运行job.waitForCompletion(true);return 0;}
}
共现次数
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep4 extends Configured implements Tool {public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String outkey = split[0] + ":" + split[1];context.write(new Text(outkey), new IntWritable(1));}}public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable i : values) {sum += 1;}context.write(key, new IntWritable(sum));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep4(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step4");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS4Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 默认reducejob.setReducerClass(GS4Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));// 6、运行job.waitForCompletion(true);return 0;}
}
四、获取商品共现矩阵
代码实现
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep5 extends Configured implements Tool {public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String goods = key.toString();String[] split = goods.split(":");// key为第一列商品,value为第二列商品:次数Text outkey = new Text(split[0]);Text outvalue = new Text(split[1] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep5(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step5");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS5Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 默认reducejob.setReducerClass(GS5Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));// 6、运行job.waitForCompletion(true);return 0;}
}
五、获取推荐值
代码实现
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;public class GoodsStep6 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep6(), args);} catch (Exception e) {e.printStackTrace();}}/*** 第二步* 375 11:0.25,5:0.25,4:0.55* 商品 用户:偏好值* 第五步* 375 203:1,961:1,91:1,90:2,89:1* 商品 商品:共现次数* 输出数据:* 用户:商品 推荐值*/public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");for (String str : split) {// key=商品 value={用户:偏好值,商品:共现次数}context.write(key, new Text(str));}}}public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 偏好集合[用户:偏好]HashMap<String, String> like = new HashMap<>();// 共现集合[商品:共现次数]HashMap<String, String> same = new HashMap<>();for (Text value : values) {String data = value.toString();String[] split = data.split(":");if (split[1].contains(".")) {like.put(split[0], split[1]);} else {same.put(split[0], split[1]);}}for (Map.Entry<String, String> l : like.entrySet()) {for (Map.Entry<String, String> s : same.entrySet()) {//用户偏好值BigDecimal lvalue = new BigDecimal(l.getValue());//商品共现BigDecimal svalue = new BigDecimal(s.getValue());//用户:共现商品Text outkey = new Text(l.getKey() + ":" + s.getKey());double outvalue = lvalue.multiply(svalue).doubleValue();context.write(outkey, new DoubleWritable(outvalue));}}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step6");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep6.GS6Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 默认reducejob.setReducerClass(GoodsStep6.GS6Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),new Path("src/main/resources/step5"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));// 6、运行job.waitForCompletion(true);return 0;}
}
六、推荐值累加及数据清洗
代码实现
推荐值累加
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep7 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep7(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {BigDecimal sum = new BigDecimal(0.0);for (Text value : values) {BigDecimal v = new BigDecimal(value.toString());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step7");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep7.GS7Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 默认reducejob.setReducerClass(GoodsStep7.GS7Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));// 6、运行job.waitForCompletion(true);return 0;}
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep8 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");boolean paySuccess = split[2].toLowerCase().equals("paysuccess");if (paySuccess) {context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));}}}public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int num = 0;for (IntWritable i : values) {num ++;}if (num == 1) context.write(key, new IntWritable(num));}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8.GS8Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 默认reducejob.setReducerClass(GoodsStep8.GS8Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));// 6、运行job.waitForCompletion(true);return 0;}
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.Iterator;public class GoodsStep8_2 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8_2(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// int num = 0;
// String outvalue = "";
// for (Text value : values) {
// outvalue = value.toString();
// num ++;
// }
// if (num == 1) context.write(key, new Text(outvalue));Iterator<Text> iter = values.iterator();Text outvalue = iter.next();if (iter.hasNext()) {}else {context.write(key, outvalue);}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 默认reducejob.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),new Path("src/main/resources/step8"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));// 6、运行job.waitForCompletion(true);return 0;}
}
七、写入数据库
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;public class GoodsStep9 {public static void main(String[] args) {try {toDB();} catch (Exception e) {e.printStackTrace();}}public static void toDB() throws Exception {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Class.forName("com.mysql.cj.jdbc.Driver");Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");Statement statement = null;FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));BufferedReader br = new BufferedReader(new InputStreamReader(open));String line = "";while ((line = br.readLine()) != null) {// 11:512 0.25// 用户:商品 推荐值String[] str = line.split("\t");String[] uid_gid = str[0].split(":");statement = conn.createStatement();String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"+ uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";statement.addBatch(sql);statement.addBatch(sql2);statement.executeBatch();}}
}
八、工作流
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class GoodsMain extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsMain(), args);GoodsStep9.toDB();} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();
// String inpath = new String("inpath");
// Path path = new Path(conf.get(inpath));Path path = new Path("data/userLog.log");Path outpath = new Path("src/main/resources/step1");Path outpath2 = new Path("src/main/resources/step2");Path outpath3 = new Path("src/main/resources/step3");Path outpath4 = new Path("src/main/resources/step4");Path outpath5 = new Path("src/main/resources/step5");Path outpath6 = new Path("src/main/resources/step6");Path outpath7 = new Path("src/main/resources/step7");Path outpath8 = new Path("src/main/resources/step8");Path outpath9 = new Path("src/main/resources/step8_2");//获取所有mr步骤job配置//step1Job job = Job.getInstance(conf);job.setJarByClass(this.getClass());job.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);job.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,path);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,outpath);//step8Job job8 = Job.getInstance(conf);job8.setJarByClass(this.getClass());job8.setMapperClass(GoodsStep8.GS8Mapper.class);job8.setMapOutputKeyClass(Text.class);job8.setMapOutputValueClass(IntWritable.class);job8.setReducerClass(GoodsStep8.GS8Reducer.class);job8.setOutputKeyClass(Text.class);job8.setOutputValueClass(IntWritable.class);job8.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job8,path);job8.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job8,outpath8);//step2Job job2 = Job.getInstance(conf);job2.setJarByClass(this.getClass());job2.setMapperClass(GoodsStep2.GS2Mapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(Text.class);job2.setReducerClass(GoodsStep2.GS2Reducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job2,outpath);job2.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job2,outpath2);//step3bookIdJob job3 = Job.getInstance(conf);job3.setJarByClass(this.getClass());job3.setMapperClass(GoodsStep3.GS3Mapper.class);job3.setMapOutputKeyClass(Text.class);job3.setMapOutputValueClass(Text.class);job3.setReducerClass(GoodsStep3.GS3Reducer.class);job3.setOutputKeyClass(Text.class);job3.setOutputValueClass(Text.class);job3.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job3,outpath);job3.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job3,outpath3);//step4Job job4 = Job.getInstance(conf);job4.setJarByClass(this.getClass());job4.setMapperClass(GoodsStep4.GS4Mapper.class);job4.setMapOutputKeyClass(Text.class);job4.setMapOutputValueClass(IntWritable.class);job4.setReducerClass(GoodsStep4.GS4Reducer.class);job4.setOutputKeyClass(Text.class);job4.setOutputValueClass(IntWritable.class);job4.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job4,outpath3);job4.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job4,outpath4);//step5Job job5 = Job.getInstance(conf);job5.setJarByClass(this.getClass());job5.setMapperClass(GoodsStep5.GS5Mapper.class);job5.setMapOutputKeyClass(Text.class);job5.setMapOutputValueClass(Text.class);job5.setReducerClass(GoodsStep5.GS5Reducer.class);job5.setOutputKeyClass(Text.class);job5.setOutputValueClass(Text.class);job5.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job5,outpath4);job5.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job5,outpath5);//step6Job job6 = Job.getInstance(conf);job6.setJarByClass(this.getClass());job6.setMapperClass(GoodsStep6.GS6Mapper.class);job6.setMapOutputKeyClass(Text.class);job6.setMapOutputValueClass(Text.class);job6.setReducerClass(GoodsStep6.GS6Reducer.class);job6.setOutputKeyClass(Text.class);job6.setOutputValueClass(DoubleWritable.class);job6.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);job6.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job6,outpath6);//step7Job job7 = Job.getInstance(conf);job7.setJarByClass(this.getClass());job7.setMapperClass(GoodsStep7.GS7Mapper.class);job7.setMapOutputKeyClass(Text.class);job7.setMapOutputValueClass(Text.class);job7.setReducerClass(GoodsStep7.GS7Reducer.class);job7.setOutputKeyClass(Text.class);job7.setOutputValueClass(DoubleWritable.class);job7.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job7,outpath6);job7.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job7,outpath7);//step9Job job9 = Job.getInstance(conf);job9.setJarByClass(this.getClass());job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job9.setMapOutputKeyClass(Text.class);job9.setMapOutputValueClass(Text.class);job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job9.setOutputKeyClass(Text.class);job9.setOutputValueClass(Text.class);job9.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);job9.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job9,outpath9);//创建可控作业ControlledJob cj = new ControlledJob(conf);cj.setJob(job);ControlledJob cj2 = new ControlledJob(conf);cj2.setJob(job2);ControlledJob cj3 = new ControlledJob(conf);cj3.setJob(job3);ControlledJob cj4 = new ControlledJob(conf);cj4.setJob(job4);ControlledJob cj5 = new ControlledJob(conf);cj5.setJob(job5);ControlledJob cj6 = new ControlledJob(conf);cj6.setJob(job6);ControlledJob cj7 = new ControlledJob(conf);cj7.setJob(job7);ControlledJob cj8 = new ControlledJob(conf);cj8.setJob(job8);ControlledJob cj9 = new ControlledJob(conf);cj9.setJob(job9);//添加作业间的依赖关系cj2.addDependingJob(cj);cj3.addDependingJob(cj);cj4.addDependingJob(cj3);cj5.addDependingJob(cj4);cj6.addDependingJob(cj2);cj6.addDependingJob(cj5);cj7.addDependingJob(cj6);cj9.addDependingJob(cj7);cj9.addDependingJob(cj8);//创建工作流,创建控制器JobControl jobs = new JobControl("work_flow");jobs.addJob(cj);jobs.addJob(cj2);jobs.addJob(cj3);jobs.addJob(cj4);jobs.addJob(cj5);jobs.addJob(cj6);jobs.addJob(cj7);jobs.addJob(cj8);jobs.addJob(cj9);//启动控制器-》一键完成所有mr计算任务Thread t=new Thread(jobs);t.start();while(true){if(jobs.allFinished()){System.out.println("作业全部完成");System.out.println(jobs.getSuccessfulJobList());jobs.stop();return 0;}else if(jobs.getFailedJobList().size()>0) {System.out.println("任务失败");System.out.println(jobs.getFailedJobList());jobs.stop();return -1;}}}
}