Skip to content

Commit 7b91b74

Browse files
author
Marcelo Vanzin
committed
Handle logs generated by 1.0 and 1.1.
These logs don't have app IDs, so they should not be filtered.
1 parent 1eca3fe commit 7b91b74

File tree

2 files changed

+115
-53
lines changed

2 files changed

+115
-53
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
8383
// List of application logs to be deleted by event log cleaner.
8484
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
8585

86-
// Constants used to parse Spark 1.0.0 log directories.
87-
private[history] val LOG_PREFIX = "EVENT_LOG_"
88-
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
89-
private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
90-
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
91-
9286
/**
9387
* Return a runnable that performs the given operation on the event logs.
9488
* This operation is expected to be executed periodically.
@@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
155149
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
156150
// Do not call ui.bind() to avoid creating a new server for each application
157151
}
158-
159152
val appListener = new ApplicationEventListener()
160153
replayBus.addListener(appListener)
161154
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
162-
163-
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
164-
165-
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
166-
ui.getSecurityManager.setAcls(uiAclsEnabled)
167-
// make sure to set admin acls before view acls so they are properly picked up
168-
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
169-
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
170-
appListener.viewAcls.getOrElse(""))
171-
ui
155+
appInfo.map { info =>
156+
ui.setAppName(s"${info.name} ($appId)")
157+
158+
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
159+
ui.getSecurityManager.setAcls(uiAclsEnabled)
160+
// make sure to set admin acls before view acls so they are properly picked up
161+
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
162+
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
163+
appListener.viewAcls.getOrElse(""))
164+
ui
165+
}.orNull
172166
}
173167
}
174168
} catch {
@@ -451,17 +445,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
451445
val appCompleted = isApplicationCompleted(eventLog)
452446
bus.addListener(appListener)
453447
bus.replay(logInput, logPath.toString, !appCompleted)
454-
appListener.appId.map { appId =>
455-
new FsApplicationAttemptInfo(
448+
449+
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
450+
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
451+
// logs generated by those versions go through.
452+
if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
453+
Some(new FsApplicationAttemptInfo(
456454
logPath.getName(),
457455
appListener.appName.getOrElse(NOT_STARTED),
458-
appId,
456+
appListener.appId.getOrElse(logPath.getName()),
459457
appListener.appAttemptId,
460458
appListener.startTime.getOrElse(-1L),
461459
appListener.endTime.getOrElse(-1L),
462460
getModificationTime(eventLog).get,
463461
appListener.sparkUser.getOrElse(NOT_STARTED),
464-
appCompleted)
462+
appCompleted))
463+
} else {
464+
None
465465
}
466466
} finally {
467467
logInput.close()
@@ -537,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
537537
}
538538
}
539539

540+
/**
541+
* Returns whether the version of Spark that generated logs records app IDs. App IDs were added
542+
* in Spark 1.1.
543+
*/
544+
private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
545+
if (isLegacyLogDirectory(entry)) {
546+
fs.listStatus(entry.getPath())
547+
.find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
548+
.map { status =>
549+
val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
550+
version != "1.0" && version != "1.1"
551+
}
552+
.getOrElse(true)
553+
} else {
554+
true
555+
}
556+
}
557+
540558
}
541559

