-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35436][SS] RocksDBFileManager - save checkpoint to DFS #32582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per #32272 (comment), we only generate the RocksDBLogFile object in the operation of saving checkpoint here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per #32272 (comment), we only use prettyJson in the log.
|
Kubernetes integration test starting |
|
cc @viirya |
|
Kubernetes integration test status failure |
|
Thanks @dongjoon-hyun. I'll look at this. |
|
Test build #138685 has finished for PR 32582 at commit
|
|
As we merged #32272, after rebasing and addressing the comment, this one is ready for review. cc @viirya and @HeartSaVioR |
|
Thanks @xuanyuanking. I will find some time to review this. |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139041 has finished for PR 32582 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, could I safely assume versionToRocksDBFiles will be loaded when RocksDBFileManager is initialized with existing checkpoint in further PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. versionToRocksDBFiles was touched in the following 3 places:
- saveCheckpointToDfs (this PR)
- deleteOldVersions
- loadCheckpointFromDfs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confusing here; if I understand correctly, we use version as batch ID + 1 for HDFSBackedStateStoreProvider, so version 0 with numKeys > 0 doesn't make sense at least for now. (As we don't have a functionality of "initial state" for now.)
How it works for RocksDB state store provider? Would the version be same with batch ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also use batch ID + 1 as the version for the RocksDB state store. Since the batch ID starts from -1 so for the corner case of the first empty batch (batch ID 0), we add the safeguard here to make sure the working dir is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about simply making sure the directory has been created in prior? We can add below in initialization and don't bother with handling specific case.
// create a root directory on initialization if necessary
if (!fm.exists(new Path(dfsRootDir))) {
fm.mkdirs(new Path(dfsRootDir))
}
If we'd like to avoid paying the cost on calling exist, let's keep the code and change the version to 1 as I met the case the version starts from 1 during test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's change the version to 1 for avoiding the extra cost of file operations. Will change it in the next commit. Done in bdd9e8e
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this, def listFiles(file: String) is probably unnecessary, as I expect implicit conversion happening when calling listFiles(file: File) with String.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right in this PR. In the further PRs, we will have more caller for the listFiles(file: String). Maybe let's keep it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have some code to check other files & metadata as well? We don't seem to cover the test on existence of zip file and the content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll enhance this test in the next PR, RocksDBFileManager - load checkpoint from DFS, by load back the metadata and check the other files. Let me keep this conversation open and be referenced in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
And I see lots of information regarding files will be printed out via INFO level. I don't know whether the information would make the log file being flooded, but I guess we want to leave the information for the first phase on ease of debugging. We could discuss to lower down the level if we are getting bothered from the log size. |
|
Great thanks for the detailed review! All comments addressed. Please take another look when you have time. |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139221 has finished for PR 32582 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! The new changes look OK.
It would be nice if we can do the same we did for reviewing the first PR; if remaining review comments are planned to be addressed via further PR, please raise the PR so that we can refer the PR and finally approve this one.
Agree, let me submit the 3rd PR today. The failure tests are not related, they can pass locally. |
|
Sorry for late. I will take look this soon. |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through RocksDBFileManager, but not look at tests yet.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The zipped file is dfsZipFile, right? why filesStr here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the fileStr contains the dfsZipFile name. The log format here is ${dfsZipFile} \n ${listing all the file names}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you talk more about there this claim (thread-safe) applies? Where we delete old files?
Do you mean versionToRocksDBFiles? When we prepare files for new version, there is another thread (maintenance thread) deleting old files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It refers to delete the files contained in the old versions. Here's the description of the deleteOldVersions method of RocksDBFileManager, which will be called in RocksDBStateStoreProvider.doMaintenance. As we did before, I'll also refer this comment when the PR for delete path submitted.
* Delete old versions by deleting the associated version and SST files.
* At a high-level, this method finds which versions to delete, and which SST files that were
* last used in those versions. Its safe to delete these SST files because a SST file can
* be reused only in successive versions. Therefore, if a SST file F was last used in version
* V, then it wont be used in version V+1 or later, and if version V can be deleted, then
* F can safely be deleted as well.
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version = 1? In practice, will we save checkpoint files for same version? I.e. for same micro-batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For task failover, it's possible that the tasks failed during uploading. The new task uses the same version and may schedule to the same executor but with different checkpoint dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use withTempDir too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test will be further extended and more temp dir will be added. So maybe we just have the root path using withTempDir? See the demo here: https://github.com/apache/spark/pull/32767/files#diff-dc6f9dfe11e76f890ff2986f866853bcac263027c82562f9a52f4672a5460826R37
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this will fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is failed on purpose.
[info] java.io.IOException: Copy failed intentionally
[info] at org.apache.spark.sql.execution.streaming.CreateAtomicTestManager$$anon$3.close(CheckpointFileManagerSuite.scala:169)
This test aims to test the behavior that the cancel should be called no matter any error happens during close. See the comment and code here: https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR222-R227
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139340 has finished for PR 32582 at commit
|
|
Gentle ping @viirya @HeartSaVioR Do you think this is ready to go? |
|
Thanks @xuanyuanking for the work. Let's move forward to next related PR. |
|
I'll merge once either build test passes. |
|
retest this, please |
|
Thanks @viirya. Let me retrigger the test. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139516 has finished for PR 32582 at commit
|
|
Thanks! Merging to master. |
|
Thanks @xuanyuanking for your contribution! I merged this to master branch. |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139529 has finished for PR 32582 at commit
|
|
Great thanks for the help! @HeartSaVioR |
What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.
Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT added.