Spark2.x中如何用源码剖析SortShuffleWriter具体实现

发布时间:2021-11-15 15:14:24 作者:柒染
来源:亿速云 阅读:201

Spark2.x中如何用源码剖析SortShuffleWriter具体实现

1. 引言

在大数据处理领域,Apache Spark 是一个广泛使用的分布式计算框架。Spark 的核心优势之一是其高效的 Shuffle 机制,而 SortShuffleWriter 是 Spark 2.x 中用于实现 Shuffle 操作的关键组件之一。本文将深入剖析 SortShuffleWriter 的具体实现,通过源码分析来揭示其内部工作原理。

2. Shuffle 机制概述

在 Spark 中,Shuffle 是指将数据重新分布到不同的分区中,以便进行后续的计算。Shuffle 操作通常发生在宽依赖(Wide Dependency)的情况下,例如 reduceByKeygroupByKey 等操作。Shuffle 的性能对整个 Spark 作业的执行效率有着重要影响。

2.1 Shuffle 的两种实现方式

在 Spark 2.x 中,Shuffle 有两种主要的实现方式:

  1. HashShuffleManager:基于哈希表的 Shuffle 实现,适用于小规模数据集。
  2. SortShuffleManager:基于排序的 Shuffle 实现,适用于大规模数据集。

本文主要关注 SortShuffleManager 中的 SortShuffleWriter

3. SortShuffleWriter 概述

SortShuffleWriterSortShuffleManager 的核心组件之一,负责将数据写入磁盘并进行排序。其主要任务包括:

  1. 数据分区:将数据按照分区 ID 进行分组。
  2. 数据排序:对每个分区内的数据进行排序。
  3. 数据写入:将排序后的数据写入磁盘。

3.1 SortShuffleWriter 的类结构

SortShuffleWriter 的类结构如下:

private[spark] class SortShuffleWriter[K, V, C](
    handle: BaseShuffleHandle[K, V, C],
    mapId: Int,
    context: TaskContext)
  extends ShuffleWriter[K, V] with Logging {
  // 类实现
}

其中,handleBaseShuffleHandle 的实例,包含了 Shuffle 的相关信息;mapId 是当前任务的 ID;context 是任务上下文。

4. SortShuffleWriter 的实现细节

4.1 数据分区

SortShuffleWriter 首先需要将数据按照分区 ID 进行分组。这一过程通过 Partitioner 实现,Partitioner 是一个抽象类,定义了如何将键值对分配到不同的分区中。

private val partitioner = handle.dependency.partitioner

partitionerShuffleDependency 中的一个属性,表示当前 Shuffle 操作的分区器。

4.2 数据排序

在数据分区之后,SortShuffleWriter 会对每个分区内的数据进行排序。排序是通过 ExternalSorter 实现的,ExternalSorter 是 Spark 中用于外部排序的工具类。

private val sorter = new ExternalSorter[K, V, C](
  context, handle.dependency.aggregator, Some(handle.dependency.partitioner), keyOrdering, serializer)

ExternalSorter 的构造函数参数包括:

4.3 数据写入

排序完成后,SortShuffleWriter 会将数据写入磁盘。数据写入是通过 DiskBlockObjectWriter 实现的,DiskBlockObjectWriter 是 Spark 中用于将数据写入磁盘的工具类。

private val writer = blockManager.getDiskWriter(
  blockId, outputFile, serializer, bufferSize, writeMetrics)

DiskBlockObjectWriter 的构造函数参数包括:

4.4 数据合并

在数据写入磁盘后,SortShuffleWriter 会将多个小文件合并成一个大文件。这一过程通过 IndexShuffleBlockResolver 实现,IndexShuffleBlockResolver 是 Spark 中用于管理 Shuffle 块的工具类。

private val shuffleBlockResolver = shuffleManager.shuffleBlockResolver

IndexShuffleBlockResolver 的主要任务是管理 Shuffle 块的索引文件和数据文件。

