【HadoopShuffle原理剖析】基础篇二

Shuffle原理剖析

在这里插入图片描述

Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。

Shuffle过程

  • Map端的Shuffle

    Map的输出结果首先被缓存到内存,当缓存区容量到达80%(缓冲区默认100MB),就启动溢写操作。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序和合并(combine),之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自己处理的数据。

  • 在Reduce端的Shuffle过程

    Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行合并排序后交给Reduce处理

作用
  • 保证每一个Reduce任务处理的数据大致是一致的

  • Map任务输出的key相同,一定是相同分区,并且肯定是相同的Reduce处理的,保证计算结果的准确性

  • Reduce任务的数量决定了分区的数量,Reduce任务越多计算处理的并行度也就越高

    Reduce任务的数量(默认为1)可以通过:job.setNumReduceTasks(数量)

特点
  • Map端溢写时,key相同的一定是在相同的分区
  • Map端溢写时,排序减少了Reduce的全局排序的复杂度
  • Map端溢写是,合并(combiner【可选】)减少溢写文件的体积,提高了Reduce任务在Fetch数据时的效率,它是一种MapReduce优化策略
  • Reduce端计算或者输出时,它的数据都是有序的
Shuffle源码追踪
  • MapTask

    在这里插入图片描述

  • ReduceTask

    (略)

    建议阅读

数据清洗

数据清洗指将原始数据处理成有价值的数据的过程,就称为数据清洗。

企业大数据开发的基本流程:

  1. 采集数据(flume、logstash)先保存到MQ(Kafka)中
  2. 将MQ中的暂存数据存放到HDFS中保存
  3. 数据清洗(低价值密度的数据处理)存放到HDFS
  4. 算法干预(MapReduce),计算结果保存到HDFS或者HBase
  5. 计算结果的可视化展示(Echarts、HCharts)
需求

现有某系统某天的Nginx的访问日志,格式如下:

27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90

大数据处理的算法,需要参数客户端的ip地址、请求时间、资源、响应状态码

正则表达式提取数据

Regex Expression主要作用字符串匹配抽取和替换

语法
规则解释
.匹配任意字符
\d匹配任意数字
\D匹配任意非数字
\w配置a-z和A-Z
\W匹配非a-z和A-Z
\s匹配空白符
^匹配字符串的开头
$匹配字符串的末尾
规则的匹配次数
语法解释
*规则匹配0到N次
规则匹配1次
{n}规则匹配N次
{n,m}规则匹配n到m次
+规则匹配1到N次(至少一次)
应用
# 匹配手机号码 11位数值构成
\d{11}# 邮箱地址校验  @
.+@.+
使用正则表达式提取Nginx访问日志中的四项指标

测试站点:http://regex101.com

分析后得到需要的正则表达式

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"\w*\s(.*)\sHTTP\/1.1"\s(\d{3}).*$
使用MapReduce分布式并行计算框架进行数据清洗

注意: 因为数据清洗不涉及统计计算,所以MapReduce程序通常只有map任务,而没有Reduce任务

job.setNumReduceTasks(0)

实现代码

数据清洗的Mapper

