Skip to content

Commit 1b5439f

Browse files
Marcelo VanzinAndrew Or
authored andcommitted
[SPARK-8372] Do not show applications that haven't recorded their app ID yet.
Showing these applications may lead to weird behavior in the History Server. For old logs, if the app ID is recorded later, you may end up with a duplicate entry. For new logs, the app might be listed with a ".inprogress" suffix. So ignore those, but still allow old applications that don't record app IDs at all (1.0 and 1.1) to be shown. Author: Marcelo Vanzin <[email protected]> Author: Carson Wang <[email protected]> Closes #7097 from vanzin/SPARK-8372 and squashes the following commits: a24eab2 [Marcelo Vanzin] Feedback. 112ae8f [Marcelo Vanzin] Merge branch 'master' into SPARK-8372 7b91b74 [Marcelo Vanzin] Handle logs generated by 1.0 and 1.1. 1eca3fe [Carson Wang] [SPARK-8372] History server shows incorrect information for application not started Conflicts: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
1 parent bc355e2 commit 1b5439f

File tree

2 files changed

+147
-60
lines changed

2 files changed

+147
-60
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
8080
// List of application logs to be deleted by event log cleaner.
8181
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
8282

83-
// Constants used to parse Spark 1.0.0 log directories.
84-
private[history] val LOG_PREFIX = "EVENT_LOG_"
85-
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
86-
private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
87-
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
88-
8983
/**
9084
* Return a runnable that performs the given operation on the event logs.
9185
* This operation is expected to be executed periodically.
@@ -143,7 +137,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
143137
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
144138
try {
145139
applications.get(appId).flatMap { appInfo =>
146-
appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
140+
appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
147141
val replayBus = new ReplayListenerBus()
148142
val ui = {
149143
val conf = this.conf.clone()
@@ -152,20 +146,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
152146
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
153147
// Do not call ui.bind() to avoid creating a new server for each application
154148
}
155-
156149
val appListener = new ApplicationEventListener()
157150
replayBus.addListener(appListener)
158151
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
159-
160-
ui.setAppName(s"${appInfo.name} ($appId)")
161-
162-
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
163-
ui.getSecurityManager.setAcls(uiAclsEnabled)
164-
// make sure to set admin acls before view acls so they are properly picked up
165-
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
166-
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
167-
appListener.viewAcls.getOrElse(""))
168-
ui
152+
appInfo.map { info =>
153+
ui.setAppName(s"${info.name} ($appId)")
154+
155+
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
156+
ui.getSecurityManager.setAcls(uiAclsEnabled)
157+
// make sure to set admin acls before view acls so they are properly picked up
158+
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
159+
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
160+
appListener.viewAcls.getOrElse(""))
161+
ui
162+
}
169163
}
170164
}
171165
} catch {
@@ -227,8 +221,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
227221
val newAttempts = logs.flatMap { fileStatus =>
228222
try {
229223
val res = replay(fileStatus, bus)
230-
logInfo(s"Application log ${res.logPath} loaded successfully.")
231-
Some(res)
224+
res match {
225+
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
226+
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
227+
"The application may have not started.")
228+
}
229+
res
232230
} catch {
233231
case e: Exception =>
234232
logError(
@@ -374,9 +372,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
374372

375373
/**
376374
* Replays the events in the specified log file and returns information about the associated
377-
* application.
375+
* application. Return `None` if the application ID cannot be located.
378376
*/
379-
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
377+
private def replay(
378+
eventLog: FileStatus,
379+
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
380380
val logPath = eventLog.getPath()
381381
logInfo(s"Replaying log path: $logPath")
382382
val logInput =
@@ -390,16 +390,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
390390
val appCompleted = isApplicationCompleted(eventLog)
391391
bus.addListener(appListener)
392392
bus.replay(logInput, logPath.toString, !appCompleted)
393-
new FsApplicationAttemptInfo(
394-
logPath.getName(),
395-
appListener.appName.getOrElse(NOT_STARTED),
396-
appListener.appId.getOrElse(logPath.getName()),
397-
appListener.appAttemptId,
398-
appListener.startTime.getOrElse(-1L),
399-
appListener.endTime.getOrElse(-1L),
400-
getModificationTime(eventLog).get,
401-
appListener.sparkUser.getOrElse(NOT_STARTED),
402-
appCompleted)
393+
394+
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
395+
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
396+
// logs generated by those versions go through.
397+
if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
398+
Some(new FsApplicationAttemptInfo(
399+
logPath.getName(),
400+
appListener.appName.getOrElse(NOT_STARTED),
401+
appListener.appId.getOrElse(logPath.getName()),
402+
appListener.appAttemptId,
403+
appListener.startTime.getOrElse(-1L),
404+
appListener.endTime.getOrElse(-1L),
405+
getModificationTime(eventLog).get,
406+
appListener.sparkUser.getOrElse(NOT_STARTED),
407+
appCompleted))
408+
} else {
409+
None
410+
}
403411
} finally {
404412
logInput.close()
405413
}
@@ -474,10 +482,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
474482
}
475483
}
476484

