Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getLastUpdatedTime(): Long = lastScanTime.get()

override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX +
EventLogFileWriter.nameForAppAndAttempt(appId, attemptId)
val app = try {
load(appId)
} catch {
case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we are trying to push for usage of RollingEventLogFilesWriter as the new default but for users who have single event logs and if they try to get the UI for an app, will this functionality not break for them since EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED is true by default ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on what we can break here, @thejdeep ?

will this functionality not break for them since EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED is true by default ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dummy metadata is added and cleaned up at FileNotFound exception immediate in this function as @mridulm requested. It works for both non-existing appId and SingleFile logs.

loadFromFallbackLocation(appId, attemptId, logPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if EVENT_LOG_ENABLE_ROLLING is disabled? Should we only do this if EVENT_LOG_ENABLE_ROLLING is enabled?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, EVENT_LOG_ENABLE_ROLLING is per-application setting. This is SHS, @viirya .

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I proposed the config name spark.history.fs.update.onDemandEnabled at the first commit because this is SHS setting. However, it was revised during the review in order to be clear in the context of rolling.

- spark.history.fs.update.onDemandEnabled
+ spark.history.fs.eventLog.rolling.onDemandLoadEnabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. A bit confusing setup to me. EVENT_LOG_ENABLE_ROLLING determines at the EventLoggingListener in SparkContext application. When FsHistoryProvider creates EventLogFileReader, it doesn't care about this config but decide it is single log or rolling log based on attempt lastIndex.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it sounds confusing. Basically, it's the same for event log compression codec.

Since a Spark job can choose spark.eventLog.compress and spark.eventLog.compression.codec arbitrarily, we need to inference from the file name. It's inevitable because Writers(Spark job) and Reader(SHS) are independent.

case _: NoSuchElementException =>
return None
}
Expand All @@ -345,6 +349,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
createInMemoryStore(attempt)
}
} catch {
case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
if (app.attempts.head.info.appSparkVersion == "unknown") {
listing.synchronized {
listing.delete(classOf[ApplicationInfoWrapper], appId)
}
}
return None
Comment on lines +352 to +358
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't quite understand what this did. Seems loadFromFallbackLocation loads a dummy record from the log path into listing?

But what does this FileNotFoundException catch? Why it deletes the dummy record immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @viirya .

Yes, this adds a dummy record (based on the user request) to proceed to load the actual file. However, if the actual file doesn't exist, FoundNotFoundException will be thrown. It means it's a user mistake. In that case, since we don't need this dummy record, we cleaned up.

case _: FileNotFoundException =>
return None
}
Expand All @@ -364,6 +375,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
Some(loadedUI)
}

private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String)
: ApplicationInfoWrapper = {
val date = new Date(0)
val lastUpdate = new Date()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastUpdate is set with the current time, @thejdeep .

val info = ApplicationAttemptInfo(
attemptId, date, date, lastUpdate, 0, "spark", false, "unknown")
addListing(new ApplicationInfoWrapper(
ApplicationInfo(appId, appId, None, None, None, None, List.empty),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So supposedly, the appId should be correct to load the record, but other info are dummy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And once periodic scanning happens, it will update the record with correct information?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct, @viirya ~

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a kind of placeholder.

List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we rely on the event log for information like startTime, endTime, user etc ? Will this not lead to incorrect information being displayed on the home page of SHS ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a dummy place to allow SHS shows the application logs before periodic scanning happens. The periodic scanning will keep it in sync.

BTW, I'm wondering how many times do you think this fallback is used in the production environments, @thejdeep ? I'm curious if you are thinking about turning off the periodic scanning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see that the intention is just to have dummy placeholders until the scanning takes care of it.

If users operate with a large Spark cluster, my two cents are that users may tend to access their app on demand much more frequently and it might just lead to a incorrect listing page. For example, we noticed that a good fraction of our SHS requests are on demand since users would like to get their reports as soon as their app finishes and before checkForLogs completes.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and technically, it's not exposed in the listing page. Could you build this PR and test it by yourself?

a incorrect listing page

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like a limitation of a single file event log, @thejdeep . If you have rolling event logs, SHS have the correct partial information already while your jobs are running.

For example, we noticed that a good fraction of our SHS requests are on demand since users would like to get their reports as soon as their app finishes and before checkForLogs completes.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just questions to understand your use cases:

  • How do you handle Spark Streaming Jobs with a single file event log ? Still your job doesn't use rolling event logs?
  • Are you assuming only Spark 2.x or 3.x jobs because Spark 4 jobs generates rolling events by default since SPARK-45771?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing context @dongjoon-hyun .

We currently do not use rolling event logs since we only currently serve batch use-cases. All applications are currently on 3.x.

I can build your PR locally and test it on single file event logs to see how it works with listing and cleanup. I can get back to you earliest by tomorrow if that works.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for the info and your efforts on reviewing this. Take your time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun wanted to get your thoughts on #51604 (comment)

Thank you!

load(appId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior if the application does not exist ? (typo in user query for example)
Will the listing now have an invalid entry ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, do you think it will be better if we check for the existence of the file at its location before adding an entry ? This is to keep parity with how checkForLogs works - we only add entries for whose event log locations exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we had better avoid that because it requires the full path including "s3://...", @thejdeep .

}

override def getEmptyListingHtml(): Seq[Node] = {
<p>
Did you specify the correct logging directory? Please verify your setting of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ private[spark] object History {
.doubleConf
.createWithDefault(0.7d)

val EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED =
ConfigBuilder("spark.history.fs.eventLog.rolling.onDemandLoadEnabled")
.doc("Whether to look up rolling event log locations on demand manner before listing files.")
.version("4.1.0")
.booleanConf
.createWithDefault(true)

val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled")
.version("3.0.0")
.doc("Specifies whether the History Server should periodically clean up driver logs from " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,40 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
}
}

test("SPARK-52914: Support spark.history.fs.eventLog.rolling.onDemandLoadEnabled") {
Seq(true, false).foreach { onDemandEnabled =>
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, onDemandEnabled)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val provider = new FsHistoryProvider(conf)

val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
writer1.start()
writeEventsToRollingWriter(writer1, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer1.stop()

assert(dir.listFiles().length === 1)
assert(provider.getListing().length === 0)
assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled)
assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0))

// The dummy entry should be protected from cleanLogs()
provider.cleanLogs()
assert(dir.listFiles().length === 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the test coverage, @thejdeep .


assert(dir.listFiles().length === 1)
assert(provider.getAppUI("nonexist", None).isEmpty)
assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new line verifies the cleanup, @mridulm .


provider.stop()
}
}
}

test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") {
withTempDir { dir =>
val conf = createTestConf(true)
Expand Down