Skip to content

Commit 3d3f8c8

Browse files
CrazyJvmrxin
authored andcommitted
Use pluggable clock in DAGSheduler #SPARK-2031
DAGScheduler supports pluggable clock like what TaskSetManager does. Author: CrazyJvm <[email protected]> Closes apache#976 from CrazyJvm/clock and squashes the following commits: 6779a4c [CrazyJvm] Use pluggable clock in DAGSheduler
1 parent c7a183b commit 3d3f8c8

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
3838
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{SystemClock, Clock, Utils}
4242

4343
/**
4444
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -61,7 +61,8 @@ class DAGScheduler(
6161
listenerBus: LiveListenerBus,
6262
mapOutputTracker: MapOutputTrackerMaster,
6363
blockManagerMaster: BlockManagerMaster,
64-
env: SparkEnv)
64+
env: SparkEnv,
65+
clock: Clock = SystemClock)
6566
extends Logging {
6667

6768
import DAGScheduler._
@@ -781,7 +782,7 @@ class DAGScheduler(
781782
logDebug("New pending tasks: " + myPending)
782783
taskScheduler.submitTasks(
783784
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
784-
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
785+
stageToInfos(stage).submissionTime = Some(clock.getTime())
785786
} else {
786787
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
787788
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -807,11 +808,11 @@ class DAGScheduler(
807808

808809
def markStageAsFinished(stage: Stage) = {
809810
val serviceTime = stageToInfos(stage).submissionTime match {
810-
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
811+
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
811812
case _ => "Unknown"
812813
}
813814
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
814-
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
815+
stageToInfos(stage).completionTime = Some(clock.getTime())
815816
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
816817
runningStages -= stage
817818
}
@@ -1015,7 +1016,7 @@ class DAGScheduler(
10151016
return
10161017
}
10171018
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
1018-
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
1019+
stageToInfos(failedStage).completionTime = Some(clock.getTime())
10191020
for (resultStage <- dependentStages) {
10201021
val job = resultStageToJob(resultStage)
10211022
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",

0 commit comments

Comments
 (0)