485+
/**
486+
* Returns whether the version of Spark that generated logs records app IDs. App IDs were added
487+
* in Spark 1.1.
488+
*/
489+
private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
490+
if (isLegacyLogDirectory(entry)) {
491+
fs.listStatus(entry.getPath())
492+
.find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
493+
.map { status =>
494+
val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
495+
version != "1.0" && version != "1.1"
496+
}
497+
.getOrElse(true)
498+
} else {
499+
true
500+
}
501+
}
502+
477503
}
478504

479-
private object FsHistoryProvider {
505+
private[history] object FsHistoryProvider {
480506
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
507+
508+
// Constants used to parse Spark 1.0.0 log directories.
509+
val LOG_PREFIX = "EVENT_LOG_"
510+
val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
511+
val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
512+
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
481513
}
482514

483515
private class FsApplicationAttemptInfo(

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
3535

3636
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
3737

38+
import FsHistoryProvider._
39+
3840
private var testDir: File = null
3941

4042
before {
@@ -63,43 +65,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
6365
// Write a new-style application log.
6466
val newAppComplete = newLogFile("new1", None, inProgress = false)
6567
writeFile(newAppComplete, true, None,
66-
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
68+
SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
69+
None),
6770
SparkListenerApplicationEnd(5L)
6871
)
6972

7073
// Write a new-style application log.
7174
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
7275
Some("lzf"))
7376
writeFile(newAppCompressedComplete, true, None,
74-
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
77+
SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
78+
1L, "test", None),
7579
SparkListenerApplicationEnd(4L))
7680

7781
// Write an unfinished app, new-style.
7882
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
7983
writeFile(newAppIncomplete, true, None,
80-
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
84+
SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
85+
None)
8186
)
8287

8388
// Write an old-style application log.
84-
val oldAppComplete = new File(testDir, "old1")
85-
oldAppComplete.mkdir()
86-
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
87-
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
88-
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
89+
val oldAppComplete = writeOldLog("old1", "1.0", None, true,
90+
SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
8991
SparkListenerApplicationEnd(3L)
9092
)
91-
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
9293

9394
// Check for logs so that we force the older unfinished app to be loaded, to make
9495
// sure unfinished apps are also sorted correctly.
9596
provider.checkForLogs()
9697

9798
// Write an unfinished app, old-style.
98-
val oldAppIncomplete = new File(testDir, "old2")
99-
oldAppIncomplete.mkdir()
100-
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
101-
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
102-
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
99+
val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
100+
SparkListenerApplicationStart("old2", None, 2L, "test", None)
103101
)
104102

105103
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -120,16 +118,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
120118
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
121119
}
122120

