@@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager(
6565 listenerBus : LiveListenerBus ,
6666 conf : SparkConf )
6767 extends Logging {
68+
69+ allocationManager =>
70+
6871 import ExecutorAllocationManager ._
6972
7073 // Lower and upper bounds on the number of executors. These are required.
@@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
121124 private var clock : Clock = new RealClock
122125
123126 // Listener for Spark events that impact the allocation policy
124- private val listener = new ExecutorAllocationListener ( this )
127+ private val listener = new ExecutorAllocationListener
125128
126129 /**
127130 * Verify that the settings specified through the config are valid.
@@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager(
209212 addTime += sustainedSchedulerBacklogTimeout * 1000
210213 }
211214
212- removeTimes.foreach { case (executorId, expireTime) =>
213- if (now >= expireTime) {
215+ removeTimes.retain { case (executorId, expireTime) =>
216+ val expired = now >= expireTime
217+ if (expired) {
214218 removeExecutor(executorId)
215- removeTimes.remove(executorId)
216219 }
220+ ! expired
217221 }
218222 }
219223
@@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager(
291295 // Do not kill the executor if we have already reached the lower bound
292296 val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
293297 if (numExistingExecutors - 1 < minNumExecutors) {
294- logInfo (s " Not removing idle executor $executorId because there are only " +
298+ logDebug (s " Not removing idle executor $executorId because there are only " +
295299 s " $numExistingExecutors executor(s) left (limit $minNumExecutors) " )
296300 return false
297301 }
@@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager(
315319 private def onExecutorAdded (executorId : String ): Unit = synchronized {
316320 if (! executorIds.contains(executorId)) {
317321 executorIds.add(executorId)
318- executorIds.foreach(onExecutorIdle)
322+ // If an executor (call this executor X) is not removed because the lower bound
323+ // has been reached, it will no longer be marked as idle. When new executors join,
324+ // however, we are no longer at the lower bound, and so we must mark executor X
325+ // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
326+ executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
319327 logInfo(s " New executor $executorId has registered (new total is ${executorIds.size}) " )
320328 if (numExecutorsPending > 0 ) {
321329 numExecutorsPending -= 1
@@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager(
373381 * the executor is not already marked as idle.
374382 */
375383 private def onExecutorIdle (executorId : String ): Unit = synchronized {
376- if (! removeTimes.contains(executorId) && ! executorsPendingToRemove.contains(executorId)) {
377- logDebug(s " Starting idle timer for $executorId because there are no more tasks " +
378- s " scheduled to run on the executor (to expire in $executorIdleTimeout seconds) " )
379- removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
384+ if (executorIds.contains(executorId)) {
385+ if (! removeTimes.contains(executorId) && ! executorsPendingToRemove.contains(executorId)) {
386+ logDebug(s " Starting idle timer for $executorId because there are no more tasks " +
387+ s " scheduled to run on the executor (to expire in $executorIdleTimeout seconds) " )
388+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
389+ }
390+ } else {
391+ logWarning(s " Attempted to mark unknown executor $executorId idle " )
380392 }
381393 }
382394
@@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager(
396408 * and consistency of events returned by the listener. For simplicity, it does not account
397409 * for speculated tasks.
398410 */
399- private class ExecutorAllocationListener (allocationManager : ExecutorAllocationManager )
400- extends SparkListener {
411+ private class ExecutorAllocationListener extends SparkListener {
401412
402413 private val stageIdToNumTasks = new mutable.HashMap [Int , Int ]
403414 private val stageIdToTaskIndices = new mutable.HashMap [Int , mutable.HashSet [Int ]]
404415 private val executorIdToTaskIds = new mutable.HashMap [String , mutable.HashSet [Long ]]
405416
406417 override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ): Unit = {
407- synchronized {
408- val stageId = stageSubmitted.stageInfo.stageId
409- val numTasks = stageSubmitted.stageInfo.numTasks
418+ val stageId = stageSubmitted.stageInfo.stageId
419+ val numTasks = stageSubmitted.stageInfo.numTasks
420+ allocationManager. synchronized {
410421 stageIdToNumTasks(stageId) = numTasks
411422 allocationManager.onSchedulerBacklogged()
412423 }
413424 }
414425
415426 override def onStageCompleted (stageCompleted : SparkListenerStageCompleted ): Unit = {
416- synchronized {
417- val stageId = stageCompleted.stageInfo.stageId
427+ val stageId = stageCompleted.stageInfo.stageId
428+ allocationManager. synchronized {
418429 stageIdToNumTasks -= stageId
419430 stageIdToTaskIndices -= stageId
420431
@@ -426,47 +437,62 @@ private[spark] class ExecutorAllocationManager(
426437 }
427438 }
428439
429- override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = synchronized {
440+ override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
430441 val stageId = taskStart.stageId
431442 val taskId = taskStart.taskInfo.taskId
432443 val taskIndex = taskStart.taskInfo.index
433444 val executorId = taskStart.taskInfo.executorId
434445
435- // If this is the last pending task, mark the scheduler queue as empty
436- stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet [Int ]) += taskIndex
437- val numTasksScheduled = stageIdToTaskIndices(stageId).size
438- val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, - 1 )
439- if (numTasksScheduled == numTasksTotal) {
440- // No more pending tasks for this stage
441- stageIdToNumTasks -= stageId
442- if (stageIdToNumTasks.isEmpty) {
443- allocationManager.onSchedulerQueueEmpty()
446+ allocationManager.synchronized {
447+ // This guards against the race condition in which the `SparkListenerTaskStart`
448+ // event is posted before the `SparkListenerBlockManagerAdded` event, which is
449+ // possible because these events are posted in different threads. (see SPARK-4951)
450+ if (! allocationManager.executorIds.contains(executorId)) {
451+ allocationManager.onExecutorAdded(executorId)
452+ }
453+
454+ // If this is the last pending task, mark the scheduler queue as empty
455+ stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet [Int ]) += taskIndex
456+ val numTasksScheduled = stageIdToTaskIndices(stageId).size
457+ val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, - 1 )
458+ if (numTasksScheduled == numTasksTotal) {
459+ // No more pending tasks for this stage
460+ stageIdToNumTasks -= stageId
461+ if (stageIdToNumTasks.isEmpty) {
462+ allocationManager.onSchedulerQueueEmpty()
463+ }
444464 }
445- }
446465
447- // Mark the executor on which this task is scheduled as busy
448- executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet [Long ]) += taskId
449- allocationManager.onExecutorBusy(executorId)
466+ // Mark the executor on which this task is scheduled as busy
467+ executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet [Long ]) += taskId
468+ allocationManager.onExecutorBusy(executorId)
469+ }
450470 }
451471
452- override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = synchronized {
472+ override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
453473 val executorId = taskEnd.taskInfo.executorId
454474 val taskId = taskEnd.taskInfo.taskId
455-
456- // If the executor is no longer running scheduled any tasks, mark it as idle
457- if (executorIdToTaskIds.contains(executorId)) {
458- executorIdToTaskIds(executorId) -= taskId
459- if (executorIdToTaskIds(executorId).isEmpty) {
460- executorIdToTaskIds -= executorId
461- allocationManager.onExecutorIdle(executorId)
475+ allocationManager.synchronized {
476+ // If the executor is no longer running scheduled any tasks, mark it as idle
477+ if (executorIdToTaskIds.contains(executorId)) {
478+ executorIdToTaskIds(executorId) -= taskId
479+ if (executorIdToTaskIds(executorId).isEmpty) {
480+ executorIdToTaskIds -= executorId
481+ allocationManager.onExecutorIdle(executorId)
482+ }
462483 }
463484 }
464485 }
465486
466487 override def onBlockManagerAdded (blockManagerAdded : SparkListenerBlockManagerAdded ): Unit = {
467488 val executorId = blockManagerAdded.blockManagerId.executorId
468489 if (executorId != SparkContext .DRIVER_IDENTIFIER ) {
469- allocationManager.onExecutorAdded(executorId)
490+ // This guards against the race condition in which the `SparkListenerTaskStart`
491+ // event is posted before the `SparkListenerBlockManagerAdded` event, which is
492+ // possible because these events are posted in different threads. (see SPARK-4951)
493+ if (! allocationManager.executorIds.contains(executorId)) {
494+ allocationManager.onExecutorAdded(executorId)
495+ }
470496 }
471497 }
472498
@@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager(
478504 /**
479505 * An estimate of the total number of pending tasks remaining for currently running stages. Does
480506 * not account for tasks which may have failed and been resubmitted.
507+ *
508+ * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
481509 */
482510 def totalPendingTasks (): Int = {
483511 stageIdToNumTasks.map { case (stageId, numTasks) =>
484512 numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0 )
485513 }.sum
486514 }
515+
516+ /**
517+ * Return true if an executor is not currently running a task, and false otherwise.
518+ *
519+ * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
520+ */
521+ def isExecutorIdle (executorId : String ): Boolean = {
522+ ! executorIdToTaskIds.contains(executorId)
523+ }
487524 }
488525
489526}
0 commit comments