DataJoin类 实现不同格式数据reduce侧连接

发布时间:2020-06-08 00:29:35 作者:Alysses1111
来源:网络 阅读:557


                      实验名称:Datajoin数据连接

实验目的:

 1、记录我的Hadoop 实验过程,我是NCU HANG TIAN BAN 的学生。将会附上完整可运行的代码。程序中框架是一套模板百度的、书上也有但是重要算法是我自己写的将会标注。 http://blog.csdn.net/wawmg/article/details/8759076 这是我参考的框架模板。

    2、提示大致浏览可看加粗部分【1、2、3、4】

实验要求:

  任务1、多个数据源的内连接

【数据样例】

输入:

factory:

factoryname addressID

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

Nanchang Univ 5

address:

addressID addressname

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

输出:

factorynameaddressIDaddressname

Bank of Beijing1Beijing

Beijing Red Star1Beijing 

Beijing Rising1eijing 

Guangzhou Development Bank2 Guangzhou 

Guangzhou Honda2 Guangzhou

Shenzhen Thunder3 Shenzhen 

Tencent3 Shenzhen

[代码开始了]【1】

// 先是TaggedWritable类 抄的不作改动
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
/*TaggedMapOutput是一个抽象数据类型,封装了标签与记录内容
 此处作为DataJoinMapperBase的输出值类型,需要实现Writable接口,所以要实现两个序列化方法
 自定义输入类型*/
public class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}

public TaggedWritable(Writable data) // 构造函数
{
this.tag = new Text(); // tag可以通过setTag()方法进行设置
this.data = data;
}

@Override
public void readFields(DataInput in) throws IOException {
tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
data.readFields(in);
}


@Override
public void write(DataOutput out) throws IOException {
tag.write(out);
out.writeUTF(this.data.getClass().getName());
data.write(out);
}


@Override
public Writable getData() {
return data;
}
}
// http://blog.csdn.net/wawmg/article/details/8759076

【2】Map阶段 算法自己写的

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class JoinMapper extends DataJoinMapperBase {
    // 这个在任务开始时调用,用于产生标签
    // 此处就直接以文件名作为标签
    @Override
    protected Text generateInputTag(String inputFile) {
        System.out.println("inputFile = " + inputFile);
        return new Text(inputFile);
    }
    // 这里我们已经确定分割符为',',更普遍的,用户应能自己指定分割符和组键。
    // 设置组键
    @Override
        protected Text generateGroupKey(TaggedMapOutput record) {
        String tag = ((Text) record.getTag()).toString();
    
        if(tag.indexOf("factory") != -1){
        String line = ((Text) record.getData()).toString();
        String[] tokens = line.split(" ");
        int len = tokens.length - 1;
        return new Text(tokens[len]);
        }else{
            String line = ((Text) record.getData()).toString();
            String[] tokens = line.split(" ");
            return new Text(tokens[0]);
        }
    }

    // 返回一个任何带任何我们想要的Text标签的TaggedWritable
    @Override
    protected TaggedMapOutput generateTaggedMapOutput(Object value) {
        TaggedWritable retv = new TaggedWritable((Text) value);
        retv.setTag(this.inputTag); // 不要忘记设定当前键值的标签
        return retv;
    }
}

【3】reduce阶段 算法也是自己写的

import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class JoinReducer extends DataJoinReducerBase {
    // 两个参数数组大小一定相同,并且最多等于数据源个数
    @Override
    protected TaggedMapOutput combine(Object[] tags, Object[] values) {
        if (tags.length < 2) return null; // 这一步,实现内联结
        String joinedStr = "";
        String dd = "  ";
        for (int i = 0; i < values.length; i++) {
             // 以逗号作为原两个数据源记录链接的分割符
            TaggedWritable tw = (TaggedWritable) values[i];
            String line = ((Text) tw.getData()).toString();
            // 将一条记录划分两组,去掉第一组的组键名。
            if( i == 0){
                String[] tokens = line.split(" ");
                dd += tokens[1];
            }
            if(i == 1){
                joinedStr += line;
                System.out.println(joinedStr);
            }
        }
        joinedStr += dd;
        TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
        retv.setTag((Text) tags[1]); // 这只retv的组键,作为最终输出键。
        return retv;
    }
}

【4】Driver 驱动类 抄的不作改动

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoinDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Path in = new Path("hdfs://localhost:9000/user/c/input/*.txt");
        Path out = new Path("hdfs://localhost:9000/user/c/output2");
        JobConf job = new JobConf(conf, DataJoinDriver.class);
        job.setJobName("DataJoin");
        FileSystem hdfs = FileSystem.get(conf);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TaggedWritable.class);
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new DataJoinDriver(),
                args);
        System.exit(res);
    }

}

 最后:输出有点小问题,就是没有做排序。

推荐阅读:
  1. 十四、MapReduce--OutputFormat和RecordWriter抽象类
  2. hadoop中mapreduce的常用类(二)

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

百度 实验 目的

上一篇:滑动窗口算法

下一篇:K8S使用Ceph RBD作为后端存储

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》