您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 如何进行MapReduce中的JobSplit源码分析
## 一、引言
MapReduce作为Hadoop的核心计算框架,其任务划分(JobSplit)机制直接影响分布式计算的效率。本文将通过源码层面深入分析JobSplit的实现原理、关键流程和核心类,帮助开发者理解任务划分的内部机制。
## 二、JobSplit概述
### 2.1 基本概念
JobSplit是MapReduce作业执行前的关键阶段,主要完成:
- 输入数据分片(InputSplit)的生成
- 确定每个Map任务处理的数据范围
- 优化数据本地化(Data Locality)
### 2.2 核心价值
```java
// 典型分片信息示例
public class FileSplit implements InputSplit {
private Path file; // 文件路径
private long start; // 起始偏移量
private long length; // 分片长度
private String[] hosts; // 数据所在节点
}
org.apache.hadoop.mapreduce.split
org.apache.hadoop.mapreduce.lib.input
# 推荐使用远程调试
export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
调用栈示例:
JobSubmitter.submitJobInternal()
→ JobWriter.writeSplits()
→ InputFormat.getSplits()
→ FileInputFormat.getSplits()
FileInputFormat.getSplits()
核心逻辑:
public List<InputSplit> getSplits(JobContext job) throws IOException {
// 1. 获取最小分片大小(minSize)和最大分片大小(maxSize)
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// 2. 遍历所有输入文件生成分片
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
// 3. 计算分片大小(splitSize)
long splitSize = computeSplitSize(minSize, maxSize, blockSize);
// 4. 生成实际分片
while (bytesRemaining / splitSize > SPLIT_SLOP) {
splits.add(makeSplit(path, start, splitSize, blkLocations[blkIndex]));
}
}
}
// 分片大小计算公式
protected long computeSplitSize(long minSize, long maxSize, long blockSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
通过BlockLocation
获取数据位置信息:
public class BlockLocation {
private String[] hosts; // 物理节点地址
private String[] names; // 主机名:端口
private long offset; // 块在文件中的偏移量
private long length;
}
CompressedInputFormat
会强制单个文件不分割:
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec = new CompressionCodecFactory().getCodec(file);
return (codec == null);
}
CombineFileInputFormat
实现策略:
public List<InputSplit> getSplits(JobContext job) throws IOException {
// 合并小文件直到达到splitSize
while (remainingLength > 0) {
long splitLength = Math.min(maxSize, remainingLength);
splits.add(new CombineFileSplit(paths, offsets, lengths));
}
}
合理设置分片大小:
<!-- mapred-site.xml -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>134217728</value> <!-- 128MB -->
</property>
自定义InputFormat示例:
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new CustomRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false; // 禁止文件分割
}
}
分片不均问题:
getSplits()
实现数据本地化失效:
# 查看分片信息
hadoop job -view-split job_xxx
通过源码分析我们可以理解: 1. JobSplit是逻辑划分而非物理切割 2. 分片大小受blockSize、minSize、maxSize三重影响 3. 数据本地化通过BlockLocation实现
附录: - Hadoop源码仓库 - 相关类图:
[InputFormat] <|-- [FileInputFormat]
[FileInputFormat] <|-- [TextInputFormat]
[FileInputFormat] <|-- [CombineFileInputFormat]
”`
注:实际分析时应结合具体Hadoop版本,本文基于2.7.x版本分析。建议通过单元测试TestFileInputFormat
进行验证性学习。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。