Skip to content

Commit 2e91826

Browse files
committed
Sort-merge join operator spilling performance improvements
What changes were proposed in this pull request? The following list of changes will improve SQL execution performance when data is spilled on the disk: 1) Implement lazy initialization of UnsafeSorterSpillReader - iterator on top of spilled rows: ... During SortMergeJoin (Left Semi Join) execution, the iterator on the spill data is created but no iteration over the data is done. ... Having lazy initialization of UnsafeSorterSpillReader will enable efficient processing of SortMergeJoin even if data is spilled onto disk. Unnecessary I/O will be avoided. 2) Decrease initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB: ... UnsafeSorterSpillReader constructor takes lot of time due to size of default 1MB memory read buffer. ... The code already has logic to increase the memory read buffer if it cannot fit the data, so decreasing the size to 1K is safe and has positive performance impact. 3) Improve memory utilization when spilling is enabled in ExternalAppendOnlyUnsafeRowArrey ... In the current implementation, when spilling is enabled, UnsafeExternalSorter object is created and then data moved from ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in the memory with the same data. That require double memory and there is duplication of data. This can be avoided. ... In the proposed solution, when spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached adding new rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter object is created and new rows are added into this object. ExternalAppendOnlyUnsafeRowArray object retains all rows already added into this object. This approach will enable better memory utilization and avoid unnecessary movement of data from one object into another. Why are the changes needed? Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs (example query 14) are not able to run even with extremely large Spark executor memory. Spark spilling feature has to be enabled, in order to be able to process these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled. The test of this solution with query 14 and enabled spilling on the disk, showed 500X performance improvements and it didn�t degrade performance of the other SQLs from TPC-DS benchmark. Does this PR introduce any user-facing change? No How was this patch tested? By running TPC-DS SQLs with different data sets 10 TB and 100 TB By running all Spark tests.
1 parent 8a926e4 commit 2e91826

File tree

12 files changed

+175
-77
lines changed

12 files changed

