Skip to content

Commit 97f0461

Browse files
committed
[SPARK-52914][CORE] Support On-Demand Log Loading for rolling logs in History Server
### What changes were proposed in this pull request? This PR aims to support `On-Demand Log Loading` in `History Server` by looking up the **rolling event log locations** even Spark listing didn't finish to load the event log files. ```scala val EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED = ConfigBuilder("spark.history.fs.eventLog.rolling.onDemandLoadEnabled") .doc("Whether to look up rolling event log locations on demand manner before listing files.") .version("4.1.0") .booleanConf .createWithDefault(true) ``` Previously, Spark History Server will show `Application ... Not Found` page if a job is requested before scanning it even if the file exists in the correct location. So, this PR doesn't introduce any regressions because this aims to introduce a kind of fallback logic to improve error handling . <img width="686" height="359" alt="Screenshot 2025-07-22 at 14 08 21" src="https://github.com/user-attachments/assets/fccb413c-5a57-4918-86c0-28ae81d54873" /> ### Why are the changes needed? Since Apache Spark 3.0, we have been using event log rolling not only for **long-running jobs**, but also for **some failed jobs** to archive the partial event logs incrementally. - #25670 Since Apache Spark 4.0, event log rolling is enabled by default. - #43638 On top of that, this PR aims to reduce storage cost at Apache Spark 4.1. By supporting `On-Demand Loading for rolled event logs`, we can use larger values for `spark.history.fs.update.interval` instead of the default `10s`. Although Spark History logs are consumed in various ways, It has a big benefit because most of successful Spark jobs's logs are not visited by the users. ### Does this PR introduce _any_ user-facing change? No. This is a new feature. ### How was this patch tested? Pass the CIs with newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51604 from dongjoon-hyun/SPARK-52914. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a81d792 commit 97f0461

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
321321
override def getLastUpdatedTime(): Long = lastScanTime.get()
322322

323323
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
324+
val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX +
325+
EventLogFileWriter.nameForAppAndAttempt(appId, attemptId)
324326
val app = try {
325327
load(appId)
326328
} catch {
329+
case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
330+
loadFromFallbackLocation(appId, attemptId, logPath)
327331
case _: NoSuchElementException =>
328332
return None
329333
}
@@ -345,6 +349,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
345349
createInMemoryStore(attempt)
346350
}
347351
} catch {
352+
case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
353+
if (app.attempts.head.info.appSparkVersion == "unknown") {
354+
listing.synchronized {
355+
listing.delete(classOf[ApplicationInfoWrapper], appId)
356+
}
357+
}
358+
return None
348359
case _: FileNotFoundException =>
349360
return None
350361
}
@@ -364,6 +375,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
364375
Some(loadedUI)
365376
}
366377

378+
private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String)
379+
: ApplicationInfoWrapper = {
380+
val date = new Date(0)
381+
val lastUpdate = new Date()
382+
val info = ApplicationAttemptInfo(
383+
attemptId, date, date, lastUpdate, 0, "spark", false, "unknown")
384+
addListing(new ApplicationInfoWrapper(
385+
ApplicationInfo(appId, appId, None, None, None, None, List.empty),
386+
List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None))))
387+
load(appId)
388+
}
389+
367390
override def getEmptyListingHtml(): Seq[Node] = {
368391
<p>
369392
Did you specify the correct logging directory? Please verify your setting of

core/src/main/scala/org/apache/spark/internal/config/History.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,13 @@ private[spark] object History {
159159
.doubleConf
160160
.createWithDefault(0.7d)
161161

162+
val EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED =
163+
ConfigBuilder("spark.history.fs.eventLog.rolling.onDemandLoadEnabled")
164+
.doc("Whether to look up rolling event log locations on demand manner before listing files.")
165+
.version("4.1.0")
166+
.booleanConf
167+
.createWithDefault(true)
168+
162169
val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled")
163170
.version("3.0.0")
164171
.doc("Specifies whether the History Server should periodically clean up driver logs from " +

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,40 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
16401640
}
16411641
}
16421642

1643+
test("SPARK-52914: Support spark.history.fs.eventLog.rolling.onDemandLoadEnabled") {
1644+
Seq(true, false).foreach { onDemandEnabled =>
1645+
withTempDir { dir =>
1646+
val conf = createTestConf(true)
1647+
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
1648+
conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, onDemandEnabled)
1649+
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
1650+
val provider = new FsHistoryProvider(conf)
1651+
1652+
val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
1653+
writer1.start()
1654+
writeEventsToRollingWriter(writer1, Seq(
1655+
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
1656+
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
1657+
writer1.stop()
1658+
1659+
assert(dir.listFiles().length === 1)
1660+
assert(provider.getListing().length === 0)
1661+
assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled)
1662+
assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0))
1663+
1664+
// The dummy entry should be protected from cleanLogs()
1665+
provider.cleanLogs()
1666+
assert(dir.listFiles().length === 1)
1667+
1668+
assert(dir.listFiles().length === 1)
1669+
assert(provider.getAppUI("nonexist", None).isEmpty)
1670+
assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0))
1671+
1672+
provider.stop()
1673+
}
1674+
}
1675+
}
1676+
16431677
test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") {
16441678
withTempDir { dir =>
16451679
val conf = createTestConf(true)

0 commit comments

Comments
 (0)