From b6f269e6460ecc441c319b5e92437e47d141c361 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 24 Nov 2014 23:40:00 -0800 Subject: [PATCH 1/5] Avoid duplicate worker registrations The gist is that we only reconnect to the master we've been communicating with instead of making a registration request to all known masters. More details in the code comments. --- .../apache/spark/deploy/worker/Worker.scala | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ca262de832e25..cc1306e14ac22 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -21,7 +21,6 @@ import java.io.File import java.io.IOException import java.text.SimpleDateFormat import java.util.{UUID, Date} -import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap @@ -177,6 +176,9 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true + // Cancel any outstanding re-registeration attempts because we found a new master + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = None } private def tryRegisterAllMasters() { @@ -187,6 +189,26 @@ private[spark] class Worker( } } + /** + * Re-register with the active master in case the master fails or the network is partitioned. + * During failures, it is important to register with only the active master instead of with + * all known masters. Otherwise, the following race condition may cause a "duplicate worker" + * error detailed in SPARK-4592: + * + * (1) Master A fails and Worker attempts to reconnect to all masters + * (2) Master B takes over and notifies Worker + * (3) Worker responds by registering with Master B + * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, + * causing the same Worker to register with Master B twice + * + * Instead, if we only register with the known active master, which must be dead because + * another master has taken over, then we can avoid registering with the same master twice. + */ + private def reregisterWithActiveMaster(): Unit = { + assert(master != null, "Attempted to re-register with an active Master that is null") + master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + } + private def retryConnectToMaster() { Utils.tryOrExit { connectionAttemptCount += 1 @@ -195,7 +217,7 @@ private[spark] class Worker( registrationRetryTimer = None } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") - tryRegisterAllMasters() + reregisterWithActiveMaster() if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = Some { From 1fce6a9343d6f563dac0c793480420c6511091ac Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 25 Nov 2014 00:06:04 -0800 Subject: [PATCH 2/5] Active master actor could be null in the beginning If a worker cannot initially reach a master, then it will attempt a retry. In this case, the active master actor must be null. This commit removes an assert that falsely assumes the contrary. --- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index cc1306e14ac22..9daca67c2899d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -205,8 +205,9 @@ private[spark] class Worker( * another master has taken over, then we can avoid registering with the same master twice. */ private def reregisterWithActiveMaster(): Unit = { - assert(master != null, "Attempted to re-register with an active Master that is null") - master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + if (master != null) { + master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + } } private def retryConnectToMaster() { From 83b321cc02e4dfb47541c7dd13f65f98012316ef Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 25 Nov 2014 00:22:28 -0800 Subject: [PATCH 3/5] Tweak wording The Master may not necessarily be dead, as it may have recovered. --- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 9daca67c2899d..fe7a1895e31db 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -201,7 +201,7 @@ private[spark] class Worker( * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, * causing the same Worker to register with Master B twice * - * Instead, if we only register with the known active master, which must be dead because + * Instead, if we only register with the known active master, which must have died because * another master has taken over, then we can avoid registering with the same master twice. */ private def reregisterWithActiveMaster(): Unit = { From 79286dc3e027d138bf13ef55f190b95844afae0e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 25 Nov 2014 01:14:21 -0800 Subject: [PATCH 4/5] Preserve old behavior for initial retries If this is an initial retry, meaning the active master is not set yet, then do try to contact all masters. Otherwise, we can assume that retry means there is a master failure. --- .../org/apache/spark/deploy/worker/Worker.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index fe7a1895e31db..485505d8a8d16 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -176,7 +176,7 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true - // Cancel any outstanding re-registeration attempts because we found a new master + // Cancel any outstanding re-registration attempts because we found a new master registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = None } @@ -190,7 +190,9 @@ private[spark] class Worker( } /** - * Re-register with the active master in case the master fails or the network is partitioned. + * Attempt to re-register with any active master that has been communicating with this worker. + * If there is none, attempt to register with all known masters. + * * During failures, it is important to register with only the active master instead of with * all known masters. Otherwise, the following race condition may cause a "duplicate worker" * error detailed in SPARK-4592: @@ -204,9 +206,12 @@ private[spark] class Worker( * Instead, if we only register with the known active master, which must have died because * another master has taken over, then we can avoid registering with the same master twice. */ - private def reregisterWithActiveMaster(): Unit = { + private def reregisterWithMaster(): Unit = { if (master != null) { master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + } else { + // We are retrying the initial registration + tryRegisterAllMasters() } } @@ -218,7 +223,7 @@ private[spark] class Worker( registrationRetryTimer = None } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") - reregisterWithActiveMaster() + reregisterWithMaster() if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = Some { From 0d9716c823bac8d3e76ae093f1f87f1334aa9e2c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 25 Nov 2014 12:53:28 -0800 Subject: [PATCH 5/5] Move re-registration logic to actor for thread-safety Instead of possible sending registration requests to the master in two separate threads (the actor thread and the timer thread), we rely on the actor's single-threaded-ness to provide for thread-safety. --- .../apache/spark/deploy/DeployMessage.scala | 2 + .../apache/spark/deploy/worker/Worker.scala | 68 +++++++++++-------- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b9dd8557ee904..c46f84de8444a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -92,6 +92,8 @@ private[deploy] object DeployMessages { case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders + case object ReregisterWithMaster // used when a worker attempts to reconnect to a master + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 485505d8a8d16..eb11163538b20 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -190,32 +190,11 @@ private[spark] class Worker( } /** - * Attempt to re-register with any active master that has been communicating with this worker. - * If there is none, attempt to register with all known masters. - * - * During failures, it is important to register with only the active master instead of with - * all known masters. Otherwise, the following race condition may cause a "duplicate worker" - * error detailed in SPARK-4592: - * - * (1) Master A fails and Worker attempts to reconnect to all masters - * (2) Master B takes over and notifies Worker - * (3) Worker responds by registering with Master B - * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, - * causing the same Worker to register with Master B twice - * - * Instead, if we only register with the known active master, which must have died because - * another master has taken over, then we can avoid registering with the same master twice. + * Re-register with the master because a network failure or a master failure has occurred. + * If the re-registration attempt threshold is exceeded, the worker exits with error. + * Note that for thread-safety this should only be called from the actor. */ private def reregisterWithMaster(): Unit = { - if (master != null) { - master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) - } else { - // We are retrying the initial registration - tryRegisterAllMasters() - } - } - - private def retryConnectToMaster() { Utils.tryOrExit { connectionAttemptCount += 1 if (registered) { @@ -223,12 +202,40 @@ private[spark] class Worker( registrationRetryTimer = None } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") - reregisterWithMaster() + /** + * Re-register with the active master this worker has been communicating with. If there + * is none, then it means this worker is still bootstrapping and hasn't established a + * connection with a master yet, in which case we should re-register with all masters. + * + * It is important to re-register only with the active master during failures. Otherwise, + * if the worker unconditionally attempts to re-register with all masters, the following + * race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592: + * + * (1) Master A fails and Worker attempts to reconnect to all masters + * (2) Master B takes over and notifies Worker + * (3) Worker responds by registering with Master B + * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, + * causing the same Worker to register with Master B twice + * + * Instead, if we only register with the known active master, we can assume that the + * old master must have died because another master has taken over. Note that this is + * still not safe if the old master recovers within this interval, but this is a much + * less likely scenario. + */ + if (master != null) { + master ! RegisterWorker( + workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + } else { + // We are retrying the initial registration + tryRegisterAllMasters() + } + // We have exceeded the initial registration retry threshold + // All retries from now on should use a higher interval if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = Some { context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL, - PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } } } else { @@ -248,7 +255,7 @@ private[spark] class Worker( connectionAttemptCount = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, - INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + @@ -428,12 +435,15 @@ private[spark] class Worker( logInfo(s"$x Disassociated !") masterDisconnected() - case RequestWorkerState => { + case RequestWorkerState => sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) - } + + case ReregisterWithMaster => + reregisterWithMaster() + } private def masterDisconnected() {