【Hadoop_06】MapReduce的概述与wc案例

  • 1、MapReduce概述
    • 1.1 MapReduce定义
    • 1.2 MapReduce优点
    • 1.3 MapReduce缺点
    • 1.4 MapReduce核心思想
    • 1.5 MapReduce进程
    • 1.6 常用数据序列化类型
    • 1.7 源码与MapReduce编程规范
  • 2、WordCount案例实操
    • 2.1 本地测试
    • 2.2 提交到集群测试

1、MapReduce概述

1.1 MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

1.2 MapReduce优点

1)MapReduce易于编程

==它简单的实现一些接口,就可以完成一个分布式程序,===这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

2)良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

4)适合PB级以上海量数据的离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

1.3 MapReduce缺点

1)不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

2)不擅长流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

3)不擅长DAG(有向无环图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

1.4 MapReduce核心思想

现在有一个需求:要统计一个文件当中每一个单词出现的总次数(并将查询结果a-p字母保存一个文件,q-z字母保存一个文件),则可以按照图示步骤

在这里插入图片描述
(1)分布式的运算程序往往需要分成至少2个阶段。map+reduce
(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。统计次数,形成键值对,<H,1>、<S,1>、<H,1>,但是次数之间不相加。
(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。将统计的次数相加求和。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

总结:分析WordCount数据流走向深入理解MapReduce核心思想。

1.5 MapReduce进程

mr、job、任务指的都是一个应用程序。例如:跑一个wordcount,可以说这是一个job或者任务。

未来在运行MapReduce程序的时候,会启动哪些进程呢?

一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。

1.6 常用数据序列化类型

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable
  • 除了string,其他的都是在java类型的基础上加上writable

1.7 源码与MapReduce编程规范

在这里插入图片描述

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

在这里插入图片描述

在这里插入图片描述

源码如下:

package org.apache.hadoop.examples;import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount
{public static void main(String[] args)throws Exception{Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; i++) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));System.exit(job.waitForCompletion(true) ? 0 : 1);}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException{int sum = 0;for (IntWritable val : values) {sum += val.get();}this.result.set(sum);context.write(key, this.result);}}public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private static final IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {this.word.set(itr.nextToken());context.write(this.word, one);}}}
}
  • 上面一共有三个方法,分别是main方法,map方法和reduce方法。
  • 定义一个类,继承mapper,之后重写里面的mapper方法,实现自己的业务逻辑。

在这里插入图片描述
在这里插入图片描述

MapReduce的编程规范如下:

在这里插入图片描述
在这里插入图片描述

2、WordCount案例实操

2.1 本地测试

1)需求

在给定的文本文件中统计输出每一个单词出现的总次数
(1)输入数据

(2)期望输出数据

wenxin	2
banzhang	1
cls	2
hadoop	1
jiao	1
ss	2
xue	1
  • 可以发现上面的数据涉及首字母排序的问题。

2)需求分析

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

在这里插入图片描述

(1)创建maven工程,MapReduceDemo

(2)在pom.xml文件中添加如下依赖

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>
</dependencies>

(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(3)创建包名:com.wenxin.mapreduce.wordcount

在这里插入图片描述

Mapper的源码:

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Mapper() {}protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {context.write(key, value);}protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {this.setup(context);try {while(context.nextKeyValue()) {this.map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {this.cleanup(context);}}public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Context() {}}
}

在这里插入图片描述
在这里插入图片描述

4)编写程序

(1)编写Mapper类

