Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public UnsafeShuffleExternalSorter(
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();

// preserve first page to ensure that we have at least one page to work with. Otherwise,
// other operators in the same task may starve this sorter (SPARK-9709).
acquireNewPageIfNecessary(pageSizeBytes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M

// In certain join operations, prepare can be called on the same partition multiple times.
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]

/**
* Prepare a partition for a single call to compute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,9 @@ public void testPeakMemoryUsed() throws Exception {
for (int i = 0; i < numRecordsPerPage * 10; i++) {
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
newPeakMemory = writer.getPeakMemoryUsedBytes();
if (i % numRecordsPerPage == 0) {
// We allocated a new page for this record, so peak memory should change
if (i % numRecordsPerPage == 0 && i != 0) {
// The first page is allocated in constructor, another page will be allocated after
// every numRecordsPerPage records (peak memory should change).
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
assertEquals(previousPeakMemory, newPeakMemory);
Expand Down