-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35628][SS] RocksDBFileManager - load checkpoint from DFS #32767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per #32582 (comment)
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139284 has finished for PR 32767 at commit
|
|
Rebased this PR based on #32582. It's ready for review now. cc @viirya and @HeartSaVioR |
|
Thank you @xuanyuanking. I'll find some time to review this. |
|
Test build #139555 has finished for PR 32767 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
@viirya Great thanks for your help! |
|
To make the RocksDB state store implementation can be reviewed quickly and easily. I just created all the rest PRs to provide us a global perspective. We can review them one by one, and I'll keep updating each of them: cc @viirya and @HeartSaVioR Thanks for your review. |
|
Thanks you @xuanyuanking! |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Just a few questions and suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is not the Java doc style we follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have been using one-liner java doc; e.g. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I thought it is two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we'd like to see this multiple lines as below review comments :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we'll have multiple lines here in the next commit :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking Seems you overwrite previous change? This looks the previous version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking friendly reminder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
……Sorry I missed this... Updating
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't process directory. Could you also mention it in the method doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be ideal if we make it clear in method name, like unzipFilesFromFile. (Ideally I'd like to see this also extracts the directory, but let's postpone it till necessary.)
In general we expect unzipping will extract the directories as well. That said, we need to make the behavior very clear to the caller side. I agree this should be mentioned to the java doc, but method name should be also intuitive to expect the actual behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, method name changed and comment added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, are we sure we don't need to process any error during unzipping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks safe; if there's an exception we may see some files being extracted and the one of output files may be broken, but callers will catch an exception and indicate the output directory is not healthy. If necessary let's document this in javadoc as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we rely on the caller side to address any exceptions. Javadoc added as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it is possible to have existing file in local dir which has same file name but not the same file in DFS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just a safer guard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A safer guard for checking both file names and file size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do this logInfo after the file size check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More specifically, we can do the file size check just after copyToLocalFile, and accumulations can be placed later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, do the size check right after copyToLocalFile and place the logInfo in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add filesReused into this log message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Done in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only used by tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have been using one-liner java doc; e.g. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be ideal if we make it clear in method name, like unzipFilesFromFile. (Ideally I'd like to see this also extracts the directory, but let's postpone it till necessary.)
In general we expect unzipping will extract the directories as well. That said, we need to make the behavior very clear to the caller side. I agree this should be mentioned to the java doc, but method name should be also intuitive to expect the actual behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks safe; if there's an exception we may see some files being extracted and the one of output files may be broken, but callers will catch an exception and indicate the output directory is not healthy. If necessary let's document this in javadoc as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: deleted -> delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in the next commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just remove the all files in localDir? Just would like to know the reason we don't clear the directory but just remove the specific files. Would we need to leverage some remaining files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The consideration here is mainly for immutable files like sst/log files. We can avoid IO for the immutable files shared among different versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More specifically, we can do the file size check just after copyToLocalFile, and accumulations can be placed later.
|
Great thanks for your detailed review, @viirya @HeartSaVioR. All comments addressed. |
|
Kubernetes integration test starting |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Let's wait for @viirya to have another round of review and do explicit approval.
|
Kubernetes integration test status failure |
|
Test build #140201 has finished for PR 32767 at commit
|
|
@xuanyuanking Oh we need to fix new code conflicts as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
files with same? with same filename?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks! Fix done.
|
@xuanyuanking Could you resolve the conflict and the minor comment? Then we can move this forward. Thanks! |
7c79a29 to
fdd0d61
Compare
|
Thanks for the review! |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #140270 has finished for PR 32767 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
be15054 to
7279d43
Compare
|
Test build #140300 has finished for PR 32767 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #140305 has finished for PR 32767 at commit
|
|
Jenkins passed. Thanks! Merging to master. |
|
Thanks @xuanyuanking for the contribution! I merged into master. |
|
Please also rebase the next PRs into master branch; we can continue reviewing next PR. |
|
Copy that. I'm rebasing now. Thanks for the help! |
|
UPDATE: we found a consistent break on Scala 2.13 build caused by this. @xuanyuanking is working on the fix so please allow us some time to fix it as follow-up PR instead of reverting this. |
|
Great thanks @HeartSaVioR and @Ngone51. Submitted #33084. |
…uild ### What changes were proposed in this pull request? Fix the consistent break on Scala 2.13 build caused by the PR #32767 ### Why are the changes needed? Fix the consistent break. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #33084 from xuanyuanking/SPARK-35628-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
The implementation for the load operation of RocksDBFileManager.
Why are the changes needed?
Provide the functionality of loading all necessary files for specific checkpoint versions from DFS to the given local directory.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT added.