Skip to content

Commit 1496b78

Browse files
author
Marcelo Vanzin
committed
[SPARK-20079][yarn] Fix client AM not allocating executors aftert restart.
The main goal of this change is to avoid the situation described in the bug, where an AM restart in the middle of a job may cause no new executors to be allocated because of faulty logic in the reset path. The change does two things: - fixes the executor alloc manager's reset() so that it does not stop allocation after a reset() in the middle of a job - re-orders the initialization of the YarnAllocator class so that it fetches the current executor ID before triggering the reset() above. This ensures both that the new allocator gets new requests for executors, and that it starts from the correct executor id. Tested with unit tests and by manually causing AM restarts while running jobs using spark-shell in YARN mode.
1 parent ba8bf28 commit 1496b78

File tree

8 files changed

+74
-100
lines changed

8 files changed

+74
-100
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
24-
import scala.util.control.ControlThrowable
24+
import scala.util.control.{ControlThrowable, NonFatal}
2525

2626
import com.codahale.metrics.{Gauge, MetricRegistry}
2727

@@ -245,10 +245,15 @@ private[spark] class ExecutorAllocationManager(
245245
}
246246

247247
/**
248-
* Reset the allocation manager to the initial state. Currently this will only be called in
249-
* yarn-client mode when AM re-registers after a failure.
248+
* Reset the allocation manager when the cluster manager loses track of the driver's state.
249+
* This is currently only done in YARN client mode, when the AM is restarted.
250+
*
251+
* This method forgets about any state about existing executors, and forces the scheduler to
252+
* re-evaluate the number of needed executors the next time it's run.
250253
*/
251254
def reset(): Unit = synchronized {
255+
addTime = 0L
256+
numExecutorsTarget = initialNumExecutors
252257
executorsPendingToRemove.clear()
253258
removeTimes.clear()
254259
}
@@ -372,8 +377,17 @@ private[spark] class ExecutorAllocationManager(
372377
return 0
373378
}
374379

375-
val addRequestAcknowledged = testing ||
376-
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
380+
val addRequestAcknowledged = try {
381+
testing ||
382+
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
383+
} catch {
384+
case NonFatal(e) =>
385+
// Use INFO level so the error it doesn't show up by default in shells. Errors here are more
386+
// commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of
387+
// noisy output.
388+
logInfo("Error reaching cluster manager.", e)
389+
false
390+
}
377391
if (addRequestAcknowledged) {
378392
val executorsString = "executor" + { if (delta > 1) "s" else "" }
379393
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private[spark] object CoarseGrainedClusterMessages {
3535
ioEncryptionKey: Option[Array[Byte]])
3636
extends CoarseGrainedClusterMessage
3737

38-
case object GetAMInitialState extends CoarseGrainedClusterMessage
38+
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
3939

4040
// Driver to executors
4141
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9393
@GuardedBy("CoarseGrainedSchedulerBackend.this")
9494
protected var localityAwareTasks = 0
9595

96+
// The num of current max ExecutorId used to re-register appMaster
97+
@volatile protected var currentExecutorIdCounter = 0
98+
9699
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
97100
extends ThreadSafeRpcEndpoint with Logging {
98101

@@ -181,7 +184,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
181184
// in this block are read when requesting executors
182185
CoarseGrainedSchedulerBackend.this.synchronized {
183186
executorDataMap.put(executorId, data)
184-
setCurrentExecutorIdCounter(executorId.toInt)
187+
if (currentExecutorIdCounter < executorId.toInt) {
188+
currentExecutorIdCounter = executorId.toInt
189+
}
185190
if (numPendingExecutors > 0) {
186191
numPendingExecutors -= 1
187192
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
@@ -649,9 +654,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
649654
defaultAskTimeout.awaitResult(response)
650655
}
651656

652-
// Set the num of current max ExecutorId used to re-register appMaster
653-
protected def setCurrentExecutorIdCounter(executorId: Int): Unit = {}
654-
655657
/**
656658
* Kill the given list of executors through the cluster manager.
657659
* @return whether the kill request is acknowledged.

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 23 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ private[spark] class ApplicationMaster(
127127
private var nextAllocationInterval = initialAllocationInterval
128128

129129
private var rpcEnv: RpcEnv = null
130-
private var amEndpoint: RpcEndpointRef = _
131130

132131
// In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
133132
private val sparkContextPromise = Promise[SparkContext]()
@@ -388,53 +387,35 @@ private[spark] class ApplicationMaster(
388387
dummyRunner.launchContextDebugInfo()
389388
}
390389

391-
/**
392-
* (executorIdCounter, requestExecutors) should be the initial state
393-
* or the last state AM restart.
394-
*
395-
* @see SPARK-12864, SPARK-20079
396-
*/
397-
val (executorIdCounter, requestExecutors) =
398-
driverRef.askSync[(Int, RequestExecutors)](GetAMInitialState)
399390
allocator = client.register(driverUrl,
400391
driverRef,
401392
yarnConf,
402393
_sparkConf,
403394
uiAddress,
404395
historyAddress,
405396
securityMgr,
406-
localResources,
407-
executorIdCounter)
408-
if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) {
409-
amEndpoint.send(requestExecutors)
410-
}
397+
localResources)
398+
399+
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
400+
// that when the driver sends an initial executor request (e.g. after an AM restart),
401+
// the allocator is ready to service requests.
402+
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
411403

412404
allocator.allocateResources()
413405
reporterThread = launchReporterThread()
414406
}
415407

