Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 8,
src.getBaseOffset() + srcPos * 8L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 8,
length * 8
dst.getBaseOffset() + dstPos * 8L,
length * 8L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 16,
src.getBaseOffset() + srcPos * 16L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 16,
length * 16);
dst.getBaseOffset() + dstPos * 16L,
length * 16L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.spark.util.collection

import java.util.Comparator

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark._
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.unsafe.array.LongArray
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat}

class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
import TestUtils.{assertNotSpilled, assertSpilled}
Expand Down Expand Up @@ -93,6 +98,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)(
sortWithoutBreakingSortingContracts)

// This test is ignored by default as it requires a fairly large heap size (16GB)
ignore("sort without breaking timsort contracts for large arrays") {
val size = 300000000
// To manifest the bug observed in SPARK-8428 and SPARK-13850, we explicitly use an array of
// the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
// that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i }
val buf = new LongArray(MemoryBlock.fromLongArray(ref))

new Sorter(UnsafeSortDataFormat.INSTANCE).sort(
buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {
override def compare(
r1: RecordPointerAndKeyPrefix,
r2: RecordPointerAndKeyPrefix): Int = {
PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix)
}
})
}

test("spilling with hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true, kryo = false)
Expand Down