diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index e41e0a9841691..246b14d0125c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -30,5 +30,15 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException + + /** + * TaskSchedulerImpl will wait to begin scheduling tasks until this method returns true. + * Subclasses can override this method to ensure no tasks are scheduled until sufficient + * resources (e.g., enough executors) are alive. + * + * Waiting until sufficient resources are ready before scheduling tasks can improve performance + * for a few reasons; for example, if all tasks are scheduled on a small number of executors, + * memory-persisted data may overflow the available memory on those executors. + */ def isReady(): Boolean = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d2f764fc22f54..9cbd425867420 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -151,7 +151,9 @@ private[spark] class TaskSchedulerImpl( } override def postStartHook() { - waitBackendReady() + while (!backend.isReady()) { + Thread.sleep(100) + } } override def submitTasks(taskSet: TaskSet) { @@ -479,17 +481,6 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None - - private def waitBackendReady(): Unit = { - if (backend.isReady) { - return - } - while (!backend.isReady) { - synchronized { - this.wait(100) - } - } - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9f085eef46720..620d165c75341 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -47,19 +47,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + val totalRegisteredExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) - // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) - if (minRegisteredRatio > 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). - val maxRegisteredWaitingTime = - conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) - val createTime = System.currentTimeMillis() - var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -94,12 +85,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) { - ready = true - logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " + - executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() + - ", minRegisteredExecutorsRatio: " + minRegisteredRatio) - } + totalRegisteredExecutors.incrementAndGet() makeOffers() } @@ -268,19 +254,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } - override def isReady(): Boolean = { - if (ready) { - return true - } - if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { - ready = true - logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) - return true - } - false - } - // Add filters to the SparkUI def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { if (proxyBase != null && proxyBase.nonEmpty) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index a28446f6c8a6b..6d284e50ae712 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -97,7 +97,6 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { - totalExpectedExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } diff --git a/docs/configuration.md b/docs/configuration.md index 2a71d7b820e5f..469a403775dca 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -760,7 +760,8 @@ Apart from these, the following properties are also available, and may be useful to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of executors has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config - spark.scheduler.maxRegisteredExecutorsWaitingTime + spark.scheduler.maxRegisteredExecutorsWaitingTime. Only valid when using YARN + for scheduling. @@ -768,7 +769,7 @@ Apart from these, the following properties are also available, and may be useful 30000 Maximum amount of time to wait for executors to register before scheduling begins - (in milliseconds). + (in milliseconds). Only valid when using YARN for scheduling. diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index f8fb96b312f23..0d7af608263b4 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -27,14 +27,9 @@ import scala.collection.mutable.ArrayBuffer private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + extends YarnSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { - if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { - minRegisteredRatio = 0.8 - ready = false - } - var client: Client = null var appId: ApplicationId = null var checkerThread: Thread = null @@ -84,7 +79,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedExecutors.set(args.numExecutors) + totalExpectedExecutors = args.numExecutors client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 0ad1794d19538..9fee8681567f3 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -25,12 +25,7 @@ import org.apache.spark.util.IntParam private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { - - if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { - minRegisteredRatio = 0.8 - ready = false - } + extends YarnSchedulerBackend(scheduler, sc.env.actorSystem) { override def start() { super.start() @@ -40,6 +35,6 @@ private[spark] class YarnClusterSchedulerBackend( } // System property can override environment variable. numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) - totalExpectedExecutors.set(numExecutors) + totalExpectedExecutors = numExecutors } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala new file mode 100644 index 0000000000000..b08650692e8d6 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import akka.actor.ActorSystem + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl + +/** + * Subclass of CoarseGrainedSchedulerBackend that handles waiting until sufficient resources + * are registered before beginning to schedule tasks (needed by both Yarn scheduler backends). + */ +private[spark] class YarnSchedulerBackend( + scheduler: TaskSchedulerImpl, + actorSystem: ActorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, actorSystem) { + + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value (expected to be a double between 0 and 1, inclusive). + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0.8) + if (minRegisteredRatio > 1) minRegisteredRatio = 1 + // Regardless of whether the required number of executors have registered, return true from + // isReady() after this amount of time has elapsed. + val maxRegisteredWaitingTime = + conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) + private val createTime = System.currentTimeMillis() + + var totalExpectedExecutors: Int = _ + + override def isReady(): Boolean = { + val registeredExecutors = totalRegisteredExecutors.get() + if (registeredExecutors >= totalExpectedExecutors * minRegisteredRatio) { + logInfo("Sufficient resources registered with YarnSchedulerBackend to begin scheduling " + + s"tasks: $registeredExecutors registered executors out of $totalExpectedExecutors " + + s"total expected executors (min registered executors ratio of ${minRegisteredRatio})" + true + } else if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { + logInfo("YarnSchedulerBackend is ready to begin scheduling, even though insufficient" + + "resources have been registered, because the maximum waiting time of " + + s"$maxRegisteredWaitingTime milliseconds has elapsed ($registeredExecutors of " + + s"${minRegisteredRatio * totalExpectedExecutors} required executors have registered)") + true + } else { + false + } + } +}