From 600eb8ab8b13b269169bc616165471e0da369256 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 20 Jul 2025 21:55:42 -0700 Subject: [PATCH 1/5] [SPARK-52914][CORE] Support `On-Demand Log Loading` in `History Server` --- .../deploy/history/FsHistoryProvider.scala | 14 +++++++++++ .../spark/internal/config/History.scala | 6 +++++ .../history/FsHistoryProviderSuite.scala | 25 +++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index af6d959011c7b..96e9bf3b76dc4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -324,6 +324,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val app = try { load(appId) } catch { + case _: NoSuchElementException if this.conf.get(ON_DEMAND_ENABLED) => + val name = Utils.nameForAppAndAttempt(appId, attemptId) + loadFromFallbackLocation(appId, attemptId, + RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + name) case _: NoSuchElementException => return None } @@ -364,6 +368,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Some(loadedUI) } + private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) + : ApplicationInfoWrapper = { + val date = new Date(0) + val info = ApplicationAttemptInfo(attemptId, date, date, date, 0, "spark", false, "unknown") + addListing(new ApplicationInfoWrapper( + ApplicationInfo(appId, appId, None, None, None, None, List.empty), + List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None)))) + load(appId) + } + override def getEmptyListingHtml(): Seq[Node] = {

Did you specify the correct logging directory? Please verify your setting of diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index f30628130862f..b76190b26a567 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -54,6 +54,12 @@ private[spark] object History { .checkValue(v => v > 0, "The update batchSize should be a positive integer.") .createWithDefault(Int.MaxValue) + val ON_DEMAND_ENABLED = ConfigBuilder("spark.history.fs.update.onDemandEnabled") + .version("4.1.0") + .doc("Whether to look up rolling event log locations on demand manner before listing files.") + .booleanConf + .createWithDefault(true) + val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled") .version("1.4.0") .doc("Whether the History Server should periodically clean up event logs from storage") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d58c996f23655..36a61326977b7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1640,6 +1640,31 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("SPARK-52914: Support spark.history.fs.update.onDemandEnabled") { + Seq(true, false).foreach { onDemandEnabled => + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(ON_DEMAND_ENABLED, onDemandEnabled) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer1.start() + writeEventsToRollingWriter(writer1, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer1.stop() + + assert(provider.getListing().length === 0) + assert(dir.listFiles().length === 1) + assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled) + + provider.stop() + } + } + } + test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") { withTempDir { dir => val conf = createTestConf(true) From ef9b833c27b088fdb8a63bf15859877bd69a1610 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 22 Jul 2025 13:39:26 -0700 Subject: [PATCH 2/5] Address comments --- .../spark/deploy/history/FsHistoryProvider.scala | 15 +++++++++++---- .../apache/spark/internal/config/History.scala | 13 +++++++------ .../deploy/history/FsHistoryProviderSuite.scala | 9 +++++++-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 96e9bf3b76dc4..1d027abdb9eb6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -321,13 +321,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + + Utils.nameForAppAndAttempt(appId, attemptId) val app = try { load(appId) } catch { - case _: NoSuchElementException if this.conf.get(ON_DEMAND_ENABLED) => - val name = Utils.nameForAppAndAttempt(appId, attemptId) - loadFromFallbackLocation(appId, attemptId, - RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + name) + case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => + loadFromFallbackLocation(appId, attemptId, logPath) case _: NoSuchElementException => return None } @@ -349,6 +349,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) createInMemoryStore(attempt) } } catch { + case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => + if (app.attempts.head.info.appSparkVersion == "unknown") { + listing.synchronized { + listing.delete(classOf[ApplicationInfoWrapper], appId) + } + } + return None case _: FileNotFoundException => return None } diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index b76190b26a567..8eaa37cceee97 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -54,12 +54,6 @@ private[spark] object History { .checkValue(v => v > 0, "The update batchSize should be a positive integer.") .createWithDefault(Int.MaxValue) - val ON_DEMAND_ENABLED = ConfigBuilder("spark.history.fs.update.onDemandEnabled") - .version("4.1.0") - .doc("Whether to look up rolling event log locations on demand manner before listing files.") - .booleanConf - .createWithDefault(true) - val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled") .version("1.4.0") .doc("Whether the History Server should periodically clean up event logs from storage") @@ -165,6 +159,13 @@ private[spark] object History { .doubleConf .createWithDefault(0.7d) + val EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED = + ConfigBuilder("spark.history.fs.eventLog.rolling.onDemandLoadEnabled") + .doc("Whether to look up rolling event log locations on demand manner before listing files.") + .version("4.1.0") + .booleanConf + .createWithDefault(true) + val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled") .version("3.0.0") .doc("Specifies whether the History Server should periodically clean up driver logs from " + diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 36a61326977b7..4c5205c7c1acc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1645,7 +1645,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P withTempDir { dir => val conf = createTestConf(true) conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) - conf.set(ON_DEMAND_ENABLED, onDemandEnabled) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, onDemandEnabled) val hadoopConf = SparkHadoopUtil.newConfiguration(conf) val provider = new FsHistoryProvider(conf) @@ -1656,9 +1656,14 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) writer1.stop() - assert(provider.getListing().length === 0) assert(dir.listFiles().length === 1) + assert(provider.getListing().length === 0) assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled) + assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) + + assert(dir.listFiles().length === 1) + assert(provider.getAppUI("nonexist", None).isEmpty) + assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) provider.stop() } From db5773bbb3987d123be3e6c0ff55e36186ec8dea Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 22 Jul 2025 13:43:14 -0700 Subject: [PATCH 3/5] Revise test case name --- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 4c5205c7c1acc..87d461e4feda6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1640,7 +1640,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } - test("SPARK-52914: Support spark.history.fs.update.onDemandEnabled") { + test("SPARK-52914: Support spark.history.fs.eventLog.rolling.onDemandLoadEnabled") { Seq(true, false).foreach { onDemandEnabled => withTempDir { dir => val conf = createTestConf(true) From 834ba1613ec02cbff0d606c2a1e5ece4b8553955 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 22 Jul 2025 17:17:17 -0700 Subject: [PATCH 4/5] Address comments --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 +++- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 1d027abdb9eb6..bea229f277b52 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -378,7 +378,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) : ApplicationInfoWrapper = { val date = new Date(0) - val info = ApplicationAttemptInfo(attemptId, date, date, date, 0, "spark", false, "unknown") + val lastUpdate = new Date() + val info = ApplicationAttemptInfo( + attemptId, date, date, lastUpdate, 0, "spark", false, "unknown") addListing(new ApplicationInfoWrapper( ApplicationInfo(appId, appId, None, None, None, None, List.empty), List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None)))) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 87d461e4feda6..69d531c2fdacc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1661,6 +1661,10 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled) assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) + // The dummy entry should be protected from cleanLogs() + provider.cleanLogs() + assert(dir.listFiles().length === 1) + assert(dir.listFiles().length === 1) assert(provider.getAppUI("nonexist", None).isEmpty) assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) From 4ee04af0a6c281901c6393f2f9f1f579a911162b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 23 Jul 2025 09:01:29 -0700 Subject: [PATCH 5/5] Update core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala Co-authored-by: Liang-Chi Hsieh --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bea229f277b52..4aa6c7e40c548 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -322,7 +322,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + - Utils.nameForAppAndAttempt(appId, attemptId) + EventLogFileWriter.nameForAppAndAttempt(appId, attemptId) val app = try { load(appId) } catch {