mapreduce 模板代码

发布时间:2020-06-27 01:52:39 作者:jethai
来源:网络 阅读:713



jai包

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
</dependency>

2.x以后就拆成一些零散的包了,没有core包了



代码:

package org.conan.myhadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
//org.apache.hadoop.mapred 老系统的包
//org.apache.hadoop.mapreduce 新系统的包 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * ModuleMapReduce Class
 * 单纯的注释 
 */
public class ModuleMapReduce extends Configured implements Tool {

    /**
     * 
     * ModuleMapper Class 不仅有注释的功效而且你鼠标放在你注释的方法上面他会把你注释的内容显示出来,
     * 
     */
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, LongWritable, Text>

    {

        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {

            super.setup(context);
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO

        }

        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {

            super.cleanup(context);
        }

    }

    /**
     * 
     * ModuleReducer Class
     * 
     */
    public static class ModuleReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            super.setup(context);
        }

        @Override
        protected void reduce(LongWritable key, Iterable<Text> value,
                Context context) throws IOException, InterruptedException {
            // TODO

        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

    }

    // Driver 驱动
    // @Override //实现接口时关键字1.5和1.7的JDK都会报错,只有1.6不报错
    public int run(String[] args) throws Exception {
        Job job = parseInputAndOutput(this, this.getConf(), args);
        // 2.set job

        // step 1:set input
        job.setInputFormatClass(TextInputFormat.class);

        // step 3:set mappper class
        job.setMapperClass(ModuleMapper.class);
        // step 4:set mapout key/value class
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // step 5:set shuffle(sort,combiner,group)
        // set sort
        job.setSortComparatorClass(LongWritable.Comparator.class);
        // set combiner(optional,default is unset)必须是Reducer的子类
        job.setCombinerClass(ModuleReducer.class);
        // set grouping
        job.setGroupingComparatorClass(LongWritable.Comparator.class);
        // step 6 set reducer class
        job.setReducerClass(ModuleReducer.class);
        // step 7:set job output key/value class
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        // step 8:set output format
        job.setOutputFormatClass(FileOutputFormat.class);

        // step 10: submit job
        Boolean isCompletion = job.waitForCompletion(true);// 提交job
        return isCompletion ? 0 : 1;
    }

    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
            throws IOException {
        // 输入参数的合法性
        if (args.length != 2) {
            System.err.printf(
                    "Usage: %s [generic options] <input> <output> \n ", tool
                            .getClass().getSimpleName());
      //%s表示输出字符串,也就是将后面的字符串替换模式中的%s
            ToolRunner.printGenericCommandUsage(System.err);
            return null;
        }

        // 1.create job

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(ModuleMapReduce.class);
        // step 2:set input path
        Path inputPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputPath);
        // step 9:set output path
        Path outputPath = new Path(args[0]);
        FileOutputFormat.setOutputPath(job, outputPath);

        return job;
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即为isCompletion ? 0 : 1
            System.exit(status);// System.exit(0)中断虚拟机的运行,退出应用程序,0表示没有异常正常退出。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


倒排索引代码


输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789    112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开

要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割

package org.conan.myhadoop.mr;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReverseIndex extends Configured implements Tool {

    enum Counter {
        LINESKIP, // 出错的行
    }

    public static class Map extends Mapper {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); // 读取源数据
            try {
                // 数据处理
                String[] lineSplit = line.split(" ");
                String anum = lineSplit[0];
                String bnum = lineSplit[1];
                context.write(new Text(bnum), new Text(anum)); // 输出

            } catch (java.lang.ArrayIndexOutOfBoundsException e) {
                context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1
                return;
            }
        }
    }

    public static class Reduce extends Reducer {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String valueString;
            String out = "";
            for (Text value : values) {
                valueString = value.toString();
                out += valueString + "|";
                System.out.println("Ruduce:key=" + key + "  value=" + value);
            }
            context.write(key, new Text(out));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        Job job = new Job(conf, "ReverseIndex"); // 任务名
        job.setJarByClass(ReverseIndex.class); // 指定Class

        FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

        job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
        job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
        job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式

        job.waitForCompletion(true);

        // 输出任务完成情况
        System.out.println("任务名称:" + job.getJobName());
        System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
        System.out.println("输入行数:"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_INPUT_RECORDS").getValue());
        System.out.println("输出行数:"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_OUTPUT_RECORDS").getValue());
        System.out.println("跳过的行:"
                + job.getCounters().findCounter(Counter.LINESKIP).getValue());

        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        // 判断参数个数是否正确
        // 如果无参数运行则显示以作程序说明
        if (args.length != 2) {
            System.err.println("");
            System.err
                    .println("Usage: ReverseIndex < input path > < output path > ");
            System.err
                    .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");

            System.exit(-1);
        }
        // 记录开始时间
        DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date start = new Date();
        // 运行任务
        int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);

        // 输出任务耗时
        Date end = new Date();
        float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
        System.out.println("任务开始:" + formatter.format(start));
        System.out.println("任务结束:" + formatter.format(end));
        System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");

        System.exit(res);
   }
    
}


去重代码

 //Mapper任务
      static class DDMap extends Mapper<LongWritable,Text,Text,Text>{
       private static Text line = new Text();
       protected void map(LongWritable k1,Text v1,Context context){
        line = v1;
        Text text = new Text("");
         try {
          context.write(line,text);
         } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         }
       };
      }

    //Reducer任务
      static class DDReduce extends Reducer<Text,Text,Text,Text>{
       protected void reduce(Text k2,Iterable<Text> v2s,Context context){
        Text text = new Text("");
        try {
         context.write(k2, text);
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        } catch (InterruptedException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       };
      }


参考文章;

一个经典的MapReduce模板代码,倒排索引(ReverseIndex)

http://blog.itpub.net/26400547/viewspace-1214945/

详解MapReduce实现数据去重与倒排索引应用场景案例

http://www.tuicool.com/articles/emi6Fb



推荐阅读:
  1. MapReduce计算框架
  2. MapReduce的入门

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

mapreduce代码 ce %pre

上一篇:Json和xml的拼接和解析

下一篇:互联网金融架构方案

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》