Skip to content

Commit 6e6adcc

Browse files
committed
[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position after FileChannel.transferTo
## What changes were proposed in this pull request? Long time ago we fixed a [bug](https://issues.apache.org/jira/browse/SPARK-3948) in shuffle writer about `FileChannel.transferTo`. We were not very confident about that fix, so we added a position check after the writing, try to discover the bug earlier. However this checking is missing in the new `UnsafeShuffleWriter`, this PR adds it. https://issues.apache.org/jira/browse/SPARK-18105 maybe related to that `FileChannel.transferTo` bug, hopefully we can find out the root cause after adding this position check. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #18091 from cloud-fan/shuffle. (cherry picked from commit d9ad789) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4f6fccf commit 6e6adcc

File tree

2 files changed

+47
-39
lines changed

2 files changed

+47
-39
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -422,17 +422,14 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
422422
for (int partition = 0; partition < numPartitions; partition++) {
423423
for (int i = 0; i < spills.length; i++) {
424424
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
425-
long bytesToTransfer = partitionLengthInSpill;
426425
final FileChannel spillInputChannel = spillInputChannels[i];
427426
final long writeStartTime = System.nanoTime();
428-
while (bytesToTransfer > 0) {
429-
final long actualBytesTransferred = spillInputChannel.transferTo(
430-
spillInputChannelPositions[i],
431-
bytesToTransfer,
432-
mergedFileOutputChannel);
433-
spillInputChannelPositions[i] += actualBytesTransferred;
434-
bytesToTransfer -= actualBytesTransferred;
435-
}
427+
Utils.copyFileStreamNIO(
428+
spillInputChannel,
429+
mergedFileOutputChannel,
430+
spillInputChannelPositions[i],
431+
partitionLengthInSpill);
432+
spillInputChannelPositions[i] += partitionLengthInSpill;
436433
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
437434
bytesWrittenToMergedFile += partitionLengthInSpill;
438435
partitionLengths[partition] += partitionLengthInSpill;

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo}
2222
import java.net._
2323
import java.nio.ByteBuffer
24-
import java.nio.channels.Channels
24+
import java.nio.channels.{Channels, FileChannel}
2525
import java.nio.charset.StandardCharsets
2626
import java.nio.file.{Files, Paths}
2727
import java.util.{Locale, Properties, Random, UUID}
@@ -58,7 +58,6 @@ import org.apache.spark.internal.Logging
5858
import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
5959
import org.apache.spark.network.util.JavaUtils
6060
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
61-
import org.apache.spark.util.logging.RollingFileAppender
6261

6362
/** CallSite represents a place in user code. It can have a short and a long form. */
6463
private[spark] case class CallSite(shortForm: String, longForm: String)
@@ -313,41 +312,22 @@ private[spark] object Utils extends Logging {
313312
* copying is disabled by default unless explicitly set transferToEnabled as true,
314313
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
315314
*/
316-
def copyStream(in: InputStream,
317-
out: OutputStream,
318-
closeStreams: Boolean = false,
319-
transferToEnabled: Boolean = false): Long =
320-
{
321-
var count = 0L
315+
def copyStream(
316+
in: InputStream,
317+
out: OutputStream,
318+
closeStreams: Boolean = false,
319+
transferToEnabled: Boolean = false): Long = {
322320
tryWithSafeFinally {
323321
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
324322
&& transferToEnabled) {
325323
// When both streams are File stream, use transferTo to improve copy performance.
326324
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
327325
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
328-
val initialPos = outChannel.position()
329326
val size = inChannel.size()
330-
331-
// In case transferTo method transferred less data than we have required.
332-
while (count < size) {
333-
count += inChannel.transferTo(count, size - count, outChannel)
334-
}
335-
336-
// Check the position after transferTo loop to see if it is in the right position and
337-
// give user information if not.
338-
// Position will not be increased to the expected length after calling transferTo in
339-
// kernel version 2.6.32, this issue can be seen in
340-
// https://bugs.openjdk.java.net/browse/JDK-7052359
341-
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
342-
val finalPos = outChannel.position()
343-
assert(finalPos == initialPos + size,
344-
s"""
345-
|Current position $finalPos do not equal to expected position ${initialPos + size}
346-
|after transferTo, please check your kernel version to see if it is 2.6.32,
347-
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
348-
|You can set spark.file.transferTo = false to disable this NIO feature.
349-
""".stripMargin)
327+
copyFileStreamNIO(inChannel, outChannel, 0, size)
328+
size
350329
} else {
330+
var count = 0L
351331
val buf = new Array[Byte](8192)
352332
var n = 0
353333
while (n != -1) {
@@ -357,8 +337,8 @@ private[spark] object Utils extends Logging {
357337
count += n
358338
}
359339
}
340+
count
360341
}
361-
count
362342
} {
363343
if (closeStreams) {
364344
try {
@@ -370,6 +350,37 @@ private[spark] object Utils extends Logging {
370350
}
371351
}
372352

353+
def copyFileStreamNIO(
354+
input: FileChannel,
355+
output: FileChannel,
356+
startPosition: Long,
357+
bytesToCopy: Long): Unit = {
358+
val initialPos = output.position()
359+
var count = 0L
360+
// In case transferTo method transferred less data than we have required.
361+
while (count < bytesToCopy) {
362+
count += input.transferTo(count + startPosition, bytesToCopy - count, output)
363+
}
364+
assert(count == bytesToCopy,
365+
s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")
366+
367+
// Check the position after transferTo loop to see if it is in the right position and
368+
// give user information if not.
369+
// Position will not be increased to the expected length after calling transferTo in
370+
// kernel version 2.6.32, this issue can be seen in
371+
// https://bugs.openjdk.java.net/browse/JDK-7052359
372+
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
373+
val finalPos = output.position()
374+
val expectedPos = initialPos + bytesToCopy
375+
assert(finalPos == expectedPos,
376+
s"""
377+
|Current position $finalPos do not equal to expected position $expectedPos
378+
|after transferTo, please check your kernel version to see if it is 2.6.32,
379+
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
380+
|You can set spark.file.transferTo = false to disable this NIO feature.
381+
""".stripMargin)
382+
}
383+
373384
/**
374385
* Construct a URI container information used for authentication.
375386
* This also sets the default authenticator to properly negotiation the

0 commit comments

Comments
 (0)