HBase And MapReduce举例分析

发布时间:2021-12-09 10:26:52 作者:iii
来源:亿速云 阅读:116

这篇文章主要介绍“HBase And MapReduce举例分析”,在日常操作中,相信很多人在HBase And MapReduce举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”HBase And MapReduce举例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

在HDFS某目录文件下有多个文件内容,将这些多个文件内容中的数据通过倒排索引后将结果写入到HBase某张表中,代码如下:

1.InvertedIndexMapper

public class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
	
	private Text keyInfo = new Text();  // 存储单词和URI的组合
	private Text valueInfo = new Text(); //存储词频
	private FileSplit split;  // 存储split对象。
	
	@Override
	protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		System.out.println(  "key-->: " +key + "\n value --> : "+value );
		//获得<key,value>对所属的FileSplit对象。
		split = (FileSplit) context.getInputSplit();
		System.out.println(  "split---> "+split.toString() );
		//System.out.println("value.tostring()---> "+  value.toString() );
		StringTokenizer itr = new StringTokenizer( value.toString());
		
		while( itr.hasMoreTokens() ){
			// key值由单词和URI组成。
			keyInfo.set( itr.nextToken()+":"+split.getPath().toString());
			//System.out.println("split.getPath().toString() --> "+  split.getPath().toString() );
			//词频初始为1
			valueInfo.set("1");
			context.write(keyInfo, valueInfo);
		}
	}
}

2.InvertedIndexCombiner

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
	
	private Text info = new Text();

	@Override
	protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		//统计词频
		int sum = 0;
		for (Text value : values) {
			sum += Integer.parseInt(value.toString() );
		}
		int splitIndex = key.toString().indexOf(":");
		//重新设置value值由URI和词频组成
		info.set( key.toString().substring( splitIndex + 1) +":"+sum );
	
		//重新设置key值为单词
		key.set( key.toString().substring(0,splitIndex));
		context.write(key, info);
	}
}

3.InvertedIndexReducer

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
	
	private Text result = new Text();
	@Override
	protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		//生成文档列表
		String fileList = new String();
		for (Text value : values) {
			fileList += value.toString()+";";
		}
		result.set(fileList);
		context.write(key, result);
	}
}

4.HBaseAndInvertedIndex

public class HBaseAndInvertedIndex {
	
	private static Path outPath;
	
	public static void main(String[] args) throws Exception {
		run();
		System.out.println( "\n\n************************");
		runHBase();
	}

	public static void run() throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf,"Hadoop-InvertedIndex");
		
		job.setJarByClass(HBaseAndInvertedIndex.class);
		
		//实现map函数,根据输入的<key,value>对生成中间结果。
		job.setMapperClass(InvertedIndexMapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setCombinerClass(InvertedIndexCombiner.class);
		job.setReducerClass(InvertedIndexReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/txt/invertedindex/"));
		DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );
		String filename = df.format( new Date() );
		outPath = new Path("hdfs://192.168.226.129:9000/rootdir/invertedindexhbase/result/"+filename+"/");
		FileOutputFormat.setOutputPath(job, outPath);  

		int result = job.waitForCompletion(true) ? 0 : 1;
		
	}
	
	public static void runHBase() throws Exception {
		Configuration conf = new Configuration();
		conf = HBaseConfiguration.create(conf);
		conf.set("hbase.zookeeper.quorum", "192.168.226.129");

		Job job = Job.getInstance(conf, "HBase-InvertedIndex");
		job.setJarByClass(HBaseAndInvertedIndex.class);

		job.setInputFormatClass(KeyValueTextInputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		// 把数据写入Hbase数据库
		FileInputFormat.addInputPath(job, new Path(outPath.toString()+"/part-r-00000") );
		System.out.println( "path---> "+ outPath.toString()  );
		TableMapReduceUtil.initTableReducerJob("invertedindex",InvertedIndexHBaseReducer.class, job);
		//将数据写入HBase数据库
		//首先先检查表是否存在
		checkTable(conf);

		System.exit( job.waitForCompletion(true) ? 0 : 1 );
		
	}

	private static void checkTable(Configuration conf) throws Exception {
		Connection con = ConnectionFactory.createConnection(conf);
		Admin admin = con.getAdmin();
		TableName tn = TableName.valueOf("invertedindex");
		if (!admin.tableExists(tn)){
			HTableDescriptor htd = new HTableDescriptor(tn);
			HColumnDescriptor hcd = new HColumnDescriptor("indexKey");
			htd.addFamily(hcd);
			admin.createTable(htd);
			System.out.println("表不存在,新创建表成功....");
		}
	}

	/**
	 * 1. 因为map是从hdfs中取数据,因此没有太大变化;而reduce需要输出结果到hbase中,
	 * 		所以这里继承了TableReduce<keyin,valuein,keyout>,这里没有valueout,
	 * 			但是规定TableReduce的valueout必须是Put或者Delete实例
	 * 
	 * 2.ImmutableBytesWritable:它是一个可以用作key或value类型的字节序列,
	 * */
	public static class InvertedIndexHBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
		@Override
		protected void reduce(
				Text key,
				Iterable<Text> values,
				Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
						throws IOException, InterruptedException {
			System.out.println(  "key---> " + key.toString()  );
			//注意行健参数的书写。
			Put put = new Put(key.toString().getBytes());
			put.addColumn(Bytes.toBytes( "indexKey" ), Bytes.toBytes("indexUrlWeight"),values.iterator().next().getBytes());
			context.write(new ImmutableBytesWritable(key.getBytes()), put);
		}
	}
}

///原数据目录文件:

HBase And MapReduce举例分析

invertedindex1.txt

Hello I will Learning Hadoop
HDFS MapReduce
Other I will Learning HBase

invertedindex2.txt  :

Hello HBase
MapReduce HDFS

查看结果:scan:

hbase(main):002:0> scan 'invertedindex'
ROW                             COLUMN+CELL                                                                              
 HBase                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex2.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex1.txt:1;                                                                      
 HDFS                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 Hadoop                         column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 Hello                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 I                              column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 Learning                       column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 MapReduce                      column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 Other                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
 will                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
                                0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in
                                vertedindex2.txt:1;                                                                      
9 row(s) in 0.2240 seconds

到此,关于“HBase And MapReduce举例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. Hbase内部是什么机制
  2. 怎么使用docker部署hbase

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

hbase

上一篇:MapReduce如何读写HBASE

下一篇:如何进行源码阅读神器Sourcetrail的安装使用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》