Skip to content

Commit 6735433

Browse files
author
Marcelo Vanzin
committed
[SPARK-20079][YARN] Fix client AM not allocating executors after restart.
The main goal of this change is to avoid the situation described in the bug, where an AM restart in the middle of a job may cause no new executors to be allocated because of faulty logic in the reset path. The change does two things: - fixes the executor alloc manager's reset() so that it does not stop allocation after a reset() in the middle of a job - re-orders the initialization of the YarnAllocator class so that it fetches the current executor ID before triggering the reset() above. This ensures both that the new allocator gets new requests for executors, and that it starts from the correct executor id. Tested with unit tests and by manually causing AM restarts while running jobs using spark-shell in YARN mode. Closes #17882 Author: Marcelo Vanzin <[email protected]> Author: Guoqiang Li <[email protected]> Closes #18663 from vanzin/SPARK-20079.
1 parent b133501 commit 6735433

File tree

3 files changed

+41
-54
lines changed

3 files changed

+41
-54
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
24-
import scala.util.control.ControlThrowable
24+
import scala.util.control.{ControlThrowable, NonFatal}
2525

2626
import com.codahale.metrics.{Gauge, MetricRegistry}
2727

@@ -245,14 +245,15 @@ private[spark] class ExecutorAllocationManager(
245245
}
246246

247247
/**
248-
* Reset the allocation manager to the initial state. Currently this will only be called in
249-
* yarn-client mode when AM re-registers after a failure.
248+
* Reset the allocation manager when the cluster manager loses track of the driver's state.
249+
* This is currently only done in YARN client mode, when the AM is restarted.
250+
*
251+
* This method forgets about any state about existing executors, and forces the scheduler to
252+
* re-evaluate the number of needed executors the next time it's run.
250253
*/
251254
def reset(): Unit = synchronized {
252-
initializing = true
255+
addTime = 0L
253256
numExecutorsTarget = initialNumExecutors
254-
numExecutorsToAdd = 1
255-
256257
executorsPendingToRemove.clear()
257258
removeTimes.clear()
258259
}
@@ -376,8 +377,17 @@ private[spark] class ExecutorAllocationManager(
376377
return 0
377378
}
378379

