- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28.9k
          [SPARK-52914][CORE] Support On-Demand Log Loading for rolling logs in History Server
          #51604
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
        
          
                core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      6a60332    to
    600eb8a      
    Compare
  
    | Could you review this PR, @LuciferYang ? | 
| a bit busy today.... I'll take a look later. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
Thank you @dongjoon-hyun
| +CC @thejdeep | 
| addListing(new ApplicationInfoWrapper( | ||
| ApplicationInfo(appId, appId, None, None, None, None, List.empty), | ||
| List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None)))) | ||
| load(appId) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior if the application does not exist ? (typo in user query for example)
Will the listing now have an invalid entry ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, do you think it will be better if we check for the existence of the file at its location before adding an entry ? This is to keep parity with how checkForLogs works - we only add entries for whose event log locations exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we had better avoid that because it requires the full path including "s3://...", @thejdeep .
| case _: NoSuchElementException if this.conf.get(ON_DEMAND_ENABLED) => | ||
| val name = Utils.nameForAppAndAttempt(appId, attemptId) | ||
| loadFromFallbackLocation(appId, attemptId, | ||
| RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + name) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not assume it will be RollingEventLogFilesWriter, users dont need to be running with the default enabled, right ?
| Thank you for review, @LuciferYang and @mridulm . This is a continuation of Apache Spark 4.0.0 SPARK-45869 Revisit and Improve Spark Standalone Cluster to support Apache Spark as a subsystem. For your questions, 
 
 
 
 As a foot note, I want to advertise the default event log (rolling) type more instead of the legacy single file event log type. | 
| cc @peter-toth | 
| 
 Use of  
 The default has flipped, but we still support single event logs ... we have nontrivial usages of this pattern. | 
| 
 I got what you missed. In short, it's not exposed to the users if no file exists there. Please see  spark/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala Lines 323 to 350 in 5b622c5 
 So, the dummy metadata will be there in the hidden manner but cleaned up at the next periodic scan because there is no matching files. 
 For the above question. Sorry for that, but what I meant is that this PR doesn't aim to support it like the PR description, @mridulm . I don't think that's the blocker for this improvement because this feature only chimes in the replacement of  | 
| 
 
 In that case, please document it in the config name (namespace it appropriately) and make that a precondition to enabling the flag for  | 
| 
 Where do you mean this? Did you see the invalid data in the UI somewhere? | 
| If you are worrying about  Or, I'm wondering if you are considering non-Apache Spark library which depends on this. | 
On-Demand Log Loading in History ServerOn-Demand Log Loading for rolling logs in History Server
      | 
 It has been a while since I looked at history server :-) (This is an example, there are other paths to view over ApplicationInfoWrapper) Will this not result in the entry added getting surfaced ? | 
| 
 Easier would be to simply remove the entry if invalid (that is, load failed) - will keep the api and ui in sync. ... assuming the example I gave is valid ! | 
| Of course, I know you are the expert, but the dummy metadata has  In addition, I agree with you that we had better delete it at  | 
| For the next items according to your advice, 
 | 
