-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20656][CORE]Support Incremental parsing of event logs in SHS #26821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #115049 has finished for PR 26821 at commit
|
|
Test build #115051 has finished for PR 26821 at commit
|
1889b53 to
b6b2c5a
Compare
|
cc @HeartSaVioR |
|
Thanks for cc.ing me, @dongjoon-hyun . I'll take a look. Btw, I think we have another JIRA issue for supporting incremental parsing SPARK-28870 which has broader goal - run with any implementation of KVStore. At first glance, this patch could cover SPARK-29261 and with SPARK-29111 it may resolve SPARK-28870 altogether - though we struggled on the details previously so I need some time to go through deeply. @shahidki31 |
|
Test build #115052 has finished for PR 26821 at commit
|
|
Thanks @HeartSaVioR , I will go through it. |
This patch is simpler because this doesn't take "restarting SHS" into account. Restarting SHS will lose the information. And for now we may not want to tracking line offset in SHS's KV store ( When you consider restoring KV store & state of listeners during restarting of SHS, you will have to store the snapshot of KV store into somewhere (that's why SPARK-29111 came in) and then you have to concern about compatibility of snapshot (entities in KV store including live entities on listeners) across Spark versions. That's why I had to change the design and introduce SPARK-29779 instead of snapshotting. We've already gone through bunch of discussions because it is not that simple in reality as it seems; so please go through these PRs as well as design docs. I guess the patch can be reviewed right now if the community prefers to have a solution which works within single SHS run first (though this may conflict with compaction #26416 which needs some arrangement), but if the community wants to have a solution which covers more cases, SPARK-28870 seems to be the way to go. (It doesn't mean this patch will not be valid - this patch could cover SPARK-29261 with some modification.) |
|
I was also working on a PR for incremental parsing in SHS as well, and there are 2 ways to skip the parsed content: filtering by lines (the approach in this PR) and skipping by bytes (see |
Yeah if possible we should deal with latter approach. I guess that brings more changes as ReplayListener just relies on Scala API which provides lines (no offset information) so we should get our hands be dirty, but it definitely worths to do. |
Hi @oopDaniel , Actually I tried with both the approaches, and it seems skipping bytes seems more complicated as we need to handle more edge cases. Also, I tested this PR with 2GB event log file and I think the time to load UI took around 2 seconds (?) including filtering and replaying. Also it is not difficult to add the skipping bytes, as all we need to do is add the bytes read parameter instead of lines read parameter and handle the edge cases. @HeartSaVioR I think the approach which you guys are doing is great, as it handles restarting SHS. But, if we can review this PR related to incremental parsing, extending to snapshotting would be easier I guess.. If there is a working PR for that, I can close this. |
|
@shahidki31 I can take a look with current solution, but you still need to persuade at least one committer to push this forward. Btw, we'd be better to clarify the performance test in details. It should include at least...
(and sure it would be nicer if you can experiment with various matrix, at least couple of tests around the size of event log file - as you said, huge event log file doesn't only take couple of GBs. It's 10s of GBs.) For me, your statement in PR description sounds to me as skipping (via read and drop) 2G takes around 2 secs which is still not ideal (as we know how to do it better), though I agree that's still a huge improvement. |
Thanks @HeartSaVioR , I'll try more performance testing based on size and initial load. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test coverage seems to be really low. I'd expect at least tests each listener can store the information via flush and reload from initialize, or at least these entities can be serialized/deserialized well, because I was encountered some issue when I just tried to serialize entities in events.
| // during invalidate UI or detached UI. Metadata of the event read will store in the | ||
| // `IncrmentalInfo`. Whenever a new event come, parsing will happen from the line it | ||
| // read last time. Currently it supports inmemory store. TODO: Support for disk store. | ||
| private val isIncrementalParsingEnabled = storePath.isEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to not add is/are in the name of value even it's expected to return Boolean. Nearest example is fastInProgressParsing.
| // If incremental parsing support configuration is enabled, underlying store will not close | ||
| // during invalidate UI or detached UI. Metadata of the event read will store in the | ||
| // `IncrmentalInfo`. Whenever a new event come, parsing will happen from the line it | ||
| // read last time. Currently it supports inmemory store. TODO: Support for disk store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please place TODO as separate line of comment so that it helps IDE to highlight.
|
|
||
| private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) | ||
| private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) | ||
| // If incremental parsing support configuration is enabled, underlying store will not close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- nit:
incremental parsing support configurationseems odd. IMHO, justincremental parsingwould work. - there're two kinds of stores in FsHistoryProvider, so maybe better to clarify here;
underlying APP kvstoreor some better words?
| private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) | ||
| private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) | ||
| // If incremental parsing support configuration is enabled, underlying store will not close | ||
| // during invalidate UI or detached UI. Metadata of the event read will store in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: invalidating / detaching
nit2: Metadata of reading event log (maybe there should be better words...) will be stored
| private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) | ||
| // If incremental parsing support configuration is enabled, underlying store will not close | ||
| // during invalidate UI or detached UI. Metadata of the event read will store in the | ||
| // `IncrmentalInfo`. Whenever a new event come, parsing will happen from the line it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess incremental parsing already explains this sentence, looks redundant. Moreover, technically, parsing doesn't start immediately when a new event comes.
| accumulatorId: Long, | ||
| metricType: String) | ||
|
|
||
| class SQLAppStatusListenerData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to place it to SQLAppStatusListener as other KVStore entities are there.
| kvstore.onFlush { | ||
| if (!live) { | ||
| flush((entity: LiveEntity) => updateStoreWithTriggerEnabled(entity)) | ||
| if (appId != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
place it before flush()
|
|
||
| def initialize(appId: String, attemptId: Option[String]): Unit = { | ||
| if (!live) { | ||
| this.appId = appId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
| provider.mergeApplicationListingCall should be (1) | ||
| } | ||
|
|
||
| test("support incremental parsing of the event logs") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know most of tests in FsHistoryProviderSuite don't deal with rolling event log, but this should really deal with it, as it's not just simple sequential reading of files.
| list.size should be (1) | ||
| provider.getAttempt("app1", None).logPath should endWith(EventLogFileWriter.IN_PROGRESS) | ||
| val appUi = provider.getAppUI("app1", None) | ||
| appUi should not be null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should really check whether store reflects the events correctly. It's not enough to just check it's loaded or not. That should cover these cases - initial read / new addition of events in same file / new file.
|
Thanks @HeartSaVioR for the review. I will update the PR as well as post the performance reports. |
|
Test build #119604 has finished for PR 26821 at commit
|
|
Test build #120468 has finished for PR 26821 at commit
|
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Currently loading application page from history server is time consuming, if the eventlog size is high. (Eg: ~47 minutes for a 18GB eventlog file). Currently, when there is any changes in event log, history server parses the entire eventLog even for smaller changes in eventLog. In this PR, we are supporting incremental parsing of event log by storing the in memory store to a hash map and will not close the store until it is valid.
Why are the changes needed?
To speed up loading history server page
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added UT, Manual tests and existing tests.
Created an eventLog of size ~2GB. Added a few more events. Loading time without the PR is ~1 minute and after the PR, it is around 2 secs.