Skip to content

Commit 2ea00a5

Browse files
Eric Vandenbergjsoltren
authored andcommitted
[SPARK-21219][CORE] Task retry occurs on same executor due to race condition with blacklisting
There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor. This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure). Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed. There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219 The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask Implemented a unit test that verifies the task is black listed before it is added to the pending task. Ran the unit test without the fix and it fails. Ran the unit test with the fix and it passes. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Eric Vandenberg <[email protected]> Closes #18427 from ericvandenbergfb/blacklistFix.
1 parent 399aa01 commit 2ea00a5

File tree

2 files changed

+54
-11
lines changed

2 files changed

+54
-11
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
198198
private[scheduler] var emittedTaskSizeWarning = false
199199

200200
/** Add a task to all the pending-task lists that it should be on. */
201-
private def addPendingTask(index: Int) {
201+
private[spark] def addPendingTask(index: Int) {
202202
for (loc <- tasks(index).preferredLocations) {
203203
loc match {
204204
case e: ExecutorCacheTaskLocation =>
@@ -826,15 +826,6 @@ private[spark] class TaskSetManager(
826826

827827
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
828828

829-
if (successful(index)) {
830-
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
831-
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
832-
s" so the previous stage needs to be re-run, or because a different copy of the task" +
833-
s" has already succeeded).")
834-
} else {
835-
addPendingTask(index)
836-
}
837-
838829
if (!isZombie && reason.countTowardsTaskFailures) {
839830
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
840831
info.host, info.executorId, index))
@@ -848,6 +839,16 @@ private[spark] class TaskSetManager(
848839
return
849840
}
850841
}
842+
843+
if (successful(index)) {
844+
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
845+
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
846+
s" so the previous stage needs to be re-run, or because a different copy of the task" +
847+
s" has already succeeded).")
848+
} else {
849+
addPendingTask(index)
850+
}
851+
851852
maybeFinishTaskSet()
852853
}
853854

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
2424

2525
import org.mockito.Matchers.{any, anyInt, anyString}
26-
import org.mockito.Mockito.{mock, never, spy, verify, when}
26+
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
2727
import org.mockito.invocation.InvocationOnMock
2828
import org.mockito.stubbing.Answer
2929

@@ -1140,6 +1140,48 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
11401140
.updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
11411141
}
11421142

1143+
test("update blacklist before adding pending task to avoid race condition") {
1144+
// When a task fails, it should apply the blacklist policy prior to
1145+
// retrying the task otherwise there's a race condition where run on
1146+
// the same executor that it was intended to be black listed from.
1147+
val conf = new SparkConf().
1148+
set(config.BLACKLIST_ENABLED, true)
1149+
1150+
// Create a task with two executors.
1151+
sc = new SparkContext("local", "test", conf)
1152+
val exec = "executor1"
1153+
val host = "host1"
1154+
val exec2 = "executor2"
1155+
val host2 = "host2"
1156+
sched = new FakeTaskScheduler(sc, (exec, host), (exec2, host2))
1157+
val taskSet = FakeTask.createTaskSet(1)
1158+
1159+
val clock = new ManualClock
1160+
val mockListenerBus = mock(classOf[LiveListenerBus])
1161+
val blacklistTracker = new BlacklistTracker(mockListenerBus, conf, None, clock)
1162+
val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker))
1163+
val taskSetManagerSpy = spy(taskSetManager)
1164+
1165+
val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)
1166+
1167+
// Assert the task has been black listed on the executor it was last executed on.
1168+
when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
1169+
new Answer[Unit] {
1170+
override def answer(invocationOnMock: InvocationOnMock): Unit = {
1171+
val task = invocationOnMock.getArgumentAt(0, classOf[Int])
1172+
assert(taskSetManager.taskSetBlacklistHelperOpt.get.
1173+
isExecutorBlacklistedForTask(exec, task))
1174+
}
1175+
}
1176+
)
1177+
1178+
// Simulate a fake exception
1179+
val e = new ExceptionFailure("a", "b", Array(), "c", None)
1180+
taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e)
1181+
1182+
verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
1183+
}
1184+
11431185
private def createTaskResult(
11441186
id: Int,
11451187
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

0 commit comments

Comments
 (0)