Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
}
}

inMemSorter.reset();

if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
Expand Down Expand Up @@ -255,6 +253,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {

writeSortedFile(false);
final long spillSize = freeMemory();
inMemSorter.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to move this call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to reset the pointer array only after freeing up the memory pages holding records. Otherwise it might happen that the task might not get memory for the pointer array if it is already holding a lot of memory.

// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this comment into reset()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, comment in ShuffleExternalSorter makes it easier to get the context and understand. Also in future, if some one tries to move this call, he will not do so, seeing the comment. If the comment is in the reset() function, someone might inadvertently move this call without seeing the comment in reset() function. However, if you have a strong opinion about it, I would gladly move the comment into reset(). Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I don't have strong opinion on it.

// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
// we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
*/
private int pos = 0;

private int initialSize;

ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
this.consumer = consumer;
assert (initialSize > 0);
this.initialSize = initialSize;
this.array = consumer.allocateArray(initialSize);
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
}
Expand All @@ -70,6 +73,10 @@ public int numRecords() {
}

public void reset() {
if (consumer != null) {
consumer.freeArray(array);
this.array = consumer.allocateArray(initialSize);
}
pos = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();

inMemSorter.reset();
}

final long spillSize = freeMemory();
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
// we might not be able to get memory for the pointer array.

taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

return spillSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
*/
private int pos = 0;

private long initialSize;

public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
Expand All @@ -102,6 +104,7 @@ public UnsafeInMemorySorter(
LongArray array) {
this.consumer = consumer;
this.memoryManager = memoryManager;
this.initialSize = array.size();
if (recordComparator != null) {
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
Expand All @@ -123,6 +126,10 @@ public void free() {
}

public void reset() {
if (consumer != null) {
consumer.freeArray(array);
this.array = consumer.allocateArray(initialSize);
}
pos = 0;
}

Expand Down