From a3e69c0234cb97c73ec57e5fbe30fde89a68899b Mon Sep 17 00:00:00 2001 From: Jason Moore Date: Thu, 28 Apr 2016 12:14:15 +1000 Subject: [PATCH 1/2] [SPARK-14915] [CORE] Don't re-queue a task if another attempt has already succeeded --- .../org/apache/spark/scheduler/TaskSetManager.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6e08cdd87a8d1..539878d97c048 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -716,7 +716,15 @@ private[spark] class TaskSetManager( failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). put(info.executorId, clock.getTimeMillis()) sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) - addPendingTask(index) + + if (successful(index)) { + logWarning( + s"Task ${info.id} in stage ${taskSet.id} (TID $tid) will not be re-queued " + + "as another attempt has already succeeded") + } else { + addPendingTask(index) + } + if (!isZombie && state != TaskState.KILLED && reason.isInstanceOf[TaskFailedReason] && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) { From fa6068a339abc539b4f1e0da37208ae74e7ecd0b Mon Sep 17 00:00:00 2001 From: Jason Moore Date: Thu, 5 May 2016 10:38:15 +1000 Subject: [PATCH 2/2] [SPARK-14915] [CORE] Change the log to INFO and slightly re-worded for clarity --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 539878d97c048..eea82bed4254d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -718,9 +718,10 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) if (successful(index)) { - logWarning( - s"Task ${info.id} in stage ${taskSet.id} (TID $tid) will not be re-queued " + - "as another attempt has already succeeded") + logInfo( + s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + + "but another instance of the task has already succeeded, " + + "so not re-queuing the task to be re-executed.") } else { addPendingTask(index) }