Skip to content

Commit 12a61d8

Browse files
jerryshaoJoshRosen
authored andcommitted
[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle
Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948). Author: jerryshao <[email protected]> Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits: be0533a [jerryshao] Address the comments a82b184 [jerryshao] add configuration to control the NIO way of copying stream e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo (cherry picked from commit c7aeecd) Signed-off-by: Josh Rosen <[email protected]> Conflicts: core/src/main/scala/org/apache/spark/util/Utils.scala
1 parent 2cd40db commit 12a61d8

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,23 +292,44 @@ private[spark] object Utils extends Logging {
292292
dir
293293
}
294294

295-
/** Copy all data from an InputStream to an OutputStream */
295+
/** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
296+
* copying is disabled by default unless explicitly set transferToEnabled as true,
297+
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
298+
*/
296299
def copyStream(in: InputStream,
297300
out: OutputStream,
298-
closeStreams: Boolean = false): Long =
301+
closeStreams: Boolean = false,
302+
transferToEnabled: Boolean = false): Long =
299303
{
300304
var count = 0L
301305
try {
302-
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
306+
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
307+
&& transferToEnabled) {
303308
// When both streams are File stream, use transferTo to improve copy performance.
304309
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
305310
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
311+
val initialPos = outChannel.position()
306312
val size = inChannel.size()
307313

308314
// In case transferTo method transferred less data than we have required.
309315
while (count < size) {
310316
count += inChannel.transferTo(count, size - count, outChannel)
311317
}
318+
319+
// Check the position after transferTo loop to see if it is in the right position and
320+
// give user information if not.
321+
// Position will not be increased to the expected length after calling transferTo in
322+
// kernel version 2.6.32, this issue can be seen in
323+
// https://bugs.openjdk.java.net/browse/JDK-7052359
324+
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
325+
val finalPos = outChannel.position()
326+
assert(finalPos == initialPos + size,
327+
s"""
328+
|Current position $finalPos do not equal to expected position ${initialPos + size}
329+
|after transferTo, please check your kernel version to see if it is 2.6.32,
330+
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
331+
|You can set spark.file.transferTo = false to disable this NIO feature.
332+
""".stripMargin)
312333
} else {
313334
val buf = new Array[Byte](8192)
314335
var n = 0

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
9393
private val conf = SparkEnv.get.conf
9494
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
9595
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
96+
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
9697

9798
// Size of object batches when reading/writing from serializers.
9899
//
@@ -743,10 +744,10 @@ private[spark] class ExternalSorter[K, V, C](
743744
var out: FileOutputStream = null
744745
var in: FileInputStream = null
745746
try {
746-
out = new FileOutputStream(outputFile)
747+
out = new FileOutputStream(outputFile, true)
747748
for (i <- 0 until numPartitions) {
748749
in = new FileInputStream(partitionWriters(i).fileSegment().file)
749-
val size = org.apache.spark.util.Utils.copyStream(in, out, false)
750+
val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
750751
in.close()
751752
in = null
752753
lengths(i) = size

0 commit comments

Comments
 (0)