hadoop 学习:mapreduce 入门案例三:顾客信息与订单信息相关联(联表)

这里的知识点在于如何合并两张表,事实上这种业务场景我们很熟悉了,这就是我们在学习 MySQL 的时候接触到的内连接,左连接,而现在我们要学习 mapreduce 中的做法

这里我们可以选择在 map 阶段和reduce阶段去做

数据:

链接: https://pan.baidu.com/s/1PH1J8SIEJA5UX0muvN-vuQ?pwd=idwx 提取码: idwx

顾客信息

 订单信息

 

编写实体类 CustomerOrder

这里我们除了顾客与订单的属性外,额外定义了一个状态,用来区分当前类是顾客信息还是订单信息

import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class CustomerOrders implements WritableComparable<CustomerOrders> {private Integer customerId;private String customerName;private Integer orderId;private String orderStatus;// 标签private String flag;@Overridepublic String toString() {return "CustomerOrders{" +"customerId=" + customerId +", customerName='" + customerName + '\'' +", orderId=" + orderId +", orderStatus='" + orderStatus + '\'' +", flag='" + flag + '\'' +'}';}public CustomerOrders() {}public CustomerOrders(Integer customerId, String customerName, Integer orderId, String orderStatus, String flag) {this.customerId = customerId;this.customerName = customerName;this.orderId = orderId;this.orderStatus = orderStatus;this.flag = flag;}public Integer getCustomerId() {return customerId;}public void setCustomerId(Integer customerId) {this.customerId = customerId;}public String getCustomerName() {return customerName;}public void setCustomerName(String customerName) {this.customerName = customerName;}public Integer getOrderId() {return orderId;}public void setOrderId(Integer orderId) {this.orderId = orderId;}public String getOrderStatus() {return orderStatus;}public void setOrderStatus(String orderStatus) {this.orderStatus = orderStatus;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic int compareTo(CustomerOrders o) {return 0;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(customerId);dataOutput.writeUTF(customerName);dataOutput.writeInt(orderId);dataOutput.writeUTF(orderStatus);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.customerId = dataInput.readInt();this.customerName = dataInput.readUTF();this.orderId = dataInput.readInt();this.orderStatus = dataInput.readUTF();this.flag = dataInput.readUTF();}
}

 

1. 在 reduce 阶段合并

传入两个文件

(1)map 阶段

setup方法在 map 方法前运行,找到当前数据所在文件的名称,用来区分当前这条数据是顾客信息还是订单信息

map 方法将传进来的数据包装成对象,最后已键值对的形式传给下一阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text,Text,CustomerOrders> {String fileName = "";@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit) context.getInputSplit();System.out.println("setup method: "+ fileSplit.getPath().toString());fileName = fileSplit.getPath().getName();System.out.println("fileName : "+fileName);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        System.out.println("map stage:");
//        System.out.println("key : "+key+"\tvalue : "+value);String[]field = value.toString().split(",");CustomerOrders customerOrders = new CustomerOrders();if (fileName.startsWith("orders")){         //订单内容customerOrders.setCustomerId(Integer.parseInt(field[2]));customerOrders.setCustomerName("");customerOrders.setOrderId(Integer.parseInt(field[0]));customerOrders.setFlag("1");customerOrders.setOrderStatus(field[3]);}else {                         //用户信息customerOrders.setCustomerId(Integer.parseInt(field[0]));customerOrders.setCustomerName(field[1]);customerOrders.setOrderId(0);customerOrders.setFlag("0");customerOrders.setOrderStatus("");}Text text = new Text(customerOrders.getCustomerId().toString());context.write(text, customerOrders);}
}

(2)reduce 阶段

这里的 reduce 方法则是,先遍历找到唯一的一个顾客信息,然后将顾客信息填充到订单信息中,再合并为一个 Text 输出

当然也可以有不同的写法,比如将每一条订单信息处理完后就写入 context 之后输出

