Skip to content

Commit abf4860

Browse files
committed
Push down variable totalExpectedResources to children classes
1 parent ca54bd9 commit abf4860

File tree

4 files changed

+11
-9
lines changed

4 files changed

+11
-9
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4848
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4949
var totalCoreCount = new AtomicInteger(0)
5050
var totalExecutors = new AtomicInteger(0)
51-
var totalExpectedResources = new AtomicInteger(0)
5251
val conf = scheduler.sc.conf
5352
private val timeout = AkkaUtils.askTimeout(conf)
5453
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
@@ -268,8 +267,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
268267

269268
override def isReady(): Boolean = {
270269
if (sufficientResourcesRegistered) {
271-
logInfo("SchedulerBackend is ready for scheduling beginning, total expected resources: " +
272-
s"$totalExpectedResources, minRegisteredResourcesRatio: $minRegisteredRatio")
270+
logInfo("SchedulerBackend is ready for scheduling beginning after " +
271+
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
273272
return true
274273
}
275274
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
3636
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
3737

3838
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
39-
totalExpectedResources.getAndSet(maxCores.getOrElse(0))
39+
val totalExpectedCores = maxCores.getOrElse(0)
4040

4141
override def start() {
4242
super.start()
@@ -113,6 +113,6 @@ private[spark] class SparkDeploySchedulerBackend(
113113
}
114114

115115
override def sufficientResourcesRegistered(): Boolean = {
116-
totalCoreCount.get() >= totalExpectedResources.get() * minRegisteredRatio
116+
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
117117
}
118118
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ private[spark] class YarnClientSchedulerBackend(
3838
var appId: ApplicationId = null
3939
var checkerThread: Thread = null
4040
var stopping: Boolean = false
41+
var totalExpectedExecutors = 0
4142

4243
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
4344
arrayBuf: ArrayBuffer[String]) {
@@ -83,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
8384

8485
logDebug("ClientArguments called with: " + argsArrayBuf)
8586
val args = new ClientArguments(argsArrayBuf.toArray, conf)
86-
totalExpectedResources.set(args.numExecutors)
87+
totalExpectedExecutors = args.numExecutors
8788
client = new Client(args, conf)
8889
appId = client.runApp()
8990
waitForApp()
@@ -150,6 +151,6 @@ private[spark] class YarnClientSchedulerBackend(
150151
}
151152

152153
override def sufficientResourcesRegistered(): Boolean = {
153-
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
154+
totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
154155
}
155156
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ private[spark] class YarnClusterSchedulerBackend(
2727
sc: SparkContext)
2828
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
2929

30+
var totalExpectedExecutors = 0
31+
3032
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
3133
minRegisteredRatio = 0.8
3234
}
@@ -39,10 +41,10 @@ private[spark] class YarnClusterSchedulerBackend(
3941
}
4042
// System property can override environment variable.
4143
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
42-
totalExpectedResources.set(numExecutors)
44+
totalExpectedExecutors = numExecutors
4345
}
4446

4547
override def sufficientResourcesRegistered(): Boolean = {
46-
totalExecutors.get() >= totalExpectedResources.get() * minRegisteredRatio
48+
totalExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
4749
}
4850
}

0 commit comments

Comments
 (0)