package com.baizhi.dataclean;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {/*** @param key* @param value   nginx访问日志中的一行记录(原始数据)* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {final String regex = "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*\\[(.*)\\]\\s\"\\w*\\s(.*)\\sHTTP\\/1.1\"\\s(\\d{3}).*$";String line = value.toString();final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);final Matcher matcher = pattern.matcher(line);while (matcher.find()) {// 四项关键指标  ip 请求时间 请求资源 响应状态码String clientIp = matcher.group(1);// yyyy-MM-dd HH:mm:ssString accessTime = matcher.group(2);String accessResource = matcher.group(3);String status = matcher.group(4);// 30/May/2013:17:38:21 +0800// 30/05/2013:17:38:21SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);try {Date date = sdf.parse(accessTime);SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String finalDate = sdf2.format(date);context.write(new Text(clientIp + " " + finalDate + " " + accessResource + " " + status), null);} catch (ParseException e) {e.printStackTrace();}}}
}

初始化类

package com.baizhi.dataclean;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class DataCleanApplication {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration(), "data clean");job.setJarByClass(DataCleanApplication.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.setInputPaths(job,new Path("file:///E:/access.log"));TextOutputFormat.setOutputPath(job,new Path("file:///E:/final"));job.setMapperClass(DataCleanMapper.class);// 注意:数据清洗通常只有map任务而没有reducejob.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.waitForCompletion(true);}
}

数据倾斜

数据分区默认策略

数据倾斜指大量的key相同的数据交由一个reduce任务统计计算,造成”闲的闲死,忙的忙死“这样的现象。不符合分布式并行计算的设计初衷的。

现象
  • 某一个reduce运行特别耗时
  • Reduce任务内存突然溢出
解决方案
  • 增大Reduce任务机器JVM的内存(硬件的水平扩展)
  • 增加Reduce任务的数量,每个Reduce任务只负责极少部分的数据处理,并且Reduce任务的数量增加提高了数据计算的并行度

Reduce任务的正确数量: 0.95或者1.75 * (NodeManage数量 * 每个节点最大容器数量)

  • 自定义分区规则Partitioner
package com.baizhi.partition;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;/*** 自定义分区规则*/
public class CustomPartitioner extends Partitioner<Text, LongWritable> {/*** @param key* @param value* @param i     numReduceTasks* @return 分区序号*/public int getPartition(Text key, LongWritable value, int i) {if (key.toString().equals("CN-GD")) return 0;else if (key.toString().equals("CN-GX")) return 1;else if (key.toString().equals("CN-HK")) return 2;else if (key.toString().equals("JP-TY")) return 3;else return 4;}
}
  • 合适使用Combiner,将key相同的value进行整合合并

在combiner合并时,v必须得能支持迭代计算,并且不能够影响Reduce任务的输入

combiner通常就是Reducer任务

// 优化策略:combiner合并操作
job.setCombinerClass(MyReducer.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387694.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过进程协作显示图像-C#

前言 如果一个软件比较复杂或者某些情况下需要拆解&#xff0c;可以考试将软件分解成两个或多个进程&#xff0c;但常规的消息传递又不能完全够用&#xff0c;使用消息共享内存&#xff0c;实现图像传递&#xff0c;当然性能这个方面我并没有测试&#xff0c;仅是一种解决思路…

Tekion 选择 ClickHouse Cloud 提升应用性能和指标监控

本文字数&#xff1a;4187&#xff1b;估计阅读时间&#xff1a;11 分钟 作者&#xff1a;ClickHouse team 本文在公众号【ClickHouseInc】首发 Tekion 由前 Tesla CIO Jay Vijayan 于 2016 年创立&#xff0c;利用大数据、人工智能和物联网等技术&#xff0c;为其汽车客户解决…

如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移

01 引言 随着大数据技术的飞速发展&#xff0c;Apache Kafka 作为一种高吞吐量、低延迟的分布式消息系统&#xff0c;已经成为企业实时数据处理的核心组件。然而&#xff0c;随着业务的扩展和技术的发展&#xff0c;企业面临着不断增加的存储成本和运维复杂性问题。为了更好地…

【数据中台】大数据管理平台建设方案(原件资料)

建设大数据管理中台&#xff0c;按照统一的数据规范和标准体系&#xff0c;构建统一数据采集&#xfe63;治理&#xfe63;共享标准、统一技术开发体系、统一接口 API &#xff0c;实现数据采集、平台治理&#xff0c;业务应用三层解耦&#xff0c;并按照统一标准格式提供高效的…

electron安装及快速创建

electron安装及快速创建 electron是一个使用 JavaScript、HTML 和 CSS 构建桌面应用程序的框架。 详细内容见官网&#xff1a;https://www.electronjs.org/zh/docs/latest/。 今天来记录下练习中的安装过程和hello world的创建。 创建项目文件夹&#xff0c;并执行npm 初始化命…

ubuntu安装tar安装 nginx最新版本

一、需要先安装依赖 apt install gcc libpcre3 libpcre3-dev zlib1g zlib1g-dev openssl libssl-dev 二、上传安装包 并解压 下载地址 nginx news tar xvf nginx-1.25.2.tar.gz 进入nginx cd nginx-1.25.2 三、编译 ./configure --prefix=/usr/local/nginx --with-htt…

Dolphinscheduler 3.2.1bug记录

问题1&#xff1a;分页只展示首页 解决方案&#xff1a; [Bug][API] list paging missing totalpage by Gallardot Pull Request #15619 apache/dolphinscheduler GitHub 问题2:Hive 数据源连接失败 解决方案&#xff1a;修改源码&#xff1a;HiveDataSourceProcessor.cla…

《深度RAG系列》 LLM 为什么选择了RAG

2023年是AIGC&#xff08;Artificial Intelligence Generated Content&#xff09;元年&#xff0c;这一年见证了人工智能生成内容领域的巨大飞跃&#xff0c;特别是大模型的爆发&#xff0c;它们在自然语言处理、图像生成、音频处理等多个领域展现出了惊人的能力。 这些预训练…

数据结构和算法入门

1.了解数据结构和算法 1.1 二分查找 二分查找&#xff08;Binary Search&#xff09;是一种在有序数组中查找特定元素的搜索算法。它的基本思想是将数组分成两半&#xff0c;然后比较目标值与中间元素的大小关系&#xff0c;从而确定应该在左半部分还是右半部分继续查找。这个…

花8000元去培训机构学习网络安全值得吗,学成后就业前景如何?

我就是从培训机构学的网络安全&#xff0c;线下五六个月&#xff0c;当时学费不到一万&#xff0c;目前已成功入行。所以&#xff0c;只要你下决心要入这一行&#xff0c;过程中能好好学&#xff0c;那这8000就花得值~ 因为只要学得好&#xff0c;工作两个多月就能赚回学费&am…

浅谈取样器之OS进程取样器

浅谈取样器之OS进程取样器 JMeter 的 OS 进程取样器&#xff08;OSProcess Sampler&#xff09;允许用户在 JMeter 测试计划中直接执行操作系统命令或脚本。这一功能对于需要集成系统级操作到性能测试场景中尤为有用&#xff0c;比如运行数据库备份脚本、调用系统维护命令或执…

存储引擎MyISAM和InnoDB

存储引擎&#xff1a;创建、查询、更新、删除 innoDB&#xff1a;64T、支持事物、不支持全文索引、支持缓存、支持外键、行级锁定 MyISAM&#xff1a;256T、不支持事物、支持全文索引、插入和查询速度快 memory&#xff1a;内存、不支持事物、不支持全文索引&#xff0c;临时…

不得不安利的程序员开发神器,太赞了!!

作为一名程序员&#xff0c;你是否常常为繁琐的后端服务而感到头疼&#xff1f;是否希望有一种工具可以帮你简化开发流程&#xff0c;让你专注于创意和功能开发&#xff1f;今天&#xff0c;我要向大家隆重推荐一款绝佳的开发神器——MemFire Cloud。它专为懒人开发者准备&…

KVM高级功能部署

KVM&#xff08;Kernel-based Virtual Machine&#xff09;是一个在Linux内核中实现的全虚拟化解决方案。除了基本的虚拟化功能外&#xff0c;KVM还提供了许多高级功能&#xff0c;以增强其性能、安全性和灵活性。以下是一些KVM的高级功能&#xff1a; 硬件加速&#xff1a; In…

基于Deap遗传算法在全量可转债上做因子挖掘(附python代码及全量因子数据)

原创文章第604篇&#xff0c;专注“AI量化投资、世界运行的规律、个人成长与财富自由"。 在4.x的时候&#xff0c;咱们分享过deap遗传算法挖掘因子的代码和数据&#xff0c;今天我们来升级到5.x中。 源码发布Quantlab4.2&#xff0c;Deap因子挖掘|gplearn做不到的咱们也…

全新微软语音合成网页版源码,短视频影视解说配音网页版系统-仿真人语音

源码介绍 最新微软语音合成网页版源码&#xff0c;可以用来给影视解说和短视频配音。它是TTS文本转语言&#xff0c;API接口和PHP源码。 这个微软语音合成接口的源码&#xff0c;超级简单&#xff0c;就几个文件搞定。用的是官方的API&#xff0c;试过了&#xff0c;合成速度…

Datawhale AI夏令营 AI+逻辑推理 Task2总结

Datawhale AI夏令营 AI逻辑推理 Task2总结 一、大语言模型解题方案介绍 1.1 大模型推理介绍 ​ 推理是建立在训练完成的基础上&#xff0c;将训练好的模型应用于新的、未见过的数据&#xff0c;模型利用先前学到的规律进行预测、分类和生成新内容&#xff0c;使得AI在实际应…

力扣SQL50 换座位

Problem: 626. 换座位 &#x1f468;‍&#x1f3eb; 参考题解 Code SELECT(CASEWHEN MOD(id, 2) ! 0 AND counts ! id THEN id 1WHEN MOD(id, 2) ! 0 AND counts id THEN idELSE id - 1END) AS id,student FROMseat,(SELECTCOUNT(*) AS countsFROMseat) AS seat_counts O…

C语言实现三子棋

通过一段时间的学习&#xff0c;我们已经能够较为熟练地使用分支语句&#xff0c;循环语句&#xff0c;创建函数&#xff0c;创建数组&#xff0c;创建随机数等。之前我们做过一个扫雷游戏&#xff0c;今天让我们再尝试创作一个三子棋游戏吧~ 一、三子棋游戏的思路 三子棋的游…

AI Agent调研--7种Agent框架对比!盘点国内一站式Agent搭建平台,一文说清差别!大家都在用Agent做什么?

代理&#xff08;Agent&#xff09;乃一种智能实体&#xff0c;具备自主环境感知与决策行动能力&#xff0c;旨在达成既定目标。作为个人或组织之数字化替身&#xff0c;AI代理执行特定任务与交易&#xff0c;其核心价值在于简化工作流程&#xff0c;削减繁复性&#xff0c;并有…