电商推荐系统

在这里插入图片描述

此篇博客主要记录一下商品推荐系统的主要实现过程。

一、获取用户对商品的偏好值

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep1 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep1(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");// 将行为转化为偏好值double like = 0.0;if (split.length >= 3) {String str = split[2].toLowerCase();if (str.equals("paysuccess")) { // 支付成功like = 0.3;} else if (str.equals("addreview")) { //评论like = 0.3;} else if (str.equals("createorder")) { // 创建订单like = 0.2;} else if (str.equals("addcar")){ // 加入购物车like = 0.15;} else { // 浏览like = 0.05;}}// key=用户:商品 value=[偏好,偏好]Text outkey = new Text(split[0] + ":" + split[1]);DoubleWritable outvalue = new DoubleWritable(like);context.write(outkey, outvalue);}}public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {// 避免精度丢失选用bigDecimalBigDecimal sum = new BigDecimal(0.0);for (DoubleWritable value : values) {BigDecimal v = new BigDecimal(value.get());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);//  默认reducejob.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));// 6、运行job.waitForCompletion(true);return 0;}
}

二、将偏好数据整理成偏好矩阵

在这里插入图片描述
在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep2 extends Configured implements Tool {public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");// 将商品id作为输出keyText outkey = new Text(split[1]);// 将用户id与偏好值组合形成valueText outvalue = new Text(split[0] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep2(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep2.GS2Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep2.GS2Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));// 6、运行job.waitForCompletion(true);return 0;}
}

三、统计商品共现次数

在这里插入图片描述

代码实现

笛卡尔积
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.ArrayList;public class GoodsStep3 extends Configured implements Tool {public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");Text outkey = new Text(split[0]);Text outvalue = new Text(split[1]);context.write(outkey, outvalue);}}public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> list = new ArrayList<>();for (Text value:values) {list.add(value.toString());}for (String g1 : list) {for (String g2:list) {if (!g1.equals(g2)) {Text outkey = new Text(g1);Text outvalue = new Text(g2);context.write(outkey, outvalue);}}}}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep3(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step3");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS3Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GS3Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));// 6、运行job.waitForCompletion(true);return 0;}
}
共现次数
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep4 extends Configured implements Tool {public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String outkey = split[0] + ":" + split[1];context.write(new Text(outkey), new IntWritable(1));}}public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable i : values) {sum += 1;}context.write(key, new IntWritable(sum));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep4(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step4");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS4Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//  默认reducejob.setReducerClass(GS4Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));// 6、运行job.waitForCompletion(true);return 0;}
}

四、获取商品共现矩阵

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep5 extends Configured implements Tool {public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String goods = key.toString();String[] split = goods.split(":");// key为第一列商品,value为第二列商品:次数Text outkey = new Text(split[0]);Text outvalue = new Text(split[1] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep5(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step5");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS5Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GS5Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));// 6、运行job.waitForCompletion(true);return 0;}
}

五、获取推荐值

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;public class GoodsStep6 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep6(), args);} catch (Exception e) {e.printStackTrace();}}/*** 第二步* 375	11:0.25,5:0.25,4:0.55* 商品 用户:偏好值* 第五步* 375	203:1,961:1,91:1,90:2,89:1* 商品 商品:共现次数* 输出数据:* 用户:商品 推荐值*/public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");for (String str : split) {// key=商品 value={用户:偏好值,商品:共现次数}context.write(key, new Text(str));}}}public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 偏好集合[用户:偏好]HashMap<String, String> like = new HashMap<>();// 共现集合[商品:共现次数]HashMap<String, String> same = new HashMap<>();for (Text value : values) {String data = value.toString();String[] split = data.split(":");if (split[1].contains(".")) {like.put(split[0], split[1]);} else {same.put(split[0], split[1]);}}for (Map.Entry<String, String> l : like.entrySet()) {for (Map.Entry<String, String> s : same.entrySet()) {//用户偏好值BigDecimal lvalue = new BigDecimal(l.getValue());//商品共现BigDecimal svalue = new BigDecimal(s.getValue());//用户:共现商品Text outkey = new Text(l.getKey() + ":" + s.getKey());double outvalue = lvalue.multiply(svalue).doubleValue();context.write(outkey, new DoubleWritable(outvalue));}}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step6");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep6.GS6Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep6.GS6Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),new Path("src/main/resources/step5"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));// 6、运行job.waitForCompletion(true);return 0;}
}

