Skip to content

Commit ea88b1a

Browse files
author
Andrew Or
committed
Revert "[SPARK-8372] History server shows incorrect information for application not started"
This reverts commit 2837e06.
1 parent 715f084 commit ea88b1a

File tree

2 files changed

+28
-53
lines changed

2 files changed

+28
-53
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
160160
replayBus.addListener(appListener)
161161
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
162162

163-
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
163+
ui.setAppName(s"${appInfo.name} ($appId)")
164164

165165
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
166166
ui.getSecurityManager.setAcls(uiAclsEnabled)
@@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
282282
val newAttempts = logs.flatMap { fileStatus =>
283283
try {
284284
val res = replay(fileStatus, bus)
285-
res match {
286-
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
287-
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
288-
"The application may have not started.")
289-
}
290-
res
285+
logInfo(s"Application log ${res.logPath} loaded successfully.")
286+
Some(res)
291287
} catch {
292288
case e: Exception =>
293289
logError(
@@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
433429

434430
/**
435431
* Replays the events in the specified log file and returns information about the associated
436-
* application. Return `None` if the application ID cannot be located.
432+
* application.
437433
*/
438-
private def replay(
439-
eventLog: FileStatus,
440-
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
434+
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
441435
val logPath = eventLog.getPath()
442436
logInfo(s"Replaying log path: $logPath")
443437
val logInput =
@@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
451445
val appCompleted = isApplicationCompleted(eventLog)
452446
bus.addListener(appListener)
453447
bus.replay(logInput, logPath.toString, !appCompleted)
454-
appListener.appId.map { appId =>
455-
new FsApplicationAttemptInfo(
456-
logPath.getName(),
457-
appListener.appName.getOrElse(NOT_STARTED),
458-
appId,
459-
appListener.appAttemptId,
460-
appListener.startTime.getOrElse(-1L),
461-
appListener.endTime.getOrElse(-1L),
462-
getModificationTime(eventLog).get,
463-
appListener.sparkUser.getOrElse(NOT_STARTED),
464-
appCompleted)
465-
}
448+
new FsApplicationAttemptInfo(
449+
logPath.getName(),
450+
appListener.appName.getOrElse(NOT_STARTED),
451+
appListener.appId.getOrElse(logPath.getName()),
452+
appListener.appAttemptId,
453+
appListener.startTime.getOrElse(-1L),
454+
appListener.endTime.getOrElse(-1L),
455+
getModificationTime(eventLog).get,
456+
appListener.sparkUser.getOrElse(NOT_STARTED),
457+
appCompleted)
466458
} finally {
467459
logInput.close()
468460
}

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -67,33 +67,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
6767
// Write a new-style application log.
6868
val newAppComplete = newLogFile("new1", None, inProgress = false)
6969
writeFile(newAppComplete, true, None,
70-
SparkListenerApplicationStart(
71-
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
70+
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
7271
SparkListenerApplicationEnd(5L)
7372
)
7473

7574
// Write a new-style application log.
7675
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
7776
Some("lzf"))
7877
writeFile(newAppCompressedComplete, true, None,
79-
SparkListenerApplicationStart(
80-
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
78+
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
8179
SparkListenerApplicationEnd(4L))
8280

8381
// Write an unfinished app, new-style.
8482
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
8583
writeFile(newAppIncomplete, true, None,
86-
SparkListenerApplicationStart(
87-
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
84+
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
8885
)
8986

9087
// Write an old-style application log.
9188
val oldAppComplete = new File(testDir, "old1")
9289
oldAppComplete.mkdir()
9390
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
9491
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
95-
SparkListenerApplicationStart(
96-
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
92+
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
9793
SparkListenerApplicationEnd(3L)
9894
)
9995
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -107,8 +103,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
107103
oldAppIncomplete.mkdir()
108104
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
109105
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
110-
SparkListenerApplicationStart(
111-
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
106+
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
112107
)
113108

114109
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -129,16 +124,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
129124
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
130125
}
131126

132-
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
127+
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
133128
newAppComplete.lastModified(), "test", true))
134-
list(1) should be (makeAppInfo("new-app-compressed-complete",
129+
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
135130
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
136131
true))
137-
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
132+
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
138133
oldAppComplete.lastModified(), "test", true))
139-
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
134+
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
140135
oldAppIncomplete.lastModified(), "test", false))
141-
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
136+
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
142137
newAppIncomplete.lastModified(), "test", false))
143138

144139
// Make sure the UI can be rendered.
@@ -162,7 +157,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
162157
logDir.mkdir()
163158
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
164159
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
165-
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
160+
SparkListenerApplicationStart("app2", None, 2L, "test", None),
166161
SparkListenerApplicationEnd(3L)
167162
)
168163
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -185,12 +180,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
185180
test("SPARK-3697: ignore directories that cannot be read.") {
186181
val logFile1 = newLogFile("new1", None, inProgress = false)
187182
writeFile(logFile1, true, None,
188-
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
183+
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
189184
SparkListenerApplicationEnd(2L)
190185
)
191186
val logFile2 = newLogFile("new2", None, inProgress = false)
192187
writeFile(logFile2, true, None,
193-
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
188+
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
194189
SparkListenerApplicationEnd(2L)
195190
)
196191
logFile2.setReadable(false, false)
@@ -223,18 +218,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
223218
}
224219
}
225220

226-
test("Parse logs that application is not started") {
227-
val provider = new FsHistoryProvider((createTestConf()))
228-
229-
val logFile1 = newLogFile("app1", None, inProgress = true)
230-
writeFile(logFile1, true, None,
231-
SparkListenerLogStart("1.4")
232-
)
233-
updateAndCheck(provider) { list =>
234-
list.size should be (0)
235-
}
236-
}
237-
238221
test("SPARK-5582: empty log directory") {
239222
val provider = new FsHistoryProvider(createTestConf())
240223

0 commit comments

Comments
 (0)