From fa746ed65a8ac685edd33c79159521398d99aa69 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sun, 3 Aug 2014 23:59:05 -0700 Subject: [PATCH 1/2] Remove support for waiting for executors in standalone mode. Current code waits until some minimum fraction of expected executors have registered before beginning scheduling. The current code in standalone mode suffers from a race condition (SPARK-2635). This race condition could be fixed, but this functionality is easily achieved by the user (they can use the storage status to determine how many executors are up, as described by @pwendell in #1462) so adding the extra complexity to the scheduler code is not worthwile. --- .../spark/scheduler/SchedulerBackend.scala | 10 ++++++ .../spark/scheduler/TaskSchedulerImpl.scala | 15 ++------- .../CoarseGrainedSchedulerBackend.scala | 31 ++----------------- .../cluster/SparkDeploySchedulerBackend.scala | 1 - docs/configuration.md | 5 +-- .../cluster/YarnClientSchedulerBackend.scala | 9 ++---- .../cluster/YarnClusterSchedulerBackend.scala | 9 ++---- 7 files changed, 22 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index e41e0a9841691..246b14d0125c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -30,5 +30,15 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException + + /** + * TaskSchedulerImpl will wait to begin scheduling tasks until this method returns true. + * Subclasses can override this method to ensure no tasks are scheduled until sufficient + * resources (e.g., enough executors) are alive. + * + * Waiting until sufficient resources are ready before scheduling tasks can improve performance + * for a few reasons; for example, if all tasks are scheduled on a small number of executors, + * memory-persisted data may overflow the available memory on those executors. + */ def isReady(): Boolean = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d2f764fc22f54..9cbd425867420 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -151,7 +151,9 @@ private[spark] class TaskSchedulerImpl( } override def postStartHook() { - waitBackendReady() + while (!backend.isReady()) { + Thread.sleep(100) + } } override def submitTasks(taskSet: TaskSet) { @@ -479,17 +481,6 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None - - private def waitBackendReady(): Unit = { - if (backend.isReady) { - return - } - while (!backend.isReady) { - synchronized { - this.wait(100) - } - } - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9f085eef46720..620d165c75341 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -47,19 +47,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + val totalRegisteredExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) - // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) - if (minRegisteredRatio > 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). - val maxRegisteredWaitingTime = - conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) - val createTime = System.currentTimeMillis() - var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -94,12 +85,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) { - ready = true - logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " + - executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() + - ", minRegisteredExecutorsRatio: " + minRegisteredRatio) - } + totalRegisteredExecutors.incrementAndGet() makeOffers() } @@ -268,19 +254,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } - override def isReady(): Boolean = { - if (ready) { - return true - } - if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { - ready = true - logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + - "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) - return true - } - false - } - // Add filters to the SparkUI def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { if (proxyBase != null && proxyBase.nonEmpty) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index a28446f6c8a6b..6d284e50ae712 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -97,7 +97,6 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { - totalExpectedExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } diff --git a/docs/configuration.md b/docs/configuration.md index 2a71d7b820e5f..469a403775dca 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -760,7 +760,8 @@ Apart from these, the following properties are also available, and may be useful to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of executors has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config - spark.scheduler.maxRegisteredExecutorsWaitingTime + spark.scheduler.maxRegisteredExecutorsWaitingTime. Only valid when using YARN + for scheduling. @@ -768,7 +769,7 @@ Apart from these, the following properties are also available, and may be useful 30000 Maximum amount of time to wait for executors to register before scheduling begins - (in milliseconds). + (in milliseconds). Only valid when using YARN for scheduling. diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index f8fb96b312f23..0d7af608263b4 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -27,14 +27,9 @@ import scala.collection.mutable.ArrayBuffer private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + extends YarnSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { - if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { - minRegisteredRatio = 0.8 - ready = false - } - var client: Client = null var appId: ApplicationId = null var checkerThread: Thread = null @@ -84,7 +79,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedExecutors.set(args.numExecutors) + totalExpectedExecutors = args.numExecutors client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 0ad1794d19538..9fee8681567f3 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -25,12 +25,7 @@ import org.apache.spark.util.IntParam private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { - - if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { - minRegisteredRatio = 0.8 - ready = false - } + extends YarnSchedulerBackend(scheduler, sc.env.actorSystem) { override def start() { super.start() @@ -40,6 +35,6 @@ private[spark] class YarnClusterSchedulerBackend( } // System property can override environment variable. numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) - totalExpectedExecutors.set(numExecutors) + totalExpectedExecutors = numExecutors } } From 7ef3d7179f4ea8a684a2bf07c0348f047369e676 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Mon, 4 Aug 2014 00:07:08 -0700 Subject: [PATCH 2/2] Added missing file --- .../cluster/YarnSchedulerBackend.scala | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala new file mode 100644 index 0000000000000..b08650692e8d6 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import akka.actor.ActorSystem + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl + +/** + * Subclass of CoarseGrainedSchedulerBackend that handles waiting until sufficient resources + * are registered before beginning to schedule tasks (needed by both Yarn scheduler backends). + */ +private[spark] class YarnSchedulerBackend( + scheduler: TaskSchedulerImpl, + actorSystem: ActorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, actorSystem) { + + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value (expected to be a double between 0 and 1, inclusive). + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0.8) + if (minRegisteredRatio > 1) minRegisteredRatio = 1 + // Regardless of whether the required number of executors have registered, return true from + // isReady() after this amount of time has elapsed. + val maxRegisteredWaitingTime = + conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) + private val createTime = System.currentTimeMillis() + + var totalExpectedExecutors: Int = _ + + override def isReady(): Boolean = { + val registeredExecutors = totalRegisteredExecutors.get() + if (registeredExecutors >= totalExpectedExecutors * minRegisteredRatio) { + logInfo("Sufficient resources registered with YarnSchedulerBackend to begin scheduling " + + s"tasks: $registeredExecutors registered executors out of $totalExpectedExecutors " + + s"total expected executors (min registered executors ratio of ${minRegisteredRatio})" + true + } else if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { + logInfo("YarnSchedulerBackend is ready to begin scheduling, even though insufficient" + + "resources have been registered, because the maximum waiting time of " + + s"$maxRegisteredWaitingTime milliseconds has elapsed ($registeredExecutors of " + + s"${minRegisteredRatio * totalExpectedExecutors} required executors have registered)") + true + } else { + false + } + } +}