379-
val addRequestAcknowledged = testing ||
380-
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
380+
val addRequestAcknowledged = try {
381+
testing ||
382+
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
383+
} catch {
384+
case NonFatal(e) =>
385+
// Use INFO level so the error it doesn't show up by default in shells. Errors here are more
386+
// commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of
387+
// noisy output.
388+
logInfo("Error reaching cluster manager.", e)
389+
false
390+
}
381391
if (addRequestAcknowledged) {
382392
val executorsString = "executor" + { if (delta > 1) "s" else "" }
383393
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ private[spark] class ApplicationMaster(
130130
private var nextAllocationInterval = initialAllocationInterval
131131

132132
private var rpcEnv: RpcEnv = null
133-
private var amEndpoint: RpcEndpointRef = _
134133

135134
// In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
136135
private val sparkContextPromise = Promise[SparkContext]()
@@ -405,32 +404,26 @@ private[spark] class ApplicationMaster(
405404
securityMgr,
406405
localResources)
407406

407+
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
408+
// that when the driver sends an initial executor request (e.g. after an AM restart),
409+
// the allocator is ready to service requests.
410+
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
411+
408412
allocator.allocateResources()
409413
reporterThread = launchReporterThread()
410414
}
411415

412416
/**
413-
* Create an [[RpcEndpoint]] that communicates with the driver.
414-
*
415-
* In cluster mode, the AM and the driver belong to same process
416-
* so the AMEndpoint need not monitor lifecycle of the driver.
417-
*
418-
* @return A reference to the driver's RPC endpoint.
417+
* @return An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
419418
*/
420-
private def runAMEndpoint(
421-
host: String,
422-
port: String,
423-
isClusterMode: Boolean): RpcEndpointRef = {
424-
val driverEndpoint = rpcEnv.setupEndpointRef(
419+
private def createSchedulerRef(host: String, port: String): RpcEndpointRef = {
420+
rpcEnv.setupEndpointRef(
425421
RpcAddress(host, port.toInt),
426422
YarnSchedulerBackend.ENDPOINT_NAME)
427-
amEndpoint =
428-
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
429-
driverEndpoint
430423
}
431424

432425
private def runDriver(securityMgr: SecurityManager): Unit = {
433-
addAmIpFilter()
426+
addAmIpFilter(None)
434427
userClassThread = startUserApplication()
435428

436429
// This a bit hacky, but we need to wait until the spark.driver.port property has
@@ -442,10 +435,9 @@ private[spark] class ApplicationMaster(
442435
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
443436
if (sc != null) {
444437
rpcEnv = sc.env.rpcEnv
445-
val driverRef = runAMEndpoint(
438+
val driverRef = createSchedulerRef(
446439
sc.getConf.get("spark.driver.host"),
447-
sc.getConf.get("spark.driver.port"),
448-
isClusterMode = true)
440+
sc.getConf.get("spark.driver.port"))
449441
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
450442
registered = true
451443
} else {
@@ -471,7 +463,7 @@ private[spark] class ApplicationMaster(
471463
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
472464
amCores, true)
473465
val driverRef = waitForSparkDriver()
474-
addAmIpFilter()
466+
addAmIpFilter(Some(driverRef))
475467
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
476468
securityMgr)
477469
registered = true
@@ -620,20 +612,21 @@ private[spark] class ApplicationMaster(
620612

621613
sparkConf.set("spark.driver.host", driverHost)
622614
sparkConf.set("spark.driver.port", driverPort.toString)
623-
624-
runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false)
615+
createSchedulerRef(driverHost, driverPort.toString)
625616
}
626617

627618
/** Add the Yarn IP filter that is required for properly securing the UI. */
628-
private def addAmIpFilter() = {
619+
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
629620
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
630621
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
631622
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
632-
if (isClusterMode) {
633-
System.setProperty("spark.ui.filters", amFilter)
634-
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
635-
} else {
636-
amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
623+
driver match {
624+
case Some(d) =>
625+
d.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
626+
627+
case None =>
628+
System.setProperty("spark.ui.filters", amFilter)
629+
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
637630
}
638631
}
639632

@@ -704,20 +697,13 @@ private[spark] class ApplicationMaster(
704697
/**
705698
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
706699
*/
707-
private class AMEndpoint(
708-
override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean)
700+
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
709701
extends RpcEndpoint with Logging {
710702

711703
override def onStart(): Unit = {
712704
driver.send(RegisterClusterManager(self))
713705
}
714706

715-
override def receive: PartialFunction[Any, Unit] = {
716-
case x: AddWebUIFilter =>
717-
logInfo(s"Add WebUI Filter. $x")
718-
driver.send(x)
719-
}
720-
721707
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
722708
case r: RequestExecutors =>
723709
Option(allocator) match {

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ private[spark] abstract class YarnSchedulerBackend(
6969
/** Scheduler extension services. */
7070
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
7171

72-
// Flag to specify whether this schedulerBackend should be reset.
73-
private var shouldResetOnAmRegister = false
74-
7572
/**
7673
* Bind to YARN. This *must* be done before calling [[start()]].
7774
*
@@ -262,13 +259,7 @@ private[spark] abstract class YarnSchedulerBackend(
262259
case RegisterClusterManager(am) =>
263260
logInfo(s"ApplicationMaster registered as $am")
264261
amEndpoint = Option(am)
265-
if (!shouldResetOnAmRegister) {
266-
shouldResetOnAmRegister = true
267-
} else {
268-
// AM is already registered before, this potentially means that AM failed and
269-
// a new one registered after the failure. This will only happen in yarn-client mode.
270-
reset()
271-
}
262+
reset()
272263

273264
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
274265
addWebUIFilter(filterName, filterParams, proxyBase)

0 commit comments

Comments
 (0)