Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,23 +269,44 @@ private[spark] object Utils extends Logging {
dir
}

/** Copy all data from an InputStream to an OutputStream */
/** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
* copying is disabled by default unless explicitly set transferToEnabled as true,
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
*/
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false): Long =
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long =
{
var count = 0L
try {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val initialPos = outChannel.position()
val size = inChannel.size()

// In case transferTo method transferred less data than we have required.
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = outChannel.position()
assert(finalPos == initialPos + size,
s"""
|Current position $finalPos do not equal to expected position ${initialPos + size}
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
} else {
val buf = new Array[Byte](8192)
var n = 0
Expand Down Expand Up @@ -727,7 +748,7 @@ private[spark] object Utils extends Logging {

/**
* Determines if a directory contains any files newer than cutoff seconds.
*
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files or directories in the
* given directory whose last modified time is later than this many seconds ago
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)

// Size of object batches when reading/writing from serializers.
//
Expand Down Expand Up @@ -705,10 +706,10 @@ private[spark] class ExternalSorter[K, V, C](
var out: FileOutputStream = null
var in: FileInputStream = null
try {
out = new FileOutputStream(outputFile)
out = new FileOutputStream(outputFile, true)
for (i <- 0 until numPartitions) {
in = new FileInputStream(partitionWriters(i).fileSegment().file)
val size = org.apache.spark.util.Utils.copyStream(in, out, false)
val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
in.close()
in = null
lengths(i) = size
Expand Down