@@ -62,14 +62,8 @@ private[spark] class TaskSetManager(
6262 private val addedJars = HashMap [String , Long ](sched.sc.addedJars.toSeq: _* )
6363 private val addedFiles = HashMap [String , Long ](sched.sc.addedFiles.toSeq: _* )
6464
65- // Quantile of tasks at which to start speculation
66- val speculationQuantile = conf.get(SPECULATION_QUANTILE )
67- val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER )
68-
6965 val maxResultSize = conf.get(config.MAX_RESULT_SIZE )
7066
71- val speculationEnabled = conf.get(SPECULATION_ENABLED )
72-
7367 // Serializer for closures and tasks.
7468 val env = SparkEnv .get
7569 val ser = env.closureSerializer.newInstance()
@@ -80,6 +74,12 @@ private[spark] class TaskSetManager(
8074 val numTasks = tasks.length
8175 val copiesRunning = new Array [Int ](numTasks)
8276
77+ val speculationEnabled = conf.get(SPECULATION_ENABLED )
78+ // Quantile of tasks at which to start speculation
79+ val speculationQuantile = conf.get(SPECULATION_QUANTILE )
80+ val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER )
81+ val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1 )
82+
8383 // For each task, tracks whether a copy of the task has succeeded. A task will also be
8484 // marked as "succeeded" if it failed with a fetch failure, in which case it should not
8585 // be re-run because the missing map data needs to be regenerated first.
@@ -1032,7 +1032,6 @@ private[spark] class TaskSetManager(
10321032 return false
10331033 }
10341034 var foundTasks = false
1035- val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1 )
10361035 logDebug(" Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
10371036
10381037 // It's possible that a task is marked as completed by the scheduler, then the size of
0 commit comments