六、推荐值累加及数据清洗

在这里插入图片描述

代码实现

推荐值累加
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep7 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep7(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {BigDecimal sum = new BigDecimal(0.0);for (Text value : values) {BigDecimal v = new BigDecimal(value.toString());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step7");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep7.GS7Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep7.GS7Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));// 6、运行job.waitForCompletion(true);return 0;}
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep8 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");boolean paySuccess = split[2].toLowerCase().equals("paysuccess");if (paySuccess) {context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));}}}public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int num = 0;for (IntWritable i : values) {num ++;}if (num == 1) context.write(key, new IntWritable(num));}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8.GS8Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//  默认reducejob.setReducerClass(GoodsStep8.GS8Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));// 6、运行job.waitForCompletion(true);return 0;}
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.Iterator;public class GoodsStep8_2 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8_2(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            int num = 0;
//            String outvalue = "";
//            for (Text value : values) {
//                outvalue = value.toString();
//                num ++;
//            }
//            if (num == 1) context.write(key, new Text(outvalue));Iterator<Text> iter = values.iterator();Text outvalue = iter.next();if (iter.hasNext()) {}else {context.write(key, outvalue);}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),new Path("src/main/resources/step8"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));// 6、运行job.waitForCompletion(true);return 0;}
}

七、写入数据库

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;public class GoodsStep9 {public static void main(String[] args) {try {toDB();} catch (Exception e) {e.printStackTrace();}}public static void toDB() throws Exception {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Class.forName("com.mysql.cj.jdbc.Driver");Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");Statement statement = null;FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));BufferedReader br = new BufferedReader(new InputStreamReader(open));String line = "";while ((line = br.readLine()) != null) {// 11:512	0.25// 用户:商品 推荐值String[] str = line.split("\t");String[] uid_gid = str[0].split(":");statement = conn.createStatement();String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"+ uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";statement.addBatch(sql);statement.addBatch(sql2);statement.executeBatch();}}
}

八、工作流

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class GoodsMain extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsMain(), args);GoodsStep9.toDB();} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();
//        String inpath = new String("inpath");
//        Path path = new Path(conf.get(inpath));Path path = new Path("data/userLog.log");Path outpath =  new Path("src/main/resources/step1");Path outpath2 = new Path("src/main/resources/step2");Path outpath3 = new Path("src/main/resources/step3");Path outpath4 = new Path("src/main/resources/step4");Path outpath5 = new Path("src/main/resources/step5");Path outpath6 = new Path("src/main/resources/step6");Path outpath7 = new Path("src/main/resources/step7");Path outpath8 = new Path("src/main/resources/step8");Path outpath9 = new Path("src/main/resources/step8_2");//获取所有mr步骤job配置//step1Job job = Job.getInstance(conf);job.setJarByClass(this.getClass());job.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);job.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,path);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,outpath);//step8Job job8 = Job.getInstance(conf);job8.setJarByClass(this.getClass());job8.setMapperClass(GoodsStep8.GS8Mapper.class);job8.setMapOutputKeyClass(Text.class);job8.setMapOutputValueClass(IntWritable.class);job8.setReducerClass(GoodsStep8.GS8Reducer.class);job8.setOutputKeyClass(Text.class);job8.setOutputValueClass(IntWritable.class);job8.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job8,path);job8.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job8,outpath8);//step2Job job2 = Job.getInstance(conf);job2.setJarByClass(this.getClass());job2.setMapperClass(GoodsStep2.GS2Mapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(Text.class);job2.setReducerClass(GoodsStep2.GS2Reducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job2,outpath);job2.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job2,outpath2);//step3bookIdJob job3 = Job.getInstance(conf);job3.setJarByClass(this.getClass());job3.setMapperClass(GoodsStep3.GS3Mapper.class);job3.setMapOutputKeyClass(Text.class);job3.setMapOutputValueClass(Text.class);job3.setReducerClass(GoodsStep3.GS3Reducer.class);job3.setOutputKeyClass(Text.class);job3.setOutputValueClass(Text.class);job3.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job3,outpath);job3.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job3,outpath3);//step4Job job4 = Job.getInstance(conf);job4.setJarByClass(this.getClass());job4.setMapperClass(GoodsStep4.GS4Mapper.class);job4.setMapOutputKeyClass(Text.class);job4.setMapOutputValueClass(IntWritable.class);job4.setReducerClass(GoodsStep4.GS4Reducer.class);job4.setOutputKeyClass(Text.class);job4.setOutputValueClass(IntWritable.class);job4.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job4,outpath3);job4.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job4,outpath4);//step5Job job5 = Job.getInstance(conf);job5.setJarByClass(this.getClass());job5.setMapperClass(GoodsStep5.GS5Mapper.class);job5.setMapOutputKeyClass(Text.class);job5.setMapOutputValueClass(Text.class);job5.setReducerClass(GoodsStep5.GS5Reducer.class);job5.setOutputKeyClass(Text.class);job5.setOutputValueClass(Text.class);job5.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job5,outpath4);job5.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job5,outpath5);//step6Job job6 = Job.getInstance(conf);job6.setJarByClass(this.getClass());job6.setMapperClass(GoodsStep6.GS6Mapper.class);job6.setMapOutputKeyClass(Text.class);job6.setMapOutputValueClass(Text.class);job6.setReducerClass(GoodsStep6.GS6Reducer.class);job6.setOutputKeyClass(Text.class);job6.setOutputValueClass(DoubleWritable.class);job6.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);job6.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job6,outpath6);//step7Job job7 = Job.getInstance(conf);job7.setJarByClass(this.getClass());job7.setMapperClass(GoodsStep7.GS7Mapper.class);job7.setMapOutputKeyClass(Text.class);job7.setMapOutputValueClass(Text.class);job7.setReducerClass(GoodsStep7.GS7Reducer.class);job7.setOutputKeyClass(Text.class);job7.setOutputValueClass(DoubleWritable.class);job7.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job7,outpath6);job7.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job7,outpath7);//step9Job job9 = Job.getInstance(conf);job9.setJarByClass(this.getClass());job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job9.setMapOutputKeyClass(Text.class);job9.setMapOutputValueClass(Text.class);job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job9.setOutputKeyClass(Text.class);job9.setOutputValueClass(Text.class);job9.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);job9.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job9,outpath9);//创建可控作业ControlledJob cj = new ControlledJob(conf);cj.setJob(job);ControlledJob cj2 = new ControlledJob(conf);cj2.setJob(job2);ControlledJob cj3 = new ControlledJob(conf);cj3.setJob(job3);ControlledJob cj4 = new ControlledJob(conf);cj4.setJob(job4);ControlledJob cj5 = new ControlledJob(conf);cj5.setJob(job5);ControlledJob cj6 = new ControlledJob(conf);cj6.setJob(job6);ControlledJob cj7 = new ControlledJob(conf);cj7.setJob(job7);ControlledJob cj8 = new ControlledJob(conf);cj8.setJob(job8);ControlledJob cj9 = new ControlledJob(conf);cj9.setJob(job9);//添加作业间的依赖关系cj2.addDependingJob(cj);cj3.addDependingJob(cj);cj4.addDependingJob(cj3);cj5.addDependingJob(cj4);cj6.addDependingJob(cj2);cj6.addDependingJob(cj5);cj7.addDependingJob(cj6);cj9.addDependingJob(cj7);cj9.addDependingJob(cj8);//创建工作流,创建控制器JobControl jobs = new JobControl("work_flow");jobs.addJob(cj);jobs.addJob(cj2);jobs.addJob(cj3);jobs.addJob(cj4);jobs.addJob(cj5);jobs.addJob(cj6);jobs.addJob(cj7);jobs.addJob(cj8);jobs.addJob(cj9);//启动控制器-》一键完成所有mr计算任务Thread t=new Thread(jobs);t.start();while(true){if(jobs.allFinished()){System.out.println("作业全部完成");System.out.println(jobs.getSuccessfulJobList());jobs.stop();return 0;}else if(jobs.getFailedJobList().size()>0) {System.out.println("任务失败");System.out.println(jobs.getFailedJobList());jobs.stop();return -1;}}}
}

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/252622.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++内存分区模型

C程序在执行时&#xff0c;将内存大方向划为4个区域。分别是代码区、全局区、栈区、堆区。 栈区&#xff1a;由编译器自动分配释放&#xff0c;存放函数的参数值、局部变量等。 堆区&#xff1a;由程序员分配和释放&#xff0c;若程序员不释放&#xff0c;程序结束时由操作系…

阅读笔记——《RapidFuzz: Accelerating fuzzing via Generative Adversarial Networks》

【参考文献】Ye A, Wang L, Zhao L, et al. Rapidfuzz: Accelerating fuzzing via generative adversarial networks[J]. Neurocomputing, 2021, 460: 195-204.【注】本文仅为作者个人学习笔记&#xff0c;如有冒犯&#xff0c;请联系作者删除。 目录 摘要 一、介绍 二、相关…

项目02《游戏-07-开发》Unity3D

基于 项目02《游戏-06-开发》Unity3D &#xff0c; 接下来做UI框架的逻辑系统&#xff0c;管理器和UI背包&#xff0c; 首先闯将UI框架的两个重要脚本 BasePanel.cs 和 UIManager.cs &#xff0c; 双击BasePanel.cs脚本修改代码&#xff1a; using UnityEngine; pu…

Redis核心技术与实战【学习笔记】 - 17.Redis 缓存异常:缓存雪崩、击穿、穿透

概述 Redis 的缓存异常问题&#xff0c;除了数据不一致问题外&#xff0c;还会面临其他三个问题&#xff0c;分别是缓存雪崩、缓存击穿、缓存穿透。这三个问题&#xff0c;一旦发生&#xff0c;会导致大量的请求积压到数据库。若并发量很大&#xff0c;就会导致数据库宕机或故…

【UE 材质】扇形材质

目录 效果 步骤 &#xff08;1&#xff09;控制扇形的弧宽度 &#xff08;2&#xff09;控制扇形的角度 &#xff08;3&#xff09;完整节点 效果 步骤 &#xff08;1&#xff09;控制扇形的弧宽度 创建一个材质&#xff0c;混合模式设置为“Additive”&#xff0c;着色…

用的到的linux-删除文件-Day3

前言&#xff1a; 上一节&#xff0c;我们讲到了怎么去移动文件&#xff0c;其中使用到两大类的脚本命令即cp和mv。各两种命令都可以完成移动&#xff0c;但是cp是复制粘贴的方式&#xff0c;可以选择原封不动的复制粘贴过来&#xff0c;即不修改文件及文件夹的创建时间等&…

2-2 动手学深度学习v2-损失函数-笔记

损失函数&#xff0c;用来衡量预测值和真实值之间的区别。是机器学习里面一个非常重要的概念。 三个常用的损失函数 L2 loss、L1 loss、Huber’s Robust loss 均方损失 L2 Loss l ( y , y ′ ) 1 2 ( y − y ′ ) 2 l(y,y^{\prime})\frac{1}{2}(y-y^{\prime})^{2} l(y,y′)21…

分享springboot框架的一个开源的本地开发部署教程(若依开源项目开发部署过程分享持续更新二开宝藏项目PostgresSQL数据库版)

1首先介绍下若依项目&#xff1a; 若依是一个基于Spring Boot和Spring Cloud技术栈开发的多租户权限管理系统。该开源项目提供了一套完整的权限管理解决方案&#xff0c;包括用户管理、角色管理、菜单管理、部门管理、岗位管理等功能。 若依项目采用前后端分离的架构&#xf…

1 初识JVM

JVM&#xff08;Java Virtual Machine&#xff09;&#xff0c;也就是 “Java虚拟机”。 对于第三点功能&#xff1a;即时编译 常见的JVM 默认安装在JDK中的虚拟机为HotSpot&#xff1a;可以用“java -version”进行查看

LLMs之miqu-1-70b:miqu-1-70b的简介、安装和使用方法、案例应用之详细攻略

LLMs之miqu-1-70b&#xff1a;miqu-1-70b的简介、安装和使用方法、案例应用之详细攻略 目录 miqu-1-70b的简介 miqu-1-70b的安装和使用方法 1、安装 2、使用方法 miqu-1-70b的案例应用 miqu-1-70b的简介 2024年1月28日&#xff0c;发布了miqu 70b&#xff0c;潜在系列中的…

《动手学深度学习(PyTorch版)》笔记7.4

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…

你了解引用和指针的区别吗?

前言&#xff1a; 在计算机编程中&#xff0c;引用和指针是两个重要的概念&#xff0c;它们用于处理内存中的数据。它们在很多编程语言中都有相应的支持&#xff0c;例如C和C。对于c语言来说&#xff0c;指针是最重要的概念之一&#xff0c;想要学好c语言就难以绕开对于指针的学…

演讲回顾:如何为大规模研发团队加速CI构建,实现高效流水线

近日&#xff0c;龙智联合Atlassian举办的DevSecOps研讨会年终专场”趋势展望与实战探讨&#xff1a;如何打好DevOps基础、赋能创新”在上海圆满落幕。龙智Atlassian技术与顾问咨询团队&#xff0c;以及清晖、JamaSoftware、CloudBees等生态伙伴的嘉宾发表了主题演讲&#xff0…

vue项目开发vscode配置

配置代码片段 步骤如下&#xff1a; 文件->首选项->配置用户代码片段新增全局代码片段起全局代码片段文件名“xxx.code-snippets” 这里以配置vue2初始代码片段为例&#xff0c;配置具体代码片段 {"name": "vue-sph","version": "…

蓝桥杯备战——13.PCF8591芯片的使用

目录 1.芯片简介2.读写时序3.控制字4.代码封装库5.原理图分析6.使用示例 1.芯片简介 截取自NXP的PCF8591芯片数据手册&#xff0c;我把重点关注部分划出来了&#xff0c;请务必自行阅读一遍数据手册&#xff01; 2.读写时序 ①器件地址&#xff1a; Bit0决定是读还是写操作&…

SQL Server数据库日志查看若已满需要清理的三种解决方案

首先查看获取实例中每个数据库日志文件大小及使用情况&#xff0c;根据数据库日志占用百分比来清理 DBCC SQLPERF(LOGSPACE) 第一种解决方案&#xff1a; 在数据库上点击右键 → 选择 属性 → 选择 文件&#xff0c;然后增加数据库日志文件的文件大小。 第二种解决方案 手动…

安全通信设置:使用 OpenSSL 为 Logstash 和 Filebeat 提供 SSL 证书

在为 Elasticsearch 采集数据时&#xff0c;我们经常使用到 Filebeat 及 Logstash。在我们之前的很多教程中&#xff0c;我们通常不为 Filebeat 和 Logstash 之前的通信做安全配置。 如何为 Filebeat 及 Logstash 直接建立安全的链接&#xff1f;这个在很多的情况下是非常有用的…

【CSS】css如何实现字体大小小于12px?

【CSS】css如何实现字体大小小于12px? 问题解决方案transform: scale(0.5)&#xff08;常用&#xff09;SVG 矢量图设置text 问题 文字需要显示为12px&#xff0c;但是小于12px的&#xff0c;浏览器是显示不来的 解决方案 transform: scale(0.5)&#xff08;常用&#xff0…

OnlyOffice:释放无限创意,打造高效协作新体验

Onlyoffice &#x1f496;前言一、&#x1f4ab;开发者版本介绍二、&#x1f4ab;开发者版本特点三、&#x1f4ab;最新版重磅来袭&#xff0c;8.0版本介绍1.显示协作者头像2.插件 UI 界面更新 四、✨Windows部署ONLYOFFICE1.安装Erlang2.安装RabbitMQ3.安装Redis4.安装Postgre…

归并排序

1 确定分界点 mid &#xff08;lr&#xff09;/2 2 递归排序left right 3 归并 合二为一 #include<bits/stdc.h> using namespace std; const int N1e910; int q[N],tmp[N],n;void merge_sort(int q[],int l,int r) {if(l>r)return;int midlr>>1;merge_sort(…