-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27562][Shuffle] Complete the verification mechanism for shuffle transmitted data #28525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @jerryshao |
|
ok to test |
|
Test build #122618 has finished for PR 28525 at commit
|
|
Sounds a great and very useful feature |
|
FYI, the dependency failure will be fixed by the following. |
|
Test build #122640 has finished for PR 28525 at commit
|
|
Test build #122641 has finished for PR 28525 at commit
|
|
Test build #122659 has finished for PR 28525 at commit
|
|
Test build #122664 has finished for PR 28525 at commit
|
|
Test build #122667 has finished for PR 28525 at commit
|
common/network-common/src/main/java/org/apache/spark/network/util/DigestUtils.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
Outdated
Show resolved
Hide resolved
|
I would suggest to describe the specs of shuffle index file somewhere in the code, and reduce the magical hard code numbers in everywhere. |
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Show resolved
Hide resolved
|
The current implementation uses shuffle index file to store partition digests. I think this: 1) couples two things together, and hard to evolve; 2) makes the logic unintuitive. I would suggest to separate index file from crc file, use a new file to store shuffle digests. |
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
Thanks for the review. I have modified the solution and save the digests into independent file. |
|
Test build #122924 has finished for PR 28525 at commit
|
|
Test build #122925 has finished for PR 28525 at commit
|
|
Test build #122931 has finished for PR 28525 at commit
|
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
1 similar comment
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
We've seen some shuffle data corruption during shuffle read phase.
As described in SPARK-26089, spark only checks small shuffle blocks before PR #23453, which is proposed by ankuriitg.
There are two changes/improvements that are made in PR #23453.
large block is corrupt in the starting, that block will be re-fetched and if that also fails,
FetchFailureException will be thrown.
reading the stream will be converted to FetchFailureException. This is slightly more aggressive
than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction.
However, I think there still exists some problems with the current shuffle transmitted data verification mechanism:
This pr complete the verification mechanism for shuffle transmitted data:
Firstly, crc32 is choosed for the checksum verification of shuffle data.
Crc is also used for checksum verification in hadoop, it is simple and fast.
During shuffle write phase, after completing the partitionedFile, we compute
the crc32 value for each partition and then write these digests with the indexs into shuffle index file.
For the sortShuffleWriter and unsafe shuffle writer, there is only one partitionedFile for a shuffleMapTask, so the compution of digests(compute the digests for each partition depend on the indexs of this partitionedFile) is cheap.
For the bypassShuffleWriter, the reduce partitions is little than byPassMergeThreshold, so the cost of digests compution is acceptable.
During shuffle read phase, the digest value will be passed with the block data.
And we will recompute the digest of the data obtained to compare with the origin digest value.
When recomputing the digest of data obtained, it only need an additional buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is markSupported we only need reset it, otherwise it is a fileSegmentManagerBuffer, we need recreate it.
So, I think this verification mechanism proposed for shuffle transmitted data is efficient and complete.
How was this patch tested?
Unit test.