package com.wenxin.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;/*** @author Susie-Wen* @version 1.0* @description:* @date 2023/12/13 9:56*/
/*
KEYIN,map阶段输入的key的类型:LongWritable
VALUEINT,map阶段输入的value的类型:Text
KEYOUT,map阶段输出的Key的类型:Text
VALUEOUT,map阶段输出的value类型:IntWritable*/
public class WordCountMapper<map> extends Mapper<LongWritable, Text,Text, IntWritable> {//private Text outK=new Text();private IntWritable outV=new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {/*LongWritable key,输入的key,偏移量Text value,输入的valueContext context,对应的上下文*///1.获取一行String line = value.toString();//2.对一行数据进行切割(因为原始数据使用的是空格,因此这里使用空格切割)String[] words = line.split(" ");//3.循环写出for(String word:words){//封装outKoutK.set(word);//写出context.write(outK,outV);}}
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(2)编写Reducer类

package com.wenxin.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*** @author Susie-Wen* @version 1.0* @description:* @date 2023/12/13 9:56*/
/*
KEYIN,reduce阶段输入的key的类型:Text
VALUEINT,reduce阶段输入的value的类型:IntWritable
KEYOUT,reduce阶段输出的Key的类型:Text
VALUEOUT,reduce阶段输出的value类型:IntWritable*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {IntWritable outV=new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum =0;//定义一个变量,进行累加//传进来的值:wenxin,(1,1)for(IntWritable value:values){sum +=value.get();//累加,不能直接加上value,因为value是IntWritable类型,要使用get方法}outV.set(sum);//写出context.write(key,outV);}
}

在这里插入图片描述

(3)编写Driver驱动类

  • driver当中有7步,都是固定的;其次需要注意不要导错包了!
package com.atguigu.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取配置信息以及获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 可以看到hadoop默认会对数据进行排序
  • 如果此时再次点击运行的话,会报错,显示输出路径存在;因此对于mapreduce程序,如果输出路径存在了,就会报错。

5)本地测试

(1)需要首先配置好HADOOP_HOME变量以及Windows运行依赖

(2)在IDEA/Eclipse上运行程序

在这里插入图片描述

在这里插入图片描述

2.2 提交到集群测试

集群上测试

(1)用maven打jar包,需要添加的打包插件依赖

<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

注意:如果工程上显示红叉。在项目上右键->maven->Reimport刷新即可。

(2)将程序打成jar包
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(3)修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/home/wenxin/module/hadoop-3.1.3路径。

(4)启动Hadoop集群

[root@hadoop102 hadoop-3.1.3]sbin/start-dfs.sh
[root@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

在这里插入图片描述

在这里插入图片描述

(5)执行WordCount程序
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

[root@hadoop102 hadoop-3.1.3]$ hadoop jar  wc.jarcom.wenxin.mapreduce.wordcount.WordCountDriver /user/wenxin/input /user/wenxin/output

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/218667.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HPM6750系列--第九篇 GPIO详解(基本操作)

一、目的 在之前的博文中我们主要介绍了不同系统不同开发编译调试环境的配置和操作&#xff08;命令行方式、Visual Studio Code、Segger Embedded Studio for RISC-V&#xff09;&#xff0c;以帮助大家准备好学习环境为目的&#xff0c;但是未涉及到芯片本身以及外设的讲解。…

苹果计划将全球1/4的IPhone产能转移至印度

KlipC报道&#xff1a;据相关人士报道&#xff0c;苹果希望在未来2到3年内每年在印度生产超过5000万部iphone&#xff0c;要是该计划得以实现&#xff0c;印度将占领全球iPhone产量的四分之一。 KlipC的分析师Alex Su表示&#xff1a;“此次iPhone15推出是苹果印度制造计划的一…

设计模式详解---策略模式

1. 策略模式简介 策略模式&#xff08;Strategy Pattern&#xff09;是一种行为型设计模式&#xff0c;用于在运行时根据不同的情境选择不同的算法或策略。该模式将算法封装成独立的类&#xff0c;使得它们可以相互替换&#xff0c;而且可以独立于客户端使用它们的方式。 1.1.…

m_map导入本地地形数据

m_map绘制地形图时&#xff0c;虽然自带有1的地形图以及从NOAA下载的1分的地形图&#xff08;详见&#xff1a;Matlab下地形图绘图包m_map安装与使用&#xff09;&#xff0c;但有时需要对地形图分辨率的要求更高&#xff0c;便无法满足。 此时&#xff0c;需要导入本地地形数…

二蛋赠书十一期:《TypeScript入门与区块链项目实战》

前言 大家好&#xff01;我是二蛋&#xff0c;一个热爱技术、乐于分享的工程师。在过去的几年里&#xff0c;我一直通过各种渠道与大家分享技术知识和经验。我深知&#xff0c;每一位技术人员都对自己的技能提升和职业发展有着热切的期待。因此&#xff0c;我非常感激大家一直…

Toyota Programming Contest 2023#8(AtCoder Beginner Contest 333)

A - Three Threes 题目大意&#xff1a;给你一个整数n&#xff0c;将这个数n输出n次。 呃呃 B - Pentagon 题目大意&#xff1a;给你一个正五边形ABCDE&#xff0c;给你任意两条边&#xff0c;判断是否相等 主要问题要判断一下内边&#xff1a;AD&#xff0c;AC&#xff0c;…

MIT6.5840-2023-Lab2C: Raft-Persistence

前置知识 见上一篇 Lab2A。 实验内容 实现 RAFT&#xff0c;分为四个 part&#xff1a;leader election、log、persistence、log compaction。 实验环境 OS&#xff1a;WSL-Ubuntu-18.04 golang&#xff1a;go1.17.6 linux/amd64 Part 2C: persistence 大部分的bug都与这…

KubeKey 离线部署 KubeSphere v3.4.1 和 K8s v1.26 实战指南

作者&#xff1a;运维有术 前言 知识点 定级&#xff1a;入门级了解清单 (manifest) 和制品 (artifact) 的概念掌握 manifest 清单的编写方法根据 manifest 清单制作 artifactKubeKey 离线集群配置文件编写KubeKey 离线部署 HarborKubeKey 离线部署 KubeSphere 和 K8sKubeKey…

C++初阶-list类的模拟实现

list类的模拟实现 一、基本框架1.1 节点类1.2 迭代器类1.3 list类 二、构造函数和析构函数2.1 构造函数2.2 析构函数 三、operator的重载和拷贝构造3.1 operator的重载3.2 拷贝构造 四、迭代器的实现4.1 迭代器类中的各种操作4.1 list类中的迭代器 五、list的增容和删除5.1 尾插…

javacv的视频截图功能

之前做了一个资源库的小项目&#xff0c;因为上传资源文件包含视频等附件&#xff0c;所以就需要时用到这个功能。通过对视频截图&#xff0c;然后作为封面缩略图&#xff0c;达到美观效果。 首先呢&#xff0c;需要准备相关的jar包&#xff0c;之前我用的是低版本的1.4.2&…

Tomcat-安装部署(源码包安装)

一、简介 Tomcat 是由 Apache 开发的一个 Servlet 容器&#xff0c;实现了对 Servlet 和 JSP 的支持&#xff0c;并提供了作为Web服务器的一些特有功能&#xff0c;如Tomcat管理和控制平台、安全域管理和Tomcat阀等。 简单来说&#xff0c;Tomcat是一个WEB应用程序的托管平台…

基于Nexus搭建Maven私服基础入门

什么是Nexus&#xff1f;它有什么优势&#xff1f; 要了解为什么需要nexus的存在&#xff0c;我们不妨从以下几个问题来简单了解一下: 为什么需要搭建私服&#xff1f;如果没有私服会出现什么问题&#xff1f; 对于企业开发而言&#xff0c;如果没有私服&#xff0c;我们所有…

十九)Stable Diffusion使用教程:ai室内设计案例

今天我们聊聊如何通过SD进行室内设计装修。 方式一:controlnet的seg模型 基础起手式: 选择常用算法,抽卡: 抽到喜欢的图片之后,拖到controlnet里: 选择seg的ade20k预处理器,点击爆炸按钮,得到seg语义分割图,下载下来: 根据语义分割表里的颜色值,到PS里进行修改: 语…

SoloLinker第一次使用记录,解决新手拿到板子的无所适从

本文目录 一、简介二、进群获取资料2.1 需要下载资料2.2 SDK 包解压 三、SDK 编译3.1 依赖安装3.2 编译配置3.3 启动编译3.4 编译后的固件目录 四、固件烧录4.1 RV1106 驱动安装4.2 打开烧录工具4.3 进入boot 模式&#xff08;烧录模式&#xff09;4.4 烧录启动固件4.5 烧录升级…

大型网站架构演进过程

架构演进 大型网站的技术挑战主要来自于庞大的用户&#xff0c;高并发的访问和海量的数据&#xff0c;任何简单的业务一旦需要处理数以P计的数据和面对数以亿计的用户&#xff0c;问题就会变得很棘手。大型网站架构主要就是解决这类问题。 架构选型是根据当前业务需要来的&…

RRC下的NAS层

无线资源控制&#xff08;Radio Resource Control&#xff0c;RRC&#xff09;&#xff0c;又称为无线资源管理&#xff08;RRM&#xff09;或者无线资源分配&#xff08;RRA&#xff09;&#xff0c;是指通过一定的策略和手段进行无线资源管理、控制和调度&#xff0c;在满足服…

数据库 02-03 补充 SQL的子查询(where,from),子查询作为集合来比较some,exists,all(某一个,存在,所有)

子查询&#xff1a; where字句的子查询&#xff1a; 通常用in关键字&#xff1a; 举个例子&#xff1a; in关键字&#xff1a; not in 关键字&#xff1a; in 也可以用于枚举集合&#xff1a; where中可以用子查询来作为集合来筛选元祖。 some&#xff0c;all的运算符号…

Spring cloud - 断路器 Resilience4J

其实文章的标题应该叫 Resilience4J&#xff0c;而不是Spring Cloud Resilience4J&#xff0c;不过由于正在对Spring cloud的一系列组件进行学习&#xff0c;为了统一&#xff0c;就这样吧。 概念区分 首先区分几个概念 Spring cloud 断路器&#xff1a;Spring Cloud的官网对…

elementui + vue2实现表格行的上下移动

场景&#xff1a; 如上&#xff0c;要实现表格行的上下移动 实现&#xff1a; <el-dialogappend-to-bodytitle"条件编辑":visible.sync"dialogVisible"width"60%"><el-table :data"data1" border style"width: 100%&q…

Flutter在Visual Studio Code上首次创建运行应用

一、创建Flutter应用 1、前提条件 安装Visual Studio Code并配置好运行环境 2、开始创建Flutter应用 1)、打开Visual Studio Code 2)、打开 View > Command Palette。 3)、在搜索框中输入“flutter”&#xff0c;弹出内容如下图所示&#xff0c;选择“ Flutter: New Pr…