-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata #32272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137727 has finished for PR 32272 at commit
|
|
Test build #137731 has finished for PR 32272 at commit
|
|
cc @viirya |
| // We turn this field into a null to avoid write a empty logFiles field in the json. | ||
| val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to avoid it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's related to the usage for RocksDB, we don't always have log files. But we must have sst files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the point here is excluding empty field (correct?) vs leaving empty field with []. Seems like a small optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the logFiles field not always has value.
|
|
||
| /** | ||
| * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of | ||
| * its copy on DFS. Since these files are immutable, their DFS copies can be reused. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that a DFS copy can be mapped to more than one local file names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we reuse the DFS copies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Can be mapped to more than one local file but for different tasks. The most common scenario is task/stage retry.
| def isSameFile(otherFile: File): Boolean = { | ||
| otherFile.getName == localFileName && otherFile.length() == sizeBytes | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a DFS copy can be mapped to more than one local file names, shouldn't two local file names the same one even their local file names are different, if their DFS file names are the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DFS file name contains UUID, it shouldn't be the same. Normally we use the local file name to filter whether the file is existing locally.
|
Sorry to visit this lately. I just went through design doc and left some comments. Probably it'd be nice if we can resolve comments on the design doc and reflect them to current/following PRs. Thanks! |
|
@HeartSaVioR Thanks for the advice. Comments have been resolved and yes, it makes sense to reflect them to the PRs. The current implementation for RocksDBCheckpointMetadata is the metadata files in the ${batchId}.zip |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your efforts on this PR!
It looks OK in overall, but there're some sorts of uncertainty during reviewing as there's no reference PR. In other words, we are reviewing methods which we don't have idea how these methods will be used.
It would be nice if there's a PR containing everything (OK to be out of sync later during reviewing) so that reviewers could refer it to determine the overall view. I'm also OK to review PRs one by one with uncertainty (with faith) and revisit all changes at the last phase.
| mapper.writeValueAsString(nullified) | ||
| } | ||
|
|
||
| def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it produce same output with json? Since this doesn't manipulate empty logFiles field. Otherwise is it by intention to handle json and prettyJson differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only difference is the logFiles fields. Actually the prettyJson field is for providing a readable string for log. json field is for files writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I see where it is used. Just for logging - got it.
| // scalastyle:on line.size.limit | ||
| } | ||
|
|
||
| private def withTempDirectory(f: File => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, withTempDir is defined in SparkFunSuite so you can just leverage the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes. Let me update.
Yes, agree on both. I propose that we can mark down the uncertain methods or the ones without the caller side for now in the PR. When I submitting the reference PR, I can link the comment to the newly created PRs. It should help to our review and make sure I don't miss to explain any uncertainty during the review. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #138507 has finished for PR 32272 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the file name, RocksDBFileManager.scala doesn't contain any RocksDBFileManager. Shall we rename it?
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Show resolved
Hide resolved
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay per this change. But as @HeartSaVioR said, I think we still need to look at how this is going to be used in the full context.
All these checkpointing metadata is for RocksDBFileManager. As my plan, the next PR is for the save path of RocksDBFileManager. Agree, I plan to use this comment as a demo. In the next PR, I'll reference this comment to provide the full context. |
|
To provide more context for the functions in this PR, I created the WIP PR (#32582) and referenced the comment there. Please check whether we can ship this for now. Thanks :) @HeartSaVioR @viirya |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Thanks @HeartSaVioR. I will take another look with #32582 tomorrow. |
|
Sorry for late. I will find some time in the weekend to look at this. |
No worries, thanks for the detailed review! Take your time. |
|
Looks like there's no further comment so I'm going to merge this once the test passes. |
|
retest this, please |
|
@xuanyuanking Could you please push an empty commit for the case Jenkins doesn't work? Thanks in advance! |
|
@HeartSaVioR Sure, thanks for reminding. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139018 has finished for PR 32272 at commit
|
|
Refer to this link for build results (access rights to CI server needed): |
|
Jenkins passed. Thanks! Merging to master. |
|
Thanks for the review and help! |
What changes were proposed in this pull request?
Initial implementation of RocksDBCheckpointMetadata. It persists the metadata for RocksDBFileManager.
Why are the changes needed?
The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The object contains all RocksDB file names and the number of total keys.
The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - Directory Structure and Format for Files stored in DFS.
Does this PR introduce any user-facing change?
No. Internal implementation only.
How was this patch tested?
New UT added.