您好,登录后才能下订单哦!
Spark2.x中Shuffle演进历程及Shuffle两阶段划分是这样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
一、概述
我自己也去看了Spark2.2.0的源码,在Spark-env初始化中只保留了两种Shuffle:Sort、Tungsten-Sort,下面是源码截图:
由于Spark Shuffle是Spark的核心之核心,为了对Spark Shuffle有更全面的认识,这里先来讲解一下Spark的技术演进历程及Shuffle阶段换分。
二、Spark Shuffle技术演进历程
1.Spark0.8以前,都是采用的HashShuffle,这是最开始的Shuffle,会存在生成很多小文件的问题,文件数M*R,其中M表示ShuffleMapTask个数,R表示Result个数;
2.Spark0.8.1中,引入了Consolidation优化机制,减少了小文件的生产,文件数变成了E*(C/T)*R,其中E表示Executor个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数(默认值为1)。
3.Spark 0.9 中,引入ExternalAppendOnlyMap,combine的时候,可以将数据spill到磁盘,然后通过堆排序merge;
4.Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle,稍后会详解这种机制;
5.Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle;
6.Spark 1.4 引入Tungsten-Sort Based Shuffle,将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle;
7.Spark 1.6 Tungsten-sort并入Sort Based Shuffle;
8.Spark 2.0 Hash Based Shuffle被启用,Sort Based Shuffle成为默认Shuffle机制。
三、Spark Shuffle的两阶段
Spark中有宽依赖、窄依赖两种,宽依赖会触发Stage的划分,这时候需要进行Shuffle,需要对RDD中的每个Paritioon数据进行重新分区,Spark Shuffle主要分成了两个阶段:Shuffle Write、Shuffle Read,两个阶段分属两个人Stage,前者属于父Stage,后者属于子Stage。
结合上篇文章的图我这里给他用不同颜色进行了标识,上面的部分是stage1中的ShuffleWrite负责Shuffle阶段数据写,下面就是stage0的中的ShuffleWrite负责Shuffle阶段数据读,如图所示:
这里有一点需要说明一下:如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。
Shuffle Write阶段
shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
Shuffle Read阶段
shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task为下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。