@@ -104,6 +104,8 @@ private[spark] class Master(
104104
105105 var leaderElectionAgent : ActorRef = _
106106
107+ private var recoverCallable : Cancellable = _
108+
107109 // As a temporary workaround before better ways of configuring memory, we allow users to set
108110 // a flag that will perform round-robin scheduling across the nodes (spreading out each app
109111 // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@@ -152,6 +154,10 @@ private[spark] class Master(
152154 }
153155
154156 override def postStop () {
157+ // prevent the CompleteRecovery message sending to restarted master
158+ if (recoverCallable != null ) {
159+ recoverCallable.cancel()
160+ }
155161 webUi.stop()
156162 fileSystemsUsed.foreach(_.close())
157163 masterMetricsSystem.stop()
@@ -171,11 +177,12 @@ private[spark] class Master(
171177 logInfo(" I have been elected leader! New state: " + state)
172178 if (state == RecoveryState .RECOVERING ) {
173179 beginRecovery(storedApps, storedDrivers, storedWorkers)
174- context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
180+ recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis)
181+ { self ! CompleteRecovery }
175182 }
176183 }
177184
178- case TriggerSchedule => schedule ()
185+ case CompleteRecovery => completeRecovery ()
179186
180187 case RevokedLeadership => {
181188 logError(" Leadership has been revoked -- master shutting down." )
@@ -450,7 +457,7 @@ private[spark] class Master(
450457 }
451458
452459 state = RecoveryState .ALIVE
453- self ! TriggerSchedule
460+ schedule()
454461 logInfo(" Recovery complete - resuming operations!" )
455462 }
456463
@@ -467,7 +474,7 @@ private[spark] class Master(
467474 * Schedule the currently available resources among waiting apps. This method will be called
468475 * every time a new app joins or resource availability changes.
469476 */
470- def schedule () {
477+ private def schedule () {
471478 if (state != RecoveryState .ALIVE ) { return }
472479
473480 // First schedule drivers, they take strict precedence over applications
@@ -487,7 +494,7 @@ private[spark] class Master(
487494 // Try to spread out each app among all the nodes, until it has all its cores
488495 for (app <- waitingApps if app.coresLeft > 0 ) {
489496 val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
490- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
497+ .filter(canUse(app, _)).sortBy(_.coresFree).reverse
491498 val numUsable = usableWorkers.length
492499 val assigned = new Array [Int ](numUsable) // Number of cores to give on each node
493500 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
0 commit comments