Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,14 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
for (int partition = 0; partition < numPartitions; partition++) {
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
long bytesToTransfer = partitionLengthInSpill;
final FileChannel spillInputChannel = spillInputChannels[i];
final long writeStartTime = System.nanoTime();
while (bytesToTransfer > 0) {
final long actualBytesTransferred = spillInputChannel.transferTo(
spillInputChannelPositions[i],
bytesToTransfer,
mergedFileOutputChannel);
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
Utils.copyFileStreamNIO(
spillInputChannel,
mergedFileOutputChannel,
spillInputChannelPositions[i],
partitionLengthInSpill);
spillInputChannelPositions[i] += partitionLengthInSpill;
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
Expand Down
71 changes: 41 additions & 30 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInf
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.{Channels, FileChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.{Locale, Properties, Random, UUID}
Expand Down Expand Up @@ -60,7 +60,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.util.logging.RollingFileAppender

/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)
Expand Down Expand Up @@ -319,41 +318,22 @@ private[spark] object Utils extends Logging {
* copying is disabled by default unless explicitly set transferToEnabled as true,
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
*/
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long =
{
var count = 0L
def copyStream(
in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long = {
tryWithSafeFinally {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val initialPos = outChannel.position()
val size = inChannel.size()

// In case transferTo method transferred less data than we have required.
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = outChannel.position()
assert(finalPos == initialPos + size,
s"""
|Current position $finalPos do not equal to expected position ${initialPos + size}
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
copyFileStreamNIO(inChannel, outChannel, 0, size)
size
} else {
var count = 0L
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
Expand All @@ -363,8 +343,8 @@ private[spark] object Utils extends Logging {
count += n
}
}
count
}
count
} {
if (closeStreams) {
try {
Expand All @@ -376,6 +356,37 @@ private[spark] object Utils extends Logging {
}
}

def copyFileStreamNIO(
input: FileChannel,
output: FileChannel,
startPosition: Long,
bytesToCopy: Long): Unit = {
val initialPos = output.position()
var count = 0L
// In case transferTo method transferred less data than we have required.
while (count < bytesToCopy) {
count += input.transferTo(count + startPosition, bytesToCopy - count, output)
}
assert(count == bytesToCopy,
s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = output.position()
val expectedPos = initialPos + bytesToCopy
assert(finalPos == expectedPos,
s"""
|Current position $finalPos do not equal to expected position $expectedPos
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
}

/**
* Construct a URI container information used for authentication.
* This also sets the default authenticator to properly negotiation the
Expand Down