java中整体MR工作机制是怎样的

发布时间:2021-12-23 16:04:14 作者:iii
来源:亿速云 阅读:346
# Java中整体MR工作机制是怎样的

## 一、MR(MapReduce)概述

### 1.1 什么是MapReduce
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于大规模数据集(大于1TB)的并行运算。其核心思想是将计算过程分解为两个主要阶段:
- **Map阶段**:对输入数据进行分块处理
- **Reduce阶段**:对Map结果进行汇总

### 1.2 Hadoop中的实现
Apache Hadoop在开源实现中提供了完整的MapReduce框架,主要包含:
- JobTracker(主节点)
- TaskTracker(从节点)
- HDFS(分布式文件系统)

## 二、MR工作流程详解

### 2.1 整体架构图
```mermaid
graph LR
    A[Input] --> B[Split]
    B --> C[Map]
    C --> D[Shuffle]
    D --> E[Reduce]
    E --> F[Output]

2.2 详细执行步骤

阶段1:Input Split

  1. 输入文件被分割成固定大小的块(默认128MB)
  2. 每个分片对应一个Map任务
  3. 分片信息包含:
    • 文件路径
    • 起始位置
    • 分片大小

阶段2:Map阶段

// 典型Mapper实现
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, Context context) {
        String[] words = value.toString().split(" ");
        for (String w : words) {
            word.set(w);
            context.write(word, one);
        }
    }
}

阶段3:Shuffle阶段

关键过程: 1. Partitioning:默认使用HashPartitioner

   public int getPartition(K key, V value, int numReduceTasks) {
       return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
   }
  1. Sorting:按key进行字典序排序
  2. Combiner(可选):本地reduce操作
  3. Grouping:相同key的values合并

阶段4:Reduce阶段

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

阶段5:Output

输出到HDFS的指定目录,每个Reducer产生一个输出文件:

part-r-00000
part-r-00001
...

三、核心组件解析

3.1 JobTracker

3.2 TaskTracker

3.3 关键数据结构

类型 说明
InputFormat 定义输入数据格式
OutputFormat 定义输出数据格式
Partitioner 决定Mapper输出到哪个Reducer
RecordReader 读取输入记录

四、优化机制

4.1 数据本地化

4.2 推测执行

graph TD
    A[慢任务检测] --> B[启动备份任务]
    B --> C{先完成者}
    C -->|备份任务| D[终止原任务]
    C -->|原任务| E[终止备份任务]

4.3 内存管理

五、YARN架构下的MR

5.1 组件变更

Hadoop 1.0 Hadoop 2.0+
JobTracker ResourceManager
TaskTracker NodeManager
- ApplicationMaster

5.2 工作流程

  1. 客户端提交MR作业
  2. ResourceManager分配Container
  3. 启动ApplicationMaster
  4. AM向RM申请资源
  5. NM启动Map/Reduce任务

六、性能调优实践

6.1 参数优化示例

# Map任务数优化
mapreduce.job.maps = max(分片数, 节点数×每个节点容器数)

# Reduce任务数优化
mapreduce.job.reduces = min(节点数×每个节点容器数×0.95, 1.75×节点数×容器数)

6.2 常见问题处理

  1. 数据倾斜
    • 自定义Partitioner
    • 增加Reducer数量
  2. OOM错误
    • 调整JVM参数
    • 增加容器内存

七、MR与Spark对比

特性 MapReduce Spark
执行模型 批处理 微批/流式
速度 较慢(磁盘IO) 快(内存计算)
API复杂度 较低 较高
适用场景 超大规模离线处理 迭代计算/实时处理

八、实际应用案例

8.1 日志分析系统

// 日志统计Mapper示例
public class LogMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    public void map(Object key, Text value, Context context) {
        String[] parts = value.toString().split("\\t");
        try {
            Date time = sdf.parse(parts[0]);
            Calendar cal = Calendar.getInstance();
            cal.setTime(time);
            context.write(new Text(cal.get(Calendar.HOUR_OF_DAY)+""), new IntWritable(1));
        } catch (ParseException e) {
            // 错误处理
        }
    }
}

8.2 推荐系统矩阵计算

使用MR实现协同过滤: 1. Map阶段计算用户向量 2. Reduce阶段计算相似度矩阵

九、未来发展趋势

  1. 与Kubernetes集成
  2. 向量化计算支持
  3. 更细粒度的资源调度

附录:关键配置参数表

参数 默认值 说明
mapreduce.task.timeout 600000ms 任务超时时间
mapreduce.map.maxattempts 4 Map任务最大重试次数
mapreduce.reduce.maxattempts 4 Reduce任务最大重试次数
mapreduce.job.ubertask.enable false 小作业优化开关

注:本文基于Hadoop 3.x版本分析,不同版本实现细节可能有所差异。 “`

(实际字数:约3800字,此处为精简展示版)

推荐阅读:
  1. WAF的工作机制
  2. 什么是MySQL的整体架构

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java

上一篇:Flume基础架构是什么

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》