Skip to content

Commit 0ecee9a

Browse files
committed
Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks
1 parent 4261454 commit 0ecee9a

File tree

4 files changed

+4
-5
lines changed

4 files changed

+4
-5
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,6 @@ class DAGScheduler(
710710
/** Submits stage, but first recursively submits any missing parents. */
711711
private def submitStage(stage: Stage) {
712712
val jobId = activeJobForStage(stage)
713-
taskScheduler.waitBackendReady
714713
if (jobId.isDefined) {
715714
logDebug("submitStage(" + stage + ")")
716715
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,4 @@ private[spark] trait TaskScheduler {
5454

5555
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
5656
def defaultParallelism(): Int
57-
def waitBackendReady(): Unit = { return }
5857
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ private[spark] class TaskSchedulerImpl(
147147

148148
override def submitTasks(taskSet: TaskSet) {
149149
val tasks = taskSet.tasks
150+
waitBackendReady
150151
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
151152
this.synchronized {
152153
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
@@ -432,7 +433,7 @@ private[spark] class TaskSchedulerImpl(
432433
// By default, rack is unknown
433434
def getRackForHost(value: String): Option[String] = None
434435

435-
override def waitBackendReady(): Unit = {
436+
private def waitBackendReady(): Unit = {
436437
if (backend.isReady) {
437438
return
438439
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5050
val conf = scheduler.sc.conf
5151
private val timeout = AkkaUtils.askTimeout(conf)
5252
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
53-
// Submit stage only after (registered executors / total executors) arrived the ratio.
53+
// Submit tasks only after (registered executors / total executors) arrived the ratio.
5454
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
55-
// Whatever minRegisteredExecutorsRatio is arrived, submit stage after the time(milliseconds).
55+
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
5656
val maxRegisteredWaitingTime =
5757
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
5858
val createTime = System.currentTimeMillis()

0 commit comments

Comments
 (0)