Skip to content

Commit 8855b97

Browse files
author
Marcelo Vanzin
committed
Merge remote-tracking branch 'mridul/fix_yarn_scheduler_bug' into SPARK-8297
2 parents 9716a72 + 687790f commit 8855b97

File tree

4 files changed

+66
-4
lines changed

4 files changed

+66
-4
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ private[spark] class ApplicationMaster(
7777
@volatile private var allocator: YarnAllocator = _
7878
private val allocatorLock = new Object()
7979

80+
@volatile private var backend: CoarseGrainedSchedulerBackend = _
81+
8082
// Fields used in client mode.
8183
private var rpcEnv: RpcEnv = null
8284
private var amEndpoint: RpcEndpointRef = _
@@ -218,11 +220,13 @@ private[spark] class ApplicationMaster(
218220
}
219221
}
220222

221-
private def sparkContextInitialized(sc: SparkContext) = {
223+
private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = {
222224
sparkContextRef.synchronized {
223225
sparkContextRef.compareAndSet(null, sc)
224226
sparkContextRef.notifyAll()
225227
}
228+
this.backend = backend
229+
if (null != allocator) allocator.setScheduler(backend)
226230
}
227231

228232
private def sparkContextStopped(sc: SparkContext) = {
@@ -252,6 +256,7 @@ private[spark] class ApplicationMaster(
252256
uiAddress,
253257
historyAddress,
254258
securityMgr)
259+
if (null != backend) allocator.setScheduler(backend)
255260

256261
allocator.allocateResources()
257262
reporterThread = launchReporterThread()
@@ -612,8 +617,9 @@ object ApplicationMaster extends Logging {
612617
}
613618
}
614619

615-
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
616-
master.sparkContextInitialized(sc)
620+
private[spark] def sparkContextInitialized(sc: SparkContext,
621+
backend: CoarseGrainedSchedulerBackend): Unit = {
622+
master.sparkContextInitialized(sc, backend)
617623
}
618624

619625
private[spark] def sparkContextStopped(sc: SparkContext): Boolean = {

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.log4j.{Level, Logger}
3636

3737
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3838
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
39+
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3940

4041
/**
4142
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -88,6 +89,10 @@ private[yarn] class YarnAllocator(
8889
// Visible for testing.
8990
private[yarn] val executorIdToContainer = new HashMap[String, Container]
9091

92+
private var numUnexpectedContainerRelease = 0L
93+
private var backend: CoarseGrainedSchedulerBackend = _
94+
private val containerIdToExecutorId = new HashMap[ContainerId, String]
95+
9196
// Executor memory in MB.
9297
protected val executorMemory = args.executorMemory
9398
// Additional memory overhead.
@@ -165,6 +170,7 @@ private[yarn] class YarnAllocator(
165170
def killExecutor(executorId: String): Unit = synchronized {
166171
if (executorIdToContainer.contains(executorId)) {
167172
val container = executorIdToContainer.remove(executorId).get
173+
containerIdToExecutorId.remove(container.getId)
168174
internalReleaseContainer(container)
169175
numExecutorsRunning -= 1
170176
} else {
@@ -353,6 +359,7 @@ private[yarn] class YarnAllocator(
353359

354360
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
355361
executorIdToContainer(executorId) = container
362+
containerIdToExecutorId(container.getId) = executorId
356363

357364
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
358365
new HashSet[ContainerId])
@@ -384,6 +391,7 @@ private[yarn] class YarnAllocator(
384391
for (completedContainer <- completedContainers) {
385392
val containerId = completedContainer.getContainerId
386393

394+
var needNotify = false
387395
if (releasedContainers.contains(containerId)) {
388396
// Already marked the container for release, so remove it from
389397
// `releasedContainers`.
@@ -415,6 +423,7 @@ private[yarn] class YarnAllocator(
415423
". Diagnostics: " + completedContainer.getDiagnostics)
416424
numExecutorsFailed += 1
417425
}
426+
needNotify = true
418427
}
419428

420429
if (allocatedContainerToHostMap.containsKey(containerId)) {
@@ -430,6 +439,15 @@ private[yarn] class YarnAllocator(
430439

431440
allocatedContainerToHostMap.remove(containerId)
432441
}
442+
443+
val executorIdOpt = containerIdToExecutorId.remove(containerId)
444+
if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get)
445+
446+
if (needNotify && executorIdOpt.isDefined) {
447+
// The executor could have gone away (like no route to host, node failure, etc)
448+
// Notify backend about the failure of the executor
449+
notifyBackend(executorIdOpt.get, containerId)
450+
}
433451
}
434452
}
435453

@@ -438,6 +456,19 @@ private[yarn] class YarnAllocator(
438456
amClient.releaseAssignedContainer(container.getId())
439457
}
440458

459+
private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = {
460+
numUnexpectedContainerRelease += 1
461+
if (null != backend) {
462+
backend.removeExecutor(executorId,
463+
"Yarn deallocated the executor (" + executorId + ") container " + containerId)
464+
}
465+
}
466+
467+
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
468+
469+
private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized {
470+
this.backend = backend
471+
}
441472
}
442473

443474
private object YarnAllocator {

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule
2929
logInfo("Created YarnClusterScheduler")
3030

3131
override def postStartHook() {
32-
ApplicationMaster.sparkContextInitialized(sc)
32+
ApplicationMaster.sparkContextInitialized(sc,
33+
this.backend.asInstanceOf[CoarseGrainedSchedulerBackend])
3334
super.postStartHook()
3435
logInfo("YarnClusterScheduler.postStartHook done")
3536
}

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
231231
handler.getNumPendingAllocate should be (1)
232232
}
233233

234+
test("lost executor removed from backend") {
235+
val handler = createAllocator(4)
236+
handler.updateResourceRequests()
237+
handler.getNumExecutorsRunning should be (0)
238+
handler.getNumPendingAllocate should be (4)
239+
240+
val container1 = createContainer("host1")
241+
val container2 = createContainer("host2")
242+
handler.handleAllocatedContainers(Array(container1, container2))
243+
244+
handler.requestTotalExecutors(2)
245+
246+
val statuses = Seq(container1, container2).map { c =>
247+
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
248+
}
249+
handler.updateResourceRequests()
250+
handler.processCompletedContainers(statuses.toSeq)
251+
handler.updateResourceRequests()
252+
handler.getNumExecutorsRunning should be (0)
253+
handler.getNumPendingAllocate should be (2)
254+
handler.getNumExecutorsFailed should be (2)
255+
handler.getNumUnexpectedContainerRelease should be (2)
256+
}
257+
234258
test("memory exceeded diagnostic regexes") {
235259
val diagnostics =
236260
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +

0 commit comments

Comments
 (0)