Skip to content

Commit 1eca3fe

Browse files
carsonwangMarcelo Vanzin
authored andcommitted
[SPARK-8372] History server shows incorrect information for application not started
The history server may show an incorrect App ID for an incomplete application like <App ID>.inprogress. This app info will never disappear even after the app is completed. ![incorrectappinfo](https://cloud.githubusercontent.com/assets/9278199/8156147/2a10fdbe-137d-11e5-9620-c5b61d93e3c1.png) The cause of the issue is that a log path name is used as the app id when app id cannot be got during replay. Author: Carson Wang <[email protected]> Closes #6827 from carsonwang/SPARK-8372 and squashes the following commits: cdbb089 [Carson Wang] Fix code style 3e46b35 [Carson Wang] Update code style 90f5dde [Carson Wang] Add a unit test d8c9cd0 [Carson Wang] Replaying events only return information when app is started
1 parent ea88b1a commit 1eca3fe

File tree

2 files changed

+53
-28
lines changed

2 files changed

+53
-28
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
160160
replayBus.addListener(appListener)
161161
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
162162

163-
ui.setAppName(s"${appInfo.name} ($appId)")
163+
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
164164

165165
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
166166
ui.getSecurityManager.setAcls(uiAclsEnabled)
@@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
282282
val newAttempts = logs.flatMap { fileStatus =>
283283
try {
284284
val res = replay(fileStatus, bus)
285-
logInfo(s"Application log ${res.logPath} loaded successfully.")
286-
Some(res)
285+
res match {
286+
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
287+
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
288+
"The application may have not started.")
289+
}
290+
res
287291
} catch {
288292
case e: Exception =>
289293
logError(
@@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
429433

430434
/**
431435
* Replays the events in the specified log file and returns information about the associated
432-
* application.
436+
* application. Return `None` if the application ID cannot be located.
433437
*/
434-
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
438+
private def replay(
439+
eventLog: FileStatus,
440+
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
435441
val logPath = eventLog.getPath()
436442
logInfo(s"Replaying log path: $logPath")
437443
val logInput =
@@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
445451
val appCompleted = isApplicationCompleted(eventLog)
446452
bus.addListener(appListener)
447453
bus.replay(logInput, logPath.toString, !appCompleted)
448-
new FsApplicationAttemptInfo(
449-
logPath.getName(),
450-
appListener.appName.getOrElse(NOT_STARTED),
451-
appListener.appId.getOrElse(logPath.getName()),
452-
appListener.appAttemptId,
453-
appListener.startTime.getOrElse(-1L),
454-
appListener.endTime.getOrElse(-1L),
455-
getModificationTime(eventLog).get,
456-
appListener.sparkUser.getOrElse(NOT_STARTED),
457-
appCompleted)
454+
appListener.appId.map { appId =>
455+
new FsApplicationAttemptInfo(
456+
logPath.getName(),
457+
appListener.appName.getOrElse(NOT_STARTED),
458+
appId,
459+
appListener.appAttemptId,
460+
appListener.startTime.getOrElse(-1L),
461+
appListener.endTime.getOrElse(-1L),
462+
getModificationTime(eventLog).get,
463+
appListener.sparkUser.getOrElse(NOT_STARTED),
464+
appCompleted)
465+
}
458466
} finally {
459467
logInput.close()
460468
}

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,29 +67,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
6767
// Write a new-style application log.
6868
val newAppComplete = newLogFile("new1", None, inProgress = false)
6969
writeFile(newAppComplete, true, None,
70-
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
70+
SparkListenerApplicationStart(
71+
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
7172
SparkListenerApplicationEnd(5L)
7273
)
7374

7475
// Write a new-style application log.
7576
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
7677
Some("lzf"))
7778
writeFile(newAppCompressedComplete, true, None,
78-
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
79+
SparkListenerApplicationStart(
80+
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
7981
SparkListenerApplicationEnd(4L))
8082

8183
// Write an unfinished app, new-style.
8284
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
8385
writeFile(newAppIncomplete, true, None,
84-
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
86+
SparkListenerApplicationStart(
87+
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
8588
)
8689

8790
// Write an old-style application log.
8891
val oldAppComplete = new File(testDir, "old1")
8992
oldAppComplete.mkdir()
9093
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
9194
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
92-
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
95+
SparkListenerApplicationStart(
96+
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
9397
SparkListenerApplicationEnd(3L)
9498
)
9599
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -103,7 +107,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
103107
oldAppIncomplete.mkdir()
104108
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
105109
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
106-
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
110+
SparkListenerApplicationStart(
111+
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
107112
)
108113

109114
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -124,16 +129,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
124129
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
125130
}
126131

127-
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
132+
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
128133
newAppComplete.lastModified(), "test", true))
129-
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
134+
list(1) should be (makeAppInfo("new-app-compressed-complete",
130135
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
131136
true))
132-
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
137+
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
133138
oldAppComplete.lastModified(), "test", true))
134-
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
139+
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
135140
oldAppIncomplete.lastModified(), "test", false))
136-
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
141+
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
137142
newAppIncomplete.lastModified(), "test", false))
138143

139144
// Make sure the UI can be rendered.
@@ -157,7 +162,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
157162
logDir.mkdir()
158163
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
159164
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
160-
SparkListenerApplicationStart("app2", None, 2L, "test", None),
165+
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
161166
SparkListenerApplicationEnd(3L)
162167
)
163168
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -180,12 +185,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
180185
test("SPARK-3697: ignore directories that cannot be read.") {
181186
val logFile1 = newLogFile("new1", None, inProgress = false)
182187
writeFile(logFile1, true, None,
183-
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
188+
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
184189
SparkListenerApplicationEnd(2L)
185190
)
186191
val logFile2 = newLogFile("new2", None, inProgress = false)
187192
writeFile(logFile2, true, None,
188-
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
193+
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
189194
SparkListenerApplicationEnd(2L)
190195
)
191196
logFile2.setReadable(false, false)
@@ -218,6 +223,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
218223
}
219224
}
220225

226+
test("Parse logs that application is not started") {
227+
val provider = new FsHistoryProvider((createTestConf()))
228+
229+
val logFile1 = newLogFile("app1", None, inProgress = true)
230+
writeFile(logFile1, true, None,
231+
SparkListenerLogStart("1.4")
232+
)
233+
updateAndCheck(provider) { list =>
234+
list.size should be (0)
235+
}
236+
}
237+
221238
test("SPARK-5582: empty log directory") {
222239
val provider = new FsHistoryProvider(createTestConf())
223240

0 commit comments

Comments
 (0)