Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ public Location next() {
return loc;
} else {
assert(reader != null);
if (!reader.hasNext()) {
advanceToNextPage();
}
try {
if (!reader.hasNext()) {
advanceToNextPage();
}
reader.loadNext();
} catch (IOException e) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ class SpillableIterator extends UnsafeSorterIterator {
private boolean loaded = false;
private int numRecords = 0;

SpillableIterator(UnsafeSorterIterator inMemIterator) {
SpillableIterator(UnsafeSorterIterator inMemIterator) throws IOException {
this.upstream = inMemIterator;
this.numRecords = inMemIterator.getNumRecords();
}
Expand Down Expand Up @@ -681,31 +681,24 @@ static class ChainedIterator extends UnsafeSorterIterator {
ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
assert iterators.size() > 0;
this.numRecords = 0;
for (UnsafeSorterIterator iter: iterators) {
this.numRecords += iter.getNumRecords();
}
this.iterators = iterators;
this.current = iterators.remove();
}

@Override
public int getNumRecords() {
public int getNumRecords() throws IOException {
initializeNumRecords();
return numRecords;
}

@Override
public boolean hasNext() {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
}
public boolean hasNext() throws IOException {
nextIterator();
return current.hasNext();
}

@Override
public void loadNext() throws IOException {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
}
nextIterator();
current.loadNext();
}

Expand All @@ -720,5 +713,21 @@ public void loadNext() throws IOException {

@Override
public long getKeyPrefix() { return current.getKeyPrefix(); }

private void initializeNumRecords() throws IOException {
if (numRecords == 0) {
for (UnsafeSorterIterator iter: iterators) {
numRecords += iter.getNumRecords();
}
this.current = iterators.remove();
}
}

private void nextIterator() throws IOException {
initializeNumRecords();
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public abstract class UnsafeSorterIterator {

public abstract boolean hasNext();
public abstract boolean hasNext() throws IOException;

public abstract void loadNext() throws IOException;

Expand All @@ -33,5 +33,5 @@ public abstract class UnsafeSorterIterator {

public abstract long getKeyPrefix();

public abstract int getNumRecords();
public abstract int getNumRecords() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public int getNumRecords() {
}

@Override
public boolean hasNext() {
public boolean hasNext() throws IOException {
return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,55 +47,49 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
private int numRecords;
private int numRecordsRemaining;

private byte[] arr = new byte[1024 * 1024];
private byte[] arr = new byte[1024];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this number is performance-senstive, could we parameterize it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I am looking into this. Thank you

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this look good? Perhaps you have some suggestion.

private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_RATIO =
ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size.ratio")
.doc("The multiplication ratio is the parameter that controls the initial read buffer " +
"size. The multiplication ratio value range is from 1 to 1024. This parameter increases " +
"the initial read buffer size in 1KB increments. It will result in the initial buffer " +
"size in the range from 1KB to 1MB. The read buffer size is dynamically adjusted " +
"afterward based on data length read from the spilled file.")
.intConf
.checkValue(v => 1 <= v && v <= DEFAULT_BUFFER_SIZE_RATIO,
s"The value must be in allowed range [1, ${DEFAULT_BUFFER_SIZE_RATIO}].")
.createWithDefault(DEFAULT_BUFFER_SIZE_RATIO)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you apply the change into this PR, first? Otherwise, its hard to leave comments line-by-line...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP. I will push update soon. I am testing changes with 10TB benchmarks.

private Object baseObject = arr;
private final TaskContext taskContext = TaskContext.get();
private final SerializerManager serManager;
private final File dataFile;
private final BlockId blkId;
private boolean initialized;

public UnsafeSorterSpillReader(
SerializerManager serializerManager,
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
final ConfigEntry<Object> bufferSizeConfigEntry =
package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
// This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
final int DEFAULT_BUFFER_SIZE_BYTES =
((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();

final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());

final InputStream bs =
new NioBufferedFileInputStream(file, bufferSizeBytes);
try {
if (readAheadEnabled) {
this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
bufferSizeBytes);
} else {
this.in = serializerManager.wrapStream(blockId, bs);
}
this.din = new DataInputStream(this.in);
numRecords = numRecordsRemaining = din.readInt();
} catch (IOException e) {
Closeables.close(bs, /* swallowIOException = */ true);
throw e;
}
serManager = serializerManager;
dataFile = file;
blkId = blockId;
initialized = false;
}

@Override
public int getNumRecords() {
public int getNumRecords() throws IOException {
if (!initialized) {
readSpilledFile();
initialized = true;
}
return numRecords;
}

@Override
public boolean hasNext() {
public boolean hasNext() throws IOException {
if (!initialized) {
readSpilledFile();
initialized = true;
}
return (numRecordsRemaining > 0);
}

@Override
public void loadNext() throws IOException {
if (!initialized) {
readSpilledFile();
initialized = true;
}
// Kill the task in case it has been marked as killed. This logic is from
// InterruptibleIterator, but we inline it here instead of wrapping the iterator in order
// to avoid performance overhead. This check is added here in `loadNext()` instead of in
Expand Down Expand Up @@ -148,4 +142,34 @@ public void close() throws IOException {
}
}
}

private void readSpilledFile() throws IOException {
assert (dataFile.length() > 0);
final ConfigEntry<Object> bufferSizeConfigEntry =
package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
// This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
final int DEFAULT_BUFFER_SIZE_BYTES =
((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();

final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());

final InputStream bs =
new NioBufferedFileInputStream(dataFile, bufferSizeBytes);
try {
if (readAheadEnabled) {
this.in = new ReadAheadInputStream(serManager.wrapStream(blkId, bs),
bufferSizeBytes);
} else {
this.in = serManager.wrapStream(blkId, bs);
}
this.din = new DataInputStream(this.in);
numRecords = numRecordsRemaining = din.readInt();
} catch (IOException e) {
Closeables.close(bs, /* swallowIOException = */ true);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.util.collection.unsafe.sort;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

Expand Down Expand Up @@ -51,7 +52,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset,
}

@Test
public void testSortingEmptyInput() {
public void testSortingEmptyInput() throws IOException {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
new TestMemoryManager(
new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m
UnsafeExternalSorter 12 12 0 13.8 72.7 1.0X
ExternalAppendOnlyUnsafeRowArray 8 8 0 19.8 50.6 1.4X


Java HotSpot(TM) 64-Bit Server VM 11.0.7+8-LTS on Linux 4.4.0-178-generic
Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UnsafeSorterSpillReader_bufferSize1024 231 342 82 1.1 901.6 1.0X
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ Spilling with 10000 rows: Best Time(ms) Avg Time(ms) Stdev(m
UnsafeExternalSorter 11 11 1 14.7 68.0 1.0X
ExternalAppendOnlyUnsafeRowArray 9 10 1 17.1 58.5 1.2X


OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~16.04-b09 on Linux 4.4.0-178-generic
Intel(R) Xeon(R) CPU E5-2687W v3 @ 3.10GHz
Spilling SpillReader with 16000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UnsafeSorterSpillReader_bufferSize1024 411 426 13 0.6 1607.2 1.0X
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
// inside `UnsafeExternalSorter`
spillableArray.cleanupResources()
spillableArray = null
} else if (inMemoryBuffer != null) {
}
if (inMemoryBuffer != null) {
inMemoryBuffer.clear()
}
numFieldsPerRow = 0
Expand Down Expand Up @@ -124,18 +125,6 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
numRowsSpillThreshold,
false)

// populate with existing in-memory buffered rows
if (inMemoryBuffer != null) {
inMemoryBuffer.foreach(existingUnsafeRow =>
spillableArray.insertRecord(
existingUnsafeRow.getBaseObject,
existingUnsafeRow.getBaseOffset,
existingUnsafeRow.getSizeInBytes,
0,
false)
)
inMemoryBuffer.clear()
}
numFieldsPerRow = unsafeRow.numFields()
}

Expand Down Expand Up @@ -168,7 +157,15 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
new SpillableArrayIterator(spillableArray.getIterator(startIndex), numFieldsPerRow)
val offsetIndex = if (inMemoryBuffer != null && startIndex > inMemoryBuffer.length) {
startIndex - inMemoryBuffer.length
} else {
0
}
new SpilledArrayMergeIterator(
spillableArray.getIterator(offsetIndex),
numFieldsPerRow,
startIndex)
}
}

Expand Down Expand Up @@ -204,20 +201,37 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
}
}

private[this] class SpillableArrayIterator(
private[this] class SpilledArrayMergeIterator(
iterator: UnsafeSorterIterator,
numFieldPerRow: Int)
numFieldPerRow: Int,
startIndex: Int)
extends ExternalAppendOnlyUnsafeRowArrayIterator {

private val currentRow = new UnsafeRow(numFieldPerRow)
private var currentIndex = startIndex

override def hasNext(): Boolean = !isModified() && iterator.hasNext
private val currentSorterRow = new UnsafeRow(numFieldPerRow)

override def hasNext(): Boolean = {
if (inMemoryBuffer != null && currentIndex < inMemoryBuffer.length) {
!isModified()
} else {
!isModified() && iterator.hasNext
}
}

override def next(): UnsafeRow = {
throwExceptionIfModified()
iterator.loadNext()
currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength)
currentRow
if (inMemoryBuffer != null && currentIndex < inMemoryBuffer.length) {
val result = inMemoryBuffer(currentIndex)
currentIndex += 1
result
} else {
iterator.loadNext()
currentSorterRow.pointTo(iterator.getBaseObject,
iterator.getBaseOffset,
iterator.getRecordLength)
currentSorterRow
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}

test("Window spill with more than the inMemoryThreshold and spillThreshold") {
val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4"), (1, "5"), (2, "6")).toDF("key", "value")
val window = Window.partitionBy($"key").orderBy($"value")

withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
Expand All @@ -628,7 +628,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
test("SPARK-21258: complex object in combination with spilling") {
// Make sure we trigger the spilling path.
withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "16") {
val sampleSchema = new StructType().
add("f0", StringType).
add("f1", LongType).
Expand Down
Loading