Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private[deploy] object DeployMessages {

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
Expand Down
52 changes: 45 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.{UUID, Date}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -177,6 +176,9 @@ private[spark] class Worker(
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = None
}

private def tryRegisterAllMasters() {
Expand All @@ -187,20 +189,53 @@ private[spark] class Worker(
}
}

private def retryConnectToMaster() {
/**
* Re-register with the master because a network failure or a master failure has occurred.
* If the re-registration attempt threshold is exceeded, the worker exits with error.
* Note that for thread-safety this should only be called from the actor.
*/
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
connectionAttemptCount += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = None
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
tryRegisterAllMasters()
/**
* Re-register with the active master this worker has been communicating with. If there
* is none, then it means this worker is still bootstrapping and hasn't established a
* connection with a master yet, in which case we should re-register with all masters.
*
* It is important to re-register only with the active master during failures. Otherwise,
* if the worker unconditionally attempts to re-register with all masters, the following
* race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
*
* (1) Master A fails and Worker attempts to reconnect to all masters
* (2) Master B takes over and notifies Worker
* (3) Worker responds by registering with Master B
* (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
* causing the same Worker to register with Master B twice
*
* Instead, if we only register with the known active master, we can assume that the
* old master must have died because another master has taken over. Note that this is
* still not safe if the old master recovers within this interval, but this is a much
* less likely scenario.
*/
if (master != null) {
master ! RegisterWorker(
workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
} else {
// We are retrying the initial registration
tryRegisterAllMasters()
}
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = Some {
context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
}
} else {
Expand All @@ -220,7 +255,7 @@ private[spark] class Worker(
connectionAttemptCount = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
Expand Down Expand Up @@ -400,12 +435,15 @@ private[spark] class Worker(
logInfo(s"$x Disassociated !")
masterDisconnected()

case RequestWorkerState => {
case RequestWorkerState =>
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, drivers.values.toList,
finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl)
}

case ReregisterWithMaster =>
reregisterWithMaster()

}

private def masterDisconnected() {
Expand Down