Skip to content

Commit cd6d240

Browse files
Comments are addressed.
1 parent 08526a8 commit cd6d240

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.TaskContext
2222
class FakeTask(
2323
stageId: Int,
2424
partitionId: Int,
25-
prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) {
25+
prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, stageAttemptId = 0, partitionId) {
2626
override def runTask(context: TaskContext): Int = 0
2727
override def preferredLocations: Seq[TaskLocation] = prefLocs
2828
}
@@ -33,11 +33,11 @@ object FakeTask {
3333
* locations for each task (given as varargs) if this sequence is not empty.
3434
*/
3535
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
36-
createTaskSet(numTasks, 0, prefLocs: _*)
36+
createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*)
3737
}
3838

3939
def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
40-
createTaskSet(numTasks, 0, stageAttemptId, prefLocs: _*)
40+
createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*)
4141
}
4242

4343
def createTaskSet(numTasks: Int, stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*):
@@ -48,6 +48,6 @@ object FakeTask {
4848
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
4949
new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
5050
}
51-
new TaskSet(tasks, stageId, stageAttemptId, 0, null)
51+
new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
5252
}
5353
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -974,18 +974,20 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
974974
assert(manager.isZombie)
975975
}
976976

977-
test("check uniqueness of TaskSetManager name") {
977+
test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
978978
sc = new SparkContext("local", "test")
979979
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
980-
val taskSet = FakeTask.createTaskSet(1, 0, 0)
980+
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
981981
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
982982
assert(manager.name === "TaskSet_0.0")
983983

984-
val taskSet2 = FakeTask.createTaskSet(1, 0, 1)
984+
// Make sure a task set with the same stage ID but different attempt ID also has a unique name
985+
val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
985986
val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
986987
assert(manager2.name === "TaskSet_0.1")
987988

988-
val taskSet3 = FakeTask.createTaskSet(1, 1, 1)
989+
// Make sure a task set with the same attempt ID but different stage ID also has a unique name
990+
val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
989991
val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
990992
assert(manager3.name === "TaskSet_1.1")
991993
}

0 commit comments

Comments
 (0)