From 1eca3fe8d2b6c56c22c01ec8bde1d86563258e58 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Wed, 17 Jun 2015 13:41:36 -0700 Subject: [PATCH 1/3] [SPARK-8372] History server shows incorrect information for application not started The history server may show an incorrect App ID for an incomplete application like .inprogress. This app info will never disappear even after the app is completed. ![incorrectappinfo](https://cloud.githubusercontent.com/assets/9278199/8156147/2a10fdbe-137d-11e5-9620-c5b61d93e3c1.png) The cause of the issue is that a log path name is used as the app id when app id cannot be got during replay. Author: Carson Wang Closes #6827 from carsonwang/SPARK-8372 and squashes the following commits: cdbb089 [Carson Wang] Fix code style 3e46b35 [Carson Wang] Update code style 90f5dde [Carson Wang] Add a unit test d8c9cd0 [Carson Wang] Replaying events only return information when app is started --- .../deploy/history/FsHistoryProvider.scala | 38 +++++++++------- .../history/FsHistoryProviderSuite.scala | 43 +++++++++++++------ 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5427a88f32ffd..db383b9823d3c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) replayBus.addListener(appListener) val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - ui.setAppName(s"${appInfo.name} ($appId)") + appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") } val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) ui.getSecurityManager.setAcls(uiAclsEnabled) @@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newAttempts = logs.flatMap { fileStatus => try { val res = replay(fileStatus, bus) - logInfo(s"Application log ${res.logPath} loaded successfully.") - Some(res) + res match { + case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") + case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " + + "The application may have not started.") + } + res } catch { case e: Exception => logError( @@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Replays the events in the specified log file and returns information about the associated - * application. + * application. Return `None` if the application ID cannot be located. */ - private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { + private def replay( + eventLog: FileStatus, + bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") val logInput = @@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) - new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - getModificationTime(eventLog).get, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted) + appListener.appId.map { appId => + new FsApplicationAttemptInfo( + logPath.getName(), + appListener.appName.getOrElse(NOT_STARTED), + appId, + appListener.appAttemptId, + appListener.startTime.getOrElse(-1L), + appListener.endTime.getOrElse(-1L), + getModificationTime(eventLog).get, + appListener.sparkUser.getOrElse(NOT_STARTED), + appCompleted) + } } finally { logInput.close() } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 09075eeb539aa..d3a6db5f260d6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -67,7 +67,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) writeFile(newAppComplete, true, None, - SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None), + SparkListenerApplicationStart( + "new-app-complete", Some("new-app-complete"), 1L, "test", None), SparkListenerApplicationEnd(5L) ) @@ -75,13 +76,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, Some("lzf")) writeFile(newAppCompressedComplete, true, None, - SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None), + SparkListenerApplicationStart( + "new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None), SparkListenerApplicationEnd(4L)) // Write an unfinished app, new-style. val newAppIncomplete = newLogFile("new2", None, inProgress = true) writeFile(newAppIncomplete, true, None, - SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None) + SparkListenerApplicationStart( + "new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None) ) // Write an old-style application log. @@ -89,7 +92,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc oldAppComplete.mkdir() createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0")) writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None, - SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None), + SparkListenerApplicationStart( + "old-app-complete", Some("old-app-complete"), 2L, "test", None), SparkListenerApplicationEnd(3L) ) createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE)) @@ -103,7 +107,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc oldAppIncomplete.mkdir() createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0")) writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None, - SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None) + SparkListenerApplicationStart( + "old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None) ) // Force a reload of data from the log directory, and check that both logs are loaded. @@ -124,16 +129,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) } - list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L, + list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L, newAppComplete.lastModified(), "test", true)) - list(1) should be (makeAppInfo(newAppCompressedComplete.getName(), + list(1) should be (makeAppInfo("new-app-compressed-complete", "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) - list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, + list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L, oldAppComplete.lastModified(), "test", true)) - list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L, + list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L, oldAppIncomplete.lastModified(), "test", false)) - list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L, + list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L, newAppIncomplete.lastModified(), "test", false)) // Make sure the UI can be rendered. @@ -157,7 +162,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc logDir.mkdir() createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0")) writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec), - SparkListenerApplicationStart("app2", None, 2L, "test", None), + SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None), SparkListenerApplicationEnd(3L) ) createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName)) @@ -180,12 +185,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("SPARK-3697: ignore directories that cannot be read.") { val logFile1 = newLogFile("new1", None, inProgress = false) writeFile(logFile1, true, None, - SparkListenerApplicationStart("app1-1", None, 1L, "test", None), + SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) val logFile2 = newLogFile("new2", None, inProgress = false) writeFile(logFile2, true, None, - SparkListenerApplicationStart("app1-2", None, 1L, "test", None), + SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) logFile2.setReadable(false, false) @@ -218,6 +223,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("Parse logs that application is not started") { + val provider = new FsHistoryProvider((createTestConf())) + + val logFile1 = newLogFile("app1", None, inProgress = true) + writeFile(logFile1, true, None, + SparkListenerLogStart("1.4") + ) + updateAndCheck(provider) { list => + list.size should be (0) + } + } + test("SPARK-5582: empty log directory") { val provider = new FsHistoryProvider(createTestConf()) From 7b91b74f366bf44b913112040e6419e06b042a07 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 29 Jun 2015 13:25:08 -0700 Subject: [PATCH 2/3] Handle logs generated by 1.0 and 1.1. These logs don't have app IDs, so they should not be filtered. --- .../deploy/history/FsHistoryProvider.scala | 68 ++++++++---- .../history/FsHistoryProviderSuite.scala | 100 ++++++++++++------ 2 files changed, 115 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index db383b9823d3c..fd2cf5a2a53b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] - // Constants used to parse Spark 1.0.0 log directories. - private[history] val LOG_PREFIX = "EVENT_LOG_" - private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" - private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_" - private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" - /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } - val appListener = new ApplicationEventListener() replayBus.addListener(appListener) val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - - appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") } - - val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - ui.getSecurityManager.setAcls(uiAclsEnabled) - // make sure to set admin acls before view acls so they are properly picked up - ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) - ui.getSecurityManager.setViewAcls(attempt.sparkUser, - appListener.viewAcls.getOrElse("")) - ui + appInfo.map { info => + ui.setAppName(s"${info.name} ($appId)") + + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setAcls(uiAclsEnabled) + // make sure to set admin acls before view acls so they are properly picked up + ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) + ui.getSecurityManager.setViewAcls(attempt.sparkUser, + appListener.viewAcls.getOrElse("")) + ui + }.orNull } } } catch { @@ -451,17 +445,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) - appListener.appId.map { appId => - new FsApplicationAttemptInfo( + + // Without an app ID, new logs will render incorrectly in the listing page, so do not list or + // try to show their UI. Some old versions of Spark generate logs without an app ID, so let + // logs generated by those versions go through. + if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) { + Some(new FsApplicationAttemptInfo( logPath.getName(), appListener.appName.getOrElse(NOT_STARTED), - appId, + appListener.appId.getOrElse(logPath.getName()), appListener.appAttemptId, appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get, appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted) + appCompleted)) + } else { + None } } finally { logInput.close() @@ -537,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Returns whether the version of Spark that generated logs records app IDs. App IDs were added + * in Spark 1.1. + */ + private def sparkVersionHasAppId(entry: FileStatus): Boolean = { + if (isLegacyLogDirectory(entry)) { + fs.listStatus(entry.getPath()) + .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) } + .map { status => + val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length()) + version != "1.0" && version != "1.1" + } + .getOrElse(true) + } else { + true + } + } + } -private object FsHistoryProvider { +private[history] object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + // Constants used to parse Spark 1.0.0 log directories. + val LOG_PREFIX = "EVENT_LOG_" + val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" + val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_" + val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" } private class FsApplicationAttemptInfo( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d3a6db5f260d6..2a62450bcdbad 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -39,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { + import FsHistoryProvider._ + private var testDir: File = null before { @@ -67,8 +69,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) writeFile(newAppComplete, true, None, - SparkListenerApplicationStart( - "new-app-complete", Some("new-app-complete"), 1L, "test", None), + SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test", + None), SparkListenerApplicationEnd(5L) ) @@ -76,39 +78,30 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, Some("lzf")) writeFile(newAppCompressedComplete, true, None, - SparkListenerApplicationStart( - "new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None), + SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"), + 1L, "test", None), SparkListenerApplicationEnd(4L)) // Write an unfinished app, new-style. val newAppIncomplete = newLogFile("new2", None, inProgress = true) writeFile(newAppIncomplete, true, None, - SparkListenerApplicationStart( - "new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None) + SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test", + None) ) // Write an old-style application log. - val oldAppComplete = new File(testDir, "old1") - oldAppComplete.mkdir() - createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0")) - writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None, - SparkListenerApplicationStart( - "old-app-complete", Some("old-app-complete"), 2L, "test", None), + val oldAppComplete = writeOldLog("old1", "1.0", None, true, + SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None), SparkListenerApplicationEnd(3L) ) - createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE)) // Check for logs so that we force the older unfinished app to be loaded, to make // sure unfinished apps are also sorted correctly. provider.checkForLogs() // Write an unfinished app, old-style. - val oldAppIncomplete = new File(testDir, "old2") - oldAppIncomplete.mkdir() - createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0")) - writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None, - SparkListenerApplicationStart( - "old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None) + val oldAppIncomplete = writeOldLog("old2", "1.0", None, false, + SparkListenerApplicationStart("old2", None, 2L, "test", None) ) // Force a reload of data from the log directory, and check that both logs are loaded. @@ -129,16 +122,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) } - list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L, + list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L, newAppComplete.lastModified(), "test", true)) - list(1) should be (makeAppInfo("new-app-compressed-complete", - "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", - true)) - list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L, + list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(), + 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) + list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L, oldAppComplete.lastModified(), "test", true)) - list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L, - oldAppIncomplete.lastModified(), "test", false)) - list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L, + list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L, + -1L, oldAppIncomplete.lastModified(), "test", false)) + list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L, newAppIncomplete.lastModified(), "test", false)) // Make sure the UI can be rendered. @@ -160,12 +152,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null val logDir = new File(testDir, codecName) logDir.mkdir() - createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0")) - writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec), - SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None), + createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec), + SparkListenerApplicationStart("app2", None, 2L, "test", None), SparkListenerApplicationEnd(3L) ) - createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName)) + createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName)) val logPath = new Path(logDir.getAbsolutePath()) try { @@ -390,6 +382,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("SPARK-8372: new logs with no app ID are ignored") { + val provider = new FsHistoryProvider(createTestConf()) + + // Write a new log file without an app id, to make sure it's ignored. + val logFile1 = newLogFile("app1", None, inProgress = true) + writeFile(logFile1, true, None, + SparkListenerLogStart("1.4") + ) + + // Write a 1.2 log file with no start event (= no app id), it should be ignored. + writeOldLog("v12Log", "1.2", None, false) + + // Write 1.0 and 1.1 logs, which don't have app ids. + writeOldLog("v11Log", "1.1", None, true, + SparkListenerApplicationStart("v11Log", None, 2L, "test", None), + SparkListenerApplicationEnd(3L)) + writeOldLog("v10Log", "1.0", None, true, + SparkListenerApplicationStart("v10Log", None, 2L, "test", None), + SparkListenerApplicationEnd(4L)) + + updateAndCheck(provider) { list => + list.size should be (2) + list(0).id should be ("v10Log") + list(1).id should be ("v11Log") + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: @@ -429,4 +448,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) } + private def writeOldLog( + fname: String, + sparkVersion: String, + codec: Option[CompressionCodec], + completed: Boolean, + events: SparkListenerEvent*): File = { + val log = new File(testDir, fname) + log.mkdir() + + val oldEventLog = new File(log, LOG_PREFIX + "1") + createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion)) + writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*) + if (completed) { + createEmptyFile(new File(log, APPLICATION_COMPLETE)) + } + + log + } + } From a24eab2fa3f30f640e783493cc694d74fdef3fab Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 30 Jun 2015 09:48:51 -0700 Subject: [PATCH 3/3] Feedback. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index fd2cf5a2a53b5..2cc465e55fceb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -140,7 +140,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { try { applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId).map { attempt => + appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() @@ -162,7 +162,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) ui - }.orNull + } } } } catch {