diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..aef4a13dde031 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -126,9 +126,10 @@ private[spark] class ExecutorAllocationManager( // Executors that have been requested to be removed but have not been killed yet private val executorsPendingToRemove = new mutable.HashSet[String] + with mutable.SynchronizedSet[String] // All known executors - private val executorIds = new mutable.HashSet[String] + private val executorIds = new mutable.HashSet[String] with mutable.SynchronizedSet[String] // A timestamp of when an addition should be triggered, or NOT_SET if it is not set // This is set when pending tasks are added but not scheduled yet @@ -137,6 +138,7 @@ private[spark] class ExecutorAllocationManager( // A timestamp for each executor of when the executor should be removed, indexed by the ID // This is set when an executor is no longer running a task, or when it first registers private val removeTimes = new mutable.HashMap[String, Long] + with mutable.SynchronizedMap[String, Long] // Polling loop interval (ms) private val intervalMillis: Long = 100 @@ -502,11 +504,15 @@ private[spark] class ExecutorAllocationManager( } } + def isExecutorPendingToRemove(executorId: String): Boolean = { + !executorsPendingToRemove.contains(executorId) + } + /** * Callback invoked when the specified executor is now running a task. * This resets all variables used for removing this executor. */ - private def onExecutorBusy(executorId: String): Unit = synchronized { + def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) } @@ -605,7 +611,6 @@ private[spark] class ExecutorAllocationManager( // Mark the executor on which this task is scheduled as busy executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId - allocationManager.onExecutorBusy(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c02597c4365c9..a133d964ee03c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -448,6 +448,16 @@ private[spark] class TaskSetManager( // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() + + val executorManager = sched.sc.executorAllocationManager.getOrElse(null) + if (executorManager != null) { + if (!executorManager.isExecutorPendingToRemove(execId)) { + logWarning(s"Executor $execId is removed before scheduler task.") + return None + } + executorManager.onExecutorBusy(execId) + } + // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size