From b5b34af964199af296e12490413225f55d93a6cd Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 15 Oct 2014 15:27:21 -0700 Subject: [PATCH 1/7] [SPARK-3736] Workers reconnect when disassociated from the master. Before, if the master node is killed and restarted, the worker nodes would not attempt to reconnect to the Master. Therefore, when the Master node was restarted, the worker nodes needed to be restarted as well. Now, when the Master node is disconnected, the worker nodes will continuously ping the master node in attempts to reconnect to it. Once the master node restarts, it will detect one of the registration requests from its former workers. The result is that the cluster re-enters a healthy state. In addition, when the master does not receive a heartbeat from the worker, the worker was removed; however, when the worker sent a heartbeat to the master, the master used to ignore the heartbeat. Now, a master that receives a heartbeat from a worker that had been disconnected will request the worker to re-attempt the registration process, at which point the worker will send a RegisterWorker request and be re-connected accordingly. Re-connection attempts per worker are submitted every N seconds, where N is configured by the property spark.worker.reconnect.interval - this has a default of 60 seconds right now. --- .../apache/spark/deploy/DeployMessage.scala | 2 ++ .../apache/spark/deploy/master/Master.scala | 6 +++++- .../apache/spark/deploy/worker/Worker.scala | 20 +++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index a7368f9f3dfb..b9dd8557ee90 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -71,6 +71,8 @@ private[deploy] object DeployMessages { case class RegisterWorkerFailed(message: String) extends DeployMessage + case class ReconnectWorker(masterUrl: String) extends DeployMessage + case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage case class LaunchExecutor( diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f98b531316a3..fb00a18cdcf6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -341,7 +341,11 @@ private[spark] class Master( case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => - logWarning("Got heartbeat from unregistered worker " + workerId) + if (workers.map(_.id).contains(workerId)) { + logWarning(s"Got heartbeat from unregistered worker $workerId." + + " Asking it to re-register.") + sender ! ReconnectWorker(masterUrl) + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 9b52cb06fb6f..eee5d91d7a79 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -67,6 +67,8 @@ private[spark] class Worker( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + val RECONNECT_ATTEMPT_INTERVAL_MILLIS = conf.getLong("spark.worker.reconnect.interval", 60) * 1000 + val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false) // How often worker will clean up old app folders val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000 @@ -94,6 +96,7 @@ private[spark] class Worker( val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] + var scheduledReconnectMessage: Option[Cancellable] = None val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") @@ -197,6 +200,8 @@ private[spark] class Worker( context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } + scheduledReconnectMessage.foreach(_.cancel()) + scheduledReconnectMessage = None case SendHeartbeat => if (connected) { master ! Heartbeat(workerId) } @@ -243,6 +248,10 @@ private[spark] class Worker( System.exit(1) } + case ReconnectWorker(masterUrl) => + logWarning(s"Master with url $masterUrl requested this worker to reconnect.") + scheduleAttemptsToReconnectToMaster() + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") @@ -365,6 +374,16 @@ private[spark] class Worker( def masterDisconnected() { logError("Connection to master failed! Waiting for master to reconnect...") connected = false + scheduleAttemptsToReconnectToMaster() + } + + def scheduleAttemptsToReconnectToMaster() { + if (!scheduledReconnectMessage.isDefined) { + scheduledReconnectMessage = Some(context.system.scheduler.schedule( + Duration Zero, RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) { + tryRegisterAllMasters() + }) + } } def generateWorkerId(): String = { @@ -374,6 +393,7 @@ private[spark] class Worker( override def postStop() { metricsSystem.report() registrationRetryTimer.foreach(_.cancel()) + scheduledReconnectMessage.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop() From 2ad5ed5b3c6c0b5d914548fe63db6d9e902ac618 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Oct 2014 11:34:08 -0700 Subject: [PATCH 2/7] [SPARK-3736] Cancel attempts to reconnect if the master changes. --- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index eee5d91d7a79..3bb19c568400 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -159,6 +159,7 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true + scheduledReconnectMessage.foreach(_.cancel()) } def tryRegisterAllMasters() { From b9a307772bb08ea8c2588c560e689bcc901b260b Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Oct 2014 11:39:09 -0700 Subject: [PATCH 3/7] [SPARK-3736] Addressing PR comments related to reconnection. - scheduledReconnectMessage --> scheduledReconnectTask - A log statement in the master is printed if a worker that was unregistered and not in its worker set sends a heartbeat. --- .../org/apache/spark/deploy/master/Master.scala | 3 +++ .../org/apache/spark/deploy/worker/Worker.scala | 14 +++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fb00a18cdcf6..3b6bb9fe128a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -345,6 +345,9 @@ private[spark] class Master( logWarning(s"Got heartbeat from unregistered worker $workerId." + " Asking it to re-register.") sender ! ReconnectWorker(masterUrl) + } else { + logWarning(s"Got heartbeat from unregistered worker $workerId." + + " This worker was never registered, so ignoring the heartbeat.") } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 3bb19c568400..08cf06b577fe 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -96,7 +96,7 @@ private[spark] class Worker( val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] - var scheduledReconnectMessage: Option[Cancellable] = None + var scheduledReconnectTask: Option[Cancellable] = None val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") @@ -159,7 +159,7 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true - scheduledReconnectMessage.foreach(_.cancel()) + scheduledReconnectTask.foreach(_.cancel()) } def tryRegisterAllMasters() { @@ -201,8 +201,8 @@ private[spark] class Worker( context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } - scheduledReconnectMessage.foreach(_.cancel()) - scheduledReconnectMessage = None + scheduledReconnectTask.foreach(_.cancel()) + scheduledReconnectTask = None case SendHeartbeat => if (connected) { master ! Heartbeat(workerId) } @@ -379,8 +379,8 @@ private[spark] class Worker( } def scheduleAttemptsToReconnectToMaster() { - if (!scheduledReconnectMessage.isDefined) { - scheduledReconnectMessage = Some(context.system.scheduler.schedule( + if (!scheduledReconnectTask.isDefined) { + scheduledReconnectTask = Some(context.system.scheduler.schedule( Duration Zero, RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) { tryRegisterAllMasters() }) @@ -394,7 +394,7 @@ private[spark] class Worker( override def postStop() { metricsSystem.report() registrationRetryTimer.foreach(_.cancel()) - scheduledReconnectMessage.foreach(_.cancel()) + scheduledReconnectTask.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop() From a698e356b05129ef2e7b9fadd73a1f2d9184c5a0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Oct 2014 11:43:26 -0700 Subject: [PATCH 4/7] [SPARK-3736] Addressing PR comment to make some defs private. --- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 08cf06b577fe..068d9db34f85 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -372,13 +372,13 @@ private[spark] class Worker( } } - def masterDisconnected() { + private def masterDisconnected() { logError("Connection to master failed! Waiting for master to reconnect...") connected = false scheduleAttemptsToReconnectToMaster() } - def scheduleAttemptsToReconnectToMaster() { + private def scheduleAttemptsToReconnectToMaster() { if (!scheduledReconnectTask.isDefined) { scheduledReconnectTask = Some(context.system.scheduler.schedule( Duration Zero, RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) { From 94ddeca5390c5746b767b99ab8086d651e474978 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Oct 2014 14:01:25 -0700 Subject: [PATCH 5/7] [SPARK-3736] Changing a log warning to a log info. --- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 068d9db34f85..9693a1661cfa 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -250,7 +250,7 @@ private[spark] class Worker( } case ReconnectWorker(masterUrl) => - logWarning(s"Master with url $masterUrl requested this worker to reconnect.") + logInfo(s"Master with url $masterUrl requested this worker to reconnect.") scheduleAttemptsToReconnectToMaster() case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => From fe0e02feaa8ac3e01ea7e90240e46a3d5a276864 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 17 Oct 2014 19:31:24 -0700 Subject: [PATCH 6/7] [SPARK-3736] Moving reconnection logic to registerWithMaster(). The logic of the worker reconnecting to the master is now shared with the logic of attempting to connect to the master on the worker's startup. Connection is attempted in certain intervals of time. - The first six attempts are in 5 to 15 second intervals, and - The ten attempts after that are in 30 to 90 second intervals. The exact intervals between attempts are randomized in that range, in order to introduce some jitter and prevent the master from being hit with giant bursts of registration requests. This model is the same as Hadoop's reconnection model. --- .../apache/spark/deploy/worker/Worker.scala | 89 +++++++++++-------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 9693a1661cfa..bb83ac45cb04 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -20,12 +20,14 @@ package org.apache.spark.deploy.worker import java.io.File import java.io.IOException import java.text.SimpleDateFormat -import java.util.Date +import java.util.{UUID, Date} +import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.Random import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} @@ -64,10 +66,17 @@ private[spark] class Worker( // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 - val REGISTRATION_TIMEOUT = 20.seconds - val REGISTRATION_RETRIES = 3 - - val RECONNECT_ATTEMPT_INTERVAL_MILLIS = conf.getLong("spark.worker.reconnect.interval", 60) * 1000 + val INITIAL_REGISTRATION_RETRIES = 6 + val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10 + val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 + val REGISTRATION_RETRY_FUZZ_MULTIPLIER = { + val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits) + randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND + } + val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 * + REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds + val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60 + * REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false) // How often worker will clean up old app folders @@ -96,7 +105,6 @@ private[spark] class Worker( val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] - var scheduledReconnectTask: Option[Cancellable] = None val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") @@ -106,6 +114,7 @@ private[spark] class Worker( var coresUsed = 0 var memoryUsed = 0 + var connectionAttemptCount = 0 val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) val workerSource = new WorkerSource(this) @@ -159,10 +168,9 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true - scheduledReconnectTask.foreach(_.cancel()) } - def tryRegisterAllMasters() { + private def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) @@ -170,26 +178,47 @@ private[spark] class Worker( } } - def registerWithMaster() { - tryRegisterAllMasters() - var retries = 0 - registrationRetryTimer = Some { - context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - Utils.tryOrExit { - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - System.exit(1) - } else { - tryRegisterAllMasters() + private def retryConnectToMaster() { + logInfo("ping") + Utils.tryOrExit { + connectionAttemptCount += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = None + } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { + tryRegisterAllMasters() + if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = Some { + context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL, + PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) } } + } else { + logError("All masters are unresponsive! Giving up.") + System.exit(1) } } } + def registerWithMaster() { + // DisassociatedEvent may be triggered multiple times, so don't attempt registration + // if there are outstanding registration attempts scheduled. + registrationRetryTimer match { + case None => + registered = false + tryRegisterAllMasters() + connectionAttemptCount = 0 + registrationRetryTimer = Some { + context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, + INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + } + case Some(_) => + logInfo("Not spawning another attempt to register with the master, since there is an" + + " attempt scheduled already.") + } + } + override def receiveWithLogging = { case RegisteredWorker(masterUrl, masterWebUiUrl) => logInfo("Successfully registered with master " + masterUrl) @@ -201,8 +230,6 @@ private[spark] class Worker( context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } - scheduledReconnectTask.foreach(_.cancel()) - scheduledReconnectTask = None case SendHeartbeat => if (connected) { master ! Heartbeat(workerId) } @@ -251,7 +278,7 @@ private[spark] class Worker( case ReconnectWorker(masterUrl) => logInfo(s"Master with url $masterUrl requested this worker to reconnect.") - scheduleAttemptsToReconnectToMaster() + registerWithMaster() case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { @@ -375,16 +402,7 @@ private[spark] class Worker( private def masterDisconnected() { logError("Connection to master failed! Waiting for master to reconnect...") connected = false - scheduleAttemptsToReconnectToMaster() - } - - private def scheduleAttemptsToReconnectToMaster() { - if (!scheduledReconnectTask.isDefined) { - scheduledReconnectTask = Some(context.system.scheduler.schedule( - Duration Zero, RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) { - tryRegisterAllMasters() - }) - } + registerWithMaster() } def generateWorkerId(): String = { @@ -394,7 +412,6 @@ private[spark] class Worker( override def postStop() { metricsSystem.report() registrationRetryTimer.foreach(_.cancel()) - scheduledReconnectTask.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop() From 83f8bc9a18c9d2663127f4f8142ae3f5273db2d2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 20 Oct 2014 11:15:09 -0700 Subject: [PATCH 7/7] [SPARK-3736] More informative log message, and fixing some indentation. --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index bb83ac45cb04..c4a8ec2e5e7b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -66,6 +66,11 @@ private[spark] class Worker( // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 + // Model retries to connect to the master, after Hadoop's model. + // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds) + // Afterwards, the next 10 attempts are between 30 and 90 seconds. + // A bit of randomness is introduced so that not all of the workers attempt to reconnect at + // the same time. val INITIAL_REGISTRATION_RETRIES = 6 val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10 val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 @@ -179,9 +184,9 @@ private[spark] class Worker( } private def retryConnectToMaster() { - logInfo("ping") Utils.tryOrExit { connectionAttemptCount += 1 + logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount") if (registered) { registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = None @@ -211,7 +216,7 @@ private[spark] class Worker( connectionAttemptCount = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, - INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) } case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" +