您好,登录后才能下订单哦!
这篇文章将为大家详细讲解有关hadoop中如何利用mapreduce实现wordcount和电影评分预测,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
mapreduce中map指映射,map指的是归约。
mapreduce是一个key-value来处理数据的编程模型,它使用map将一组key-value映射为另一组key-value
通过底层传递给reduce,在reduce中,它将所有map过程传递过来的key-value进行归约,相同的key值,value值会放在一起。mapreduce内部还会对reduce过程中的key值进行一次排序。
一.WordCount
public class WordCount { // public static final String HDFS = "hdfs://localhost:8888"; public static final Pattern DELIMITER = Pattern.compile("\\b([a-zA-Z]+)\\b"); //自定义Map类型执行 "映射"这一部分 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { //mapreduce中,Text相当于String类型,IntWritable相当于Int类型 //LongWritable是实现了WritableComparable的一个数据类型。 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override //重写父类map()函数 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行数据 String line = value.toString(); //将该行字符全部变为小写 line = line.toLowerCase(); //根据定义好的正则表达式拆分一行字符串。 Matcher matcher = DELIMITER.matcher(line); while(matcher.find()){ //将分解的一个个单词类型转化为Text。 word.set(matcher.group()); //将相应的key-value值传入。key值为单词,value值为1. context.write(word,one); } } } //自定义Combine过程先对本地进行的map进行一次reduce过程,减少传递给主机的数据量. public static class Combine extends Reducer <Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //遍历同一个key值的所有value,所有的value放在同一个Iterable中。 for (IntWritable line : values) { sum += line.get(); } IntWritable value = new IntWritable(sum); //将key-value按照指定的输出格式输出。 context.write(key, value); } } public static class Reduce extends Reducer <Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable line : values) { sum += line.get(); } IntWritable value = new IntWritable(sum); context.write(key, value); } } public static void main(String[] args) throws Exception { JobConf conf = WordCount.config(); String input = "data/1.txt"; String output = HDFS + "/user/hdfs/wordcount"; //自定义HDFS文件操作工具类 HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf); //移除存在的文件否则会报文件生成文件已存在的错误 hdfs.rmr(output); Job job = new Job(conf); job.setJarByClass(WordCount.class); //设置输出的key值类型 job.setOutputKeyClass(Text.class); //设置输出的value值类型 job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCount.Map.class); job.setCombinerClass(WordCount.Combine.class); job.setReducerClass(WordCount.Reduce.class); job.setInputFormatClass(TextInputFormat.class); //设置输出的格式,这里使用的是自定义的FileOutputFormat类,见下文。 job.setOutputFormatClass(ParseTextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static JobConf config() { JobConf conf = new JobConf(WordCount.class); conf.setJobName("WordCount"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); // conf.set("io.sort.mb", ""); return conf; } }
自定义文件输出格式
import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class ParseTextOutputFormat<K, V> extends FileOutputFormat<K, V>{ protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get("mapred.textoutputformat.separator", ":"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } }
二.电影评分预测
整个算法的实现中使用了slop one算法来预测评分,此处自定义的输出类与上文一致。
输入文件格式为userId::movieId::score
package main.java.org.conan.myhadoop.recommend; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; import org.apache.hadoop.mapred.JobConf; import main.java.org.conan.myhadoop.hdfs.HdfsDAO; public class Recommend { public static final String HDFS = "hdfs://localhost:8888"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); public static final Pattern STRING = Pattern.compile("[\t,:]"); // public final static int movieListLength = 100000; // public static int []movieList = new int[movieListLength]; public static List movieList = new ArrayList(); public static Map userScore = new HashMap(); public static void main(String[] args) throws Exception { Map<String, String> path = new HashMap<String, String>(); String in = "logfile/4.txt"; String out = HDFS + "/user/hdfs/recommend" + "/step5"; // path.put("data", "logfile/small.csv"); // path.put("data", "logfile/ratings.dat"); if(args.length == 2){ in = args[0]; out = HDFS + args[1]; System.out.println(out); } //设置数据输入路径 path.put("data", in); //设置第一步输入文件路径 path.put("Step1Input", HDFS + "/user/hdfs/recommend"); //设置第一步结果输出路径 path.put("Step1Output", path.get("Step1Input") + "/step1"); //设置第二步输入文件路径 path.put("Step2Input", path.get("Step1Output")); //设置第二步结果输出路径 path.put("Step2Output", path.get("Step1Input") + "/step2"); //设置第三步输入文件路径 path.put("Step3Input1", path.get("data")); // path.put("Step3Input2", "logfile/movie/movies.dat"); //设置第三步结果输出路径 path.put("Step3Output", path.get("Step1Input") + "/step3"); // path.put("Step3Input2", path.get("Step2Output")); // path.put("Step3Output2", path.get("Step1Input") + "/step3_2"); // //设置第四步输入文件路径1 path.put("Step4Input1", path.get("Step2Output")); //设置第四步输入文件路径2 path.put("Step4Input2", path.get("Step3Output")); //设置第四步结果输出路径 path.put("Step4Output", path.get("Step1Input") + "/step4"); // //设置第五步输入文件路径 path.put("Step5Input", path.get("Step4Output")); // path.put("Step5Input2", path.get("Step3Output2")); //设置第五步结果输出路径 path.put("Step5Output", out); //第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵 Step1.run(path); //根据第一步的输出结果计算物品评分的同现矩阵 Step2.run(path); //获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0 Step3.run(path); //根据第二步与第三步的结果计算出每位用户对每部电影的评分 Step4.run(path); //整理输出格式。 Step5.run(path); System.exit(0); } public static JobConf config() { JobConf conf = new JobConf(Recommend.class); conf.setJobName("Recommand"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); // conf.set("io.sort.mb", ""); return conf; } }
//求出用户对物品的评分矩阵,即得出用户对电影 的评分矩阵 //每一行数据代表一个用户对所有打分电影的结果 //key值为userId, value值为movieID:score,movieId:score public class Step1 { public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> { private final static Text k = new Text(); private final static Text v = new Text(); @Override public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String[] tokens = value.toString().split("::"); String itemID = tokens[1]; String pref = tokens[2]; k.set(tokens[0]); v.set(itemID + ":" + pref); output.collect(k, v); } } public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String value= ""; int num = 0; while (values.hasNext()) { num++; value += values.next(); value += ","; if( num >= 400 ){ value = value.substring(0, value.length() - 1); Text v = new Text(value); output.collect(key, v); value = ""; num = 0; break; } } if(num != 0){ value = value.substring(0, value.length() - 1); Text v = new Text(value); output.collect(key, v); } } } public static void run(Map<String, String> path) throws IOException { JobConf conf = Recommend.config(); String input = path.get("Step1Input"); String output = path.get("Step1Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); // hdfs.rmr(output); hdfs.rmr(input); hdfs.mkdirs(input); hdfs.copyFile(path.get("data"), input); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(Step1_ToItemPreMapper.class); conf.setReducerClass(Step1_ToUserVectorReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); RunningJob job = JobClient.runJob(conf); while (!job.isComplete()) { job.waitForCompletion(); } } }
//根据第一步的 结果求出物品的同现矩阵 //算法方面,没有太好的算法处理两个for循环,就在求物品同现矩阵的时候使用一个随机数,得出一个movieA:movieB的结果 public class Step2 { public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> { private final static Text k = new Text(); private final static DoubleWritable v = new DoubleWritable(); // private final static IntWritable v = new IntWritable(1); @Override public void map(LongWritable key, Text values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { String[] tokens = Recommend.DELIMITER.split(values.toString()); for (int i = 1; i < tokens.length; i++) { String itemID = tokens[i].split(":")[0]; // for (int j = 1; j < i+1; j++) { // String itemID2 = tokens[j].split(":")[0]; // double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]); //// if(sum<0.5) break; //// if(sum>4.5) break; // k.set(itemID + ":" + itemID2+":"); // v.set(sum); // output.collect(k, v); // k.set(itemID2 + ":" + itemID+":"); // v.set(sum); // output.collect(k, v); // // } Random random = new Random(); int j; j = random.nextInt(tokens.length - 1) + 1; String itemID2 = tokens[j].split(":")[0]; double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]); k.set(itemID + ":" + itemID2+":"); v.set(sum); output.collect(k, v); } } } public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> { private DoubleWritable result = new DoubleWritable(); @Override public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { double sum = 0; int count = 0; while (values.hasNext()) { sum += values.next().get(); count++; } sum = sum/count*1.0; DecimalFormat df = new DecimalFormat("#.0000"); sum = Double.valueOf(df.format(sum)); // System.out.println(key+"---count----"+count+"-------"+sum); result.set(sum); output.collect(key, result); } } public static void run(Map<String, String> path) throws IOException { JobConf conf = Recommend.config(); String input = path.get("Step2Input"); String output = path.get("Step2Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(DoubleWritable.class); conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class); // conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class); conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); RunningJob job = JobClient.runJob(conf); while (!job.isComplete()) { job.waitForCompletion(); } } }
//取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0 //此处因为没有一个专门的电影记录为文件,所以就只能从数据文件中获取到所有的电影ID。 //并将所有的电影ID维持在一个线性表中,但是当数据文件过大时,每次读取一条数据都要从线性表中判断该电影是否已经记录 //,导致效率会越来越低 //并且维持一个静态map记录每个用户对的第一部评过分的电影,以此作为标准,使用物品同现矩阵进行计算 public class Step3 { public static class Step4_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> { private final static Text k = new Text(); private final static Text v = new Text(); private String flag; //判断读取的数据集 // private final static Map<Integer, List<Cooccurrence>> cooccurrenceMatrix = new HashMap<Integer, List<Cooccurrence>>(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getParent().getName();// 判断读的数据集 } @Override public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { String[] tokens = values.toString().split("::"); // System.out.println(flag); // System.out.println(tokens.length); // // for(int i = 0;i < tokens.length;i++){ // System.out.println(tokens[i]); // } // 获取所有的电影数据,应该有一个文件记录所有的电影信息,就不用判断是否包含直接添加 if( !Recommend.movieList.contains(tokens[1]) ){ Recommend.movieList.add(tokens[1]); } // if(flag.equals("movie")){ // Recommend.movieList.add(tokens[0]); // } // else{ k.set(tokens[0]); v.set(tokens[1] + "," + tokens[2]); context.write(k, v); // } } } public static class Step4_AggregateAndRecommendReducer extends Reducer<Text, Text, Text, Text> { private final static Text v = new Text(); @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map userMovieList = new HashMap(); for(Text line : values){ String[] tokens = Recommend.DELIMITER.split(line.toString()); userMovieList.put(tokens[0], tokens[1]); } for(int i = 0; i < Recommend.movieList.size();i++){ // System.out.println("key---->" + key); // System.out.println("value---->" + v); if(!userMovieList.containsKey(Recommend.movieList.get(i))){ v.set(Recommend.movieList.get(i) + "," + 0); context.write(key, v); } else{ v.set(Recommend.movieList.get(i) + "," + userMovieList.get(Recommend.movieList.get(i))); context.write(key, v); } } } } public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = Recommend.config(); String input1 = path.get("Step3Input1"); // String input2 = path.get("Step3Input2"); String output = path.get("Step3Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); Job job = new Job(conf); job.setJarByClass(Step3.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Step3.Step4_PartialMultiplyMapper.class); job.setReducerClass(Step3.Step4_AggregateAndRecommendReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1)); FileOutputFormat.setOutputPath(job, new Path(output)); do{ job.waitForCompletion(false); }while(!job.isComplete()); } }
//根据第二步与第三步的结果计算出每位用户对每部电影的评分 //根据第三步结果,读取数据,当发现用户对某部电影的评分为0的时候, //根据第二步得到的map获取数据和物品同现矩阵计算得出用户对电影的评分 public class Step4 { public static class Step4Update_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> { private String flag;// A同现矩阵 or B评分矩阵 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getParent().getName();// 判断读的数据集 // System.out.println(flag); } @Override public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { String[] tokens = Recommend.DELIMITER.split(values.toString()); if (flag.equals("step2")) {// 同现矩阵 // System.out.println(tokens.length); // for(int i = 0; i < tokens.length;i++){ // System.out.println(tokens[i]); // } // String[] v1 = tokens[0].split(":"); // String itemID1 = v1[0]; // String itemID2 = v1[1]; // String num = tokens[1]; // // Text k = new Text(itemID1); // Text v = new Text("A:" + itemID2 + "," + num); String[] v1 = tokens[0].split(":"); Text k = new Text(v1[0]); Text v = new Text("M:" + v1[1] + "," + tokens[1]); context.write(k, v); // System.out.println(k.toString() + " " + v.toString()); } else if (flag.equals("step3")) {// 评分矩阵 // System.out.println(tokens.length); // for(int i = 0; i < tokens.length;i++){ // System.out.println(tokens[i]); // } // String[] v2 = tokens[1].split(","); //// String itemID = tokens[0]; //// String userID = v2[0]; //// String pref = v2[1]; if(Double.parseDouble(tokens[2]) != 0 && !Recommend.userScore.containsKey(tokens[0])){ Recommend.userScore.put(tokens[0], tokens[1] + "," + tokens[2]); } //// Text k = new Text(tokens[1]); Text v = new Text("U:" + tokens[0] + "," + tokens[2]); context.write(k, v); // System.out.println(k.toString() + " " + v.toString()); } } } public static class Step4Update_AggregateReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // System.out.println("key--->" + key); Map movie = new HashMap(); Text k; Text v; //Map user = new HashMap(); List list = new ArrayList(); for (Text line : values) { list.add(line.toString()); // System.out.println(line.toString()); String[] tokens = Recommend.STRING.split(line.toString()); if(tokens[0].equals("M")){ // System.out.println(tokens[1]); // System.out.println(tokens[2]); movie.put(tokens[1], tokens[2]); } } for(int i = 0;i < list.size();i++) { String[] tokens = Recommend.STRING.split((String) list.get(i)); //System.out.println(tokens[0]); if(tokens[0].equals("U")){ if(Double.parseDouble(tokens[2]) == 0 ){ String userScore = (String) Recommend.userScore.get(tokens[1]); String[] temps = Recommend.STRING.split(userScore); k = new Text(key); // System.out.println("useid"+tokens[1]+"movie score"+temps[1]); // System.out.println("movie id"+movie.get(temps[0])); double temp = 0; if(movie.get(temps[0]) != null){ Double.parseDouble((String) movie.get(temps[0])); } double score = Double.parseDouble(temps[1])+temp; v = new Text(tokens[1] + "," + score); } else{ k = new Text(key); v = new Text(tokens[1] + "," + tokens[2]); } // System.out.println("key-->" + k); // System.out.println("value-->" + v); context.write(k, v); } } // System.out.println(key.toString() + ":"); // // Map<String, String> mapA = new HashMap<String, String>(); // Map<String, String> mapB = new HashMap<String, String>(); // // for (Text line : values) { // String val = line.toString(); // System.out.println(val); // // if (val.startsWith("A:")) { // String[] kv = Recommend.DELIMITER.split(val.substring(2)); // mapA.put(kv[0], kv[1]); // // } else if (val.startsWith("B:")) { // String[] kv = Recommend.DELIMITER.split(val.substring(2)); // mapB.put(kv[0], kv[1]); // // } // } // // double result = 0; // Iterator<String> iter = mapA.keySet().iterator(); // while (iter.hasNext()) { // String mapk = iter.next();// itemID // // int num = Integer.parseInt(mapA.get(mapk)); // Iterator<String> iterb = mapB.keySet().iterator(); // while (iterb.hasNext()) { // String mapkb = iterb.next();// userID // double pref = Double.parseDouble(mapB.get(mapkb)); // result = num * pref;// 矩阵乘法相乘计算 // // Text k = new Text(mapkb); // Text v = new Text(mapk + "," + result); // context.write(k, v); // System.out.println(k.toString() + " " + v.toString()); // } // } } } public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = Recommend.config(); String input1 = path.get("Step4Input1"); String input2 = path.get("Step4Input2"); String output = path.get("Step4Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); Job job = new Job(conf); job.setJarByClass(Step4.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Step4.Step4Update_PartialMultiplyMapper.class); job.setReducerClass(Step4.Step4Update_AggregateReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2)); FileOutputFormat.setOutputPath(job, new Path(output)); do{ job.waitForCompletion(false); }while(!job.isComplete()); } }
//对最后的数据输出格式做一遍规范。 public class Step5 { public static class Step5_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { // System.out.println("run"); // System.out.println("key--->" + key); String[] tokens = Recommend.DELIMITER.split(values.toString()); Text k = new Text(tokens[1]); Text v; if(Double.parseDouble(tokens[2]) == 0){ v = new Text(tokens[0] + "::"); } else{ v = new Text(tokens[0] + "::" + tokens[2]); } context.write(k, v); } } public static class Step5_AggregateReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text line : values) { Text k = new Text(key.toString()); context.write(k, line); } } } public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = Recommend.config(); String input = path.get("Step5Input"); String output = path.get("Step5Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); Job job = new Job(conf); job.setJarByClass(Step5.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Step5.Step5_PartialMultiplyMapper.class); job.setReducerClass(Step5.Step5_AggregateReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(ParseTextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); do{ job.waitForCompletion(false); }while(!job.isComplete()); System.out.println("---------------------end--------------------"); } }
关于hadoop中如何利用mapreduce实现wordcount和电影评分预测就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。