Skip to content

Commit 8de080d

Browse files
DjvuLeeMarcelo Vanzin
authored andcommitted
[SPARK-21383][YARN] Fix the YarnAllocator allocates more Resource
When NodeManagers launching Executors, the `missing` value will exceed the real value when the launch is slow, this can lead to YARN allocates more resource. We add the `numExecutorsRunning` when calculate the `missing` to avoid this. Test by experiment. Author: DjvuLee <[email protected]> Closes #18651 from djvulee/YarnAllocate.
1 parent 799e131 commit 8de080d

File tree

1 file changed

+33
-17
lines changed

1 file changed

+33
-17
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
1919

2020
import java.util.Collections
2121
import java.util.concurrent._
22+
import java.util.concurrent.atomic.AtomicInteger
2223
import java.util.regex.Pattern
2324

2425
import scala.collection.mutable
@@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._
3031
import org.apache.hadoop.yarn.client.api.AMRMClient
3132
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
3233
import org.apache.hadoop.yarn.conf.YarnConfiguration
33-
import org.apache.log4j.{Level, Logger}
3434

3535
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3636
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
@@ -80,7 +80,9 @@ private[yarn] class YarnAllocator(
8080
private val releasedContainers = Collections.newSetFromMap[ContainerId](
8181
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
8282

83-
@volatile private var numExecutorsRunning = 0
83+
private val numExecutorsRunning = new AtomicInteger(0)
84+
85+
private val numExecutorsStarting = new AtomicInteger(0)
8486

8587
/**
8688
* Used to generate a unique ID per executor
@@ -163,7 +165,7 @@ private[yarn] class YarnAllocator(
163165
clock = newClock
164166
}
165167

166-
def getNumExecutorsRunning: Int = numExecutorsRunning
168+
def getNumExecutorsRunning: Int = numExecutorsRunning.get()
167169

168170
def getNumExecutorsFailed: Int = synchronized {
169171
val endTime = clock.getTimeMillis()
@@ -242,7 +244,7 @@ private[yarn] class YarnAllocator(
242244
if (executorIdToContainer.contains(executorId)) {
243245
val container = executorIdToContainer.get(executorId).get
244246
internalReleaseContainer(container)
245-
numExecutorsRunning -= 1
247+
numExecutorsRunning.decrementAndGet()
246248
} else {
247249
logWarning(s"Attempted to kill unknown executor $executorId!")
248250
}
@@ -267,10 +269,12 @@ private[yarn] class YarnAllocator(
267269
val allocatedContainers = allocateResponse.getAllocatedContainers()
268270

269271
if (allocatedContainers.size > 0) {
270-
logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
272+
logDebug(("Allocated containers: %d. Current executor count: %d. " +
273+
"Launching executor count: %d. Cluster resources: %s.")
271274
.format(
272275
allocatedContainers.size,
273-
numExecutorsRunning,
276+
numExecutorsRunning.get,
277+
numExecutorsStarting.get,
274278
allocateResponse.getAvailableResources))
275279

276280
handleAllocatedContainers(allocatedContainers.asScala)
@@ -281,7 +285,7 @@ private[yarn] class YarnAllocator(
281285
logDebug("Completed %d containers".format(completedContainers.size))
282286
processCompletedContainers(completedContainers.asScala)
283287
logDebug("Finished processing %d completed containers. Current running executor count: %d."
284-
.format(completedContainers.size, numExecutorsRunning))
288+
.format(completedContainers.size, numExecutorsRunning.get))
285289
}
286290
}
287291

@@ -294,7 +298,11 @@ private[yarn] class YarnAllocator(
294298
def updateResourceRequests(): Unit = {
295299
val pendingAllocate = getPendingAllocate
296300
val numPendingAllocate = pendingAllocate.size
297-
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
301+
val missing = targetNumExecutors - numPendingAllocate -
302+
numExecutorsStarting.get - numExecutorsRunning.get
303+
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
304+
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
305+
s"executorsStarting: ${numExecutorsStarting.get}")
298306

299307
if (missing > 0) {
300308
logInfo(s"Will request $missing executor container(s), each with " +
@@ -493,7 +501,8 @@ private[yarn] class YarnAllocator(
493501
s"for executor with ID $executorId")
494502

495503
def updateInternalState(): Unit = synchronized {
496-
numExecutorsRunning += 1
504+
numExecutorsRunning.incrementAndGet()
505+
numExecutorsStarting.decrementAndGet()
497506
executorIdToContainer(executorId) = container
498507
containerIdToExecutorId(container.getId) = executorId
499508

@@ -503,7 +512,8 @@ private[yarn] class YarnAllocator(
503512
allocatedContainerToHostMap.put(containerId, executorHostname)
504513
}
505514

506-
if (numExecutorsRunning < targetNumExecutors) {
515+
if (numExecutorsRunning.get < targetNumExecutors) {
516+
numExecutorsStarting.incrementAndGet()
507517
if (launchContainers) {
508518
launcherPool.execute(new Runnable {
509519
override def run(): Unit = {
@@ -523,11 +533,16 @@ private[yarn] class YarnAllocator(
523533
).run()
524534
updateInternalState()
525535
} catch {
526-
case NonFatal(e) =>
527-
logError(s"Failed to launch executor $executorId on container $containerId", e)
528-
// Assigned container should be released immediately to avoid unnecessary resource
529-
// occupation.
530-
amClient.releaseAssignedContainer(containerId)
536+
case e: Throwable =>
537+
numExecutorsStarting.decrementAndGet()
538+
if (NonFatal(e)) {
539+
logError(s"Failed to launch executor $executorId on container $containerId", e)
540+
// Assigned container should be released immediately
541+
// to avoid unnecessary resource occupation.
542+
amClient.releaseAssignedContainer(containerId)
543+
} else {
544+
throw e
545+
}
531546
}
532547
}
533548
})
@@ -537,7 +552,8 @@ private[yarn] class YarnAllocator(
537552
}
538553
} else {
539554
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
540-
"reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
555+
"reached target Executors count: %d.").format(
556+
numExecutorsRunning.get, targetNumExecutors))
541557
}
542558
}
543559
}
@@ -552,7 +568,7 @@ private[yarn] class YarnAllocator(
552568
val exitReason = if (!alreadyReleased) {
553569
// Decrement the number of executors running. The next iteration of
554570
// the ApplicationMaster's reporting thread will take care of allocating.
555-
numExecutorsRunning -= 1
571+
numExecutorsRunning.decrementAndGet()
556572
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
557573
containerId,
558574
onHostStr,

0 commit comments

Comments
 (0)