还有就是这里的对象的赋值写的不太好,但是不能直接用=去赋值,可以使用 BeanUtils 的 copyproperties()方法去赋值

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class ReduceJoinReducer extends Reducer<Text,CustomerOrders,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException {System.out.println("reduce stage: key:"+key+"  values:"+values);String customerName = "";String text = "";List<CustomerOrders> list = new ArrayList<>();for (CustomerOrders co : values){if (co.getFlag().equals("0")){customerName = co.getCustomerName();}CustomerOrders customerOrders = new CustomerOrders();customerOrders.setCustomerName(co.getCustomerName());customerOrders.setFlag(co.getFlag());customerOrders.setCustomerId(co.getCustomerId());customerOrders.setOrderStatus(co.getOrderStatus());customerOrders.setOrderId(co.getOrderId());list.add(customerOrders);}System.out.println(list);System.out.println();for (CustomerOrders co : list){if (co.getFlag().equals("1")){CustomerOrders customerOrders = new CustomerOrders();customerOrders = co;customerOrders.setCustomerName(customerName);customerOrders.setFlag("2");System.out.println(customerOrders.toString());text += customerOrders.toString()+"\t";}}System.out.println(text);System.out.println("customerName:"+customerName);context.write(key, new Text(text));}
}

(3)driver 启动

基本操作,设置好各个参数

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CustomerOrders.class);job.setReducerClass(ReduceJoinReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job ,new Path[]{new Path(args[0]),new Path(args[1])});Path path = new Path(args[2]);FileSystem fs = FileSystem.get(path.toUri(),conf);if (fs.exists(path)){fs.delete(path, true);}FileOutputFormat.setOutputPath(job,path);fs.close();job.waitForCompletion(true);
}}

2. 在 map 阶段合并

传入一个文件,另一个文件以缓存文件cachefile的形式传入,这种方法要注意,cachefile的大小不能太大,可以形象的打个比方,你去朋友家做客,晚上朋友家没有被子,你捎带个被子过去,这是可以的,但是如果说你朋友缺个房子,你不能捎带个房子过去对吧。

(1)map 阶段

setup方法使用 io 流的方法将顾客信息读取进来,使用 List<CustomerOrders>去存储

map 方法对于每个订单信息都遍历一次列表,通过顾客编号这一关联属性去找到对应的顾客信息并填充进去

import md.kb23.demo03.CustomerOrders;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.codehaus.jackson.map.util.BeanUtil;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;public class MapJoinMapper extends Mapper<LongWritable, Text, CustomerOrders, NullWritable> {private List<CustomerOrders> list = new ArrayList<CustomerOrders>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {URI[] cashFiles = context.getCacheFiles();for (URI uri : cashFiles){System.out.println(uri.getPath());String currentFileName = new Path(uri).getName();if (currentFileName.startsWith("customers")){String path = uri.getPath();BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));String line;while ((line = br.readLine())!=null){System.out.println(line);String[] field = line.split(",");CustomerOrders customerOrders = new CustomerOrders(Integer.parseInt(field[0]),field[1]+" "+field[2],0,"","");list.add(customerOrders);}}}}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[]orderField = value.toString().split(",");int customerId = Integer.parseInt(orderField[2]);CustomerOrders customerOrders = null;for (CustomerOrders customer : list){if (customer.getCustomerId()==customerId){customerOrders=customer;}}CustomerOrders order = new CustomerOrders();if (customerOrders!=null){order.setCustomerName(customerOrders.getCustomerName());}else {order.setCustomerName("");}order.setCustomerId(customerId);order.setOrderStatus(orderField[3]);order.setFlag("1");order.setOrderId(Integer.parseInt(orderField[0]));context.write(order, null);}
}

 

(2)driver 启动

这里我们在 map 阶段已经将事情都做完了,就不用再额外写一个 reduce 了,另外就是注意一下 cachefile 的添加方法

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {long start = System.currentTimeMillis();Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMapper.class);job.setMapOutputKeyClass(CustomerOrders.class);job.setMapOutputValueClass(NullWritable.class);Path inpath = new Path("in/demo3/orders.csv");FileInputFormat.setInputPaths(job,inpath);Path outpath = new Path("out/out5");FileSystem fs = FileSystem.get(outpath.toUri(),conf);if(fs.exists(outpath)){fs.delete(outpath,true);}FileOutputFormat.setOutputPath(job,outpath);//设置 reduce 阶段任务数job.setNumReduceTasks(0);Path cashPath = new Path("in/demo3/customers.csv");job.addCacheFile(cashPath.toUri());job.waitForCompletion(true);long end = System.currentTimeMillis();System.out.println("程序运行时间:"+(end-start));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/111326.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java版工程项目管理系统源码+系统管理+系统设置+项目管理+合同管理+二次开发

