diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index af6d959011c7b..4aa6c7e40c548 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -321,9 +321,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + + EventLogFileWriter.nameForAppAndAttempt(appId, attemptId) val app = try { load(appId) } catch { + case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => + loadFromFallbackLocation(appId, attemptId, logPath) case _: NoSuchElementException => return None } @@ -345,6 +349,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) createInMemoryStore(attempt) } } catch { + case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => + if (app.attempts.head.info.appSparkVersion == "unknown") { + listing.synchronized { + listing.delete(classOf[ApplicationInfoWrapper], appId) + } + } + return None case _: FileNotFoundException => return None } @@ -364,6 +375,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Some(loadedUI) } + private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) + : ApplicationInfoWrapper = { + val date = new Date(0) + val lastUpdate = new Date() + val info = ApplicationAttemptInfo( + attemptId, date, date, lastUpdate, 0, "spark", false, "unknown") + addListing(new ApplicationInfoWrapper( + ApplicationInfo(appId, appId, None, None, None, None, List.empty), + List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None)))) + load(appId) + } + override def getEmptyListingHtml(): Seq[Node] = {

Did you specify the correct logging directory? Please verify your setting of diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index f30628130862f..8eaa37cceee97 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -159,6 +159,13 @@ private[spark] object History { .doubleConf .createWithDefault(0.7d) + val EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED = + ConfigBuilder("spark.history.fs.eventLog.rolling.onDemandLoadEnabled") + .doc("Whether to look up rolling event log locations on demand manner before listing files.") + .version("4.1.0") + .booleanConf + .createWithDefault(true) + val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled") .version("3.0.0") .doc("Specifies whether the History Server should periodically clean up driver logs from " + diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d58c996f23655..69d531c2fdacc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1640,6 +1640,40 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("SPARK-52914: Support spark.history.fs.eventLog.rolling.onDemandLoadEnabled") { + Seq(true, false).foreach { onDemandEnabled => + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, onDemandEnabled) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer1.start() + writeEventsToRollingWriter(writer1, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer1.stop() + + assert(dir.listFiles().length === 1) + assert(provider.getListing().length === 0) + assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled) + assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) + + // The dummy entry should be protected from cleanLogs() + provider.cleanLogs() + assert(dir.listFiles().length === 1) + + assert(dir.listFiles().length === 1) + assert(provider.getAppUI("nonexist", None).isEmpty) + assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) + + provider.stop() + } + } + } + test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") { withTempDir { dir => val conf = createTestConf(true)