From a3a0f016e409026805c3960063a708ffbc3e2488 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 12:41:33 -0700 Subject: [PATCH 1/4] Fix SPARK-8297 --- .../spark/deploy/yarn/ApplicationMaster.scala | 8 +++-- .../spark/deploy/yarn/YarnAllocator.scala | 30 +++++++++++++++++++ .../cluster/YarnClusterScheduler.scala | 3 +- .../deploy/yarn/YarnAllocatorSuite.scala | 23 ++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 83dafa4a125d2..78f50a97376d6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -218,11 +218,12 @@ private[spark] class ApplicationMaster( } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = { sparkContextRef.synchronized { sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } + allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { @@ -612,8 +613,9 @@ object ApplicationMaster extends Logging { } } - private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { - master.sparkContextInitialized(sc) + private[spark] def sparkContextInitialized(sc: SparkContext, + backend: CoarseGrainedSchedulerBackend): Unit = { + master.sparkContextInitialized(sc, backend) } private[spark] def sparkContextStopped(sc: SparkContext): Boolean = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 940873fbd046c..2544990dda858 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -88,6 +88,10 @@ private[yarn] class YarnAllocator( // Visible for testing. private[yarn] val executorIdToContainer = new HashMap[String, Container] + private var numUnexpectedContainerRelease = 0L + private var backend: CoarseGrainedSchedulerBackend = _ + private val containerIdToExecutorId = new HashMap[ContainerId, String] + // Executor memory in MB. protected val executorMemory = args.executorMemory // Additional memory overhead. @@ -165,6 +169,7 @@ private[yarn] class YarnAllocator( def killExecutor(executorId: String): Unit = synchronized { if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.remove(executorId).get + containerIdToExecutorId.remove(container.getId) internalReleaseContainer(container) numExecutorsRunning -= 1 } else { @@ -353,6 +358,7 @@ private[yarn] class YarnAllocator( logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) executorIdToContainer(executorId) = container + containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) @@ -384,6 +390,7 @@ private[yarn] class YarnAllocator( for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + var needNotify = false if (releasedContainers.contains(containerId)) { // Already marked the container for release, so remove it from // `releasedContainers`. @@ -415,6 +422,7 @@ private[yarn] class YarnAllocator( ". Diagnostics: " + completedContainer.getDiagnostics) numExecutorsFailed += 1 } + needNotify = true } if (allocatedContainerToHostMap.containsKey(containerId)) { @@ -430,6 +438,15 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.remove(containerId) } + + val executorIdOpt = containerIdToExecutorId.remove(containerId) + if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get) + + if (needNotify && executorIdOpt.isDefined) { + // The executor could have gone away (like no route to host, node failure, etc) + // Notify backend about the failure of the executor + notifyBackend(executorIdOpt.get, containerId) + } } } @@ -438,6 +455,19 @@ private[yarn] class YarnAllocator( amClient.releaseAssignedContainer(container.getId()) } + private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + if (null != backend) { + backend.removeExecutor(executorId, + "Yarn deallocated the executor (" + executorId + ") container " + containerId) + } + numUnexpectedContainerRelease += 1 + } + + private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease + + private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized { + this.backend = backend + } } private object YarnAllocator { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 72ec4d6b34af6..b1f2daf913430 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -29,7 +29,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule logInfo("Created YarnClusterScheduler") override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) + ApplicationMaster.sparkContextInitialized(sc, + this.backend.asInstanceOf[CoarseGrainedSchedulerBackend]) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 7509000771d94..120109a384d26 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -231,6 +231,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumPendingAllocate should be (1) } + test("lost executor removed from backend") { + val handler = createAllocator(4) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (4) + + val container1 = createContainer("host1") + val container2 = createContainer("host2") + handler.handleAllocatedContainers(Array(container1, container2)) + + handler.requestTotalExecutors(2) + + val statuses = Seq(container1, container2).map { c => + ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) + } + handler.updateResourceRequests() + handler.processCompletedContainers(statuses.toSeq) + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (2) + handler.getNumExecutorsFailed should be (2) + handler.getNumUnexpectedContainerRelease should be (2) + } + test("memory exceeded diagnostic regexes") { val diagnostics = "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + From 9ee1307ea2c6b498625145d224ce57049acad038 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 12:41:33 -0700 Subject: [PATCH 2/4] Fix SPARK-8297 --- pom.xml | 8 +++++ .../spark/deploy/yarn/ApplicationMaster.scala | 8 +++-- .../spark/deploy/yarn/YarnAllocator.scala | 31 +++++++++++++++++++ .../cluster/YarnClusterScheduler.scala | 3 +- .../deploy/yarn/YarnAllocatorSuite.scala | 23 ++++++++++++++ 5 files changed, 69 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index ffa96128a3d61..5ee6515092914 100644 --- a/pom.xml +++ b/pom.xml @@ -87,18 +87,23 @@ core + tools network/common network/shuffle + unsafe assembly + repl launcher + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 83dafa4a125d2..78f50a97376d6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -218,11 +218,12 @@ private[spark] class ApplicationMaster( } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = { sparkContextRef.synchronized { sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } + allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { @@ -612,8 +613,9 @@ object ApplicationMaster extends Logging { } } - private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { - master.sparkContextInitialized(sc) + private[spark] def sparkContextInitialized(sc: SparkContext, + backend: CoarseGrainedSchedulerBackend): Unit = { + master.sparkContextInitialized(sc, backend) } private[spark] def sparkContextStopped(sc: SparkContext): Boolean = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 940873fbd046c..49932a239953b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,6 +36,7 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -88,6 +89,10 @@ private[yarn] class YarnAllocator( // Visible for testing. private[yarn] val executorIdToContainer = new HashMap[String, Container] + private var numUnexpectedContainerRelease = 0L + private var backend: CoarseGrainedSchedulerBackend = _ + private val containerIdToExecutorId = new HashMap[ContainerId, String] + // Executor memory in MB. protected val executorMemory = args.executorMemory // Additional memory overhead. @@ -165,6 +170,7 @@ private[yarn] class YarnAllocator( def killExecutor(executorId: String): Unit = synchronized { if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.remove(executorId).get + containerIdToExecutorId.remove(container.getId) internalReleaseContainer(container) numExecutorsRunning -= 1 } else { @@ -353,6 +359,7 @@ private[yarn] class YarnAllocator( logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) executorIdToContainer(executorId) = container + containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) @@ -384,6 +391,7 @@ private[yarn] class YarnAllocator( for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + var needNotify = false if (releasedContainers.contains(containerId)) { // Already marked the container for release, so remove it from // `releasedContainers`. @@ -415,6 +423,7 @@ private[yarn] class YarnAllocator( ". Diagnostics: " + completedContainer.getDiagnostics) numExecutorsFailed += 1 } + needNotify = true } if (allocatedContainerToHostMap.containsKey(containerId)) { @@ -430,6 +439,15 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.remove(containerId) } + + val executorIdOpt = containerIdToExecutorId.remove(containerId) + if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get) + + if (needNotify && executorIdOpt.isDefined) { + // The executor could have gone away (like no route to host, node failure, etc) + // Notify backend about the failure of the executor + notifyBackend(executorIdOpt.get, containerId) + } } } @@ -438,6 +456,19 @@ private[yarn] class YarnAllocator( amClient.releaseAssignedContainer(container.getId()) } + private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + if (null != backend) { + backend.removeExecutor(executorId, + "Yarn deallocated the executor (" + executorId + ") container " + containerId) + } + numUnexpectedContainerRelease += 1 + } + + private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease + + private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized { + this.backend = backend + } } private object YarnAllocator { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 72ec4d6b34af6..b1f2daf913430 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -29,7 +29,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule logInfo("Created YarnClusterScheduler") override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) + ApplicationMaster.sparkContextInitialized(sc, + this.backend.asInstanceOf[CoarseGrainedSchedulerBackend]) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 7509000771d94..120109a384d26 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -231,6 +231,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumPendingAllocate should be (1) } + test("lost executor removed from backend") { + val handler = createAllocator(4) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (4) + + val container1 = createContainer("host1") + val container2 = createContainer("host2") + handler.handleAllocatedContainers(Array(container1, container2)) + + handler.requestTotalExecutors(2) + + val statuses = Seq(container1, container2).map { c => + ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) + } + handler.updateResourceRequests() + handler.processCompletedContainers(statuses.toSeq) + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (2) + handler.getNumExecutorsFailed should be (2) + handler.getNumUnexpectedContainerRelease should be (2) + } + test("memory exceeded diagnostic regexes") { val diagnostics = "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + From 9218fcc6f87c5eb0ea2f3e3f4f3ea60cab7cfaa8 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 19:33:28 -0700 Subject: [PATCH 3/4] Fix failing testcase --- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 49932a239953b..49f030c81d548 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -457,11 +457,11 @@ private[yarn] class YarnAllocator( } private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + numUnexpectedContainerRelease += 1 if (null != backend) { backend.removeExecutor(executorId, "Yarn deallocated the executor (" + executorId + ") container " + containerId) } - numUnexpectedContainerRelease += 1 } private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 120109a384d26..c71bb37a8f7a9 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -248,6 +248,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } handler.updateResourceRequests() handler.processCompletedContainers(statuses.toSeq) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (2) handler.getNumExecutorsFailed should be (2) From e1b00673876f99217661758c2afba8d5da470308 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 19:33:28 -0700 Subject: [PATCH 4/4] Fix failing testcase, fix merge issue from our 1.3 -> master --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 6 +++++- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 78f50a97376d6..70058761becd9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -77,6 +77,8 @@ private[spark] class ApplicationMaster( @volatile private var allocator: YarnAllocator = _ private val allocatorLock = new Object() + @volatile private var backend: CoarseGrainedSchedulerBackend = _ + // Fields used in client mode. private var rpcEnv: RpcEnv = null private var amEndpoint: RpcEndpointRef = _ @@ -223,7 +225,8 @@ private[spark] class ApplicationMaster( sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } - allocator.setScheduler(backend) + this.backend = backend + if (null != allocator) allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { @@ -253,6 +256,7 @@ private[spark] class ApplicationMaster( uiAddress, historyAddress, securityMgr) + if (null != backend) allocator.setScheduler(backend) allocator.allocateResources() reporterThread = launchReporterThread() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 49932a239953b..49f030c81d548 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -457,11 +457,11 @@ private[yarn] class YarnAllocator( } private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + numUnexpectedContainerRelease += 1 if (null != backend) { backend.removeExecutor(executorId, "Yarn deallocated the executor (" + executorId + ") container " + containerId) } - numUnexpectedContainerRelease += 1 } private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 120109a384d26..c71bb37a8f7a9 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -248,6 +248,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } handler.updateResourceRequests() handler.processCompletedContainers(statuses.toSeq) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (2) handler.getNumExecutorsFailed should be (2)