-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API. #25304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API. #25304
Conversation
|
Test build #108423 has finished for PR 25304 at commit
|
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Show resolved
Hide resolved
|
Test build #108425 has finished for PR 25304 at commit
|
|
Test build #108427 has finished for PR 25304 at commit
|
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Show resolved
Hide resolved
| private void initStream() throws IOException { | ||
| if (outputFileStream == null) { | ||
| // This file needs to opened in append mode in order to work around a Linux kernel bug that | ||
| // affects transferTo; see SPARK-3948 for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is actually more appropriate in initChannel() (transferTo is only used w/ the channel).
btw, why does initChannel set the member variable outputFileStream? Couldn't it just set
outputChannel = new FileOutputStream(outputTempFile, true).getChannel();?
|
Test build #108487 has finished for PR 25304 at commit
|
|
Test build #108587 has finished for PR 25304 at commit
|
|
@squito addressed the comments here. |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor things, but I think it mostly looks good
| public interface SingleFileShuffleMapOutputWriter { | ||
|
|
||
| /** | ||
| * Transfer a file that contains the bytes of all the splits written by this map task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
splits -> partitions
| shuffleId, mapId, taskContext.taskAttemptId()); | ||
| if (maybeSingleFileWriter.isPresent()) { | ||
| // Here, we don't need to perform any metrics updates because the bytes written to this | ||
| // output file would have already been counted as shuffle bytes written. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is only true for the local disk implementation. eg. if some other implementation did take advantage of the single merged file somehow, and wrote it all to a remote store, it would be doing another write.
But I am not really worried about this, as I don't think any other store will actually use this ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where could we move this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito I think this SingleSpillShuffleMapOutputWriter can be pretty useful, it may avoid some byte-to-byte read/write, instead the custom store can have implementation with more performant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dunno if there is a great alternative. We could say its the job of individual implementations to increment the metrics, and then move this comment into LocalDiskSingleSpillMapOutputWriter on why the metrics aren't incremented. But we're specifically trying to avoid exposing metrics to the api. you could also have transferMapSpillFile() return the number of bytes written, and then the existing implementation would return 0.
It all kinda feels like overkill to me. @gczsjdy I agree its possible for another store to take advantage of this, but do you have a specific case in mind? I'd like to avoid adding too many things to the api and keep things simple (with odd cases just to support the existing implementation).
| // | ||
| // We allow the individual merge methods to report their own IO times since different merge | ||
| // strategies use different IO techniques. We count IO during merge towards the shuffle | ||
| // shuffle write time, which appears to be consistent with the "not bypassing merge-sort" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: shuffle shuffle (it was there before, might as well fix it now)
| import org.apache.spark.util.Utils; | ||
|
|
||
| public class LocalDiskSingleFileMapOutputWriter | ||
| implements SingleFileShuffleMapOutputWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I know this was my naming suggestion earlier, but after more thought, how about File -> Spill?
LocalDiskSingleSpillMapOutputWriter
SingleSpillShuffleMapOutputWriter
| public void transferMapOutputFile( | ||
| File mapOutputFile, | ||
| long[] partitionLengths) throws IOException { | ||
| File outputFile = blockResolver.getDataFile(shuffleId, mapId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a brief comment here would help, eg. "we've only got one spill file, which is already in the right format of the final data file. So no merging to do, just move it to the right location"
|
|
||
| @Override | ||
| public void transferMapOutputFile( | ||
| File mapOutputFile, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and maybe rename this to mapSpillFile
| import java.io.File; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.spark.annotation.Private; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import grouping
| try { | ||
| for (int i = 0; i < spills.length; i++) { | ||
| long partitionLengthInSpill = 0L; | ||
| partitionLengthInSpill += spills[i].partitionLengths[partition]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment got overlooked (probably because github hides long diffs, I hate that ...)
| * with the same (shuffleId, mapId) pair can be distinguished by the | ||
| * different values of mapTaskAttemptId. | ||
| */ | ||
| default Optional<SingleFileShuffleMapOutputWriter> createSingleFileMapOutputWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like a lot of indirection to implement one method... what about returning a boolean if the transfer is supported, or throwing UnsupportedOperationException (although that's a bit slower)?
(If the transfer is supported but fails you'd still throw an IOException.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer Optional to both of these, specifically because throwing an exception by default forces a try...catch on the caller, and a boolean requires a separate method call outside of the plugin tree to implement which splices the logic between the plugin tree and the UnsafeShuffleWriter.
|
Addressed most things, except #25304 (comment) - not sure how important it is or where to move the comment if at all. |
|
Test build #109243 has finished for PR 25304 at commit
|
| * single partition file, as the entire result of a map task, to the backing store. | ||
| * <p> | ||
| * Most implementations should return the default {@link Optional#empty()} to indicate that | ||
| * they do not support this optimization. This primarily is for backwards-compatibility in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably it's better to indicate what kinds of implementations may support this optimization? Otherwise it's confusing. I think the storage that has API like move supports this optimization, is this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Truth be told, even plugins that support remote FS move would unlikely be able to support this well - one would still have to transfer the whole file up to the remote storage layer, but that could just as easily be done by writing the data from the file through an output stream.
I think only implementations that stage the files locally could support this in any meaningful way at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with leaving out the docs if only because very very few implementations should even care about this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I don't think its necessary to go into more details. I would say the "casual" plugin developer wouldn't bother with this, and if they're really serious, they can look at what the existing implementation does. The comment is sufficient for that.
|
Test build #109255 has finished for PR 25304 at commit
|
|
retest this please |
|
Test build #109763 has finished for PR 25304 at commit
|
|
Test build #109972 has finished for PR 25304 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor things.
| mapId, | ||
| taskContext.taskAttemptId(), | ||
| partitioner.numPartitions()); | ||
| mapWriter.commitAllPartitions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just return the value returned here instead of creating a new array?
| writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); | ||
| bytesWrittenToMergedFile += partitionLengthInSpill; | ||
| partitionLengths[partition] += partitionLengthInSpill; | ||
| boolean copyThrewExecption = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
| boolean copyThrewExecption = true; | ||
| ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition); | ||
| WritableByteChannelWrapper resolvedChannel = writer.openChannelWrapper() | ||
| .orElseGet(() -> new StreamFallbackChannelWrapper(openStreamUnchecked(writer))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented on another PR about this, but StreamFallbackChannelWrapper is probably going to be slower than calling mergeSpillsWithFileStream in this case, because of the extra buffering the Channels.createChannel wrapper adds.
Probably not a huge deal, but seems easy to call mergeSpillsWithFileStream if the implementation does not return a channel wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky part here is that you only know if the channel wrapper is supported when we've started looking through the partitions in this loop - which would mean we would have to abort iteration early and switch the merge strategy. Can we leave it as-is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh, another reason why I prefer explicit capability checks over using optionals. Ok to leave like this for now, I guess.
| final FileChannel spillInputChannel = spillInputChannels[i]; | ||
| final long writeStartTime = System.nanoTime(); | ||
| Utils.copyFileStreamNIO( | ||
| spillInputChannel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks indented too far.
| // after calling transferTo in kernel version 2.6.32. This issue is described at | ||
| // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948. | ||
| if (outputFileChannel != null | ||
| && outputFileChannel.position() != bytesWrittenToMergedFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: && goes in previous line
| when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); | ||
| doAnswer(invocationOnMock -> { | ||
|
|
||
| Answer renameTempAnswer = invocationOnMock -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't give you a "raw type" warning? (Answer has a type parameter.)
|
Test build #110273 has finished for PR 25304 at commit
|
|
Test build #110298 has finished for PR 25304 at commit
|
|
retest this please |
|
Test build #110335 has finished for PR 25304 at commit
|
|
|
||
| import java.io.IOException; | ||
|
|
||
| import java.util.Optional; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import grouping
| final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); | ||
| final boolean fastMergeEnabled = | ||
| (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE()); | ||
| (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't really need to touch these lines, but since you did, there's a typo in that constant's name.
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.nio.file.Files; | ||
| import org.apache.spark.shuffle.IndexShuffleBlockResolver; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add empty line before
|
retest this please |
|
lgtm aside from marcelo's comments |
|
Test build #110365 has finished for PR 25304 at commit
|
|
Test build #110385 has finished for PR 25304 at commit
|
|
Merging to master. |
## What changes were proposed in this pull request? Uses the APIs introduced in SPARK-28209 in the UnsafeShuffleWriter. ## How was this patch tested? Since this is just a refactor, existing unit tests should cover the relevant code paths. Micro-benchmarks from the original fork where this code was built show no degradation in performance. Closes apache#25304 from mccheah/shuffle-writer-refactor-unsafe-writer. Lead-authored-by: mcheah <[email protected]> Co-authored-by: mccheah <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
What changes were proposed in this pull request?
Uses the APIs introduced in SPARK-28209 in the UnsafeShuffleWriter.
How was this patch tested?
Since this is just a refactor, existing unit tests should cover the relevant code paths. Micro-benchmarks from the original fork where this code was built show no degradation in performance.