工程项目各模块及其功能点清单 一、系统管理 1、数据字典&#xff1a;实现对数据字典标签的增删改查操作 2、编码管理&#xff1a;实现对系统编码的增删改查操作 3、用户管理&#xff1a;管理和查看用户角色 4、菜单管理&#xff1a;实现对系统菜单的增删改查操…

人工智能会成为人类的威胁吗?马斯克、扎克伯格、比尔·盖茨出席

根据消息人士透露&#xff0c;此次人工智能洞察论坛将是一次历史性的聚会&#xff0c;吸引了来自科技界的许多重量级人物。与会者们将共同探讨人工智能在科技行业和社会发展中的巨大潜力以及可能带来的挑战。 埃隆马斯克&#xff0c;特斯拉和SpaceX的首席执行官&#xff0c;一直…

如何提高视频清晰度?视频调整清晰度操作方法

现在很多小伙伴通过制作短视频发布到一些短视频平台上记录生活&#xff0c;分享趣事。但制作的视频有些比较模糊&#xff0c;做视频的小伙伴应该都知道&#xff0c;视频画质模糊不清&#xff0c;会严重影响观众的观看体验。 通过研究&#xff0c;总结了以下几点严重影响的点 …

Android12之ABuffer数据处理(三十四)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药. 更多原创,欢迎关注:Android…

Nacos集群搭建

集群结构 三个nacos节点的地址&#xff1a; 节点ipportnacos1127.0.0.18845nacos2127.0.0.18846nacos3127.0.0.18847 集群步骤 搭建集群的基本步骤&#xff1a; 搭建数据库&#xff0c;初始化数据库表结构 下载nacos安装包 配置nacos 启动nacos集群 nginx反向代理 初始化…

02调制+滤波器+冲激函数的傅立叶变换

目录 一、调制方式 1.1 什么是调制&#xff1f; 1.2 为什么要调制&#xff1f; 1.3 如何调制&#xff1f; 1.4 调制包含的信号类型&#xff1f; 1. 消息信号 2. 载波信号 3. 调制信号 1.5 调制类型&#xff1f; 1. 调幅 2. 调频 3. 调相 4. 模拟脉冲调制 5. 脉冲…

WSL Opencv with_ffmpeg conan1.60.0

我是ubuntu18. self.options[“opencv”].with_ffmpeg True 关键是gcc版本需要conan支持&#xff0c;比如我的是&#xff1a; compilergcc compiler.version7.5 此外还需要安装系统所需库&#xff1a; https://qq742971636.blog.csdn.net/article/details/132559789 甚至来…

C# NetTopologySuite+ProjNet 任意图形类型坐标转换

添加引用&#xff1a;NetTopologySuite、ProjNet、ProjNet.SRID Program.cs文件&#xff1a; using ProjNet.CoordinateSystems; using ProjNet.CoordinateSystems.Transformations; using ProjNet.SRID; using System; using System.Collections.Generic; using System.Linq;…

unordered-------Hash

✅<1>主页&#xff1a;我的代码爱吃辣&#x1f4c3;<2>知识讲解&#xff1a;数据结构——哈希表☂️<3>开发环境&#xff1a;Visual Studio 2022&#x1f4ac;<4>前言&#xff1a;哈希是一种映射的思想&#xff0c;哈希表即使利用这种思想&#xff0c;…

前端基础1——HTML标记语言

文章目录 一、基本了解二、HTML常用标签2.1 文本格式化标签2.2 列表标签2.3 超链接标签2.4 图片标签2.5 表格标签2.6 表单标签2.6.1 提交表单2.6.2 下拉表单2.6.3 按钮标签 2.7 布局标签 一、基本了解 网页组成&#xff08;index.html页面&#xff09;&#xff1a; HTML标记语言…

