Hadoop中怎么实现MapReduce的数据输入

发布时间:2021-12-22 17:22:10 作者:iii
来源:亿速云 阅读:173
# Hadoop中怎么实现MapReduce的数据输入

## 1. MapReduce数据输入概述

MapReduce作为Hadoop的核心计算框架,其数据处理流程始于数据输入阶段。数据输入机制决定了MapReduce作业如何读取原始数据并将其转化为可供Mapper处理的键值对形式。在Hadoop生态系统中,数据输入过程具有以下核心特点:

1. **分布式特性**:输入数据通常存储在HDFS上,被自动划分为多个数据块(默认128MB)
2. **并行处理**:每个数据分片(InputSplit)由一个独立的Mapper任务处理
3. **格式无关性**:通过InputFormat抽象类支持多种数据格式处理
4. **位置感知**:遵循"移动计算比移动数据更高效"原则,尽量在数据所在节点执行计算

## 2. 核心组件与工作机制

### 2.1 InputFormat抽象类

作为数据输入的最高层抽象,InputFormat定义了两个核心职责:

```java
public abstract class InputFormat<K, V> {
    // 获取输入数据的分片信息
    public abstract List<InputSplit> getSplits(JobContext context);
    
    // 创建RecordReader用于读取分片数据
    public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context);
}

2.2 InputSplit实现

InputSplit表示逻辑上的数据分片,包含以下关键信息:

  1. 分片长度(字节数)
  2. 分片位置信息(存储节点列表)
  3. 分片数据的位置标识
public abstract class InputSplit {
    public abstract long getLength();
    public abstract String[] getLocations();
}

2.3 RecordReader组件

RecordReader负责将InputSplit转化为具体的键值对记录:

public abstract class RecordReader<KEYIN, VALUEIN> {
    // 初始化方法
    public abstract void initialize(InputSplit split, TaskAttemptContext context);
    
    // 读取下一条记录
    public abstract boolean nextKeyValue();
    
    // 获取当前键
    public abstract KEYIN getCurrentKey();
    
    // 获取当前值
    public abstract VALUEIN getCurrentValue();
    
    // 获取进度
    public abstract float getProgress();
    
    // 关闭资源
    public abstract void close();
}

3. 内置输入格式实现

3.1 TextInputFormat(默认文本输入)

处理纯文本文件的默认实现,特点包括: - 每行文本记录 - 键为LongWritable类型(字节偏移量) - 值为Text类型(行内容)

// 典型使用方式
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);

3.2 KeyValueTextInputFormat

处理键值对文本的特殊格式: - 每行格式为”key[分隔符]value” - 默认分隔符是制表符(\t) - 可通过mapreduce.input.keyvaluelinerecordreader.key.value.separator配置

3.3 SequenceFileInputFormat

二进制文件输入格式,支持三种类型: 1. SequenceFileInputFormat: 通用型 2. SequenceFileAsTextInputFormat: 将键值转为Text对象 3. SequenceFileAsBinaryInputFormat: 原始二进制格式

3.4 复合输入格式

CompositeInputFormat 用于多路径输入场景: - 支持多个输入路径的联合查询 - 通过join表达式定义数据关联方式 - 典型应用场景:Map端join操作

// 配置示例
conf.set("mapreduce.join.expr", 
    CompositeInputFormat.compose("inner", 
    KeyValueTextInputFormat.class, "/path1", "/path2"));

4. 自定义输入格式实现

4.1 实现场景

需要自定义输入格式的典型场景: - 处理非标准格式数据(如二进制协议) - 需要特殊分片逻辑(如不可分割的压缩格式) - 多数据源组合输入 - 需要预处理原始数据

4.2 实现步骤

4.2.1 继承InputFormat

public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) {
        return new CustomRecordReader();
    }
    
    // 可选:覆盖分片逻辑
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false; // 示例:禁止分片
    }
}

4.2.2 实现RecordReader

public class CustomRecordReader extends RecordReader<LongWritable, Text> {
    private LineReader in;
    private LongWritable key;
    private Text value;
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) {
        // 初始化读取器
        FileSplit fileSplit = (FileSplit)split;
        Configuration conf = context.getConfiguration();
        Path path = fileSplit.getPath();
        FileSystem fs = path.getFileSystem(conf);
        FSDataInputStream fileIn = fs.open(path);
        in = new LineReader(fileIn, conf);
    }
    
    @Override
    public boolean nextKeyValue() {
        // 读取下一条记录
        key = new LongWritable();
        value = new Text();
        int bytesRead = in.readLine(value);
        if (bytesRead == 0) return false;
        key.set(offset); // 设置偏移量
        return true;
    }
    // 其他方法实现...
}

4.2.3 配置使用

job.setInputFormatClass(CustomInputFormat.class);
FileInputFormat.addInputPath(job, new Path("input/"));

5. 高级输入控制技术

5.1 输入路径过滤

通过FileInputFormat设置路径过滤器:

// 自定义过滤器
public class RegexFilter extends PathFilter {
    private final String regex;
    
    public RegexFilter(String regex) {
        this.regex = regex;
    }
    
    @Override
    public boolean accept(Path path) {
        return path.toString().matches(regex);
    }
}

// 使用配置
FileInputFormat.setInputPathFilter(job, RegexFilter.class);
conf.set("filter.pattern", ".*\\.data$");

5.2 多路径输入处理

// 添加多个输入路径
FileInputFormat.addInputPath(job, new Path("input1/"));
FileInputFormat.addInputPath(job, new Path("input2/"));

// 为不同路径设置不同InputFormat
MultipleInputs.addInputPath(job, new Path("input1/"), 
    TextInputFormat.class, Mapper1.class);
MultipleInputs.addInputPath(job, new Path("input2/"),
    SequenceFileInputFormat.class, Mapper2.class);

5.3 输入采样与分片优化

通过InputSampler实现数据采样,优化分片:

// 创建采样器
InputSampler.Sampler<Text, Text> sampler = 
    new InputSampler.RandomSampler<>(0.1, 1000, 10);

// 写入分区文件
InputSampler.writePartitionFile(job, sampler);

// 配置TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);

6. 性能优化实践

6.1 分片大小调优

计算公式:

splitSize = max(minSize, min(maxSize, blockSize))

配置参数:

<!-- 最小分片大小 -->
<property>
    <name>mapreduce.input.fileinputformat.split.minsize</name>
    <value>0</value>
</property>

<!-- 最大分片大小 -->
<property>
    <name>mapreduce.input.fileinputformat.split.maxsize</name>
    <value>256000000</value>
</property>

6.2 压缩数据输入处理

支持压缩格式: - DEFLATE (.deflate) - gzip (.gz) - bzip2 (.bz2) - LZO (.lzo) - Snappy (.snappy)

自动检测机制:

CompressionCodecFactory codecFactory = 
    new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(filePath);

6.3 本地化优化

通过以下配置提高数据本地化:

<property>
    <name>mapreduce.job.split.metainfo.maxsize</name>
    <value>10000000</value>
</property>
<property>
    <name>mapreduce.input.fileinputformat.split.maxsize</name>
    <value>134217728</value> <!-- 匹配HDFS块大小 -->
</property>

7. 常见问题解决方案

7.1 小文件问题

解决方案对比:

方案 优点 缺点
CombineFileInputFormat 自动合并小文件 需要内存缓冲
HAR归档文件 减少NameNode压力 需要额外归档步骤
SequenceFile合并 高效二进制格式 不可直接查看内容

实现示例:

job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 256000000);

7.2 数据倾斜处理

应对策略: 1. 预处理阶段数据采样 2. 自定义分区器(Partitioner) 3. 使用Combiner减少数据传输 4. 倾斜键单独处理

7.3 格式异常处理

增强鲁棒性的方法:

// 在RecordReader中实现容错逻辑
try {
    // 解析记录
} catch (MalformedRecordException e) {
    context.getCounter("Error", "BadRecords").increment(1);
    if (skipBadRecords) continue;
    else throw e;
}

8. 未来发展趋势

  1. 向量化输入:Apache ORC/Parquet等列式存储的向量化读取
  2. 云原生存储适配:对S3、OSS等对象存储的优化支持
  3. 智能分片:基于机器学习预测的自动分片大小调整
  4. 流批一体:与Flink等流处理框架的统一输入接口

通过深入理解MapReduce数据输入机制,开发者可以针对不同业务场景选择最优的输入策略,构建高效可靠的大数据处理管道。 “`

推荐阅读:
  1. Hadoop 之 MapReduce
  2. Hadoop MapReduce

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

hadoop mapreduce

上一篇:Zabbix snmptrap配置的示例分析

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》