Shuffle流程是什么

发布时间:2021-12-23 16:07:57 作者:iii
来源:亿速云 阅读:133

本篇内容介绍了“Shuffle流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

shuffle流程源码解读

1、从WordCountMapper类中的map方法中写出kv后,进入shuffle流程	--context.write(outK,outV);
进入TaskInputOutputContext中的write()方法			--看下就过
进入WrappedMapper.java中的mapContext.write(key, value);方法	//112行
进入TaskInputOutputContextImpl.java 中output.write(key, value);方法 	//89行
最终定位到MapTask的write()方法内,	//726行
2、重点步骤,收集器对象将kv收集到缓冲区,并在收集前将kv的分区号计算出来.
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
第一次进入该方法时,因为没有设置reduce的个数,所以最终返回的永远是0号分区
3、定位到MapTask类中的collect方法并进入    		//1082行
bufferRemaining -= METASIZE;	//计算缓冲区剩余大小,该行代码前面的代码是对kv类型的一个判断
如果bufferRemaining < 0 则开始进行溢写操作,内部是对数据的一些校验和计算
4、定位到startSpill(); --1126行  	//只有当溢写数据大小满足80%时,才会触发该操作
WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写
5、进入到startSpill()方法内部		--MapTask类1590行
spillReady.signal(); //1602行  		--线程通信, 通知溢写线程开始干活
//执行溢写线程(MapTask内部类SpillThread)的run方法
//run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法
直接执行下面的排序和溢写方法		--sortAndSpill()方法  	--MapTask的1605行
6、定位到1615行
final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象
--排序按照分区排序,溢写按照分区溢写

final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称
 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619
 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录

out = rfs.create(filename);		//创建执行改步后,在上述的目录下生成溢写文件spill0.out文件

Shuffle流程是什么

7、继续向下走,定位到MapTask类的1625行
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);		//溢写前排序

8、定位到1629行,进入for循环	--按照分区进行溢写

9、分析for循环内代码,看具体溢写过程
	9.1 先有一个writer对象,通过该对象来完成数据溢写
		writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
	9.2 判断是否有设置combinerRunner对象
		如果有,则按照设置的combinerRunner业务去处理;
		如果没有,则走默认的溢写规则

10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据

Shuffle流程是什么

11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小为:1M)) {}	//MapTask类的1685行
// 如果索引数据超过指定的内存大小,也需要溢写到文件中.(该现象一般情况很难发生.)
12、当本次溢写完毕之后,继续回到WordCountMapper类中的map方法内的context.write(outk,outv);方法处

--说明:因为我们使用本地debug模式调试,所以看不到并行的效果,只能是串行效果,因此看到的是当内存内读取满足
80%时,发生溢写操作,其实溢写并未停止,只不过我们看不到,剩余的溢写数据在20%内存进行
13、如上溢写过程,在整个mapTask中会出现N次,具体多少看数据量. 如果map中最后的数据写到缓冲区,但是没有满足
80%溢写条件的情况,最终也需要将缓冲区的数据刷写到磁盘(最后一次溢写)。

最后一次会发生在 MapTask中关闭 NewOutputCollector对象的时候.
即在该行代码处发生    output.close(mapperContext);	--MapTask的805行

14、进入output.close(mapperContext);方法内	--MapTask的732行
定位到collector.flush();方法 // 735行
-->将缓冲区的数据刷写到磁盘-->重新走sortAndSpill()方法(最后一次刷写)

Shuffle流程是什么 Shuffle流程是什么

上述流程,每发生一次溢写就会生成一个溢写小文件(溢写文件内的数据是排好序的)
最终所有的数据都写到磁盘中后,在磁盘上就是多个溢写文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢写全部完成之后,就进入归并操作		--MapTask的1527行
mergeParts();方法,进入该方法,定位到MapTask的1844行
filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local
1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out

Shuffle流程是什么

16、继续向下走,定位到MapTask的1880行
Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
   --归并后,最终输出的文件路径
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00
01/attempt_local1440922619_0001_m_000000_0/output/file.out

17、继续向下走,定位到MapTask的1882行
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
   --归并后,最终输出文件的索引文件
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00
01/attempt_local1440922619_0001_m_000000_0/output/file.out.index

18、创建file.out 文件
	FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

19、for (int parts = 0; parts < partitions; parts++) {}	//1925行,按照分区进行归并排序

20、for循环内具体的归并操作	//1950行
	RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, mergeFactor,
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter, sortSegments,
                         null, spilledRecordsCounter, sortPhase.phase(),
                         TaskType.MAP);
21、归并后的数据写出到文件
Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, 
keyClass, valClass, codec,spilledRecordsCounter); //1961行

//归并也可以使用combiner,但是前提条件是设置了combiner,并且溢写次数大于等于3 
if (combinerRunner == null || numSpills < minSpillsForCombine(3)) {
      Merger.writeFile(kvIter, writer, reporter, job);
} else {
      combineCollector.setWriter(writer);
      combinerRunner.combine(kvIter, combineCollector);
}

22、归并完成
writer.close();		//1972行

Shuffle流程是什么

23、写出索引文件
spillRec.writeToFile(finalIndexFile, job);	//1986行

24、删除所有的溢写文件spill0.out spill1.out ... spill0.out,只保留最终的输出文件。
for(int i = 0; i < numSpills; i++) {
       rfs.delete(filename[i],true);
}

Shuffle流程是什么

“Shuffle流程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

推荐阅读:
  1. spark(四):shuffle
  2. shuffle和map shuffle有什么区别

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

shuffle

上一篇:ReduceTask流程是怎样的

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》