From e17ada2629b7ac0cc6e888cd1a0da6827159ea3b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 16 Oct 2014 16:45:51 +0800 Subject: [PATCH 1/3] Fix kernel 2.6.32 bug led unexpected behavior of transferTo --- .../scala/org/apache/spark/util/Utils.scala | 19 ++++++++++++++++++- .../util/collection/ExternalSorter.scala | 2 +- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index cbc4095065a19..aec4651e4f3b2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -280,12 +280,29 @@ private[spark] object Utils extends Logging { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() + val initialPos = outChannel.position() val size = inChannel.size() // In case transferTo method transferred less data than we have required. while (count < size) { count += inChannel.transferTo(count, size - count, outChannel) } + + // Check the position after transferTo loop to see if it is in the right position and + // give user information if not. + // Position will not be increased to the expected length after calling transferTo in + // kernel version 2.6.32, this issue can be seen in + // scalastyle:off + // https://bugs.openjdk.java.net/browse/JDK-7052359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel) + // scalastyle:on + // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). + val finalPos = outChannel.position() + assert(finalPos == initialPos + size, + s""" + |Current position $finalPos do not equal to expected position ${initialPos + count} + |after transferTo, please check your kernel version to see if it is 2.6.32, + |this is a kernel bug which will lead to unexpected behavior when using transferTo. + """.stripMargin) } else { val buf = new Array[Byte](8192) var n = 0 @@ -727,7 +744,7 @@ private[spark] object Utils extends Logging { /** * Determines if a directory contains any files newer than cutoff seconds. - * + * * @param dir must be the path to a directory, or IllegalArgumentException is thrown * @param cutoff measured in seconds. Returns true if there are any files or directories in the * given directory whose last modified time is later than this many seconds ago diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 644fa36818647..7e17c15c3f0d4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -705,7 +705,7 @@ private[spark] class ExternalSorter[K, V, C]( var out: FileOutputStream = null var in: FileInputStream = null try { - out = new FileOutputStream(outputFile) + out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) val size = org.apache.spark.util.Utils.copyStream(in, out, false) From a82b18423f57c5f584d93d2702d710d1cde843c2 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 17 Oct 2014 15:21:15 +0800 Subject: [PATCH 2/3] add configuration to control the NIO way of copying stream --- core/src/main/scala/org/apache/spark/util/Utils.scala | 11 ++++++----- .../apache/spark/util/collection/ExternalSorter.scala | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aec4651e4f3b2..e166411f675d2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -272,11 +272,13 @@ private[spark] object Utils extends Logging { /** Copy all data from an InputStream to an OutputStream */ def copyStream(in: InputStream, out: OutputStream, - closeStreams: Boolean = false): Long = + closeStreams: Boolean = false, + transferToEnabled: Boolean = true): Long = { var count = 0L try { - if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) { + if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] + && transferToEnabled) { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() @@ -292,9 +294,7 @@ private[spark] object Utils extends Logging { // give user information if not. // Position will not be increased to the expected length after calling transferTo in // kernel version 2.6.32, this issue can be seen in - // scalastyle:off - // https://bugs.openjdk.java.net/browse/JDK-7052359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel) - // scalastyle:on + // https://bugs.openjdk.java.net/browse/JDK-7052359 // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). val finalPos = outChannel.position() assert(finalPos == initialPos + size, @@ -302,6 +302,7 @@ private[spark] object Utils extends Logging { |Current position $finalPos do not equal to expected position ${initialPos + count} |after transferTo, please check your kernel version to see if it is 2.6.32, |this is a kernel bug which will lead to unexpected behavior when using transferTo. + |You can set spark.file.transferTo = false to disable this NIO feature. """.stripMargin) } else { val buf = new Array[Byte](8192) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 7e17c15c3f0d4..d1b06d14acbd2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C]( private val conf = SparkEnv.get.conf private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 + private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true) // Size of object batches when reading/writing from serializers. // @@ -708,7 +709,7 @@ private[spark] class ExternalSorter[K, V, C]( out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) - val size = org.apache.spark.util.Utils.copyStream(in, out, false) + val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) in.close() in = null lengths(i) = size From be0533a88f6b624629ac66cfeb9989337c002cfd Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 20 Oct 2014 13:56:34 +0800 Subject: [PATCH 3/3] Address the comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e166411f675d2..13d6a7ffbcf8c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -269,11 +269,14 @@ private[spark] object Utils extends Logging { dir } - /** Copy all data from an InputStream to an OutputStream */ + /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream + * copying is disabled by default unless explicitly set transferToEnabled as true, + * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. + */ def copyStream(in: InputStream, out: OutputStream, closeStreams: Boolean = false, - transferToEnabled: Boolean = true): Long = + transferToEnabled: Boolean = false): Long = { var count = 0L try { @@ -299,7 +302,7 @@ private[spark] object Utils extends Logging { val finalPos = outChannel.position() assert(finalPos == initialPos + size, s""" - |Current position $finalPos do not equal to expected position ${initialPos + count} + |Current position $finalPos do not equal to expected position ${initialPos + size} |after transferTo, please check your kernel version to see if it is 2.6.32, |this is a kernel bug which will lead to unexpected behavior when using transferTo. |You can set spark.file.transferTo = false to disable this NIO feature.