您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Java中整体MR工作机制是怎样的
## 一、MR(MapReduce)概述
### 1.1 什么是MapReduce
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于大规模数据集(大于1TB)的并行运算。其核心思想是将计算过程分解为两个主要阶段:
- **Map阶段**:对输入数据进行分块处理
- **Reduce阶段**:对Map结果进行汇总
### 1.2 Hadoop中的实现
Apache Hadoop在开源实现中提供了完整的MapReduce框架,主要包含:
- JobTracker(主节点)
- TaskTracker(从节点)
- HDFS(分布式文件系统)
## 二、MR工作流程详解
### 2.1 整体架构图
```mermaid
graph LR
A[Input] --> B[Split]
B --> C[Map]
C --> D[Shuffle]
D --> E[Reduce]
E --> F[Output]
// 典型Mapper实现
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) {
String[] words = value.toString().split(" ");
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
关键过程: 1. Partitioning:默认使用HashPartitioner
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
输出到HDFS的指定目录,每个Reducer产生一个输出文件:
part-r-00000
part-r-00001
...
类型 | 说明 |
---|---|
InputFormat | 定义输入数据格式 |
OutputFormat | 定义输出数据格式 |
Partitioner | 决定Mapper输出到哪个Reducer |
RecordReader | 读取输入记录 |
graph TD
A[慢任务检测] --> B[启动备份任务]
B --> C{先完成者}
C -->|备份任务| D[终止原任务]
C -->|原任务| E[终止备份任务]
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
Hadoop 1.0 | Hadoop 2.0+ |
---|---|
JobTracker | ResourceManager |
TaskTracker | NodeManager |
- | ApplicationMaster |
# Map任务数优化
mapreduce.job.maps = max(分片数, 节点数×每个节点容器数)
# Reduce任务数优化
mapreduce.job.reduces = min(节点数×每个节点容器数×0.95, 1.75×节点数×容器数)
特性 | MapReduce | Spark |
---|---|---|
执行模型 | 批处理 | 微批/流式 |
速度 | 较慢(磁盘IO) | 快(内存计算) |
API复杂度 | 较低 | 较高 |
适用场景 | 超大规模离线处理 | 迭代计算/实时处理 |
// 日志统计Mapper示例
public class LogMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void map(Object key, Text value, Context context) {
String[] parts = value.toString().split("\\t");
try {
Date time = sdf.parse(parts[0]);
Calendar cal = Calendar.getInstance();
cal.setTime(time);
context.write(new Text(cal.get(Calendar.HOUR_OF_DAY)+""), new IntWritable(1));
} catch (ParseException e) {
// 错误处理
}
}
}
使用MR实现协同过滤: 1. Map阶段计算用户向量 2. Reduce阶段计算相似度矩阵
参数 | 默认值 | 说明 |
---|---|---|
mapreduce.task.timeout | 600000ms | 任务超时时间 |
mapreduce.map.maxattempts | 4 | Map任务最大重试次数 |
mapreduce.reduce.maxattempts | 4 | Reduce任务最大重试次数 |
mapreduce.job.ubertask.enable | false | 小作业优化开关 |
注:本文基于Hadoop 3.x版本分析,不同版本实现细节可能有所差异。 “`
(实际字数:约3800字,此处为精简展示版)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。