diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 8f4e3229976dc..1e924d2aec442 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -61,10 +61,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) { public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { Platform.copyMemory( src.getBaseObject(), - src.getBaseOffset() + srcPos * 8, + src.getBaseOffset() + srcPos * 8L, dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 8, - length * 8 + dst.getBaseOffset() + dstPos * 8L, + length * 8L ); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index d19b71fbc1bcb..7bda76907f4c3 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -75,10 +75,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) { public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { Platform.copyMemory( src.getBaseObject(), - src.getBaseOffset() + srcPos * 16, + src.getBaseOffset() + srcPos * 16L, dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 16, - length * 16); + dst.getBaseOffset() + dstPos * 16L, + length * 16L); } @Override diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 4dd8e31c27351..699f7fa1f2727 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -17,12 +17,17 @@ package org.apache.spark.util.collection +import java.util.Comparator + import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark._ import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.unsafe.array.LongArray +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat} class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { import TestUtils.{assertNotSpilled, assertSpilled} @@ -93,6 +98,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)( sortWithoutBreakingSortingContracts) + // This test is ignored by default as it requires a fairly large heap size (16GB) + ignore("sort without breaking timsort contracts for large arrays") { + val size = 300000000 + // To manifest the bug observed in SPARK-8428 and SPARK-13850, we explicitly use an array of + // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] + // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi() + val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i } + val buf = new LongArray(MemoryBlock.fromLongArray(ref)) + + new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] { + override def compare( + r1: RecordPointerAndKeyPrefix, + r2: RecordPointerAndKeyPrefix): Int = { + PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix) + } + }) + } + test("spilling with hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false)