| Let me tag @thejdeep who worked on the internal solution (no bandwidth to open source it yet :( ) ... I did not review it, so I am not as aware of the intricacies as he is. | 
| Thank you for invaluable review, @mridulm . I addressed your comments. | 
|  | ||
| assert(dir.listFiles().length === 1) | ||
| assert(provider.getAppUI("nonexist", None).isEmpty) | ||
| assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new line verifies the cleanup, @mridulm .
| val app = try { | ||
| load(appId) | ||
| } catch { | ||
| case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that we are trying to push for usage of RollingEventLogFilesWriter as the new default but for users who have single event logs and if they try to get the UI for an app, will this functionality not break for them since EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED is true by default ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate on what we can break here, @thejdeep ?
will this functionality not break for them since EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED is true by default ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dummy metadata is added and cleaned up at FileNotFound exception immediate in this function as  @mridulm requested. It works for both non-existing appId and SingleFile logs.
| val info = ApplicationAttemptInfo(attemptId, date, date, date, 0, "spark", false, "unknown") | ||
| addListing(new ApplicationInfoWrapper( | ||
| ApplicationInfo(appId, appId, None, None, None, None, List.empty), | ||
| List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None)))) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we rely on the event log for information like startTime, endTime, user etc ? Will this not lead to incorrect information being displayed on the home page of SHS ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only a dummy place to allow SHS shows the application logs before periodic scanning happens. The periodic scanning will keep it in sync.
BTW, I'm wondering how many times do you think this fallback is used in the production environments, @thejdeep ? I'm curious if you are thinking about turning off the periodic scanning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see that the intention is just to have dummy placeholders until the scanning takes care of it.
If users operate with a large Spark cluster, my two cents are that users may tend to access their app on demand much more frequently and it might just lead to a incorrect listing page. For example, we noticed that a good fraction of our SHS requests are on demand since users would like to get their reports as soon as their app finishes and before checkForLogs completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and technically, it's not exposed in the listing page. Could you build this PR and test it by yourself?
a incorrect listing page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a limitation of a single file event log, @thejdeep . If you have rolling event logs, SHS have the correct partial information already while your jobs are running.
For example, we noticed that a good fraction of our SHS requests are on demand since users would like to get their reports as soon as their app finishes and before checkForLogs completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just questions to understand your use cases:
- How do you handle Spark Streaming Jobs with a single file event log ? Still your job doesn't use rolling event logs?
- Are you assuming only Spark 2.x or 3.x jobs because Spark 4 jobs generates rolling events by default since SPARK-45771?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for sharing context @dongjoon-hyun .
We currently do not use rolling event logs since we only currently serve batch use-cases. All applications are currently on 3.x.
I can build your PR locally and test it on single file event logs to see how it works with listing and cleanup. I can get back to you earliest by tomorrow if that works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for the info and your efforts on reviewing this. Take your time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun wanted to get your thoughts on #51604 (comment)
Thank you!
| @dongjoon-hyun These are two scenarios which I wanted to get your thoughts on, after testing your change. These are run with rolling event logs : 
 | 
| Is this true the Apache Spark deletes the running app's rolling logs, @thejdeep ? 
 | 
| @dongjoon-hyun This is my test bed : Steps : 
 
 | 
