您好,登录后才能下订单哦!
# Hadoop中怎么实现MapReduce的数据输入
## 1. MapReduce数据输入概述
MapReduce作为Hadoop的核心计算框架,其数据处理流程始于数据输入阶段。数据输入机制决定了MapReduce作业如何读取原始数据并将其转化为可供Mapper处理的键值对形式。在Hadoop生态系统中,数据输入过程具有以下核心特点:
1. **分布式特性**:输入数据通常存储在HDFS上,被自动划分为多个数据块(默认128MB)
2. **并行处理**:每个数据分片(InputSplit)由一个独立的Mapper任务处理
3. **格式无关性**:通过InputFormat抽象类支持多种数据格式处理
4. **位置感知**:遵循"移动计算比移动数据更高效"原则,尽量在数据所在节点执行计算
## 2. 核心组件与工作机制
### 2.1 InputFormat抽象类
作为数据输入的最高层抽象,InputFormat定义了两个核心职责:
```java
public abstract class InputFormat<K, V> {
// 获取输入数据的分片信息
public abstract List<InputSplit> getSplits(JobContext context);
// 创建RecordReader用于读取分片数据
public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context);
}
InputSplit表示逻辑上的数据分片,包含以下关键信息:
public abstract class InputSplit {
public abstract long getLength();
public abstract String[] getLocations();
}
RecordReader负责将InputSplit转化为具体的键值对记录:
public abstract class RecordReader<KEYIN, VALUEIN> {
// 初始化方法
public abstract void initialize(InputSplit split, TaskAttemptContext context);
// 读取下一条记录
public abstract boolean nextKeyValue();
// 获取当前键
public abstract KEYIN getCurrentKey();
// 获取当前值
public abstract VALUEIN getCurrentValue();
// 获取进度
public abstract float getProgress();
// 关闭资源
public abstract void close();
}
处理纯文本文件的默认实现,特点包括: - 每行文本记录 - 键为LongWritable类型(字节偏移量) - 值为Text类型(行内容)
// 典型使用方式
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);
处理键值对文本的特殊格式:
- 每行格式为”key[分隔符]value”
- 默认分隔符是制表符(\t)
- 可通过mapreduce.input.keyvaluelinerecordreader.key.value.separator
配置
二进制文件输入格式,支持三种类型:
1. SequenceFileInputFormat
CompositeInputFormat 用于多路径输入场景: - 支持多个输入路径的联合查询 - 通过join表达式定义数据关联方式 - 典型应用场景:Map端join操作
// 配置示例
conf.set("mapreduce.join.expr",
CompositeInputFormat.compose("inner",
KeyValueTextInputFormat.class, "/path1", "/path2"));
需要自定义输入格式的典型场景: - 处理非标准格式数据(如二进制协议) - 需要特殊分片逻辑(如不可分割的压缩格式) - 多数据源组合输入 - 需要预处理原始数据
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new CustomRecordReader();
}
// 可选:覆盖分片逻辑
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false; // 示例:禁止分片
}
}
public class CustomRecordReader extends RecordReader<LongWritable, Text> {
private LineReader in;
private LongWritable key;
private Text value;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// 初始化读取器
FileSplit fileSplit = (FileSplit)split;
Configuration conf = context.getConfiguration();
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(path);
in = new LineReader(fileIn, conf);
}
@Override
public boolean nextKeyValue() {
// 读取下一条记录
key = new LongWritable();
value = new Text();
int bytesRead = in.readLine(value);
if (bytesRead == 0) return false;
key.set(offset); // 设置偏移量
return true;
}
// 其他方法实现...
}
job.setInputFormatClass(CustomInputFormat.class);
FileInputFormat.addInputPath(job, new Path("input/"));
通过FileInputFormat设置路径过滤器:
// 自定义过滤器
public class RegexFilter extends PathFilter {
private final String regex;
public RegexFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return path.toString().matches(regex);
}
}
// 使用配置
FileInputFormat.setInputPathFilter(job, RegexFilter.class);
conf.set("filter.pattern", ".*\\.data$");
// 添加多个输入路径
FileInputFormat.addInputPath(job, new Path("input1/"));
FileInputFormat.addInputPath(job, new Path("input2/"));
// 为不同路径设置不同InputFormat
MultipleInputs.addInputPath(job, new Path("input1/"),
TextInputFormat.class, Mapper1.class);
MultipleInputs.addInputPath(job, new Path("input2/"),
SequenceFileInputFormat.class, Mapper2.class);
通过InputSampler实现数据采样,优化分片:
// 创建采样器
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<>(0.1, 1000, 10);
// 写入分区文件
InputSampler.writePartitionFile(job, sampler);
// 配置TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
计算公式:
splitSize = max(minSize, min(maxSize, blockSize))
配置参数:
<!-- 最小分片大小 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>
</property>
<!-- 最大分片大小 -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>256000000</value>
</property>
支持压缩格式: - DEFLATE (.deflate) - gzip (.gz) - bzip2 (.bz2) - LZO (.lzo) - Snappy (.snappy)
自动检测机制:
CompressionCodecFactory codecFactory =
new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(filePath);
通过以下配置提高数据本地化:
<property>
<name>mapreduce.job.split.metainfo.maxsize</name>
<value>10000000</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>134217728</value> <!-- 匹配HDFS块大小 -->
</property>
解决方案对比:
方案 | 优点 | 缺点 |
---|---|---|
CombineFileInputFormat | 自动合并小文件 | 需要内存缓冲 |
HAR归档文件 | 减少NameNode压力 | 需要额外归档步骤 |
SequenceFile合并 | 高效二进制格式 | 不可直接查看内容 |
实现示例:
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 256000000);
应对策略: 1. 预处理阶段数据采样 2. 自定义分区器(Partitioner) 3. 使用Combiner减少数据传输 4. 倾斜键单独处理
增强鲁棒性的方法:
// 在RecordReader中实现容错逻辑
try {
// 解析记录
} catch (MalformedRecordException e) {
context.getCounter("Error", "BadRecords").increment(1);
if (skipBadRecords) continue;
else throw e;
}
通过深入理解MapReduce数据输入机制,开发者可以针对不同业务场景选择最优的输入策略,构建高效可靠的大数据处理管道。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。