From 3f8c941faba134edfbc1a3c2ee7d00615d82e010 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Wed, 28 May 2014 13:35:13 +0800 Subject: [PATCH 01/13] submit stage after (configured ratio of) executors have been registered --- .../apache/spark/scheduler/DAGScheduler.scala | 1 + .../spark/scheduler/SchedulerBackend.scala | 1 + .../spark/scheduler/TaskScheduler.scala | 1 + .../spark/scheduler/TaskSchedulerImpl.scala | 10 ++++++++++ .../CoarseGrainedSchedulerBackend.scala | 19 +++++++++++++++++++ .../cluster/SparkDeploySchedulerBackend.scala | 1 + 6 files changed, 33 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b3ebaa547de0d..9a42b8f429c8e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -710,6 +710,7 @@ class DAGScheduler( /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) + taskScheduler.waitBackendReady if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 6a6d8e609bc39..e41e0a9841691 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException + def isReady(): Boolean = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 819c35257b5a7..3f42f4b989673 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,4 +54,5 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int + def waitBackendReady(): Unit = {return} } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 17292b4c15b8b..3566fc7f3107f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -431,6 +431,16 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None + override def waitBackendReady():Unit={ + if(backend.isReady){ + return + } + while(!backend.isReady){ + synchronized{ + this.wait(100) + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e47a060683a2d..4191f9182ac17 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -46,9 +46,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + val registeredRatio = conf.getDouble("spark.executor.registeredRatio", 0) + val maxRegisteredWaitingTime = conf.getInt("spark.executor.maxRegisteredWaitingTime", 10000) + val createTime = System.currentTimeMillis() + var ready = if(registeredRatio==0)true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -83,6 +88,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) + if (executorActor.size >= totalExecutors.get() * registeredRatio) { + ready = true + } makeOffers() } @@ -244,6 +252,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { + if (ready){ + return true + } + if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { + ready = true + return true + } + return false + } } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c07b3f7b695a..266229f703942 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { + totalExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } From 37f7dc281d3959ec8b7199889358f03c47781914 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Mon, 16 Jun 2014 16:17:45 +0800 Subject: [PATCH 02/13] support yarn mode(percentage style) --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 ++++---- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4191f9182ac17..ff57372dcb72d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -50,10 +50,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - val registeredRatio = conf.getDouble("spark.executor.registeredRatio", 0) - val maxRegisteredWaitingTime = conf.getInt("spark.executor.maxRegisteredWaitingTime", 10000) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredRatio", 0) + val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredWaitingTime", 10000) val createTime = System.currentTimeMillis() - var ready = if(registeredRatio==0)true else false + var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -88,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExecutors.get() * registeredRatio) { + if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) { ready = true } makeOffers() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 039cf4f276119..1e49a34a0b02f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) + totalExecutors.set(args.numExecutors) + // reset default minRegisteredRatio for yarn mode + if (minRegisteredRatio == 0) { + minRegisteredRatio = 0.9 + ready = false + } client = new Client(args, conf) appId = client.runApp() waitForApp() From e7b6272ffc42d28395049f8839ecdaa607d78eef Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Tue, 17 Jun 2014 17:55:35 +0800 Subject: [PATCH 03/13] support yarn-cluster --- .../scala/org/apache/spark/SparkContext.scala | 11 +++- .../cluster/YarnClusterSchedulerBackend.scala | 56 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0678bdd02110e..1023ecffe6a18 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1524,7 +1524,16 @@ object SparkContext extends Logging { throw new SparkException("YARN mode not available ?", e) } } - val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + val backend = try { + val clazz = + Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") + val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) + cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] + } catch { + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) + } + } scheduler.initialize(backend) scheduler diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala new file mode 100644 index 0000000000000..f8cc4e8517f02 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.scheduler.TaskSchedulerImpl + +import scala.collection.mutable.ArrayBuffer + +private[spark] class YarnClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { + if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) + } else if (sc.getConf.contains(sysProp)) { + arrayBuf += (optionName, sc.getConf.get(sysProp)) + } + } + + override def start() { + super.start() + val argsArrayBuf = new ArrayBuffer[String]() + List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } + val args = new ApplicationMasterArguments(argsArrayBuf.toArray) + totalExecutors.set(args.numExecutors) + // reset default minRegisteredRatio for yarn mode + if (minRegisteredRatio == 0) { + minRegisteredRatio = 0.9 + ready = false + } + } +} From 812c33cc8ee0aba1eba8fc113aa0fc1899796edd Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Thu, 19 Jun 2014 11:08:18 +0800 Subject: [PATCH 04/13] Fix driver lost --num-executors option in yarn-cluster mode --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6244332f23737..bd9eed57301a5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances",args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false, From 6cfb9ece6bdc608d198b013510e53e2b8dc6e965 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Fri, 20 Jun 2014 13:04:27 +0800 Subject: [PATCH 05/13] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha --- .../scala/org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 9 +++++---- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 1 + .../scheduler/cluster/YarnClientSchedulerBackend.scala | 5 ----- .../scheduler/cluster/YarnClusterSchedulerBackend.scala | 5 ----- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 7 files changed, 10 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 3f42f4b989673..324dd0f0acdfd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,5 +54,5 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int - def waitBackendReady(): Unit = {return} + def waitBackendReady(): Unit = { return } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3566fc7f3107f..8189b622d38ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -431,12 +431,13 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None - override def waitBackendReady():Unit={ - if(backend.isReady){ + + override def waitBackendReady(): Unit = { + if (backend.isReady) { return } - while(!backend.isReady){ - synchronized{ + while (!backend.isReady) { + synchronized { this.wait(100) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ff57372dcb72d..a2a4d9a65729c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -254,14 +254,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } override def isReady(): Boolean = { - if (ready){ + if (ready) { return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { ready = true return true } - return false + false } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1cc9c33cd2d02..c0f0595c94a0c 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false /* initialize */ , diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 1e49a34a0b02f..0110bc857e9bd 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -78,11 +78,6 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExecutors.set(args.numExecutors) - // reset default minRegisteredRatio for yarn mode - if (minRegisteredRatio == 0) { - minRegisteredRatio = 0.9 - ready = false - } client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index f8cc4e8517f02..3165b456c1663 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -47,10 +47,5 @@ private[spark] class YarnClusterSchedulerBackend( .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } val args = new ApplicationMasterArguments(argsArrayBuf.toArray) totalExecutors.set(args.numExecutors) - // reset default minRegisteredRatio for yarn mode - if (minRegisteredRatio == 0) { - minRegisteredRatio = 0.9 - ready = false - } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bd9eed57301a5..797c7895847df 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -164,7 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") - System.setProperty("spark.executor.instances",args.numExecutors.toString) + System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false, From ce0868ad90f0f38c620a19d4ed051731686a9578 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Mon, 23 Jun 2014 12:48:34 +0800 Subject: [PATCH 06/13] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../scheduler/cluster/YarnClusterSchedulerBackend.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a2a4d9a65729c..b138e8cc56b52 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -50,8 +50,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredRatio", 0) - val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredWaitingTime", 10000) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) val createTime = System.currentTimeMillis() var ready = if (minRegisteredRatio <= 0) true else false diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 3165b456c1663..0b6ca69a241b3 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -17,20 +17,19 @@ package org.apache.spark.scheduler.cluster +import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkContext} import org.apache.spark.deploy.yarn.ApplicationMasterArguments import org.apache.spark.scheduler.TaskSchedulerImpl -import scala.collection.mutable.ArrayBuffer - private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { - private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + private def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { if (System.getenv(envVar) != null) { arrayBuf += (optionName, System.getenv(envVar)) From 4261454a7c080caa57a36dea6488e7ded9c6b0b9 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Wed, 25 Jun 2014 10:44:41 +0800 Subject: [PATCH 07/13] Add docs for new configs & code style --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b138e8cc56b52..564b4709fc2c0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -50,8 +50,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit stage only after (registered executors / total executors) arrived the ratio. var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) - val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) + // Whatever minRegisteredExecutorsRatio is arrived, submit stage after the time(milliseconds). + val maxRegisteredWaitingTime = + conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) val createTime = System.currentTimeMillis() var ready = if (minRegisteredRatio <= 0) true else false From 0ecee9a08b9de8e3a536fdb48a5838aef8988dfb Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Wed, 25 Jun 2014 13:31:54 +0800 Subject: [PATCH 08/13] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - .../main/scala/org/apache/spark/scheduler/TaskScheduler.scala | 1 - .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 3 ++- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9a42b8f429c8e..b3ebaa547de0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -710,7 +710,6 @@ class DAGScheduler( /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) - taskScheduler.waitBackendReady if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 324dd0f0acdfd..819c35257b5a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,5 +54,4 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int - def waitBackendReady(): Unit = { return } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8189b622d38ba..1de4907d03cc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -147,6 +147,7 @@ private[spark] class TaskSchedulerImpl( override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks + waitBackendReady logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = new TaskSetManager(this, taskSet, maxTaskFailures) @@ -432,7 +433,7 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None - override def waitBackendReady(): Unit = { + private def waitBackendReady(): Unit = { if (backend.isReady) { return } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 564b4709fc2c0..1d64179aba812 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -50,9 +50,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit stage only after (registered executors / total executors) arrived the ratio. + // Submit tasks only after (registered executors / total executors) arrived the ratio. var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) - // Whatever minRegisteredExecutorsRatio is arrived, submit stage after the time(milliseconds). + // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) val createTime = System.currentTimeMillis() From 4d6d847e63e43676d6bebc7608524b094396d764 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Thu, 26 Jun 2014 17:07:01 +0800 Subject: [PATCH 09/13] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor --- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 7 +++-- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 31 ++++++------------- 5 files changed, 17 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1de4907d03cc9..474664fa37cde 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -143,11 +143,11 @@ private[spark] class TaskSchedulerImpl( Utils.tryOrExit { checkSpeculatableTasks() } } } + waitBackendReady } override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks - waitBackendReady logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = new TaskSetManager(this, taskSet, maxTaskFailures) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1d64179aba812..95a480bcbaf28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -46,11 +46,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExecutors = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total executors) arrived the ratio. + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value. var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). val maxRegisteredWaitingTime = @@ -91,7 +92,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) { + if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio) { ready = true } makeOffers() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 266229f703942..bf2dc88e29048 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -95,7 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { - totalExecutors.addAndGet(1) + totalExpectedExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 0110bc857e9bd..0ad82a70d0556 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -77,7 +77,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExecutors.set(args.numExecutors) + totalExpectedExecutors.set(args.numExecutors) client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 0b6ca69a241b3..87cc3c969c315 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -17,34 +17,23 @@ package org.apache.spark.scheduler.cluster -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.SparkContext import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) - with Logging { - - private def addArg(optionName: String, envVar: String, sysProp: String, - arrayBuf: ArrayBuffer[String]) { - if (System.getenv(envVar) != null) { - arrayBuf += (optionName, System.getenv(envVar)) - } else if (sc.getConf.contains(sysProp)) { - arrayBuf += (optionName, sc.getConf.get(sysProp)) - } - } + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { override def start() { super.start() - val argsArrayBuf = new ArrayBuffer[String]() - List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances")) - .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } - val args = new ApplicationMasterArguments(argsArrayBuf.toArray) - totalExecutors.set(args.numExecutors) + var numExecutors = 2 + if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) + } else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2) + } + totalExpectedExecutors.set(numExecutors) } } From c6f052211222aa66a772067455193ffab7d83585 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Fri, 27 Jun 2014 16:48:55 +0800 Subject: [PATCH 10/13] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS --- .../cluster/CoarseGrainedSchedulerBackend.scala | 3 ++- .../deploy/yarn/ApplicationMasterArguments.scala | 6 +++++- .../cluster/YarnClusterSchedulerBackend.scala | 11 ++++++----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 95a480bcbaf28..510844ab5b5be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -51,8 +51,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) // Submit tasks only after (registered executors / total expected executors) - // is equal to at least this value. + // is equal to at least this value, that is double between 0 and 1. var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + if (minRegisteredRatio > 1) minRegisteredRatio = 1 // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 25cc9016b10a6..4c383ab574abe 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 - var numExecutors = 2 + var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS parseArgs(args.toList) @@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) { System.exit(exitCode) } } + +object ApplicationMasterArguments { + val DEFAULT_NUMBER_EXECUTORS = 2 +} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 87cc3c969c315..a04b08f43cc5a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.ApplicationMasterArguments import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.IntParam @@ -28,12 +29,12 @@ private[spark] class YarnClusterSchedulerBackend( override def start() { super.start() - var numExecutors = 2 - if (sc.getConf.contains("spark.executor.instances")) { - numExecutors = sc.getConf.getInt("spark.executor.instances", 2) - } else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2) + var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors) } + // System property can override environment variable. + numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) totalExpectedExecutors.set(numExecutors) } } From 22ead12a5b8472efa1d02bd56b7404f1245ee513 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Mon, 30 Jun 2014 13:28:03 +0800 Subject: [PATCH 11/13] Move waitBackendReady to postStartHook --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 5 ++++- .../spark/scheduler/cluster/YarnClientClusterScheduler.scala | 2 ++ .../spark/scheduler/cluster/YarnClusterScheduler.scala | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 474664fa37cde..7cbb83a15626d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -143,7 +143,10 @@ private[spark] class TaskSchedulerImpl( Utils.tryOrExit { checkSpeculatableTasks() } } } - waitBackendReady + } + + override def postStartHook() { + waitBackendReady() } override def submitTasks(taskSet: TaskSet) { diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 6b91e6b9eb899..15e8c21aa5906 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur override def postStartHook() { + super.postStartHook() // The yarn application is running, but the executor might not yet ready // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt + // TODO It needn't after waitBackendReady Thread.sleep(2000L) logInfo("YarnClientClusterScheduler.postStartHook done") } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 39cdd2e8a522b..9ee53d797c8ea 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) override def postStartHook() { val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc) + super.postStartHook() if (sparkContextInitialized){ ApplicationMaster.waitForInitialAllocations() // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt + // TODO It needn't after waitBackendReady Thread.sleep(3000L) } logInfo("YarnClusterScheduler.postStartHook done") From 1ac08b1ca26d6b42afecd9f6b974935921590d72 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Fri, 11 Jul 2014 17:41:11 +0800 Subject: [PATCH 12/13] Add new configs to user docs --- docs/configuration.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index b84104cc7e653..ee680fa4c4095 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -699,6 +699,22 @@ Apart from these, the following properties are also available, and may be useful (in milliseconds) + + spark.scheduler.minRegisteredExecutorsRatio + 0 + + Submit tasks only after (registered executors / total expected executors) + is equal to at least this value, which is double between 0 and 1. + + + + spark.scheduler.maxRegisteredExecutorsWaitingTime + 30000 + + Whatever (registered executors / total expected executors) is reached + spark.scheduler.minRegisteredExecutorsRatio, submit tasks after the time(milliseconds). + + #### Security From b9f8326720740949b55441a34f094ba0986c9022 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Mon, 14 Jul 2014 14:30:09 +0800 Subject: [PATCH 13/13] Add logs & edit docs --- .../cluster/CoarseGrainedSchedulerBackend.scala | 7 ++++++- docs/configuration.md | 11 +++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 510844ab5b5be..b8f3b7bb85cb7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -93,8 +93,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) - if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio) { + if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) { ready = true + logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " + + executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() + + ", minRegisteredExecutorsRatio: " + minRegisteredRatio) } makeOffers() } @@ -264,6 +267,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { ready = true + logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + + "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) return true } false diff --git a/docs/configuration.md b/docs/configuration.md index ee680fa4c4095..3f4296a2aa942 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -703,16 +703,19 @@ Apart from these, the following properties are also available, and may be useful spark.scheduler.minRegisteredExecutorsRatio 0 - Submit tasks only after (registered executors / total expected executors) - is equal to at least this value, which is double between 0 and 1. + The minimum ratio of registered executors (registered executors / total expected executors) + to wait for before scheduling begins. Specified as a double between 0 and 1. + Regardless of whether the minimum ratio of executors has been reached, + the maximum amount of time it will wait before scheduling begins is controlled by config + spark.scheduler.maxRegisteredExecutorsWaitingTime spark.scheduler.maxRegisteredExecutorsWaitingTime 30000 - Whatever (registered executors / total expected executors) is reached - spark.scheduler.minRegisteredExecutorsRatio, submit tasks after the time(milliseconds). + Maximum amount of time to wait for executors to register before scheduling begins + (in milliseconds).