| Thanks. Let me check the details, @thejdeep . | 
| I confirmed the regression. Thank you for reporting and sharing the setup. Let me fix it.  | 
| To @thejdeep , it's fixed and the test case is updated to provide a test coverage for your report case. | 
| private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) | ||
| : ApplicationInfoWrapper = { | ||
| val date = new Date(0) | ||
| val lastUpdate = new Date() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastUpdate is set with the current time, @thejdeep .
|  | ||
| // The dummy entry should be protected from cleanLogs() | ||
| provider.cleanLogs() | ||
| assert(dir.listFiles().length === 1) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the test coverage, @thejdeep .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => | ||
| if (app.attempts.head.info.appSparkVersion == "unknown") { | ||
| listing.synchronized { | ||
| listing.delete(classOf[ApplicationInfoWrapper], appId) | ||
| } | ||
| } | ||
| return None | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I don't quite understand what this did. Seems loadFromFallbackLocation loads a dummy record from the log path into listing?
But what does this FileNotFoundException catch? Why it deletes the dummy record immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @viirya .
Yes, this adds a dummy record (based on the user request) to proceed to load the actual file. However, if the actual file doesn't exist, FoundNotFoundException will be thrown. It means it's a user mistake. In that case, since we don't need this dummy record, we cleaned up.
| Thanks for addressing the cleanup regression @dongjoon-hyun . Another thing to call out just for posterity sake is the version field will be shown as    | 
| 
 As I mentioned before, it will be updated at the next scan, @thejdeep . Previously, it was the following situation until the next scan and this PR proposes the above until next scan. | 
| val info = ApplicationAttemptInfo( | ||
| attemptId, date, date, lastUpdate, 0, "spark", false, "unknown") | ||
| addListing(new ApplicationInfoWrapper( | ||
| ApplicationInfo(appId, appId, None, None, None, None, List.empty), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So supposedly, the appId should be correct to load the record, but other info are dummy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And once periodic scanning happens, it will update the record with correct information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct, @viirya ~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a kind of placeholder.
| @dongjoon-hyun Thanks for addressing all my comments. LGTM. | 
| Thank you for your thorough reviews, @thejdeep . It helps this feature a lot. | 
| val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + | ||
| Utils.nameForAppAndAttempt(appId, attemptId) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RollingEventLogFilesWriter.getAppEventLogDirPath also considers logBaseDir, don't we need to consider it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we don't need logBaseDir at this info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. AttemptInfoWrapper.logPath doesn't contain base dir. When FsHistoryProvider tried to read for attempt, it will append base dir.
| load(appId) | ||
| } catch { | ||
| case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) => | ||
| loadFromFallbackLocation(appId, attemptId, logPath) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if EVENT_LOG_ENABLE_ROLLING is disabled? Should we only do this if EVENT_LOG_ENABLE_ROLLING is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, EVENT_LOG_ENABLE_ROLLING is per-application setting. This is SHS, @viirya .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I proposed the config name spark.history.fs.update.onDemandEnabled at the first commit because this is SHS setting. However, it was revised during the review in order to be clear in the context of rolling.
- spark.history.fs.update.onDemandEnabled
+ spark.history.fs.eventLog.rolling.onDemandLoadEnabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I see. A bit confusing setup to me. EVENT_LOG_ENABLE_ROLLING determines at the EventLoggingListener in SparkContext application. When FsHistoryProvider creates EventLogFileReader, it doesn't care about this config but decide it is single log or rolling log based on attempt lastIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it sounds confusing. Basically, it's the same for event log compression codec.
Since a Spark job can choose spark.eventLog.compress and spark.eventLog.compression.codec arbitrarily, we need to inference from the file name. It's inevitable because Writers(Spark job) and Reader(SHS) are independent.
| val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + | ||
| Utils.nameForAppAndAttempt(appId, attemptId) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it's better to call existing method for preparing the log name. Otherwise, they might be out of sync unintentionally.
| val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + | |
| Utils.nameForAppAndAttempt(appId, attemptId) | |
| val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + | |
| EventLogFileWriter.nameForAppAndAttempt(appId, attemptId) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Ya, I agree. For now, it's a simple alias, you are right for the future.
…rovider.scala Co-authored-by: Liang-Chi Hsieh <[email protected]>
| Thank you so much, @LuciferYang , @mridulm , @thejdeep , @yaooqinn , @viirya , @peter-toth . Merged to master for Apache Spark 4.1.0. | 
### What changes were proposed in this pull request? This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation. ### Why are the changes needed? To help the users use new features easily. - #47856 - #51130 - #51163 - #51604 - #51630 - #51708 - #51885 - #52091 - #52382 ### Does this PR introduce _any_ user-facing change? No behavior change because this is a documentation update. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52626 from dongjoon-hyun/SPARK-53926. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>


What changes were proposed in this pull request?
This PR aims to support
On-Demand Log LoadinginHistory Serverby looking up the rolling event log locations even Spark listing didn't finish to load the event log files.Previously, Spark History Server will show
Application ... Not Foundpage if a job is requested before scanning it even if the file exists in the correct location. So, this PR doesn't introduce any regressions because this aims to introduce a kind of fallback logic to improve error handling .Why are the changes needed?
Since Apache Spark 3.0, we have been using event log rolling not only for long-running jobs, but also for some failed jobs to archive the partial event logs incrementally.
Since Apache Spark 4.0, event log rolling is enabled by default.
spark.eventLog.rolling.enabledby default #43638On top of that, this PR aims to reduce storage cost at Apache Spark 4.1. By supporting
On-Demand Loading for rolled event logs, we can use larger values forspark.history.fs.update.intervalinstead of the default10s. Although Spark History logs are consumed in various ways, It has a big benefit because most of successful Spark jobs's logs are not visited by the users.Does this PR introduce any user-facing change?
No. This is a new feature.
How was this patch tested?
Pass the CIs with newly added test case.
Was this patch authored or co-authored using generative AI tooling?
No.