-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS #17012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The fix looks good to me. But the test doesn't test anything. Right? |
|
@zsxwing The test is there just to make sure the new code path is executed during tests. It probably makes more sense to create a fake filesystem type that adheres to the rename semantic of HDFS to test this. |
|
@zsxwing I have rewritten the test code. |
|
@zsxwing Is it possible for you to give some approximate timeframe for when this might be accepted? A week or month? Since we are blocked in our spark streaming adoption (this HDFS error happens not only in restart but also in first run of our more complex aggregating drivers), I don't know if I should wait for this fix to make it into the nightly build soon or build a custom dist ourselves and update our automation. Any input as to the fix timeline would be greatly appreciated! Also this fix handles the same issue as SPARK-19645 which has a different solution. Is this simple solution sufficient or the more complex fix in #16980 required? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think we should not delete the file. Otherwise, it will break speculation. E.g., there is also a speculation ask running. It wrote successfully and just reported to the driver that the task was successful. At the same time, another task just deleted the final file wrongly because the file exists.
The correct way is just skipping rename. We can depend on the basic assumption of Structured Streaming that the same batch should contain the same data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
7d44680 to
530c027
Compare
|
Just some feedback that I did some initial regression testing with this pull request on a full YARN (v2.7.3) 4 node cluster on GCP and it appears to have fixed the two issues we had- our structured streaming drivers now restart normally and our complex aggregation driver runs for the first time. |
|
What is the proposed semantics from this PR now ?
Is this right ? If yes, the PR title/description should be changed. |
|
ok to test |
+1 |
|
Test build #73548 has finished for PR 17012 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add our discussion to the comment? Such as
// scalastyle:off
// Renaming a file atop an existing one fails on HDFS
// (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
// Hence we should either skip the rename step or delete the target file. Because deleting the
// target file will beak speculation, skipping the rename step is the only choice. It's still
// semantically correct because Structured Streaming requires rerunning a batch should
// generate the same output. (SPARK-19677)
// scalastyle:on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SPARK-19677: Renaming a file atop an existing one on HDFS
|
Just two minor comments. Otherwise, LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
|
Test build #73589 has finished for PR 17012 at commit
|
|
LGTM. Thanks! Merging to master, 2.1 and 2.0. |
… not fail on HDFS ## What changes were proposed in this pull request? HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies: > Destination exists and is a file > Renaming a file atop an existing file is specified as failing, raising an exception. > - Local FileSystem : the rename succeeds; the destination file is replaced by the source file. > - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false. This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output. ## How was this patch tested? This patch was tested by running `StateStoreSuite`. Author: Roberto Agostino Vitillo <[email protected]> Closes #17012 from vitillo/fix_rename.
… not fail on HDFS ## What changes were proposed in this pull request? HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies: > Destination exists and is a file > Renaming a file atop an existing file is specified as failing, raising an exception. > - Local FileSystem : the rename succeeds; the destination file is replaced by the source file. > - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false. This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output. ## How was this patch tested? This patch was tested by running `StateStoreSuite`. Author: Roberto Agostino Vitillo <[email protected]> Closes #17012 from vitillo/fix_rename. (cherry picked from commit 9734a92) Signed-off-by: Shixiong Zhu <[email protected]>
…treaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (#17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <[email protected]> Closes #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b) Signed-off-by: Shixiong Zhu <[email protected]>
…treaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (#17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <[email protected]> Closes #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b) Signed-off-by: Shixiong Zhu <[email protected]>
…treaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (#17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <[email protected]> Closes #17124 from gf53520/SPARK-19779.
What changes were proposed in this pull request?
HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the implementation notes of
rename(), the behavior of the local filesystem and HDFS varies:This patch ensures that
rename()isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.How was this patch tested?
This patch was tested by running
StateStoreSuite.