Skip to content

Commit 5983828

Browse files
rekhajoshmsrowen
authored andcommitted
[SPARK-8593] [CORE] Sort app attempts by start time.
This makes sure attempts are listed in the order they were executed, and that the app's state matches the state of the most current attempt. Author: Joshi <[email protected]> Author: Rekha Joshi <[email protected]> Closes apache#7253 from rekhajoshm/SPARK-8593 and squashes the following commits: 874dd80 [Joshi] History Server: updated order for multiple attempts(logcleaner) 716e0b1 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime) 548c753 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime) 83306a8 [Joshi] History Server: updated order for multiple attempts(descending start time) b0fc922 [Joshi] History Server: updated order for multiple attempts(updated comment) cc0fda7 [Joshi] History Server: updated order for multiple attempts(updated test) 304cb0b [Joshi] History Server: updated order for multiple attempts(reverted HistoryPage) 85024e8 [Joshi] History Server: updated order for multiple attempts a41ac4b [Joshi] History Server: updated order for multiple attempts ab65fa1 [Joshi] History Server: some attempt completed to work with showIncomplete 0be142d [Rekha Joshi] Merge pull request #3 from apache/master 106fd8e [Rekha Joshi] Merge pull request #2 from apache/master e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master (cherry picked from commit 42d8a01) Signed-off-by: Sean Owen <[email protected]>
1 parent f34f3d7 commit 5983828

File tree

2 files changed

+14
-20
lines changed

2 files changed

+14
-20
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
352352

353353
/**
354354
* Comparison function that defines the sort order for application attempts within the same
355-
* application. Order is: running attempts before complete attempts, running attempts sorted
356-
* by start time, completed attempts sorted by end time.
355+
* application. Order is: attempts are sorted by descending start time.
356+
* Most recent attempt state matches with current state of the app.
357357
*
358358
* Normally applications should have a single running attempt; but failure to call sc.stop()
359359
* may cause multiple running attempts to show up.
@@ -363,11 +363,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
363363
private def compareAttemptInfo(
364364
a1: FsApplicationAttemptInfo,
365365
a2: FsApplicationAttemptInfo): Boolean = {
366-
if (a1.completed == a2.completed) {
367-
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
368-
} else {
369-
!a1.completed
370-
}
366+
a1.startTime >= a2.startTime
371367
}
372368

373369
/**

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
239239
appListAfterRename.size should be (1)
240240
}
241241

242-
test("apps with multiple attempts") {
242+
test("apps with multiple attempts with order") {
243243
val provider = new FsHistoryProvider(createTestConf())
244244

245-
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
245+
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
246246
writeFile(attempt1, true, None,
247-
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
248-
SparkListenerApplicationEnd(2L)
247+
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
249248
)
250249

251250
updateAndCheck(provider) { list =>
@@ -255,7 +254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
255254

256255
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
257256
writeFile(attempt2, true, None,
258-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
257+
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
259258
)
260259

261260
updateAndCheck(provider) { list =>
@@ -264,30 +263,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
264263
list.head.attempts.head.attemptId should be (Some("attempt2"))
265264
}
266265

267-
val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
268-
attempt2.delete()
269-
writeFile(attempt2, true, None,
270-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
266+
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
267+
writeFile(attempt3, true, None,
268+
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
271269
SparkListenerApplicationEnd(4L)
272270
)
273271

274272
updateAndCheck(provider) { list =>
275273
list should not be (null)
276274
list.size should be (1)
277-
list.head.attempts.size should be (2)
278-
list.head.attempts.head.attemptId should be (Some("attempt2"))
275+
list.head.attempts.size should be (3)
276+
list.head.attempts.head.attemptId should be (Some("attempt3"))
279277
}
280278

281279
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
282-
writeFile(attempt2, true, None,
280+
writeFile(attempt1, true, None,
283281
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
284282
SparkListenerApplicationEnd(6L)
285283
)
286284

287285
updateAndCheck(provider) { list =>
288286
list.size should be (2)
289287
list.head.attempts.size should be (1)
290-
list.last.attempts.size should be (2)
288+
list.last.attempts.size should be (3)
291289
list.head.attempts.head.attemptId should be (Some("attempt1"))
292290

293291
list.foreach { case app =>

0 commit comments

Comments
 (0)