Verilog开源项目——百兆以太网交换机(一)架构设计与Feature定义

Verilog开源项目——百兆以太网交换机&#xff08;一&#xff09;架构设计与Feature定义 &#x1f508;声明&#xff1a;未经作者允许&#xff0c;禁止转载 &#x1f603;博主主页&#xff1a;王_嘻嘻的CSDN主页 &#x1f511;全新原创以太网交换机项目&#xff0c;Blog内容将聚…

23.8.11.用apifox端口号与java接口链接的时候少了个/导致连接不成功。

用apifox端口号与java接口链接的时候少了个/导致连接不成功。 原因分析&#xff0c;因为拼接的位置少了个/ 如图所示

【Java转Go】快速上手学习笔记(六)之网络编程篇一

目录 TCP一个简单案例server.go 服务端client.go 客户端 HTTPserver.go 服务端client.go 客户端 RPC一个很简单的示例server.go 服务端client.go 客户端 WebSocketserver.go 服务端client.go 客户端 完整代码server.go 服务端client.go 客户端 go往期文章笔记&#xff1a; 【J…

(笔记四)利用opencv识别标记视频中的目标

预操作&#xff1a; 通过cv2将视频的某一帧图片转为HSV模式&#xff0c;并通过鼠标获取对应区域目标的HSV值&#xff0c;用于后续的目标识别阈值区间的选取 img cv.imread(r"D:\data\123.png") img cv.cvtColor(img, cv.COLOR_BGR2HSV) plt.figure(1), plt.imshow…

开始MySQL之路——MySQL 事务(详解分析)

MySQL 事务概述 MySQL 事务主要用于处理操作量大&#xff0c;复杂度高的数据。比如说&#xff0c;在人员管理系统中&#xff0c;你删除一个人员&#xff0c;你即需要删除人员的基本资料&#xff0c;也要删除和该人员相关的信息&#xff0c;如信箱&#xff0c;文章等等&#xf…

打造互动体验:品牌 DTC 如何转变其私域战略

越来越多的品牌公司选择采用DTC 模式与消费者进行互动&#xff0c;而非仅仅销售产品。通过与消费者建立紧密联系&#xff0c;DTC模式不仅可以提供更具成本效益的规模扩张方式&#xff0c;还能够控制品牌体验、获取宝贵的第一方数据并提升盈利能力。然而DTC模式的经济模型比许多…

Docker创建Consul并添加权限控制

一、部署Consul 1、拉取镜像&#xff1a; docker pull consul:<consul-version> 2、运行 docker run --name consul1 -p 8300:8300/tcp -p 8301:8301/tcp -p 8301:8301/udp -p 8302:8302/tcp -p 8302:8302/udp -p 8500:8500 -p 8600:8600/tcp -p 8600:8600/udp -v /h…

数据结构——栈

栈 栈的理解 咱们先不管栈的数据结构什么&#xff0c;先了解栈是什么&#xff0c;栈就像一个桶一样&#xff0c;你先放进去的东西&#xff0c;被后放进的的东西压着&#xff0c;那么就需要把后放进行的东西拿出才能拿出来先放进去的东西&#xff0c;如图1&#xff0c;就像图1中…

Android Studio调试出现错误时,无法定位错误信息解决办法

做项目时运行项目会出现问题&#xff0c;但是找不到具体位置&#xff0c;如下图所示&#xff1a;感觉是不是很懵逼~&#xff0c;Log也没有显示是哪里的问题 解决方案&#xff0c;在右侧导航栏中选择Gradle——app——build&#xff0c;然后点击运行 运行结果如下&#xff0c;很…

C#: Json序列化和反序列化,集合为什么多出来一些元素?

如下面的例子&#xff0c;很容易看出问题&#xff1a; 如果类本身的无参构造函数&#xff0c; 就添加了一些元素&#xff0c;序列化&#xff0c;再反序列化&#xff0c;会导致元素增加。 如果要避免&#xff0c;必须添加&#xff1a; new JsonSerializerSettings() { Object…