From 17e7949b427e89984ed8dee19ea2804503feba66 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 20 Apr 2016 02:37:23 +0530 Subject: [PATCH] fixing SPARK-14736 Deadlock in registering applications while the Master is in the RECOVERING mode --- .../org/apache/spark/deploy/master/Master.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index edc9be2a8a8cb..542178dbdfe7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -77,6 +77,7 @@ private[deploy] class Master( val idToApp = new HashMap[String, ApplicationInfo] private val waitingApps = new ArrayBuffer[ApplicationInfo] val apps = new HashSet[ApplicationInfo] + private val waitingAppsWhileRecovering = new ArrayBuffer[ApplicationInfo] private val idToWorker = new HashMap[String, WorkerInfo] private val addressToWorker = new HashMap[RpcAddress, WorkerInfo] @@ -571,6 +572,9 @@ private[deploy] class Master( } state = RecoveryState.ALIVE + // Re-register the apps which were omitted during the Recovering phase. + waitingAppsWhileRecovering.foreach(registerApplication) + waitingAppsWhileRecovering.clear() schedule() logInfo("Recovery complete - resuming operations!") } @@ -818,7 +822,13 @@ private[deploy] class Master( private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address if (addressToApp.contains(appAddress)) { - logInfo("Attempted to re-register application at same address: " + appAddress) + if (state == RecoveryState.RECOVERING) { + logInfo("Attempted to re-register application at same address: " + appAddress + " in the " + + "Recovering Mode") + waitingAppsWhileRecovering += app + } else { + logInfo("Attempted to re-register application at same address: " + appAddress) + } return }