Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private[deploy] object DeployMessages {

case class RegisterWorkerFailed(message: String) extends DeployMessage

case class ReconnectWorker(masterUrl: String) extends DeployMessage

case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage

case class LaunchExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,14 @@ private[spark] class Master(
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
logWarning("Got heartbeat from unregistered worker " + workerId)
if (workers.map(_.id).contains(workerId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a not in it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211 what he is trying to do seems to be that, only before we decide this worker is DEAD, we allow the reconnect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above observation is correct - only workers that have previously registered with the master are allowed to reconnect. Workers that are connecting for the first time shouldn't be allowed to spawn a heartbeat and have the master send back a reconnection message. I've updated the log message on an else case to make this more explicit.

logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
sender ! ReconnectWorker(masterUrl)
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
}

Expand Down
81 changes: 62 additions & 19 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package org.apache.spark.deploy.worker
import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{UUID, Date}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
Expand Down Expand Up @@ -64,8 +66,22 @@ private[spark] class Worker(
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4

val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
// Model retries to connect to the master, after Hadoop's model.
// The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
// Afterwards, the next 10 attempts are between 30 and 90 seconds.
// A bit of randomness is introduced so that not all of the workers attempt to reconnect at
// the same time.
val INITIAL_REGISTRATION_RETRIES = 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment above this line to say that this is modeled after Hadoop's design. This will help future maintainers to understand this code.

val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
}
val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
* REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds

val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
Expand Down Expand Up @@ -103,6 +119,7 @@ private[spark] class Worker(

var coresUsed = 0
var memoryUsed = 0
var connectionAttemptCount = 0

val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
val workerSource = new WorkerSource(this)
Expand Down Expand Up @@ -158,34 +175,55 @@ private[spark] class Worker(
connected = true
}

def tryRegisterAllMasters() {
private def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}

def registerWithMaster() {
tryRegisterAllMasters()
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
private def retryConnectToMaster() {
Utils.tryOrExit {
connectionAttemptCount += 1
logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount")
if (registered) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = None
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
tryRegisterAllMasters()
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = Some {
context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
}
}
} else {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
}
}
}

def registerWithMaster() {
// DisassociatedEvent may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
registrationRetryTimer match {
case None =>
registered = false
tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}

override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
Expand Down Expand Up @@ -243,6 +281,10 @@ private[spark] class Worker(
System.exit(1)
}

case ReconnectWorker(masterUrl) =>
logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
registerWithMaster()

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
Expand Down Expand Up @@ -362,9 +404,10 @@ private[spark] class Worker(
}
}

def masterDisconnected() {
private def masterDisconnected() {
logError("Connection to master failed! Waiting for master to reconnect...")
connected = false
registerWithMaster()
}

def generateWorkerId(): String = {
Expand Down