您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 如何进行DAGScheduler源码解读
## 一、前言
DAGScheduler是Apache Spark核心调度层的关键组件,负责将逻辑执行计划(RDD依赖关系)转换为物理执行计划(Stage划分与任务调度)。本文将从源码层面深入解析DAGScheduler的工作原理,帮助开发者理解Spark作业调度的核心机制。
## 二、环境准备
### 1. 源码获取
```bash
git clone https://github.com/apache/spark.git
cd spark/core/src/main/scala/org/apache/spark/scheduler
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
...
) extends Logging {
// 关键成员变量
private[scheduler] val nextStageId = new AtomicInteger(0)
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
...
}
Driver Program
↓
DAGScheduler (Stage划分)
↓
TaskScheduler (集群任务调度)
↓
SchedulerBackend (资源协商)
↓
Executor (任务执行)
// 核心调用链
dagScheduler.runJob()
→ submitJob()
→ handleJobSubmitted()
→ createResultStage()
→ getOrCreateParentStages()
private def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
Stage类型 | 触发条件 | 特点 |
---|---|---|
ResultStage | 执行Action操作 | 包含最终计算函数 |
ShuffleMapStage | 存在Shuffle依赖 | 输出Shuffle数据 |
// 事件循环处理核心
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") {
override def onReceive(event: DAGSchedulerEvent): Unit = {
event match {
case JobSubmitted(...) => dagScheduler.handleJobSubmitted(...)
case StageCancelled(...) => dagScheduler.handleStageCancellation(...)
case ... // 其他事件处理
}
}
}
// 任务生成关键代码
private[scheduler] def submitMissingTasks(
stage: Stage,
jobId: Int): Unit = {
// 1. 确定需要计算的分区
val partitionsToCompute = stage.findMissingPartitions()
// 2. 创建Task集合
val tasks: Seq[Task[_]] = stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { p =>
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, ...)
}
case stage: ResultStage =>
partitionsToCompute.map { p =>
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, ...)
}
}
// 3. 提交任务
taskScheduler.submitTasks(new TaskSet(...))
}
// 最大重试次数配置
spark.stage.maxConsecutiveAttempts = 4
// 本地性优先级排序
val locs = taskIdToLocations(taskId).map {
case TaskLocation(host, executorId) =>
(host, executorId) match {
case (Some(h), Some(e)) => Seq(ExecutorCacheTaskLocation(h, e))
case (Some(h), None) => Seq(HostTaskLocation(h))
case _ => Nil
}
}
// 资源调整事件处理
case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
handleExecutorLost(execId)
# 启用DEBUG日志
log4j.logger.org.apache.spark.scheduler.DAGScheduler=DEBUG
// 通过SparkListener跟踪
sc.addSparkListener(new SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
println(s"Stage ${stageSubmitted.stageInfo.stageId} submitted")
}
})
可能原因: - 持久化策略不当 - Shuffle依赖配置错误
诊断方法: - 分析Stage任务执行时间分布 - 检查Shuffle Read/Write指标
通过对DAGScheduler源码的系统解读,我们可以深入理解: 1. RDD物理执行计划的生成逻辑 2. Spark作业的调度生命周期 3. 分布式计算中的容错处理机制 4. 性能优化的核心切入点
建议读者结合Spark UI和实际调试经验,逐步构建完整的调度系统认知模型。
参数 | 默认值 | 说明 |
---|---|---|
spark.default.parallelism | 8 | 默认并行度 |
spark.stage.maxConsecutiveAttempts | 4 | Stage最大重试次数 |
spark.locality.wait | 3s | 本地性等待时间 |
”`
注:本文基于Spark 3.3.x版本源码分析,实际阅读时请对应具体版本。建议通过IDE的代码导航功能辅助阅读,重点关注类继承关系和关键方法调用链。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。