hadoop的map函数使用方法是什么

发布时间:2021-12-10 09:56:19 作者:iii
来源:亿速云 阅读:278

这篇文章主要介绍“hadoop的map函数使用方法是什么”,在日常操作中,相信很多人在hadoop的map函数使用方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”hadoop的map函数使用方法是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

大表关联小表时可以使用hadoop的DistributedCache把小标缓存到内存中,由hadoop分发这些内存到每台需要map操作的服务器上进行数据的清洗,关联。

例如有这样一份数据用户登陆信息login:

1,0,20121213
2,0,20121213
3,1,20121213
4,1,20121213
1,0,20121114

第一列是用户id,二列是性别,第三列是登陆时间 。

需要将表中的用户id,替换成用户的名字,性别替换成汉字,然后统计他的登陆次数。

其中users表为:

1,张三,hubei
3,王五,tianjin
4,赵六,guangzhou
2,李四,beijing

sex表为:

0,男
1,女       

map函数中进行维表的关联,输出为姓名,性别为key,登陆1次为value。

public class Mapclass extends Mapper<LongWritable, Text, Text, Text> {
    private Map<String, String> userMap = new HashMap<String, String>();
    private Map<String, String> sexMap = new HashMap<String, String>();
    private Text oKey = new Text();
    private Text oValue = new Text();
    private String[] kv;
    @Override
    protected void setup(Context context) {
        BufferedReader in = null;
        // 从当前作业中获取要缓存的文件
        try {
            Path[] paths = DistributedCache.getLocalCacheFiles(context
                    .getConfiguration());
            String uidNameAddr = null;
            String sidSex = null;
            for (Path path : paths) {
                if (path.toString().contains("users")) {
                    in = new BufferedReader(new FileReader(path.toString()));
                    while (null != (uidNameAddr = in.readLine())) {
                        userMap.put(uidNameAddr.split(",", -1)[0],
                                uidNameAddr.split(",", -1)[1]);
                    }
                } else if (path.toString().contains("sex")) {
                    in = new BufferedReader(new FileReader(path.toString()));
                    while (null != (sidSex = in.readLine())) {
                        sexMap.put(sidSex.split(",", -1)[0],
                                sidSex.split(",", -1)[1]);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        kv = value.toString().split(",");
        // map join: 在map阶段过滤掉不需要的数据
        if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) {
            oKey.set(userMap.get(kv[0]) + "," + sexMap.get(kv[1]));
            oValue.set("1");
            context.write(oKey, oValue);
        }
    }
}

reduce函数:

public class Reduce extends Reducer<Text, Text, Text, Text> {
    private Text oValue = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        int sumCount = 0;
        for (Text val : values) {
            sumCount += Integer.parseInt(val.toString());
        }
        oValue.set(String.valueOf(sumCount));
        context.write(key, oValue);
    }
}

main函数为:

public class MultiTableJoin extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "MultiTableJoin");
        job.setJobName("MultiTableJoin");
        job.setJarByClass(MultiTableJoin.class);
        job.setMapperClass(Mapclass.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
                args).getRemainingArgs();
        // 我们把第1、2个参数的地址作为要缓存的文件路径
        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),
                job.getConfiguration());
        DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(),
                job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[2]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] arg0) throws Exception {
        String[] args = new String[4];
        args[0] = "hdfs://172.16.0.87:9000/user/jeff/decli/sex";
        args[1] = "hdfs://172.16.0.87:9000/user/jeff/decli/users";
        args[2] = "hdfs://172.16.0.87:9000/user/jeff/decli/login";
        args[3] = "hdfs://172.16.0.87:9000/user/jeff/decli/out";
        int res = ToolRunner.run(new Configuration(), new MultiTableJoin(),
                args);
        System.exit(res);
    }
}

计算的输出为:

张三,男    2
李四,男    1
王五,女    1
赵六,女    1

到此,关于“hadoop的map函数使用方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. 【Hadoop】Map和Reduce个数问题
  2. hadoop中map如何输出

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

map hadoop

上一篇:怎么在Hadoop-1.2.1中跑wordcount

下一篇:HIVE的数据存储在哪

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》