542-
private object FsHistoryProvider {
560+
private[history] object FsHistoryProvider {
543561
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
562+
563+
// Constants used to parse Spark 1.0.0 log directories.
564+
val LOG_PREFIX = "EVENT_LOG_"
565+
val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
566+
val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
567+
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
544568
}
545569

546570
private class FsApplicationAttemptInfo(

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 69 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
3939

4040
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
4141

42+
import FsHistoryProvider._
43+
4244
private var testDir: File = null
4345

4446
before {
@@ -67,48 +69,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
6769
// Write a new-style application log.
6870
val newAppComplete = newLogFile("new1", None, inProgress = false)
6971
writeFile(newAppComplete, true, None,
70-
SparkListenerApplicationStart(
71-
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
72+
SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
73+
None),
7274
SparkListenerApplicationEnd(5L)
7375
)
7476

7577
// Write a new-style application log.
7678
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
7779
Some("lzf"))
7880
writeFile(newAppCompressedComplete, true, None,
79-
SparkListenerApplicationStart(
80-
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
81+
SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
82+
1L, "test", None),
8183
SparkListenerApplicationEnd(4L))
8284

8385
// Write an unfinished app, new-style.
8486
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
8587
writeFile(newAppIncomplete, true, None,
86-
SparkListenerApplicationStart(
87-
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
88+
SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
89+
None)
8890
)
8991

9092
// Write an old-style application log.
91-
val oldAppComplete = new File(testDir, "old1")
92-
oldAppComplete.mkdir()
93-
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
94-
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
95-
SparkListenerApplicationStart(
96-
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
93+
val oldAppComplete = writeOldLog("old1", "1.0", None, true,
94+
SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
9795
SparkListenerApplicationEnd(3L)
9896
)
99-
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
10097

10198
// Check for logs so that we force the older unfinished app to be loaded, to make
10299
// sure unfinished apps are also sorted correctly.
103100
provider.checkForLogs()
104101

105102
// Write an unfinished app, old-style.
106-
val oldAppIncomplete = new File(testDir, "old2")
107-
oldAppIncomplete.mkdir()
108-
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
109-
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
110-
SparkListenerApplicationStart(
111-
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
103+
val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
104+
SparkListenerApplicationStart("old2", None, 2L, "test", None)
112105
)
113106

114107
// Force a reload of data from the log directory, and check that both logs are loaded.
@@ -129,16 +122,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
129122
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
130123
}
131124

132-
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
125+
list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
133126
newAppComplete.lastModified(), "test", true))
134-
list(1) should be (makeAppInfo("new-app-compressed-complete",
135-
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
136-
true))
137-
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
127+
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
128+
1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
129+
list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
138130
oldAppComplete.lastModified(), "test", true))
139-
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
140-
oldAppIncomplete.lastModified(), "test", false))
141-
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
131+
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
132+
-1L, oldAppIncomplete.lastModified(), "test", false))
133+
list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
142134
newAppIncomplete.lastModified(), "test", false))
143135

144136
// Make sure the UI can be rendered.
@@ -160,12 +152,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
160152
val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
161153
val logDir = new File(testDir, codecName)
162154
logDir.mkdir()
163-
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
164-
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
165-
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
155+
createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
156+
writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
157+
SparkListenerApplicationStart("app2", None, 2L, "test", None),
166158
SparkListenerApplicationEnd(3L)
167159
)
168-
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
160+
createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))
169161

170162
val logPath = new Path(logDir.getAbsolutePath())
171163
try {
@@ -390,6 +382,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
390382
}
391383
}
392384

385+
test("SPARK-8372: new logs with no app ID are ignored") {
386+
val provider = new FsHistoryProvider(createTestConf())
387+
388+
// Write a new log file without an app id, to make sure it's ignored.
389+
val logFile1 = newLogFile("app1", None, inProgress = true)
390+
writeFile(logFile1, true, None,
391+
SparkListenerLogStart("1.4")
392+
)
393+
394+
// Write a 1.2 log file with no start event (= no app id), it should be ignored.
395+
writeOldLog("v12Log", "1.2", None, false)
396+
397+
// Write 1.0 and 1.1 logs, which don't have app ids.
398+
writeOldLog("v11Log", "1.1", None, true,
399+
SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
400+
SparkListenerApplicationEnd(3L))
401+
writeOldLog("v10Log", "1.0", None, true,
402+
SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
403+
SparkListenerApplicationEnd(4L))
404+
405+
updateAndCheck(provider) { list =>
406+
list.size should be (2)
407+
list(0).id should be ("v10Log")
408+
list(1).id should be ("v11Log")
409+
}
410+
}
411+
393412
/**
394413
* Asks the provider to check for logs and calls a function to perform checks on the updated
395414
* app list. Example:
@@ -429,4 +448,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
429448
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
430449
}
431450

451+
private def writeOldLog(
452+
fname: String,
453+
sparkVersion: String,
454+
codec: Option[CompressionCodec],
455+
completed: Boolean,
456+
events: SparkListenerEvent*): File = {
457+
val log = new File(testDir, fname)
458+
log.mkdir()
459+
460+
val oldEventLog = new File(log, LOG_PREFIX + "1")
461+
createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
462+
writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
463+
if (completed) {
464+
createEmptyFile(new File(log, APPLICATION_COMPLETE))
465+
}
466+
467+
log
468+
}
469+
432470
}

0 commit comments

Comments
 (0)