416408
/**
417-
* Create an [[RpcEndpoint]] that communicates with the driver.
418-
*
419-
* In cluster mode, the AM and the driver belong to same process
420-
* so the AMEndpoint need not monitor lifecycle of the driver.
421-
*
422-
* @return A reference to the driver's RPC endpoint.
409+
* @return An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
423410
*/
424-
private def runAMEndpoint(
425-
host: String,
426-
port: String,
427-
isClusterMode: Boolean): RpcEndpointRef = {
428-
val driverEndpoint = rpcEnv.setupEndpointRef(
411+
private def createSchedulerRef(host: String, port: String): RpcEndpointRef = {
412+
rpcEnv.setupEndpointRef(
429413
RpcAddress(host, port.toInt),
430414
YarnSchedulerBackend.ENDPOINT_NAME)
431-
amEndpoint =
432-
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
433-
driverEndpoint
434415
}
435416

436417
private def runDriver(securityMgr: SecurityManager): Unit = {
437-
addAmIpFilter()
418+
addAmIpFilter(None)
438419
userClassThread = startUserApplication()
439420

440421
// This a bit hacky, but we need to wait until the spark.driver.port property has
@@ -446,10 +427,9 @@ private[spark] class ApplicationMaster(
446427
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
447428
if (sc != null) {
448429
rpcEnv = sc.env.rpcEnv
449-
val driverRef = runAMEndpoint(
430+
val driverRef = createSchedulerRef(
450431
sc.getConf.get("spark.driver.host"),
451-
sc.getConf.get("spark.driver.port"),
452-
isClusterMode = true)
432+
sc.getConf.get("spark.driver.port"))
453433
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
454434
} else {
455435
// Sanity check; should never happen in normal operation, since sc should only be null
@@ -474,7 +454,7 @@ private[spark] class ApplicationMaster(
474454
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
475455
clientMode = true)
476456
val driverRef = waitForSparkDriver()
477-
addAmIpFilter()
457+
addAmIpFilter(Some(driverRef))
478458
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
479459
securityMgr)
480460

@@ -622,20 +602,21 @@ private[spark] class ApplicationMaster(
622602

623603
sparkConf.set("spark.driver.host", driverHost)
624604
sparkConf.set("spark.driver.port", driverPort.toString)
625-
626-
runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false)
605+
createSchedulerRef(driverHost, driverPort.toString)
627606
}
628607

629608
/** Add the Yarn IP filter that is required for properly securing the UI. */
630-
private def addAmIpFilter() = {
609+
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
631610
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
632611
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
633612
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
634-
if (isClusterMode) {
635-
System.setProperty("spark.ui.filters", amFilter)
636-
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
637-
} else {
638-
amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
613+
driver match {
614+
case Some(d) =>
615+
d.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
616+
617+
case None =>
618+
System.setProperty("spark.ui.filters", amFilter)
619+
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
639620
}
640621
}
641622

@@ -706,20 +687,13 @@ private[spark] class ApplicationMaster(
706687
/**
707688
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
708689
*/
709-
private class AMEndpoint(
710-
override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean)
690+
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
711691
extends RpcEndpoint with Logging {
712692

713693
override def onStart(): Unit = {
714694
driver.send(RegisterClusterManager(self))
715695
}
716696

717-
override def receive: PartialFunction[Any, Unit] = {
718-
case x: AddWebUIFilter =>
719-
logInfo(s"Add WebUI Filter. $x")
720-
driver.send(x)
721-
}
722-
723697
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
724698
case r: RequestExecutors =>
725699
Option(allocator) match {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.internal.config._
4040
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
4141
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
4242
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
43+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
4344
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
4445

4546
/**
@@ -64,8 +65,7 @@ private[yarn] class YarnAllocator(
6465
appAttemptId: ApplicationAttemptId,
6566
securityMgr: SecurityManager,
6667
localResources: Map[String, LocalResource],
67-
resolver: SparkRackResolver,
68-
private var executorIdCounter: Int)
68+
resolver: SparkRackResolver)
6969
extends Logging {
7070

7171
import YarnAllocator._
@@ -82,6 +82,22 @@ private[yarn] class YarnAllocator(
8282

8383
@volatile private var numExecutorsRunning = 0
8484

85+
/**
86+
* Used to generate a unique ID per executor
87+
*
88+
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
89+
* the id of new executor will start from 1, this will conflict with the executor has
90+
* already created before. So, we should initialize the `executorIdCounter` by getting
91+
* the max executorId from driver.
92+
*
93+
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
94+
* in yarn client mode. For more details, can check in jira.
95+
*
96+
* @see SPARK-12864
97+
*/
98+
private var executorIdCounter: Int =
99+
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
100+
85101
// Queue to store the timestamp of failed executors
86102
private val failedExecutorsTimeStamps = new Queue[Long]()
87103

@@ -147,8 +163,6 @@ private[yarn] class YarnAllocator(
147163
clock = newClock
148164
}
149165

150-
def getTargetNumExecutors: Int = targetNumExecutors
151-
152166
def getNumExecutorsRunning: Int = numExecutorsRunning
153167

154168
def getNumExecutorsFailed: Int = synchronized {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ private[spark] class YarnRMClient extends Logging {
5858
uiAddress: Option[String],
5959
uiHistoryAddress: String,
6060
securityMgr: SecurityManager,
61-
localResources: Map[String, LocalResource],
62-
executorIdCounter: Int
61+
localResources: Map[String, LocalResource]
6362
): YarnAllocator = {
6463
amClient = AMRMClient.createAMRMClient()
6564
amClient.init(conf)
@@ -76,7 +75,7 @@ private[spark] class YarnRMClient extends Logging {
7675
registered = true
7776
}
7877
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
79-
localResources, new SparkRackResolver(), executorIdCounter)
78+
localResources, new SparkRackResolver())
8079
}
8180

8281
/**

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,6 @@ private[spark] abstract class YarnSchedulerBackend(
6969
/** Scheduler extension services. */
7070
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
7171

72-
// Flag to specify whether this schedulerBackend should be reset.
73-
private var shouldResetOnAmRegister = false
74-
75-
private val currentState = new CurrentAMState(0,
76-
RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty))
77-
78-
protected class CurrentAMState(
79-
var executorIdCounter: Int,
80-
var requestExecutors: RequestExecutors)
8172
/**
8273
* Bind to YARN. This *must* be done before calling [[start()]].
8374
*
@@ -146,20 +137,7 @@ private[spark] abstract class YarnSchedulerBackend(
146137
* This includes executors already pending or running.
147138
*/
148139
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
149-
val requestExecutors = prepareRequestExecutors(requestedTotal)
150-
val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors)
151-
setCurrentRequestExecutors(requestExecutors)
152-
future
153-
}
154-
155-
override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized {
156-
if (currentState.executorIdCounter < executorId.toInt) {
157-
currentState.executorIdCounter = executorId.toInt
158-
}
159-
}
160-
161-
def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized {
162-
currentState.requestExecutors = requestExecutors
140+
yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
163141
}
164142

165143
/**
@@ -281,13 +259,7 @@ private[spark] abstract class YarnSchedulerBackend(
281259
case RegisterClusterManager(am) =>
282260
logInfo(s"ApplicationMaster registered as $am")
283261
amEndpoint = Option(am)
284-
if (!shouldResetOnAmRegister) {
285-
shouldResetOnAmRegister = true
286-
} else {
287-
// AM is already registered before, this potentially means that AM failed and
288-
// a new one registered after the failure. This will only happen in yarn-client mode.
289-
reset()
290-
}
262+
reset()
291263

292264
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
293265
addWebUIFilter(filterName, filterParams, proxyBase)
@@ -331,8 +303,8 @@ private[spark] abstract class YarnSchedulerBackend(
331303
context.reply(false)
332304
}
333305

334-
case GetAMInitialState =>
335-
context.reply((currentState.executorIdCounter, currentState.requestExecutors))
306+
case RetrieveLastAllocatedExecutorId =>
307+
context.reply(currentExecutorIdCounter)
336308
}
337309

338310
override def onDisconnected(remoteAddress: RpcAddress): Unit = {

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
101101
appAttemptId,
102102
new SecurityManager(sparkConf),
103103
Map(),
104-
new MockResolver(),
105-
0)
104+
new MockResolver())
106105
}
107106

108107
def createContainer(host: String): Container = {

0 commit comments

Comments
 (0)