MapReduce怎么使用

发布时间:2021-12-30 13:59:44 作者:iii
来源:亿速云 阅读:150

本篇内容主要讲解“MapReduce怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce怎么使用”吧!

  1. 什么是MR

    MR是一种分布计算模型,主要用来解决海量数据的计算问题的。它包含了两种计算函数,一个是Mapping,另外一个是Reducing。Mapping对集合内的每个目标做同一个操作,Reduceing则是遍历集合中的元素返回一个综合的结果。我们操作代码时,只需要重写map和reduce方法就行,十分简单。这两个函数的形参都是k,v对,当数据量到达10PB以上时,则会速度变慢。

  2. MR执行过程

    MR程序启动时,会把输入文件转化成<k1,v1>键值对传给map函数,有几个键值对就执行几次map函数,但不是说有几个键值对就有几个Mapper进程,这是不对的。经过map函数处理,变成<k2,v2>键值对。由<k2,v2>转变成reduce函数的输入<k2,{v2,.....}>的过程被称之为shuffle。shuffle并不是象map和reduce这样的某个函数,不是需要单独拿出节点运行的,它仅仅只是一个过程。<k2,{v2...}>进过reduce函数处理,变成了最后的输出<k3,v3>。在到达reduce函数之前,键值对的数目是不变的。

                  Map阶段

     (1).根据输入文件解析成<k1,v1>对,每一对调用一次map函数

     (2).根据自己编写的map函数,将键值对处理,变成新的<k2,v2>键值对输出 

     (3).对输出的键值对进行分区,不同分区对应着不同的Reducer进程

     (4).每个分区中的键值对,根据key进行排序,分组。然后把相同key的val放到同一个集合中。

      (5).进行规约(可选)

                     Reduce阶段

         (1).多个map函数输出的kv对,按照不同分区,传输到不同的reduce节点上。

          (2).将多个map函数输出的kv对合并,排序。根据reduce函数逻辑,处理<k2,{v2..}>,转换成新的键值对输出

         (3).输出保存文件

  3.简单例子

      Wordcount

public class WordCount {
 public static class  MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
  Text k2=new Text();
  LongWritable v2=new LongWritable();
  @Override
  protected void map(LongWritable k1, Text v1,Context context)
    throws IOException, InterruptedException {
    String[] words=v1.toString().split("\t");
    for (String string : words) {
     k2.set(string);
     v2.set(1L);
    context.write(k2, v2);
   }
  }
 }
 public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
  LongWritable v3=new LongWritable();
  @Override
  protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
   long sum=0; 
   for (LongWritable longWritable : v2s) {
    sum=sum+longWritable.get();
   }
   v3.set(sum);
   context.write(k2, v3);
  }
  
 }
 public static void main(String[] args) throws Exception {
  Configuration conf=new Configuration();
  Job job=Job.getInstance(conf, WordCount.class.getSimpleName());
  job.setJarByClass(WordCount.class);
  job.setMapperClass(MyMapper.class);
  job.setReducerClass(MyReduce.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(LongWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  
  FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt"));
  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4"));
  
  job.waitForCompletion(true);
 } 
 
}

4.MR的序列化

   序列化就是把结构化的对象转换为字节流,在MR中,他没有用java自己的序列化,而是自己实现了一套序列化。因为相比较而言,hadoop的序列化有着诸多优点。在mr程序中,我们的参数和输出的键值对全都是实现了序列化的对象,当我们需要自订一个序列化对象,该如何操作呢?只需要实现Writable接口即可,当然key需要实现WritableComparable接口,因为需要根据key来排序和分组。

   接着有个小例子来展示序列化。就是电信流量的处理例子。

public class LiuLiang {
 public static class MyMapper extends Mapper<LongWritable, Text, Text, MyArrayWritable>{
  Text k2=new Text();
  MyArrayWritable v2=new MyArrayWritable();
  LongWritable v21=new LongWritable();
  LongWritable v22=new LongWritable();
  LongWritable v23=new LongWritable();
  LongWritable v24=new LongWritable();
  LongWritable[] values=new LongWritable[4];
  @Override
  protected void map(LongWritable k1, Text v1, Context context)
      throws IOException, InterruptedException {
    String[] words=v1.toString().split("\t");
    k2.set(words[1]);
    v21.set(Long.parseLong(words[6]));
    v22.set(Long.parseLong(words[7]));
    v23.set(Long.parseLong(words[8]));
    v24.set(Long.parseLong(words[9]));
    values[0]=v21;
    values[1]=v22;
    values[2]=v23;
    values[3]=v24;
    v2.set(values);
    context.write(k2, v2);
  }
 }
 public static class MyReduce extends Reducer<Text, MyArrayWritable, Text, Text>{
  Text v3=new Text();
  @Override
  protected void reduce(Text k2, Iterable<MyArrayWritable> v2s, Context context)
      throws IOException, InterruptedException {
    long sum1=0;
    long sum2=0;
    long sum3=0;
    long sum4=0;
    for (MyArrayWritable myArrayWritable : v2s) {
     Writable[] values= myArrayWritable.get();
     sum1=sum1+((LongWritable)values[0]).get();
     sum2=sum2+((LongWritable)values[1]).get();
     sum3=sum3+((LongWritable)values[2]).get();
     sum4=sum4+((LongWritable)values[3]).get();
   }
    v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4);
    context.write(k2, v3);
  }
 }
 public static void main(String[] args) throws Exception {
  Configuration conf=new Configuration();
  Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName());
  job.setJarByClass(LiuLiang.class);
  job.setMapperClass(MyMapper.class);
  job.setReducerClass(MyReduce.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(MyArrayWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  
  FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat"));
  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3"));
  
  job.waitForCompletion(true);
 } 
}
class MyArrayWritable extends ArrayWritable{
 public MyArrayWritable(){
  super(LongWritable.class);
 }
 public MyArrayWritable(String[] arg0) {
  super(arg0);
 }
 
}

5.SequenceFile

    在HDFS的学习中,提到了小文件的解决方案,其中一个便是这个SequenceFile。他是一种无序存储,将kv对序列化到文件中,从而合并许多小文件并且支持压缩。缺点是必须遍历才能查看里面各个小文件。

public class SequenceFileTest {
 public static void main(String[] args) throws Exception{
  Configuration conf = new Configuration();
  FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop");
  //Write(conf, fileSystem);
  Read(conf, fileSystem);
 }
 
 
 private static void Read(Configuration conf, FileSystem fileSystem) throws IOException {
  Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf);
  Text key=new Text();
  Text val=new Text();
  while(reader.next(key, val)){
   System.out.println(key.toString()+"----"+val.toString());
  }
  IOUtils.closeStream(reader);
 }
 
 
 private static void Write(Configuration conf, FileSystem fileSystem) throws IOException {
  Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class);
  Collection<File> files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false);
  for (File file : files) {
   Text text = new Text();
   text.set(FileUtils.readFileToString(file));
   writer.append(new Text(file.getName()), text);
  }
  IOUtils.closeStream(writer);
 }
}

到此,相信大家对“MapReduce怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. MapReduce on Hbase
  2. MapReduce 调优

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

mapreduce

上一篇:Mapreduce程序中reduce的Iterable参数问题怎么解决

下一篇:MapReduce的二次排序使用什么参数

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》