Skip to content

Commit a0d7949

Browse files
zsxwinggatorsmile
authored andcommitted
[SPARK-23475][WEBUI] Skipped stages should be evicted before completed stages
## What changes were proposed in this pull request? The root cause of missing completed stages is because `cleanupStages` will never remove skipped stages. This PR changes the logic to always remove skipped stage first. This is safe since the job itself contains enough information to render skipped stages in the UI. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <[email protected]> Closes apache#20656 from zsxwing/SPARK-23475. (cherry picked from commit 45cf714) Signed-off-by: gatorsmile <[email protected]>
1 parent 23ba441 commit a0d7949

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,10 @@ private[spark] class AppStatusListener(
863863
return
864864
}
865865

866-
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
866+
// As the completion time of a skipped stage is always -1, we will remove skipped stages first.
867+
// This is safe since the job itself contains enough information to render skipped stages in the
868+
// UI.
869+
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime")
867870
val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
868871
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
869872
}

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
10251025
}
10261026
}
10271027

1028+
test("skipped stages should be evicted before completed stages") {
1029+
val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
1030+
val listener = new AppStatusListener(store, testConf, true)
1031+
1032+
val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
1033+
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
1034+
1035+
// Sart job 1
1036+
time += 1
1037+
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
1038+
1039+
// Start and stop stage 1
1040+
time += 1
1041+
stage1.submissionTime = Some(time)
1042+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
1043+
1044+
time += 1
1045+
stage1.completionTime = Some(time)
1046+
listener.onStageCompleted(SparkListenerStageCompleted(stage1))
1047+
1048+
// Stop job 1 and stage 2 will become SKIPPED
1049+
time += 1
1050+
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
1051+
1052+
// Submit stage 3 and verify stage 2 is evicted
1053+
val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
1054+
time += 1
1055+
stage3.submissionTime = Some(time)
1056+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
1057+
1058+
assert(store.count(classOf[StageDataWrapper]) === 2)
1059+
intercept[NoSuchElementException] {
1060+
store.read(classOf[StageDataWrapper], Array(2, 0))
1061+
}
1062+
}
1063+
10281064
test("eviction should respect task completion time") {
10291065
val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
10301066
val listener = new AppStatusListener(store, testConf, true)

0 commit comments

Comments
 (0)