diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6e028886f231..ee1789552871 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -322,10 +322,10 @@ public Location next() { return loc; } else { assert(reader != null); - if (!reader.hasNext()) { - advanceToNextPage(); - } try { + if (!reader.hasNext()) { + advanceToNextPage(); + } reader.loadNext(); } catch (IOException e) { try { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 55e4e609c3c7..d79a02c76d86 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -506,7 +506,7 @@ class SpillableIterator extends UnsafeSorterIterator { private boolean loaded = false; private int numRecords = 0; - SpillableIterator(UnsafeSorterIterator inMemIterator) { + SpillableIterator(UnsafeSorterIterator inMemIterator) throws IOException { this.upstream = inMemIterator; this.numRecords = inMemIterator.getNumRecords(); } @@ -681,31 +681,24 @@ static class ChainedIterator extends UnsafeSorterIterator { ChainedIterator(Queue iterators) { assert iterators.size() > 0; this.numRecords = 0; - for (UnsafeSorterIterator iter: iterators) { - this.numRecords += iter.getNumRecords(); - } this.iterators = iterators; - this.current = iterators.remove(); } @Override - public int getNumRecords() { + public int getNumRecords() throws IOException { + initializeNumRecords(); return numRecords; } @Override - public boolean hasNext() { - while (!current.hasNext() && !iterators.isEmpty()) { - current = iterators.remove(); - } + public boolean hasNext() throws IOException { + nextIterator(); return current.hasNext(); } @Override public void loadNext() throws IOException { - while (!current.hasNext() && !iterators.isEmpty()) { - current = iterators.remove(); - } + nextIterator(); current.loadNext(); } @@ -720,5 +713,21 @@ public void loadNext() throws IOException { @Override public long getKeyPrefix() { return current.getKeyPrefix(); } + + private void initializeNumRecords() throws IOException { + if (numRecords == 0) { + for (UnsafeSorterIterator iter: iterators) { + numRecords += iter.getNumRecords(); + } + this.current = iterators.remove(); + } + } + + private void nextIterator() throws IOException { + initializeNumRecords(); + while (!current.hasNext() && !iterators.isEmpty()) { + current = iterators.remove(); + } + } } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java index 1b3167fcc250..c9b15ba3270e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java @@ -21,7 +21,7 @@ public abstract class UnsafeSorterIterator { - public abstract boolean hasNext(); + public abstract boolean hasNext() throws IOException; public abstract void loadNext() throws IOException; @@ -33,5 +33,5 @@ public abstract class UnsafeSorterIterator { public abstract long getKeyPrefix(); - public abstract int getNumRecords(); + public abstract int getNumRecords() throws IOException; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index ab800288dcb4..20bb963c6084 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -71,7 +71,7 @@ public int getNumRecords() { } @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext()); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index a524c4790407..03db1a74e111 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -47,55 +47,49 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen private int numRecords; private int numRecordsRemaining; - private byte[] arr = new byte[1024 * 1024]; + private byte[] arr = new byte[1024]; private Object baseObject = arr; private final TaskContext taskContext = TaskContext.get(); + private final SerializerManager serManager; + private final File dataFile; + private final BlockId blkId; + private boolean initialized; public UnsafeSorterSpillReader( SerializerManager serializerManager, File file, BlockId blockId) throws IOException { assert (file.length() > 0); - final ConfigEntry bufferSizeConfigEntry = - package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE(); - // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe. - final int DEFAULT_BUFFER_SIZE_BYTES = - ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue(); - int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES : - ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue(); - - final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get( - package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED()); - - final InputStream bs = - new NioBufferedFileInputStream(file, bufferSizeBytes); - try { - if (readAheadEnabled) { - this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), - bufferSizeBytes); - } else { - this.in = serializerManager.wrapStream(blockId, bs); - } - this.din = new DataInputStream(this.in); - numRecords = numRecordsRemaining = din.readInt(); - } catch (IOException e) { - Closeables.close(bs, /* swallowIOException = */ true); - throw e; - } + serManager = serializerManager; + dataFile = file; + blkId = blockId; + initialized = false; } @Override - public int getNumRecords() { + public int getNumRecords() throws IOException { + if (!initialized) { + readSpilledFile(); + initialized = true; + } return numRecords; } @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { + if (!initialized) { + readSpilledFile(); + initialized = true; + } return (numRecordsRemaining > 0); } @Override public void loadNext() throws IOException { + if (!initialized) { + readSpilledFile(); + initialized = true; + } // Kill the task in case it has been marked as killed. This logic is from // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order // to avoid performance overhead. This check is added here in `loadNext()` instead of in @@ -148,4 +142,34 @@ public void close() throws IOException { } } } + + private void readSpilledFile() throws IOException { + assert (dataFile.length() > 0); + final ConfigEntry bufferSizeConfigEntry = + package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE(); + // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe. + final int DEFAULT_BUFFER_SIZE_BYTES = + ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue(); + int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES : + ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue(); + + final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get( + package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED()); + + final InputStream bs = + new NioBufferedFileInputStream(dataFile, bufferSizeBytes); + try { + if (readAheadEnabled) { + this.in = new ReadAheadInputStream(serManager.wrapStream(blkId, bs), + bufferSizeBytes); + } else { + this.in = serManager.wrapStream(blkId, bs); + } + this.din = new DataInputStream(this.in); + numRecords = numRecordsRemaining = din.readInt(); + } catch (IOException e) { + Closeables.close(bs, /* swallowIOException = */ true); + throw e; + } + } } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 2b8a0602730e..e7945f99a926 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -51,7 +52,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset, } @Test - public void testSortingEmptyInput() { + public void testSortingEmptyInput() throws IOException { final TaskMemoryManager memoryManager = new TaskMemoryManager( new TestMemoryManager( new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt index 4f50a894e5c0..881b3e4e73f2 100644 --- a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt @@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m UnsafeExternalSorter 12 12 0 13.8 72.7 1.0X ExternalAppendOnlyUnsafeRowArray 8 8 0 19.8 50.6 1.4X - +Java HotSpot(TM) 64-Bit Server VM 11.0.7+8-LTS on Linux 4.4.0-178-generic +Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz +Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +UnsafeSorterSpillReader_bufferSize1024 231 342 82 1.1 901.6 1.0X \ No newline at end of file diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt index c4be80af1334..989380214e49 100644 --- a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt +++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt @@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m UnsafeExternalSorter 11 11 1 14.7 68.0 1.0X ExternalAppendOnlyUnsafeRowArray 9 10 1 17.1 58.5 1.2X - +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux 4.4.0-178-generic +Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz +Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +UnsafeSorterSpillReader_bufferSize1024 411 426 13 0.6 1607.2 1.0X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index ac282ea2e94f..eb63e5f44440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -95,7 +95,8 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( // inside `UnsafeExternalSorter` spillableArray.cleanupResources() spillableArray = null - } else if (inMemoryBuffer != null) { + } + if (inMemoryBuffer != null) { inMemoryBuffer.clear() } numFieldsPerRow = 0 @@ -124,18 +125,6 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( numRowsSpillThreshold, false) - // populate with existing in-memory buffered rows - if (inMemoryBuffer != null) { - inMemoryBuffer.foreach(existingUnsafeRow => - spillableArray.insertRecord( - existingUnsafeRow.getBaseObject, - existingUnsafeRow.getBaseOffset, - existingUnsafeRow.getSizeInBytes, - 0, - false) - ) - inMemoryBuffer.clear() - } numFieldsPerRow = unsafeRow.numFields() } @@ -168,7 +157,15 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( if (spillableArray == null) { new InMemoryBufferIterator(startIndex) } else { - new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow) + val offsetIndex = if (inMemoryBuffer != null && startIndex > inMemoryBuffer.length) { + startIndex - inMemoryBuffer.length + } else { + 0 + } + new SpilledArrayMergeIterator( + spillableArray.getIterator(offsetIndex), + numFieldsPerRow, + startIndex) } } @@ -204,20 +201,37 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( } } - private[this] class SpillableArrayIterator( + private[this] class SpilledArrayMergeIterator( iterator: UnsafeSorterIterator, - numFieldPerRow: Int) + numFieldPerRow: Int, + startIndex: Int) extends ExternalAppendOnlyUnsafeRowArrayIterator { - private val currentRow = new UnsafeRow(numFieldPerRow) + private var currentIndex = startIndex - override def hasNext(): Boolean = !isModified() && iterator.hasNext + private val currentSorterRow = new UnsafeRow(numFieldPerRow) + + override def hasNext(): Boolean = { + if (inMemoryBuffer != null && currentIndex < inMemoryBuffer.length) { + !isModified() + } else { + !isModified() && iterator.hasNext + } + } override def next(): UnsafeRow = { throwExceptionIfModified() - iterator.loadNext() - currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength) - currentRow + if (inMemoryBuffer != null && currentIndex < inMemoryBuffer.length) { + val result = inMemoryBuffer(currentIndex) + currentIndex += 1 + result + } else { + iterator.loadNext() + currentSorterRow.pointTo(iterator.getBaseObject, + iterator.getBaseOffset, + iterator.getRecordLength) + currentSorterRow + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index bc6adfb857b0..2d27c4340c92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -614,7 +614,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest } test("Window spill with more than the inMemoryThreshold and spillThreshold") { - val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value") + val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4"), (1, "5"), (2, "6")).toDF("key", "value") val window = Window.partitionBy($"key").orderBy($"value") withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", @@ -628,7 +628,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest test("SPARK-21258: complex object in combination with spilling") { // Make sure we trigger the spilling path. withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", - SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") { + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "16") { val sampleSchema = new StructType(). add("f0", StringType). add("f1", LongType). diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 0869e25674e6..be24dd3ca32c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -182,6 +182,47 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { } } + def testAgainstUnsafeSorterSpillReader( + numSpillThreshold: Int, + numRows: Int, + numIterators: Int, + iterations: Int): Unit = { + val rows = testRows(numRows) + val benchmark = new Benchmark(s"Spilling SpillReader with $numRows rows", iterations * numRows, + output = output) + + benchmark.addCase("UnsafeSorterSpillReader_bufferSize1024") { _: Int => + val array = UnsafeExternalSorter.create( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + null, + null, + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numSpillThreshold, + false) + + rows.foreach(x => + array.insertRecord( + x.getBaseObject, + x.getBaseOffset, + x.getSizeInBytes, + 0, + false)) + + for (_ <- 0L until numIterators) { + array.getIterator(0) + } + array.cleanupResources() + } + + withFakeTaskContext { + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("WITHOUT SPILL") { val spillThreshold = 100 * 1000 @@ -194,6 +235,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) testAgainstRawUnsafeExternalSorter( config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) + testAgainstUnsafeSorterSpillReader(5 * 1000, 16 * 1000, 100 * 1000, 1 << 4) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index 98aba3ba25f1..735368533f1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -169,8 +169,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar // Add more rows to trigger switch to [[UnsafeExternalSorter]] and cause a spill to happen. // Verify that spill has happened - populateRows(array, 2, expectedValues) - assert(array.length == inMemoryThreshold + 1) + populateRows(array, 12, expectedValues) + assert(array.length == inMemoryThreshold + 11) assertSpill() val iterator2 = validateData(array, expectedValues) @@ -202,7 +202,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar } test("generate iterator with start index exceeding array's size (without spill)") { - val (inMemoryThreshold, spillThreshold) = (20, 100) + val (inMemoryThreshold, spillThreshold) = (60, 100) withExternalArray(inMemoryThreshold, spillThreshold) { array => populateRows(array, spillThreshold / 2)