5. SortShuffleWriter 的源码分析

5.1 write 方法

SortShuffleWriter 的核心方法是 write,该方法负责将数据写入磁盘并进行排序。write 方法的源码如下:

override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter.insertAll(records)
  val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  val partitionLengths = sorter.writePartitionedFile(blockId, outputFile)
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, outputFile)
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}

write 方法的主要步骤如下:

  1. 插入数据:通过 sorter.insertAll 方法将数据插入到 ExternalSorter 中。
  2. 获取输出文件:通过 shuffleBlockResolver.getDataFile 方法获取输出文件。
  3. 写入分区数据:通过 sorter.writePartitionedFile 方法将分区数据写入磁盘。
  4. 写入索引文件:通过 shuffleBlockResolver.writeIndexFileAndCommit 方法写入索引文件并提交。
  5. 更新 MapStatus:更新 MapStatus,表示当前任务的状态。

5.2 insertAll 方法

insertAll 方法是 ExternalSorter 的核心方法之一,负责将数据插入到 ExternalSorter 中。insertAll 方法的源码如下:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  val shouldCombine = aggregator.isDefined
  if (shouldCombine) {
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (records.hasNext) {
      kv = records.next()
      map.changeValue((getPartition(kv._1), kv._1), update)
      maybeSpillCollection(usingMap = true)
    }
  } else {
    while (records.hasNext) {
      val kv = records.next()
      buffer.insert((getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]))
      maybeSpillCollection(usingMap = false)
    }
  }
}

insertAll 方法的主要步骤如下:

  1. 判断是否需要聚合:通过 aggregator.isDefined 判断是否需要聚合操作。
  2. 插入数据:如果需要聚合,则通过 map.changeValue 方法插入数据;否则,通过 buffer.insert 方法插入数据。
  3. 可能溢出:通过 maybeSpillCollection 方法判断是否需要将数据溢出到磁盘。

5.3 writePartitionedFile 方法

writePartitionedFile 方法是 ExternalSorter 的另一个核心方法,负责将分区数据写入磁盘。writePartitionedFile 方法的源码如下:

def writePartitionedFile(
    blockId: BlockId,
    outputFile: File): Array[Long] = {
  val lengths = new Array[Long](numPartitions)
  val writer = blockManager.getDiskWriter(blockId, outputFile, serializer, bufferSize, writeMetrics)
  val partitionedIterator = partitionedDestructiveSortedIterator(None)
  partitionedIterator.foreach { case (partitionId, elements) =>
    val length = writer.write(elements)
    lengths(partitionId) = length
  }
  writer.commitAndClose()
  lengths
}

writePartitionedFile 方法的主要步骤如下:

  1. 初始化长度数组:初始化一个长度为 numPartitions 的数组,用于存储每个分区的数据长度。
  2. 获取磁盘写入器:通过 blockManager.getDiskWriter 方法获取磁盘写入器。
  3. 获取分区迭代器:通过 partitionedDestructiveSortedIterator 方法获取分区迭代器。
  4. 写入数据:遍历分区迭代器,将每个分区的数据写入磁盘,并记录数据长度。
  5. 提交并关闭写入器:通过 writer.commitAndClose 方法提交并关闭写入器。

5.4 writeIndexFileAndCommit 方法

writeIndexFileAndCommit 方法是 IndexShuffleBlockResolver 的核心方法之一,负责写入索引文件并提交。writeIndexFileAndCommit 方法的源码如下:

