Skip to content

Commit 234acbd

Browse files
author
Andrew Or
committed
Reserve a page in sorter when preparing each partition
1 parent b889e08 commit 234acbd

File tree

2 files changed

+52
-19
lines changed

2 files changed

+52
-19
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ private UnsafeExternalSorter(
138138
this.inMemSorter = existingInMemorySorter;
139139
}
140140

141+
// Acquire a new page as soon as we construct the sorter to ensure that we have at
142+
// least one page to work with. Otherwise, other operators in the same task may starve
143+
// this sorter (SPARK-9709).
144+
acquireNewPage();
145+
141146
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
142147
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
143148
// does not fully consume the sorter's output (e.g. sort followed by limit).
@@ -343,22 +348,32 @@ private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
343348
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
344349
pageSizeBytes + ")");
345350
} else {
346-
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
347-
if (memoryAcquired < pageSizeBytes) {
348-
shuffleMemoryManager.release(memoryAcquired);
349-
spill();
350-
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
351-
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
352-
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
353-
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
354-
}
355-
}
356-
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
357-
currentPagePosition = currentPage.getBaseOffset();
358-
freeSpaceInCurrentPage = pageSizeBytes;
359-
allocatedPages.add(currentPage);
351+
acquireNewPage();
352+
}
353+
}
354+
}
355+
356+
/**
357+
* Acquire a new page from the {@link ShuffleMemoryManager}.
358+
*
359+
* If there is not enough space to allocate the new page, spill all existing ones
360+
* and try again. If there is still not enough space, report error to the caller.
361+
*/
362+
private void acquireNewPage() throws IOException {
363+
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
364+
if (memoryAcquired < pageSizeBytes) {
365+
shuffleMemoryManager.release(memoryAcquired);
366+
spill();
367+
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
368+
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
369+
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
370+
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
360371
}
361372
}
373+
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
374+
currentPagePosition = currentPage.getBaseOffset();
375+
freeSpaceInCurrentPage = pageSizeBytes;
376+
allocatedPages.add(currentPage);
362377
}
363378

364379
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.{InternalAccumulator, TaskContext}
21-
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.errors._
2424
import org.apache.spark.sql.catalyst.expressions._
@@ -123,7 +123,12 @@ case class TungstenSort(
123123
val schema = child.schema
124124
val childOutput = child.output
125125
val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
126-
child.execute().mapPartitions({ iter =>
126+
127+
/**
128+
* Set up the sorter in each partition before computing the parent partition.
129+
* This makes sure our sorter is not starved by other sorters used in the same task.
130+
*/
131+
def preparePartition(): UnsafeExternalRowSorter = {
127132
val ordering = newOrdering(sortOrder, childOutput)
128133

129134
// The comparator for comparing prefix
@@ -143,12 +148,25 @@ case class TungstenSort(
143148
if (testSpillFrequency > 0) {
144149
sorter.setTestSpillFrequency(testSpillFrequency)
145150
}
146-
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
147-
val taskContext = TaskContext.get()
151+
sorter
152+
}
153+
154+
/** Compute a partition using the sorter already set up previously. */
155+
def executePartition(
156+
taskContext: TaskContext,
157+
partitionIndex: Int,
158+
sorter: UnsafeExternalRowSorter,
159+
parentIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
160+
val sortedIterator = sorter.sort(parentIterator.asInstanceOf[Iterator[UnsafeRow]])
148161
taskContext.internalMetricsToAccumulators(
149162
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)
150163
sortedIterator
151-
}, preservesPartitioning = true)
164+
}
165+
166+
// Note: we need to set up the external sorter in each partition before computing
167+
// the parent partition, so we cannot simply use `mapPartitions` here.
168+
new MapPartitionsWithPreparationRDD[InternalRow, InternalRow, UnsafeExternalRowSorter](
169+
child.execute(), preparePartition, executePartition, preservesPartitioning = true)
152170
}
153171

154172
}

0 commit comments

Comments
 (0)