@@ -20,12 +20,14 @@ package org.apache.spark.deploy.worker
2020import java .io .File
2121import java .io .IOException
2222import java .text .SimpleDateFormat
23- import java .util .Date
23+ import java .util .{UUID , Date }
24+ import java .util .concurrent .TimeUnit
2425
2526import scala .collection .JavaConversions ._
2627import scala .collection .mutable .HashMap
2728import scala .concurrent .duration ._
2829import scala .language .postfixOps
30+ import scala .util .Random
2931
3032import akka .actor ._
3133import akka .remote .{DisassociatedEvent , RemotingLifecycleEvent }
@@ -64,10 +66,17 @@ private[spark] class Worker(
6466 // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
6567 val HEARTBEAT_MILLIS = conf.getLong(" spark.worker.timeout" , 60 ) * 1000 / 4
6668
67- val REGISTRATION_TIMEOUT = 20 .seconds
68- val REGISTRATION_RETRIES = 3
69-
70- val RECONNECT_ATTEMPT_INTERVAL_MILLIS = conf.getLong(" spark.worker.reconnect.interval" , 60 ) * 1000
69+ val INITIAL_REGISTRATION_RETRIES = 6
70+ val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
71+ val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
72+ val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
73+ val randomNumberGenerator = new Random (UUID .randomUUID.getMostSignificantBits)
74+ randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
75+ }
76+ val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
77+ REGISTRATION_RETRY_FUZZ_MULTIPLIER )).seconds
78+ val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
79+ * REGISTRATION_RETRY_FUZZ_MULTIPLIER )).seconds
7180
7281 val CLEANUP_ENABLED = conf.getBoolean(" spark.worker.cleanup.enabled" , false )
7382 // How often worker will clean up old app folders
@@ -96,7 +105,6 @@ private[spark] class Worker(
96105 val finishedExecutors = new HashMap [String , ExecutorRunner ]
97106 val drivers = new HashMap [String , DriverRunner ]
98107 val finishedDrivers = new HashMap [String , DriverRunner ]
99- var scheduledReconnectTask : Option [Cancellable ] = None
100108
101109 val publicAddress = {
102110 val envVar = System .getenv(" SPARK_PUBLIC_DNS" )
@@ -106,6 +114,7 @@ private[spark] class Worker(
106114
107115 var coresUsed = 0
108116 var memoryUsed = 0
117+ var connectionAttemptCount = 0
109118
110119 val metricsSystem = MetricsSystem .createMetricsSystem(" worker" , conf, securityMgr)
111120 val workerSource = new WorkerSource (this )
@@ -159,37 +168,57 @@ private[spark] class Worker(
159168 throw new SparkException (" Invalid spark URL: " + x)
160169 }
161170 connected = true
162- scheduledReconnectTask.foreach(_.cancel())
163171 }
164172
165- def tryRegisterAllMasters () {
173+ private def tryRegisterAllMasters () {
166174 for (masterUrl <- masterUrls) {
167175 logInfo(" Connecting to master " + masterUrl + " ..." )
168176 val actor = context.actorSelection(Master .toAkkaUrl(masterUrl))
169177 actor ! RegisterWorker (workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
170178 }
171179 }
172180
173- def registerWithMaster () {
174- tryRegisterAllMasters( )
175- var retries = 0
176- registrationRetryTimer = Some {
177- context.system.scheduler.schedule( REGISTRATION_TIMEOUT , REGISTRATION_TIMEOUT ) {
178- Utils .tryOrExit {
179- retries += 1
180- if (registered ) {
181- registrationRetryTimer.foreach(_.cancel() )
182- } else if (retries >= REGISTRATION_RETRIES ) {
183- logError( " All masters are unresponsive! Giving up. " )
184- System .exit( 1 )
185- } else {
186- tryRegisterAllMasters( )
181+ private def retryConnectToMaster () {
182+ logInfo( " ping " )
183+ Utils .tryOrExit {
184+ connectionAttemptCount += 1
185+ if (registered ) {
186+ registrationRetryTimer.foreach(_.cancel())
187+ registrationRetryTimer = None
188+ } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES ) {
189+ tryRegisterAllMasters( )
190+ if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES ) {
191+ registrationRetryTimer.foreach(_.cancel() )
192+ registrationRetryTimer = Some {
193+ context.system.scheduler.schedule( PROLONGED_REGISTRATION_RETRY_INTERVAL ,
194+ PROLONGED_REGISTRATION_RETRY_INTERVAL )(retryConnectToMaster )
187195 }
188196 }
197+ } else {
198+ logError(" All masters are unresponsive! Giving up." )
199+ System .exit(1 )
189200 }
190201 }
191202 }
192203
204+ def registerWithMaster () {
205+ // DisassociatedEvent may be triggered multiple times, so don't attempt registration
206+ // if there are outstanding registration attempts scheduled.
207+ registrationRetryTimer match {
208+ case None =>
209+ registered = false
210+ tryRegisterAllMasters()
211+ connectionAttemptCount = 0
212+ registrationRetryTimer = Some {
213+ context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL ,
214+ INITIAL_REGISTRATION_RETRY_INTERVAL )(retryConnectToMaster)
215+ }
216+ case Some (_) =>
217+ logInfo(" Not spawning another attempt to register with the master, since there is an" +
218+ " attempt scheduled already." )
219+ }
220+ }
221+
193222 override def receiveWithLogging = {
194223 case RegisteredWorker (masterUrl, masterWebUiUrl) =>
195224 logInfo(" Successfully registered with master " + masterUrl)
@@ -201,8 +230,6 @@ private[spark] class Worker(
201230 context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
202231 CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup )
203232 }
204- scheduledReconnectTask.foreach(_.cancel())
205- scheduledReconnectTask = None
206233
207234 case SendHeartbeat =>
208235 if (connected) { master ! Heartbeat (workerId) }
@@ -251,7 +278,7 @@ private[spark] class Worker(
251278
252279 case ReconnectWorker (masterUrl) =>
253280 logInfo(s " Master with url $masterUrl requested this worker to reconnect. " )
254- scheduleAttemptsToReconnectToMaster ()
281+ registerWithMaster ()
255282
256283 case LaunchExecutor (masterUrl, appId, execId, appDesc, cores_, memory_) =>
257284 if (masterUrl != activeMasterUrl) {
@@ -375,16 +402,7 @@ private[spark] class Worker(
375402 private def masterDisconnected () {
376403 logError(" Connection to master failed! Waiting for master to reconnect..." )
377404 connected = false
378- scheduleAttemptsToReconnectToMaster()
379- }
380-
381- private def scheduleAttemptsToReconnectToMaster () {
382- if (! scheduledReconnectTask.isDefined) {
383- scheduledReconnectTask = Some (context.system.scheduler.schedule(
384- Duration Zero , RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) {
385- tryRegisterAllMasters()
386- })
387- }
405+ registerWithMaster()
388406 }
389407
390408 def generateWorkerId (): String = {
@@ -394,7 +412,6 @@ private[spark] class Worker(
394412 override def postStop () {
395413 metricsSystem.report()
396414 registrationRetryTimer.foreach(_.cancel())
397- scheduledReconnectTask.foreach(_.cancel())
398415 executors.values.foreach(_.kill())
399416 drivers.values.foreach(_.kill())
400417 webUi.stop()
0 commit comments