From 88c9c3ef407cecbe46ced9411d1d14ff70752d65 Mon Sep 17 00:00:00 2001 From: hongshen Date: Mon, 5 Oct 2015 10:12:50 +0800 Subject: [PATCH 1/4] Prevent task failed for executor kill by driver --- .../spark/ExecutorAllocationManager.scala | 17 ++++++++++++----- .../apache/spark/scheduler/TaskSetManager.scala | 8 ++++++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..aa5ff56e206b5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -125,10 +125,10 @@ private[spark] class ExecutorAllocationManager( conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) // Executors that have been requested to be removed but have not been killed yet - private val executorsPendingToRemove = new mutable.HashSet[String] + private val executorsPendingToRemove = new mutable.HashSet[String] with mutable.SynchronizedSet[String] // All known executors - private val executorIds = new mutable.HashSet[String] + private val executorIds = new mutable.HashSet[String] with mutable.SynchronizedSet[String] // A timestamp of when an addition should be triggered, or NOT_SET if it is not set // This is set when pending tasks are added but not scheduled yet @@ -136,7 +136,7 @@ private[spark] class ExecutorAllocationManager( // A timestamp for each executor of when the executor should be removed, indexed by the ID // This is set when an executor is no longer running a task, or when it first registers - private val removeTimes = new mutable.HashMap[String, Long] + private val removeTimes = new mutable.HashMap[String, Long] with mutable.SynchronizedMap[String, Long] // Polling loop interval (ms) private val intervalMillis: Long = 100 @@ -501,12 +501,20 @@ private[spark] class ExecutorAllocationManager( logWarning(s"Attempted to mark unknown executor $executorId idle") } } + + def isExecutorAlive(executorId: String): Boolean = { + if (!executorsPendingToRemove.contains(executorId)) { + true + } else { + false + } + } /** * Callback invoked when the specified executor is now running a task. * This resets all variables used for removing this executor. */ - private def onExecutorBusy(executorId: String): Unit = synchronized { + def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) } @@ -605,7 +613,6 @@ private[spark] class ExecutorAllocationManager( // Mark the executor on which this task is scheduled as busy executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId - allocationManager.onExecutorBusy(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c02597c4365c9..7bd54f48fd9b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -474,6 +474,14 @@ private[spark] class TaskSetManager( abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } + val executorManager = sched.sc.executorAllocationManager.getOrElse(null) + if(executorManager != null) { + if(!executorManager.isExecutorAlive(execId)) { + return None + } + executorManager.onExecutorBusy(execId) + } + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true From 18cbf73b81023e8adac6326e3225799291cbf8b5 Mon Sep 17 00:00:00 2001 From: hongshen Date: Mon, 5 Oct 2015 12:01:06 +0800 Subject: [PATCH 2/4] Fix code style. --- .../org/apache/spark/ExecutorAllocationManager.scala | 8 +++++--- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index aa5ff56e206b5..eef726294286c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -125,7 +125,8 @@ private[spark] class ExecutorAllocationManager( conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) // Executors that have been requested to be removed but have not been killed yet - private val executorsPendingToRemove = new mutable.HashSet[String] with mutable.SynchronizedSet[String] + private val executorsPendingToRemove = new mutable.HashSet[String] + with mutable.SynchronizedSet[String] // All known executors private val executorIds = new mutable.HashSet[String] with mutable.SynchronizedSet[String] @@ -136,7 +137,8 @@ private[spark] class ExecutorAllocationManager( // A timestamp for each executor of when the executor should be removed, indexed by the ID // This is set when an executor is no longer running a task, or when it first registers - private val removeTimes = new mutable.HashMap[String, Long] with mutable.SynchronizedMap[String, Long] + private val removeTimes = new mutable.HashMap[String, Long] + with mutable.SynchronizedMap[String, Long] // Polling loop interval (ms) private val intervalMillis: Long = 100 @@ -501,7 +503,7 @@ private[spark] class ExecutorAllocationManager( logWarning(s"Attempted to mark unknown executor $executorId idle") } } - + def isExecutorAlive(executorId: String): Boolean = { if (!executorsPendingToRemove.contains(executorId)) { true diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7bd54f48fd9b8..01a7acfbf3b70 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -475,13 +475,13 @@ private[spark] class TaskSetManager( throw new TaskNotSerializableException(e) } val executorManager = sched.sc.executorAllocationManager.getOrElse(null) - if(executorManager != null) { - if(!executorManager.isExecutorAlive(execId)) { + if (executorManager != null) { + if (!executorManager.isExecutorAlive(execId)) { return None } executorManager.onExecutorBusy(execId) } - + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true From 6215f5259fa7ba3fd4e4268c3eb6ab43fb7ed7a3 Mon Sep 17 00:00:00 2001 From: hongshen Date: Mon, 19 Oct 2015 14:31:04 +0800 Subject: [PATCH 3/4] Rename method. --- .../spark/ExecutorAllocationManager.scala | 8 ++------ .../spark/scheduler/TaskSetManager.scala | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index eef726294286c..aef4a13dde031 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -504,12 +504,8 @@ private[spark] class ExecutorAllocationManager( } } - def isExecutorAlive(executorId: String): Boolean = { - if (!executorsPendingToRemove.contains(executorId)) { - true - } else { - false - } + def isExecutorPendingToRemove(executorId: String): Boolean = { + !executorsPendingToRemove.contains(executorId) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 01a7acfbf3b70..0dd97d8fdfbfa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -448,6 +448,16 @@ private[spark] class TaskSetManager( // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() + + val executorManager = sched.sc.executorAllocationManager.getOrElse(null) + if (executorManager != null) { + if (!executorManager.isExecutorPendingToRemove(execId)) { + logWarning(s"Executor $execId is removed before scheduler task.") + return None + } + executorManager.onExecutorBusy(execId) + } + // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size @@ -474,14 +484,7 @@ private[spark] class TaskSetManager( abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } - val executorManager = sched.sc.executorAllocationManager.getOrElse(null) - if (executorManager != null) { - if (!executorManager.isExecutorAlive(execId)) { - return None - } - executorManager.onExecutorBusy(execId) - } - + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true From de47fb6f51a4483448f29948b3ef43f5059d1386 Mon Sep 17 00:00:00 2001 From: hongshen Date: Mon, 19 Oct 2015 14:50:53 +0800 Subject: [PATCH 4/4] Fix code style. --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 0dd97d8fdfbfa..a133d964ee03c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -448,7 +448,7 @@ private[spark] class TaskSetManager( // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() - + val executorManager = sched.sc.executorAllocationManager.getOrElse(null) if (executorManager != null) { if (!executorManager.isExecutorPendingToRemove(execId)) { @@ -484,7 +484,6 @@ private[spark] class TaskSetManager( abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true