+175
-77
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,10 @@ public Location next() {
323323
return loc;
324324
} else {
325325
assert(reader != null);
326-
if (!reader.hasNext()) {
327-
advanceToNextPage();
328-
}
329326
try {
327+
if (!reader.hasNext()) {
328+
advanceToNextPage();
329+
}
330330
reader.loadNext();
331331
} catch (IOException e) {
332332
try {

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ class SpillableIterator extends UnsafeSorterIterator {
506506
private boolean loaded = false;
507507
private int numRecords = 0;
508508

509-
SpillableIterator(UnsafeSorterIterator inMemIterator) {
509+
SpillableIterator(UnsafeSorterIterator inMemIterator) throws IOException {
510510
this.upstream = inMemIterator;
511511
this.numRecords = inMemIterator.getNumRecords();
512512
}
@@ -681,31 +681,24 @@ static class ChainedIterator extends UnsafeSorterIterator {
681681
ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
682682
assert iterators.size() > 0;
683683
this.numRecords = 0;
684-
for (UnsafeSorterIterator iter: iterators) {
685-
this.numRecords += iter.getNumRecords();
686-
}
687684
this.iterators = iterators;
688-
this.current = iterators.remove();
689685
}
690686

691687
@Override
692-
public int getNumRecords() {
688+
public int getNumRecords() throws IOException {
689+
initializeNumRecords();
693690
return numRecords;
694691
}
695692

696693
@Override
697-
public boolean hasNext() {
698-
while (!current.hasNext() && !iterators.isEmpty()) {
699-
current = iterators.remove();
700-
}
694+
public boolean hasNext() throws IOException {
695+
nextIterator();
701696
return current.hasNext();
702697
}
703698

704699
@Override
705700
public void loadNext() throws IOException {
706-
while (!current.hasNext() && !iterators.isEmpty()) {
707-
current = iterators.remove();
708-
}
701+
nextIterator();
709702
current.loadNext();
710703
}
711704

@@ -720,5 +713,21 @@ public void loadNext() throws IOException {
720713

721714
@Override
722715
public long getKeyPrefix() { return current.getKeyPrefix(); }
716+
717+
private void initializeNumRecords() throws IOException {
718+
if (numRecords == 0) {
719+
for (UnsafeSorterIterator iter: iterators) {
720+
numRecords += iter.getNumRecords();
721+
}
722+
this.current = iterators.remove();
723+
}
724+
}
725+
726+
private void nextIterator() throws IOException {
727+
initializeNumRecords();
728+
while (!current.hasNext() && !iterators.isEmpty()) {
729+
current = iterators.remove();
730+
}
731+
}
723732
}
724733
}

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public abstract class UnsafeSorterIterator {
2323

24-
public abstract boolean hasNext();
24+
public abstract boolean hasNext() throws IOException;
2525

2626
public abstract void loadNext() throws IOException;
2727

@@ -33,5 +33,5 @@ public abstract class UnsafeSorterIterator {
3333

3434
public abstract long getKeyPrefix();
3535

36-
public abstract int getNumRecords();
36+
public abstract int getNumRecords() throws IOException;
3737
}

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public int getNumRecords() {
7171
}
7272

7373
@Override
74-
public boolean hasNext() {
74+
public boolean hasNext() throws IOException {
7575
return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext());
7676
}
7777

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,55 +47,49 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
4747
private int numRecords;
4848
private int numRecordsRemaining;
4949

50-
private byte[] arr = new byte[1024 * 1024];
50+
private byte[] arr = new byte[1024];
5151
private Object baseObject = arr;
5252
private final TaskContext taskContext = TaskContext.get();
53+
private final SerializerManager serManager;
54+
private final File dataFile;
55+
private final BlockId blkId;
56+
private boolean initialized;
5357

5458
public UnsafeSorterSpillReader(
5559
SerializerManager serializerManager,
5660
File file,
5761
BlockId blockId) throws IOException {
5862
assert (file.length() > 0);
59-
final ConfigEntry<Object> bufferSizeConfigEntry =
60-
package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
61-
// This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
62-
final int DEFAULT_BUFFER_SIZE_BYTES =
63-
((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
64-
int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
65-
((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();
66-
67-
final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
68-
package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());
69-
70-
final InputStream bs =
71-
new NioBufferedFileInputStream(file, bufferSizeBytes);
72-
try {
73-
if (readAheadEnabled) {
74-
this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
75-
bufferSizeBytes);
76-
} else {
77-
this.in = serializerManager.wrapStream(blockId, bs);
78-
}
79-
this.din = new DataInputStream(this.in);
80-
numRecords = numRecordsRemaining = din.readInt();
81-
} catch (IOException e) {
82-
Closeables.close(bs, /* swallowIOException = */ true);
83-
throw e;
84-
}
63+
serManager = serializerManager;
64+
dataFile = file;
65+
blkId = blockId;
66+
initialized = false;
8567
}
8668

8769
@Override
88-
public int getNumRecords() {
70+
public int getNumRecords() throws IOException {
71+
if (!initialized) {
72+
readSpilledFile();
73+
initialized = true;
74+
}
8975
return numRecords;
9076
}
9177

9278
@Override
93-
public boolean hasNext() {
79+
public boolean hasNext() throws IOException {
80+
if (!initialized) {
81+
readSpilledFile();
82+
initialized = true;
83+
}
9484
return (numRecordsRemaining > 0);
9585
}
9686

9787
@Override
9888
public void loadNext() throws IOException {
89+
if (!initialized) {
90+
readSpilledFile();
91+
initialized = true;
92+
}
9993
// Kill the task in case it has been marked as killed. This logic is from
10094
// InterruptibleIterator, but we inline it here instead of wrapping the iterator in order
10195
// to avoid performance overhead. This check is added here in `loadNext()` instead of in
@@ -148,4 +142,34 @@ public void close() throws IOException {
148142
}
149143
}
150144
}
145+
146+
private void readSpilledFile() throws IOException {
147+
assert (dataFile.length() > 0);
148+
final ConfigEntry<Object> bufferSizeConfigEntry =
149+
package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
150+
// This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
151+
final int DEFAULT_BUFFER_SIZE_BYTES =
152+
((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
153+
int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
154+
((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();
155+
156+
final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
157+
package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());
158+
159+
final InputStream bs =
160+
new NioBufferedFileInputStream(dataFile, bufferSizeBytes);
161+
try {
162+
if (readAheadEnabled) {
163+
this.in = new ReadAheadInputStream(serManager.wrapStream(blkId, bs),
164+
bufferSizeBytes);
165+
} else {
166+
this.in = serManager.wrapStream(blkId, bs);
167+
}
168+
this.din = new DataInputStream(this.in);
169+
numRecords = numRecordsRemaining = din.readInt();
170+
} catch (IOException e) {
171+
Closeables.close(bs, /* swallowIOException = */ true);
172+
throw e;
173+
}
174+
}
151175
}

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.util.collection.unsafe.sort;
1919

20+
import java.io.IOException;
2021
import java.nio.charset.StandardCharsets;
2122
import java.util.Arrays;
2223

@@ -51,7 +52,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset,
5152
}
5253

5354
@Test
54-
public void testSortingEmptyInput() {
55+
public void testSortingEmptyInput() throws IOException {
5556
final TaskMemoryManager memoryManager = new TaskMemoryManager(
5657
new TestMemoryManager(
5758
new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);

sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk11-results.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m
4242
UnsafeExternalSorter 12 12 0 13.8 72.7 1.0X
4343
ExternalAppendOnlyUnsafeRowArray 8 8 0 19.8 50.6 1.4X
4444

45-
45+
Java HotSpot(TM) 64-Bit Server VM 11.0.7+8-LTS on Linux 4.4.0-178-generic
46+
Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
47+
Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
48+
------------------------------------------------------------------------------------------------------------------------
49+
UnsafeSorterSpillReader_bufferSize1024 231 342 82 1.1 901.6 1.0X

sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m
4242
UnsafeExternalSorter 11 11 1 14.7 68.0 1.0X
4343
ExternalAppendOnlyUnsafeRowArray 9 10 1 17.1 58.5 1.2X
4444

45-
45+
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux 4.4.0-178-generic
46+
Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
47+
Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
48+
------------------------------------------------------------------------------------------------------------------------
49+
UnsafeSorterSpillReader_bufferSize1024 411 426 13 0.6 1607.2 1.0X

sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
9595
// inside `UnsafeExternalSorter`
9696
spillableArray.cleanupResources()
9797
spillableArray = null
98-
} else if (inMemoryBuffer != null) {
98+
}
99+
if (inMemoryBuffer != null) {
99100
inMemoryBuffer.clear()
100101
}
101102
numFieldsPerRow = 0
@@ -124,18 +125,6 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
124125
numRowsSpillThreshold,
125126
false)
126127

127-
// populate with existing in-memory buffered rows
128-
if (inMemoryBuffer != null) {
129-
inMemoryBuffer.foreach(existingUnsafeRow =>
130-
spillableArray.insertRecord(
131-
existingUnsafeRow.getBaseObject,
132-
existingUnsafeRow.getBaseOffset,
133-
existingUnsafeRow.getSizeInBytes,
134-
0,
135-
false)
136-
)
137-
inMemoryBuffer.clear()
138-
}
139128
numFieldsPerRow = unsafeRow.numFields()
140129
}
141130

@@ -168,7 +157,15 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
168157
if (spillableArray == null) {
169158
new InMemoryBufferIterator(startIndex)
170159
} else {
171-
new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow)
160+
val offsetIndex = if (startIndex > inMemoryBuffer.length) {
161+
startIndex - inMemoryBuffer.length
162+
} else {
163+
0
164+
}
165+
new SpilledArrayMergeIterator(
166+
spillableArray.getIterator(offsetIndex),
167+
numFieldsPerRow,
168+
startIndex)
172169
}
173170
}
174171

@@ -204,20 +201,37 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
204201
}
205202
}
206203

207-
private[this] class SpillableArrayIterator(
204+
private[this] class SpilledArrayMergeIterator(
208205
iterator: UnsafeSorterIterator,
209-
numFieldPerRow: Int)
206+
numFieldPerRow: Int,
207+
startIndex: Int)
210208
extends ExternalAppendOnlyUnsafeRowArrayIterator {
211209

212-
private val currentRow = new UnsafeRow(numFieldPerRow)
210+
private var currentIndex = startIndex
213211

214-
override def hasNext(): Boolean = !isModified() && iterator.hasNext
212+
private val currentSorterRow = new UnsafeRow(numFieldPerRow)
213+
214+
override def hasNext(): Boolean = {
215+
if (currentIndex < inMemoryBuffer.length) {
216+
!isModified()
217+
} else {
218+
!isModified() && iterator.hasNext
219+
}
220+
}
215221

216222
override def next(): UnsafeRow = {
217223
throwExceptionIfModified()
218-
iterator.loadNext()
219-
currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength)
220-
currentRow
224+
if (currentIndex < inMemoryBuffer.length) {
225+
val result = inMemoryBuffer(currentIndex)
226+
currentIndex += 1
227+
result
228+
} else {
229+
iterator.loadNext()
230+
currentSorterRow.pointTo(iterator.getBaseObject,
231+
iterator.getBaseOffset,
232+
iterator.getRecordLength)
233+
currentSorterRow
234+
}
221235
}
222236
}
223237
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
606606
}
607607

608608
test("Window spill with more than the inMemoryThreshold and spillThreshold") {
609-
val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
609+
val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4"), (1, "5"), (2, "6")).toDF("key", "value")
610610
val window = Window.partitionBy($"key").orderBy($"value")
611611

612612
withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
@@ -620,7 +620,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
620620
test("SPARK-21258: complex object in combination with spilling") {
621621
// Make sure we trigger the spilling path.
622622
withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
623-
SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
623+
SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "16") {
624624
val sampleSchema = new StructType().
625625
add("f0", StringType).
626626
add("f1", LongType).

0 commit comments

Comments
 (0)