Skip to content

Commit 4d6d847

Browse files
committed
Move waitBackendReady to TaskSchedulerImpl.start & some code refactor
1 parent 0ecee9a commit 4d6d847

File tree

5 files changed

+17
-27
lines changed

5 files changed

+17
-27
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,11 @@ private[spark] class TaskSchedulerImpl(
143143
Utils.tryOrExit { checkSpeculatableTasks() }
144144
}
145145
}
146+
waitBackendReady
146147
}
147148

148149
override def submitTasks(taskSet: TaskSet) {
149150
val tasks = taskSet.tasks
150-
waitBackendReady
151151
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
152152
this.synchronized {
153153
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4646
{
4747
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4848
var totalCoreCount = new AtomicInteger(0)
49-
var totalExecutors = new AtomicInteger(0)
49+
var totalExpectedExecutors = new AtomicInteger(0)
5050
val conf = scheduler.sc.conf
5151
private val timeout = AkkaUtils.askTimeout(conf)
5252
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
53-
// Submit tasks only after (registered executors / total executors) arrived the ratio.
53+
// Submit tasks only after (registered executors / total expected executors)
54+
// is equal to at least this value.
5455
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
5556
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
5657
val maxRegisteredWaitingTime =
@@ -91,7 +92,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
9192
executorAddress(executorId) = sender.path.address
9293
addressToExecutorId(sender.path.address) = executorId
9394
totalCoreCount.addAndGet(cores)
94-
if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) {
95+
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio) {
9596
ready = true
9697
}
9798
makeOffers()

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
9595

9696
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
9797
memory: Int) {
98-
totalExecutors.addAndGet(1)
98+
totalExpectedExecutors.addAndGet(1)
9999
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
100100
fullId, hostPort, cores, Utils.megabytesToString(memory)))
101101
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private[spark] class YarnClientSchedulerBackend(
7777

7878
logDebug("ClientArguments called with: " + argsArrayBuf)
7979
val args = new ClientArguments(argsArrayBuf.toArray, conf)
80-
totalExecutors.set(args.numExecutors)
80+
totalExpectedExecutors.set(args.numExecutors)
8181
client = new Client(args, conf)
8282
appId = client.runApp()
8383
waitForApp()

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,23 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
22-
import org.apache.spark.{Logging, SparkContext}
23-
import org.apache.spark.deploy.yarn.ApplicationMasterArguments
20+
import org.apache.spark.SparkContext
2421
import org.apache.spark.scheduler.TaskSchedulerImpl
22+
import org.apache.spark.util.IntParam
2523

2624
private[spark] class YarnClusterSchedulerBackend(
2725
scheduler: TaskSchedulerImpl,
2826
sc: SparkContext)
29-
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
30-
with Logging {
31-
32-
private def addArg(optionName: String, envVar: String, sysProp: String,
33-
arrayBuf: ArrayBuffer[String]) {
34-
if (System.getenv(envVar) != null) {
35-
arrayBuf += (optionName, System.getenv(envVar))
36-
} else if (sc.getConf.contains(sysProp)) {
37-
arrayBuf += (optionName, sc.getConf.get(sysProp))
38-
}
39-
}
27+
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
4028

4129
override def start() {
4230
super.start()
43-
val argsArrayBuf = new ArrayBuffer[String]()
44-
List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
45-
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
46-
.foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
47-
val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
48-
totalExecutors.set(args.numExecutors)
31+
var numExecutors = 2
32+
if (sc.getConf.contains("spark.executor.instances")) {
33+
numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
34+
} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
35+
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
36+
}
37+
totalExpectedExecutors.set(numExecutors)
4938
}
5039
}

0 commit comments

Comments
 (0)