From a3a0f016e409026805c3960063a708ffbc3e2488 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 12:41:33 -0700 Subject: [PATCH 1/7] Fix SPARK-8297 --- .../spark/deploy/yarn/ApplicationMaster.scala | 8 +++-- .../spark/deploy/yarn/YarnAllocator.scala | 30 +++++++++++++++++++ .../cluster/YarnClusterScheduler.scala | 3 +- .../deploy/yarn/YarnAllocatorSuite.scala | 23 ++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 83dafa4a125d2..78f50a97376d6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -218,11 +218,12 @@ private[spark] class ApplicationMaster( } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = { sparkContextRef.synchronized { sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } + allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { @@ -612,8 +613,9 @@ object ApplicationMaster extends Logging { } } - private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { - master.sparkContextInitialized(sc) + private[spark] def sparkContextInitialized(sc: SparkContext, + backend: CoarseGrainedSchedulerBackend): Unit = { + master.sparkContextInitialized(sc, backend) } private[spark] def sparkContextStopped(sc: SparkContext): Boolean = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 940873fbd046c..2544990dda858 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -88,6 +88,10 @@ private[yarn] class YarnAllocator( // Visible for testing. private[yarn] val executorIdToContainer = new HashMap[String, Container] + private var numUnexpectedContainerRelease = 0L + private var backend: CoarseGrainedSchedulerBackend = _ + private val containerIdToExecutorId = new HashMap[ContainerId, String] + // Executor memory in MB. protected val executorMemory = args.executorMemory // Additional memory overhead. @@ -165,6 +169,7 @@ private[yarn] class YarnAllocator( def killExecutor(executorId: String): Unit = synchronized { if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.remove(executorId).get + containerIdToExecutorId.remove(container.getId) internalReleaseContainer(container) numExecutorsRunning -= 1 } else { @@ -353,6 +358,7 @@ private[yarn] class YarnAllocator( logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) executorIdToContainer(executorId) = container + containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) @@ -384,6 +390,7 @@ private[yarn] class YarnAllocator( for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + var needNotify = false if (releasedContainers.contains(containerId)) { // Already marked the container for release, so remove it from // `releasedContainers`. @@ -415,6 +422,7 @@ private[yarn] class YarnAllocator( ". Diagnostics: " + completedContainer.getDiagnostics) numExecutorsFailed += 1 } + needNotify = true } if (allocatedContainerToHostMap.containsKey(containerId)) { @@ -430,6 +438,15 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.remove(containerId) } + + val executorIdOpt = containerIdToExecutorId.remove(containerId) + if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get) + + if (needNotify && executorIdOpt.isDefined) { + // The executor could have gone away (like no route to host, node failure, etc) + // Notify backend about the failure of the executor + notifyBackend(executorIdOpt.get, containerId) + } } } @@ -438,6 +455,19 @@ private[yarn] class YarnAllocator( amClient.releaseAssignedContainer(container.getId()) } + private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + if (null != backend) { + backend.removeExecutor(executorId, + "Yarn deallocated the executor (" + executorId + ") container " + containerId) + } + numUnexpectedContainerRelease += 1 + } + + private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease + + private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized { + this.backend = backend + } } private object YarnAllocator { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 72ec4d6b34af6..b1f2daf913430 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -29,7 +29,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule logInfo("Created YarnClusterScheduler") override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) + ApplicationMaster.sparkContextInitialized(sc, + this.backend.asInstanceOf[CoarseGrainedSchedulerBackend]) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 7509000771d94..120109a384d26 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -231,6 +231,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumPendingAllocate should be (1) } + test("lost executor removed from backend") { + val handler = createAllocator(4) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (4) + + val container1 = createContainer("host1") + val container2 = createContainer("host2") + handler.handleAllocatedContainers(Array(container1, container2)) + + handler.requestTotalExecutors(2) + + val statuses = Seq(container1, container2).map { c => + ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) + } + handler.updateResourceRequests() + handler.processCompletedContainers(statuses.toSeq) + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (2) + handler.getNumExecutorsFailed should be (2) + handler.getNumUnexpectedContainerRelease should be (2) + } + test("memory exceeded diagnostic regexes") { val diagnostics = "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + From 9ee1307ea2c6b498625145d224ce57049acad038 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 12:41:33 -0700 Subject: [PATCH 2/7] Fix SPARK-8297 --- pom.xml | 8 +++++ .../spark/deploy/yarn/ApplicationMaster.scala | 8 +++-- .../spark/deploy/yarn/YarnAllocator.scala | 31 +++++++++++++++++++ .../cluster/YarnClusterScheduler.scala | 3 +- .../deploy/yarn/YarnAllocatorSuite.scala | 23 ++++++++++++++ 5 files changed, 69 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index ffa96128a3d61..5ee6515092914 100644 --- a/pom.xml +++ b/pom.xml @@ -87,18 +87,23 @@ core + tools network/common network/shuffle + unsafe assembly + repl launcher + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 83dafa4a125d2..78f50a97376d6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -218,11 +218,12 @@ private[spark] class ApplicationMaster( } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = { sparkContextRef.synchronized { sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } + allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { @@ -612,8 +613,9 @@ object ApplicationMaster extends Logging { } } - private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { - master.sparkContextInitialized(sc) + private[spark] def sparkContextInitialized(sc: SparkContext, + backend: CoarseGrainedSchedulerBackend): Unit = { + master.sparkContextInitialized(sc, backend) } private[spark] def sparkContextStopped(sc: SparkContext): Boolean = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 940873fbd046c..49932a239953b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,6 +36,7 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -88,6 +89,10 @@ private[yarn] class YarnAllocator( // Visible for testing. private[yarn] val executorIdToContainer = new HashMap[String, Container] + private var numUnexpectedContainerRelease = 0L + private var backend: CoarseGrainedSchedulerBackend = _ + private val containerIdToExecutorId = new HashMap[ContainerId, String] + // Executor memory in MB. protected val executorMemory = args.executorMemory // Additional memory overhead. @@ -165,6 +170,7 @@ private[yarn] class YarnAllocator( def killExecutor(executorId: String): Unit = synchronized { if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.remove(executorId).get + containerIdToExecutorId.remove(container.getId) internalReleaseContainer(container) numExecutorsRunning -= 1 } else { @@ -353,6 +359,7 @@ private[yarn] class YarnAllocator( logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) executorIdToContainer(executorId) = container + containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) @@ -384,6 +391,7 @@ private[yarn] class YarnAllocator( for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + var needNotify = false if (releasedContainers.contains(containerId)) { // Already marked the container for release, so remove it from // `releasedContainers`. @@ -415,6 +423,7 @@ private[yarn] class YarnAllocator( ". Diagnostics: " + completedContainer.getDiagnostics) numExecutorsFailed += 1 } + needNotify = true } if (allocatedContainerToHostMap.containsKey(containerId)) { @@ -430,6 +439,15 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.remove(containerId) } + + val executorIdOpt = containerIdToExecutorId.remove(containerId) + if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get) + + if (needNotify && executorIdOpt.isDefined) { + // The executor could have gone away (like no route to host, node failure, etc) + // Notify backend about the failure of the executor + notifyBackend(executorIdOpt.get, containerId) + } } } @@ -438,6 +456,19 @@ private[yarn] class YarnAllocator( amClient.releaseAssignedContainer(container.getId()) } + private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + if (null != backend) { + backend.removeExecutor(executorId, + "Yarn deallocated the executor (" + executorId + ") container " + containerId) + } + numUnexpectedContainerRelease += 1 + } + + private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease + + private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized { + this.backend = backend + } } private object YarnAllocator { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 72ec4d6b34af6..b1f2daf913430 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -29,7 +29,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule logInfo("Created YarnClusterScheduler") override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) + ApplicationMaster.sparkContextInitialized(sc, + this.backend.asInstanceOf[CoarseGrainedSchedulerBackend]) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 7509000771d94..120109a384d26 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -231,6 +231,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumPendingAllocate should be (1) } + test("lost executor removed from backend") { + val handler = createAllocator(4) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (4) + + val container1 = createContainer("host1") + val container2 = createContainer("host2") + handler.handleAllocatedContainers(Array(container1, container2)) + + handler.requestTotalExecutors(2) + + val statuses = Seq(container1, container2).map { c => + ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) + } + handler.updateResourceRequests() + handler.processCompletedContainers(statuses.toSeq) + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (2) + handler.getNumExecutorsFailed should be (2) + handler.getNumUnexpectedContainerRelease should be (2) + } + test("memory exceeded diagnostic regexes") { val diagnostics = "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + From 9218fcc6f87c5eb0ea2f3e3f4f3ea60cab7cfaa8 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 19:33:28 -0700 Subject: [PATCH 3/7] Fix failing testcase --- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 49932a239953b..49f030c81d548 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -457,11 +457,11 @@ private[yarn] class YarnAllocator( } private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + numUnexpectedContainerRelease += 1 if (null != backend) { backend.removeExecutor(executorId, "Yarn deallocated the executor (" + executorId + ") container " + containerId) } - numUnexpectedContainerRelease += 1 } private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 120109a384d26..c71bb37a8f7a9 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -248,6 +248,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } handler.updateResourceRequests() handler.processCompletedContainers(statuses.toSeq) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (2) handler.getNumExecutorsFailed should be (2) From e1b00673876f99217661758c2afba8d5da470308 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 6 Jul 2015 19:33:28 -0700 Subject: [PATCH 4/7] Fix failing testcase, fix merge issue from our 1.3 -> master --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 6 +++++- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 78f50a97376d6..70058761becd9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -77,6 +77,8 @@ private[spark] class ApplicationMaster( @volatile private var allocator: YarnAllocator = _ private val allocatorLock = new Object() + @volatile private var backend: CoarseGrainedSchedulerBackend = _ + // Fields used in client mode. private var rpcEnv: RpcEnv = null private var amEndpoint: RpcEndpointRef = _ @@ -223,7 +225,8 @@ private[spark] class ApplicationMaster( sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } - allocator.setScheduler(backend) + this.backend = backend + if (null != allocator) allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { @@ -253,6 +256,7 @@ private[spark] class ApplicationMaster( uiAddress, historyAddress, securityMgr) + if (null != backend) allocator.setScheduler(backend) allocator.allocateResources() reporterThread = launchReporterThread() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 49932a239953b..49f030c81d548 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -457,11 +457,11 @@ private[yarn] class YarnAllocator( } private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { + numUnexpectedContainerRelease += 1 if (null != backend) { backend.removeExecutor(executorId, "Yarn deallocated the executor (" + executorId + ") container " + containerId) } - numUnexpectedContainerRelease += 1 } private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 120109a384d26..c71bb37a8f7a9 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -248,6 +248,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } handler.updateResourceRequests() handler.processCompletedContainers(statuses.toSeq) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (2) handler.getNumExecutorsFailed should be (2) From 04dc11200b87b87ac53d371319ee494726f87923 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 15 Jul 2015 13:48:12 -0700 Subject: [PATCH 5/7] Use driver <-> AM communication to send "remove executor" request. --- .../cluster/YarnSchedulerBackend.scala | 2 + .../spark/deploy/yarn/ApplicationMaster.scala | 34 ++++++++-------- .../spark/deploy/yarn/YarnAllocator.scala | 40 +++++++------------ .../spark/deploy/yarn/YarnRMClient.scala | 5 ++- .../cluster/YarnClusterScheduler.scala | 3 +- .../deploy/yarn/YarnAllocatorSuite.scala | 7 +++- 6 files changed, 44 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index bc67abb5df446..a07267caac7d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -108,6 +108,8 @@ private[spark] abstract class YarnSchedulerBackend( case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) + case RemoveExecutor(executorId, reason) => + removeExecutor(executorId, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 70058761becd9..4818bc0aaecfd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -77,8 +77,6 @@ private[spark] class ApplicationMaster( @volatile private var allocator: YarnAllocator = _ private val allocatorLock = new Object() - @volatile private var backend: CoarseGrainedSchedulerBackend = _ - // Fields used in client mode. private var rpcEnv: RpcEnv = null private var amEndpoint: RpcEndpointRef = _ @@ -220,20 +218,22 @@ private[spark] class ApplicationMaster( } } - private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = { + private def sparkContextInitialized(sc: SparkContext) = { sparkContextRef.synchronized { sparkContextRef.compareAndSet(null, sc) sparkContextRef.notifyAll() } - this.backend = backend - if (null != allocator) allocator.setScheduler(backend) } private def sparkContextStopped(sc: SparkContext) = { sparkContextRef.compareAndSet(sc, null) } - private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = { + private def registerAM( + _rpcEnv: RpcEnv, + driverRef: RpcEndpointRef, + uiAddress: String, + securityMgr: SecurityManager) = { val sc = sparkContextRef.get() val appId = client.getAttemptId().getApplicationId().toString() @@ -250,13 +250,13 @@ private[spark] class ApplicationMaster( RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt), CoarseGrainedSchedulerBackend.ENDPOINT_NAME) allocator = client.register(driverUrl, + driverRef, yarnConf, _sparkConf, if (sc != null) sc.preferredNodeLocationData else Map(), uiAddress, historyAddress, securityMgr) - if (null != backend) allocator.setScheduler(backend) allocator.allocateResources() reporterThread = launchReporterThread() @@ -267,17 +267,20 @@ private[spark] class ApplicationMaster( * * In cluster mode, the AM and the driver belong to same process * so the AMEndpoint need not monitor lifecycle of the driver. + * + * @return A reference to the driver's RPC endpoint. */ private def runAMEndpoint( host: String, port: String, - isClusterMode: Boolean): Unit = { + isClusterMode: Boolean): RpcEndpointRef = { val driverEndpoint = rpcEnv.setupEndpointRef( SparkEnv.driverActorSystemName, RpcAddress(host, port.toInt), YarnSchedulerBackend.ENDPOINT_NAME) amEndpoint = rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode)) + driverEndpoint } private def runDriver(securityMgr: SecurityManager): Unit = { @@ -295,11 +298,11 @@ private[spark] class ApplicationMaster( "Timed out waiting for SparkContext.") } else { rpcEnv = sc.env.rpcEnv - runAMEndpoint( + val driverRef = runAMEndpoint( sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.port"), isClusterMode = true) - registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) + registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) userClassThread.join() } } @@ -307,9 +310,9 @@ private[spark] class ApplicationMaster( private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { val port = sparkConf.getInt("spark.yarn.am.port", 0) rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr) - waitForSparkDriver() + val driverRef = waitForSparkDriver() addAmIpFilter() - registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) + registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) // In client mode the actor will stop the reporter thread. reporterThread.join() @@ -433,7 +436,7 @@ private[spark] class ApplicationMaster( } } - private def waitForSparkDriver(): Unit = { + private def waitForSparkDriver(): RpcEndpointRef = { logInfo("Waiting for Spark driver to be reachable.") var driverUp = false val hostport = args.userArgs(0) @@ -617,9 +620,8 @@ object ApplicationMaster extends Logging { } } - private[spark] def sparkContextInitialized(sc: SparkContext, - backend: CoarseGrainedSchedulerBackend): Unit = { - master.sparkContextInitialized(sc, backend) + private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { + master.sparkContextInitialized(sc) } private[spark] def sparkContextStopped(sc: SparkContext): Boolean = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 49f030c81d548..806be33d19256 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,7 +36,9 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -53,6 +55,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend */ private[yarn] class YarnAllocator( driverUrl: String, + driverRef: RpcEndpointRef, conf: Configuration, sparkConf: SparkConf, amClient: AMRMClient[ContainerRequest], @@ -90,7 +93,6 @@ private[yarn] class YarnAllocator( private[yarn] val executorIdToContainer = new HashMap[String, Container] private var numUnexpectedContainerRelease = 0L - private var backend: CoarseGrainedSchedulerBackend = _ private val containerIdToExecutorId = new HashMap[ContainerId, String] // Executor memory in MB. @@ -390,13 +392,8 @@ private[yarn] class YarnAllocator( private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = { for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId - - var needNotify = false - if (releasedContainers.contains(containerId)) { - // Already marked the container for release, so remove it from - // `releasedContainers`. - releasedContainers.remove(containerId) - } else { + val alreadyReleased = releasedContainers.remove(containerId) + if (!alreadyReleased) { // Decrement the number of executors running. The next iteration of // the ApplicationMaster's reporting thread will take care of allocating. numExecutorsRunning -= 1 @@ -423,7 +420,6 @@ private[yarn] class YarnAllocator( ". Diagnostics: " + completedContainer.getDiagnostics) numExecutorsFailed += 1 } - needNotify = true } if (allocatedContainerToHostMap.containsKey(containerId)) { @@ -440,13 +436,16 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.remove(containerId) } - val executorIdOpt = containerIdToExecutorId.remove(containerId) - if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get) + containerIdToExecutorId.remove(containerId).foreach { eid => + executorIdToContainer.remove(eid) - if (needNotify && executorIdOpt.isDefined) { - // The executor could have gone away (like no route to host, node failure, etc) - // Notify backend about the failure of the executor - notifyBackend(executorIdOpt.get, containerId) + if (!alreadyReleased) { + // The executor could have gone away (like no route to host, node failure, etc) + // Notify backend about the failure of the executor + numUnexpectedContainerRelease += 1 + driverRef.send(RemoveExecutor(eid, + s"Yarn deallocated the executor $eid (container $containerId)")) + } } } } @@ -456,19 +455,8 @@ private[yarn] class YarnAllocator( amClient.releaseAssignedContainer(container.getId()) } - private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = { - numUnexpectedContainerRelease += 1 - if (null != backend) { - backend.removeExecutor(executorId, - "Yarn deallocated the executor (" + executorId + ") container " + containerId) - } - } - private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease - private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized { - this.backend = backend - } } private object YarnAllocator { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 7f533ee55e8bb..4999f9c06210a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.Utils @@ -56,6 +57,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg */ def register( driverUrl: String, + driverRef: RpcEndpointRef, conf: YarnConfiguration, sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], @@ -73,7 +75,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) registered = true } - new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr) + new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args, + securityMgr) } /** diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index b1f2daf913430..72ec4d6b34af6 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -29,8 +29,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule logInfo("Created YarnClusterScheduler") override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc, - this.backend.asInstanceOf[CoarseGrainedSchedulerBackend]) + ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index c71bb37a8f7a9..b316ca16a577e 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -26,14 +26,16 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.scalatest.{BeforeAndAfterEach, Matchers} +import org.mockito.Mockito._ + import org.apache.spark.{SecurityManager, SparkFunSuite} import org.apache.spark.SparkConf import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.YarnAllocator._ +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo -import org.scalatest.{BeforeAndAfterEach, Matchers} - class MockResolver extends DNSToSwitchMapping { override def resolve(names: JList[String]): JList[String] = { @@ -91,6 +93,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter "--class", "SomeClass") new YarnAllocator( "not used", + mock(classOf[RpcEndpointRef]), conf, sparkConf, rmClient, From 537da6f65e822327cdc30bf8031e3f8e2fc8c17d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 20 Jul 2015 10:45:13 -0700 Subject: [PATCH 6/7] Make an expected log less scary. --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7c7f70d8a193b..1775f2604a061 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -229,7 +229,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.executorLost(executorId, SlaveLost(reason)) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason)) - case None => logError(s"Asked to remove non-existent executor $executorId") + case None => logInfo(s"Asked to remove non-existent executor $executorId") } } From 471e4a0e65e9256d26d87d0ff427565d2b5f12c3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 29 Jul 2015 15:33:13 -0700 Subject: [PATCH 7/7] Fix unit test after merge. --- .../scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 5eee50cdde85d..58318bf9bcc08 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -245,7 +245,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) - handler.requestTotalExecutors(2) + handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map()) val statuses = Seq(container1, container2).map { c => ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)