def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Int,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  val indexFile = getIndexFile(shuffleId, mapId)
  val indexTmp = Utils.tempFileWith(indexFile)
  try {
    val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
    Utils.tryWithSafeFinally {
      var offset = 0L
      out.writeLong(offset)
      for (length <- lengths) {
        offset += length
        out.writeLong(offset)
      }
    } {
      out.close()
    }
    val dataFile = getDataFile(shuffleId, mapId)
    if (dataFile.exists()) {
      dataFile.delete()
    }
    if (!dataTmp.renameTo(dataFile)) {
      throw new IOException(s"fail to rename ${dataTmp} to ${dataFile}")
    }
    if (!indexTmp.renameTo(indexFile)) {
      throw new IOException(s"fail to rename ${indexTmp} to ${indexFile}")
    }
  } finally {
    if (indexTmp.exists() && !indexTmp.delete()) {
      logError(s"Failed to delete temporary index file at ${indexTmp}")
    }
  }
}

writeIndexFileAndCommit 方法的主要步骤如下:

  1. 获取索引文件:通过 getIndexFile 方法获取索引文件。
  2. 创建临时索引文件:通过 Utils.tempFileWith 方法创建临时索引文件。
  3. 写入索引数据:将每个分区的偏移量写入临时索引文件。
  4. 重命名数据文件:将临时数据文件重命名为正式数据文件。
  5. 重命名索引文件:将临时索引文件重命名为正式索引文件。

6. SortShuffleWriter 的性能优化

SortShuffleWriter 的性能对整个 Spark 作业的执行效率有着重要影响。为了提高 SortShuffleWriter 的性能,Spark 采用了多种优化策略,包括:

  1. 内存管理:通过 ExternalSorter 管理内存,避免内存溢出。
  2. 数据压缩:通过 Serializer 对数据进行压缩,减少磁盘 I/O。
  3. 并行写入:通过多个 DiskBlockObjectWriter 并行写入数据,提高写入速度。

6.1 内存管理

ExternalSorter 通过 maybeSpillCollection 方法管理内存,当内存使用超过一定阈值时,将数据溢出到磁盘。maybeSpillCollection 方法的源码如下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {
  if (usingMap) {
    if (map.estimateSize() >= _spillThreshold) {
      spill()
    }
  } else {
    if (buffer.size >= _spillThreshold) {
      spill()
    }
  }
}

maybeSpillCollection 方法的主要步骤如下:

  1. 判断内存使用:如果使用 map,则通过 map.estimateSize 方法判断内存使用;否则,通过 buffer.size 方法判断内存使用。
  2. 溢出数据:如果内存使用超过阈值,则通过 spill 方法将数据溢出到磁盘。

6.2 数据压缩

Serializer 是 Spark 中用于数据序列化和压缩的工具类。Serializer 的默认实现是 JavaSerializer,但可以通过配置使用其他序列化器,例如 KryoSerializer

private val serializer = SparkEnv.get.serializer

Serializer 的主要任务是将数据序列化为字节流,并对字节流进行压缩,以减少磁盘 I/O。

6.3 并行写入

DiskBlockObjectWriter 是 Spark 中用于将数据写入磁盘的工具类。DiskBlockObjectWriter 支持并行写入,可以通过多个 DiskBlockObjectWriter 同时写入数据,以提高写入速度。

private val writer = blockManager.getDiskWriter(
  blockId, outputFile, serializer, bufferSize, writeMetrics)

DiskBlockObjectWriter 的主要任务是将数据写入磁盘,并记录写入度量。

7. 总结

SortShuffleWriter 是 Spark 2.x 中用于实现 Shuffle 操作的关键组件之一。通过源码分析,我们深入了解了 SortShuffleWriter 的内部工作原理,包括数据分区、数据排序、数据写入和数据合并等过程。此外,我们还探讨了 SortShuffleWriter 的性能优化策略,包括内存管理、数据压缩和并行写入等。

通过对 SortShuffleWriter 的源码剖析,我们可以更好地理解 Spark 的 Shuffle 机制,并为优化 Spark 作业的性能提供有价值的参考。

推荐阅读:
  1. RDD Transformation和Action源码剖析
  2. STL源码剖析——list

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

sort shufflewriter

上一篇:Linux中文本分析awk命令怎么用

下一篇:Node.js 8.5新特性有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》