Skip to content

Commit 41cf47e

Browse files
committed
Fix race condition at SchedulerBackend.isReady in standalone mode
1 parent 9632719 commit 41cf47e

File tree

5 files changed

+37
-28
lines changed

5 files changed

+37
-28
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4747
{
4848
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4949
var totalCoreCount = new AtomicInteger(0)
50-
var totalExpectedExecutors = new AtomicInteger(0)
50+
var totalExecutors = new AtomicInteger(0)
51+
var totalExpectedResources = new AtomicInteger(0)
5152
val conf = scheduler.sc.conf
5253
private val timeout = AkkaUtils.askTimeout(conf)
5354
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
54-
// Submit tasks only after (registered executors / total expected executors)
55+
// Submit tasks only after (registered resources / total expected resources)
5556
// is equal to at least this value, that is double between 0 and 1.
56-
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
57+
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
5758
if (minRegisteredRatio > 1) minRegisteredRatio = 1
58-
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
59+
// Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds).
5960
val maxRegisteredWaitingTime =
60-
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
61+
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
6162
val createTime = System.currentTimeMillis()
62-
var ready = if (minRegisteredRatio <= 0) true else false
6363

6464
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
6565
private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
9494
executorAddress(executorId) = sender.path.address
9595
addressToExecutorId(sender.path.address) = executorId
9696
totalCoreCount.addAndGet(cores)
97-
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
98-
ready = true
99-
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
100-
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
101-
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
102-
}
97+
totalExecutors.addAndGet(1)
10398
makeOffers()
10499
}
105100

@@ -268,14 +263,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
268263
}
269264
}
270265

266+
def checkRegisteredResources(): Boolean = true
267+
271268
override def isReady(): Boolean = {
272-
if (ready) {
269+
if (checkRegisteredResources) {
270+
logInfo("SchedulerBackend is ready for scheduling beginning" +
271+
", total expected resources: " + totalExpectedResources.get() +
272+
", minRegisteredResourcesRatio: " + minRegisteredRatio)
273273
return true
274274
}
275275
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
276-
ready = true
277276
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
278-
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
277+
"maxRegisteredResourcesWaitingTime(ms): " + maxRegisteredWaitingTime)
279278
return true
280279
}
281280
false

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
3636
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
3737

3838
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
39+
totalExpectedResources.getAndSet(maxCores.getOrElse(0))
3940

4041
override def start() {
4142
super.start()
@@ -98,7 +99,6 @@ private[spark] class SparkDeploySchedulerBackend(
9899

99100
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
100101
memory: Int) {
101-
totalExpectedExecutors.addAndGet(1)
102102
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
103103
fullId, hostPort, cores, Utils.megabytesToString(memory)))
104104
}
@@ -111,4 +111,8 @@ private[spark] class SparkDeploySchedulerBackend(
111111
logInfo("Executor %s removed: %s".format(fullId, message))
112112
removeExecutor(fullId.split("/")(1), reason.toString)
113113
}
114+
115+
override def checkRegisteredResources(): Boolean = {
116+
totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio
117+
}
114118
}

docs/configuration.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -746,21 +746,22 @@ Apart from these, the following properties are also available, and may be useful
746746
</td>
747747
</tr>
748748
</tr>
749-
<td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
749+
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
750750
<td>0</td>
751751
<td>
752-
The minimum ratio of registered executors (registered executors / total expected executors)
752+
The minimum ratio of registered resources (registered resources / total expected resources)
753+
(resources are executors in yarn mode, CPU cores in standalone and mesos mode)
753754
to wait for before scheduling begins. Specified as a double between 0 and 1.
754-
Regardless of whether the minimum ratio of executors has been reached,
755+
Regardless of whether the minimum ratio of resources has been reached,
755756
the maximum amount of time it will wait before scheduling begins is controlled by config
756-
<code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
757+
<code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>
757758
</td>
758759
</tr>
759760
<tr>
760-
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
761+
<td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
761762
<td>30000</td>
762763
<td>
763-
Maximum amount of time to wait for executors to register before scheduling begins
764+
Maximum amount of time to wait for resources to register before scheduling begins
764765
(in milliseconds).
765766
</td>
766767
</tr>

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ private[spark] class YarnClientSchedulerBackend(
3030
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
3131
with Logging {
3232

33-
if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
33+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
3434
minRegisteredRatio = 0.8
35-
ready = false
3635
}
3736

3837
var client: Client = null
@@ -84,7 +83,7 @@ private[spark] class YarnClientSchedulerBackend(
8483

8584
logDebug("ClientArguments called with: " + argsArrayBuf)
8685
val args = new ClientArguments(argsArrayBuf.toArray, conf)
87-
totalExpectedExecutors.set(args.numExecutors)
86+
totalExpectedResources.set(args.numExecutors)
8887
client = new Client(args, conf)
8988
appId = client.runApp()
9089
waitForApp()
@@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
150149
logInfo("Stopped")
151150
}
152151

152+
override def checkRegisteredResources(): Boolean = {
153+
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
154+
}
153155
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ private[spark] class YarnClusterSchedulerBackend(
2727
sc: SparkContext)
2828
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
2929

30-
if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
30+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
3131
minRegisteredRatio = 0.8
32-
ready = false
3332
}
3433

3534
override def start() {
@@ -40,6 +39,10 @@ private[spark] class YarnClusterSchedulerBackend(
4039
}
4140
// System property can override environment variable.
4241
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
43-
totalExpectedExecutors.set(numExecutors)
42+
totalExpectedResources.set(numExecutors)
43+
}
44+
45+
override def checkRegisteredResources(): Boolean = {
46+
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
4447
}
4548
}

0 commit comments

Comments
 (0)