123-
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
121+
list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
124122
newAppComplete.lastModified(), "test", true))
125-
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
126-
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
127-
true))
128-
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
123+
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
124+
1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
125+
list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
129126
oldAppComplete.lastModified(), "test", true))
130-
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
131-
oldAppIncomplete.lastModified(), "test", false))
132-
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
127+
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
128+
-1L, oldAppIncomplete.lastModified(), "test", false))
129+
list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
133130
newAppIncomplete.lastModified(), "test", false))
134131

135132
// Make sure the UI can be rendered.
@@ -151,12 +148,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
151148
val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
152149
val logDir = new File(testDir, codecName)
153150
logDir.mkdir()
154-
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
155-
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
151+
createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
152+
writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
156153
SparkListenerApplicationStart("app2", None, 2L, "test", None),
157154
SparkListenerApplicationEnd(3L)
158155
)
159-
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
156+
createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
160157

161158
val logPath = new Path(logDir.getAbsolutePath())
162159
try {
@@ -176,12 +173,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
176173
test("SPARK-3697: ignore directories that cannot be read.") {
177174
val logFile1 = newLogFile("new1", None, inProgress = false)
178175
writeFile(logFile1, true, None,
179-
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
176+
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
180177
SparkListenerApplicationEnd(2L)
181178
)
182179
val logFile2 = newLogFile("new2", None, inProgress = false)
183180
writeFile(logFile2, true, None,
184-
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
181+
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
185182
SparkListenerApplicationEnd(2L)
186183
)
187184
logFile2.setReadable(false, false)
@@ -214,6 +211,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
214211
}
215212
}
216213

214+
test("Parse logs that application is not started") {
215+
val provider = new FsHistoryProvider((createTestConf()))
216+
217+
val logFile1 = newLogFile("app1", None, inProgress = true)
218+
writeFile(logFile1, true, None,
219+
SparkListenerLogStart("1.4")
220+
)
221+
updateAndCheck(provider) { list =>
222+
list.size should be (0)
223+
}
224+
}
225+
217226
test("SPARK-5582: empty log directory") {
218227
val provider = new FsHistoryProvider(createTestConf())
219228

@@ -335,6 +344,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
335344
assert(!log2.exists())
336345
}
337346

347+
test("SPARK-8372: new logs with no app ID are ignored") {
348+
val provider = new FsHistoryProvider(createTestConf())
349+
350+
// Write a new log file without an app id, to make sure it's ignored.
351+
val logFile1 = newLogFile("app1", None, inProgress = true)
352+
writeFile(logFile1, true, None,
353+
SparkListenerLogStart("1.4")
354+
)
355+
356+
// Write a 1.2 log file with no start event (= no app id), it should be ignored.
357+
writeOldLog("v12Log", "1.2", None, false)
358+
359+
// Write 1.0 and 1.1 logs, which don't have app ids.
360+
writeOldLog("v11Log", "1.1", None, true,
361+
SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
362+
SparkListenerApplicationEnd(3L))
363+
writeOldLog("v10Log", "1.0", None, true,
364+
SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
365+
SparkListenerApplicationEnd(4L))
366+
367+
updateAndCheck(provider) { list =>
368+
list.size should be (2)
369+
list(0).id should be ("v10Log")
370+
list(1).id should be ("v11Log")
371+
}
372+
}
373+
338374
/**
339375
* Asks the provider to check for logs and calls a function to perform checks on the updated
340376
* app list. Example:
@@ -374,4 +410,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
374410
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
375411
}
376412

413+
private def writeOldLog(
414+
fname: String,
415+
sparkVersion: String,
416+
codec: Option[CompressionCodec],
417+
completed: Boolean,
418+
events: SparkListenerEvent*): File = {
419+
val log = new File(testDir, fname)
420+
log.mkdir()
421+
422+
val oldEventLog = new File(log, LOG_PREFIX + "1")
423+
createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
424+
writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
425+
if (completed) {
426+
createEmptyFile(new File(log, APPLICATION_COMPLETE))
427+
}
428+
429+
log
430+
}
431+
377432
}

0 commit comments

Comments
 (0)