hadoop mapreduce执行过程是怎么样的

发布时间:2021-12-08 10:49:47 作者:小新
来源:亿速云 阅读:113
# Hadoop MapReduce执行过程是怎么样的

## 一、MapReduce概述

### 1.1 什么是MapReduce
MapReduce是一种分布式计算模型,由Google提出,主要用于大规模数据集(大于1TB)的并行运算。它将复杂的、运行于大规模集群上的并行计算过程高度抽象为两个函数:Map和Reduce。

### 1.2 MapReduce的核心思想
"分而治之"是MapReduce的核心思想:
- **Map阶段**:将大数据集分解为成百上千的小数据集
- **Reduce阶段**:对Map阶段的中间结果进行汇总

### 1.3 MapReduce的优势
1. 易于编程
2. 良好的扩展性
3. 高容错性
4. 适合海量数据离线处理

## 二、MapReduce架构组成

### 2.1 主要组件
| 组件 | 功能描述 |
|------|----------|
| Client | 提交MapReduce作业 |
| JobTracker | 资源管理和作业调度 |
| TaskTracker | 执行具体任务 |
| HDFS | 存储输入输出数据 |

### 2.2 角色划分
1. **JobTracker**(主节点):
   - 管理所有作业
   - 调度任务到TaskTracker
   - 监控任务执行

2. **TaskTracker**(从节点):
   - 执行Map和Reduce任务
   - 向JobTracker汇报状态

## 三、MapReduce详细执行流程

### 3.1 整体流程图
```mermaid
graph TD
    A[Input Data] --> B[Split]
    B --> C[Map Task]
    C --> D[Shuffle]
    D --> E[Reduce Task]
    E --> F[Output]

3.2 阶段分解

3.2.1 Input Split阶段

  1. 输入文件被逻辑划分为多个Split
  2. 每个Split对应一个Map任务
  3. 默认Split大小等于HDFS块大小(128MB)

关键参数

<property>
    <name>mapreduce.input.fileinputformat.split.minsize</name>
    <value>1</value>
</property>

3.2.2 Map阶段

  1. 每个Map任务处理一个Split
  2. 执行用户定义的map()函数
  3. 输出形式的中间结果

示例代码

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        for (String w : words) {
            word.set(w);
            context.write(word, one);
        }
    }
}

3.2.3 Shuffle阶段(核心阶段)

Map端Shuffle: 1. Partition:根据Reduce数量分区

   public int getPartition(K key, V value, int numReduceTasks) {
       return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
   }
  1. Sort:按key排序
  2. Spill:内存缓冲区溢出到磁盘
  3. Merge:合并多个溢出文件

Reduce端Shuffle: 1. Copy:从各Map节点拉取数据 2. Merge:合并来自不同Map的数据 3. Sort:二次排序

3.2.4 Reduce阶段

  1. 执行用户定义的reduce()函数
  2. 对相同key的values集合进行处理
  3. 输出最终结果到HDFS

示例代码

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

四、关键机制详解

4.1 数据本地化优化

  1. 原则:移动计算比移动数据更高效
  2. 三级本地化策略
    • 节点本地化(最佳)
    • 机架本地化
    • 跨机架访问

4.2 容错机制

  1. TaskTracker故障

    • JobTracker重新调度任务
    • 黑名单机制
  2. Task失败

    • 自动重试(默认4次)
    • 超过重试次数则标记作业失败
  3. 推测执行

    <property>
       <name>mapreduce.map.speculative</name>
       <value>true</value>
    </property>
    

4.3 性能优化技术

4.3.1 Combiner优化

job.setCombinerClass(WordCountReducer.class);

4.3.2 压缩设置

<property>
    <name>mapreduce.map.output.compress</name>
    <value>true</value>
</property>

4.3.3 内存调优

<property>
    <name>mapreduce.map.memory.mb</name>
    <value>2048</value>
</property>

五、YARN架构下的MapReduce

5.1 组件变化

传统架构 YARN架构
JobTracker ResourceManager
TaskTracker NodeManager
- ApplicationMaster

5.2 执行流程对比

  1. 资源请求方式不同
  2. 任务调度机制变化
  3. 容错实现方式更新

六、实际案例分析

6.1 单词计数(WordCount)

完整代码示例:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

6.2 性能瓶颈分析

  1. 数据倾斜解决方案:

    • 自定义Partitioner
    • 增加Reduce数量
    • 使用Combiner
  2. Shuffle优化

    • 调整缓冲区大小
    • 优化排序算法
    • 合理设置Reduce数量

七、MapReduce的局限与发展

7.1 主要局限性

  1. 实时计算能力弱
  2. 迭代计算效率低
  3. 中间结果写磁盘影响性能

7.2 替代技术

  1. Spark:内存计算框架
  2. Flink:流批一体处理
  3. Tez:DAG执行引擎

八、最佳实践建议

  1. 输入文件处理

    • 尽量使用大文件(>128MB)
    • 避免大量小文件
  2. 参数调优

    <!-- 设置Map任务数 -->
    <property>
       <name>mapreduce.job.maps</name>
       <value>100</value>
    </property>
    
  3. 监控工具

    • JobHistory Server
    • YARN ResourceManager UI

结语

MapReduce作为Hadoop的核心计算框架,虽然在新一代计算框架面前显得效率不足,但其”分而治之”的思想仍然深刻影响着大数据处理领域。理解MapReduce的执行原理,不仅有助于优化传统Hadoop作业,更能为学习其他分布式计算框架奠定坚实基础。 “`

推荐阅读:
  1. Hadoop 部署之 Hadoop (三)
  2. Hadoop的MapReduce执行流程图

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

hadoop mapreduce

上一篇:Hadoop 2.6.0如何动态添加节点

下一篇:Hadoop基础知识点有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》