1717
1818package org .apache .spark .shuffle .unsafe ;
1919
20+ import javax .annotation .Nullable ;
2021import java .io .File ;
2122import java .io .IOException ;
2223import java .util .LinkedList ;
23- import javax .annotation .Nullable ;
2424
2525import scala .Tuple2 ;
2626
3434import org .apache .spark .serializer .DummySerializerInstance ;
3535import org .apache .spark .serializer .SerializerInstance ;
3636import org .apache .spark .shuffle .ShuffleMemoryManager ;
37- import org .apache .spark .storage .*;
37+ import org .apache .spark .storage .BlockManager ;
38+ import org .apache .spark .storage .DiskBlockObjectWriter ;
39+ import org .apache .spark .storage .TempShuffleBlockId ;
3840import org .apache .spark .unsafe .PlatformDependent ;
41+ import org .apache .spark .unsafe .array .ByteArrayMethods ;
3942import org .apache .spark .unsafe .memory .MemoryBlock ;
4043import org .apache .spark .unsafe .memory .TaskMemoryManager ;
4144import org .apache .spark .util .Utils ;
@@ -68,7 +71,7 @@ final class UnsafeShuffleExternalSorter {
6871 private final int pageSizeBytes ;
6972 @ VisibleForTesting
7073 final int maxRecordSizeBytes ;
71- private final TaskMemoryManager memoryManager ;
74+ private final TaskMemoryManager taskMemoryManager ;
7275 private final ShuffleMemoryManager shuffleMemoryManager ;
7376 private final BlockManager blockManager ;
7477 private final TaskContext taskContext ;
@@ -91,7 +94,7 @@ final class UnsafeShuffleExternalSorter {
9194 private long peakMemoryUsedBytes ;
9295
9396 // These variables are reset after spilling:
94- @ Nullable private UnsafeShuffleInMemorySorter sorter ;
97+ @ Nullable private UnsafeShuffleInMemorySorter inMemSorter ;
9598 @ Nullable private MemoryBlock currentPage = null ;
9699 private long currentPagePosition = -1 ;
97100 private long freeSpaceInCurrentPage = 0 ;
@@ -105,7 +108,7 @@ public UnsafeShuffleExternalSorter(
105108 int numPartitions ,
106109 SparkConf conf ,
107110 ShuffleWriteMetrics writeMetrics ) throws IOException {
108- this .memoryManager = memoryManager ;
111+ this .taskMemoryManager = memoryManager ;
109112 this .shuffleMemoryManager = shuffleMemoryManager ;
110113 this .blockManager = blockManager ;
111114 this .taskContext = taskContext ;
@@ -115,8 +118,7 @@ public UnsafeShuffleExternalSorter(
115118 // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
116119 this .fileBufferSizeBytes = (int ) conf .getSizeAsKb ("spark.shuffle.file.buffer" , "32k" ) * 1024 ;
117120 this .pageSizeBytes = (int ) Math .min (
118- PackedRecordPointer .MAXIMUM_PAGE_SIZE_BYTES ,
119- conf .getSizeAsBytes ("spark.buffer.pageSize" , "64m" ));
121+ PackedRecordPointer .MAXIMUM_PAGE_SIZE_BYTES , shuffleMemoryManager .pageSizeBytes ());
120122 this .maxRecordSizeBytes = pageSizeBytes - 4 ;
121123 this .writeMetrics = writeMetrics ;
122124 initializeForWriting ();
@@ -134,7 +136,7 @@ private void initializeForWriting() throws IOException {
134136 throw new IOException ("Could not acquire " + memoryRequested + " bytes of memory" );
135137 }
136138
137- this .sorter = new UnsafeShuffleInMemorySorter (initialSize );
139+ this .inMemSorter = new UnsafeShuffleInMemorySorter (initialSize );
138140 }
139141
140142 /**
@@ -161,7 +163,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
161163
162164 // This call performs the actual sort.
163165 final UnsafeShuffleInMemorySorter .UnsafeShuffleSorterIterator sortedRecords =
164- sorter .getSortedIterator ();
166+ inMemSorter .getSortedIterator ();
165167
166168 // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
167169 // after SPARK-5581 is fixed.
@@ -207,8 +209,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
207209 }
208210
209211 final long recordPointer = sortedRecords .packedRecordPointer .getRecordPointer ();
210- final Object recordPage = memoryManager .getPage (recordPointer );
211- final long recordOffsetInPage = memoryManager .getOffsetInPage (recordPointer );
212+ final Object recordPage = taskMemoryManager .getPage (recordPointer );
213+ final long recordOffsetInPage = taskMemoryManager .getOffsetInPage (recordPointer );
212214 int dataRemaining = PlatformDependent .UNSAFE .getInt (recordPage , recordOffsetInPage );
213215 long recordReadPosition = recordOffsetInPage + 4 ; // skip over record length
214216 while (dataRemaining > 0 ) {
@@ -270,9 +272,9 @@ void spill() throws IOException {
270272 spills .size () > 1 ? " times" : " time" );
271273
272274 writeSortedFile (false );
273- final long sorterMemoryUsage = sorter .getMemoryUsage ();
274- sorter = null ;
275- shuffleMemoryManager .release (sorterMemoryUsage );
275+ final long inMemSorterMemoryUsage = inMemSorter .getMemoryUsage ();
276+ inMemSorter = null ;
277+ shuffleMemoryManager .release (inMemSorterMemoryUsage );
276278 final long spillSize = freeMemory ();
277279 taskContext .taskMetrics ().incMemoryBytesSpilled (spillSize );
278280
@@ -284,7 +286,7 @@ private long getMemoryUsage() {
284286 for (MemoryBlock page : allocatedPages ) {
285287 totalPageSize += page .size ();
286288 }
287- return ((sorter == null ) ? 0 : sorter .getMemoryUsage ()) + totalPageSize ;
289+ return ((inMemSorter == null ) ? 0 : inMemSorter .getMemoryUsage ()) + totalPageSize ;
288290 }
289291
290292 private void updatePeakMemoryUsed () {
@@ -306,7 +308,7 @@ private long freeMemory() {
306308 updatePeakMemoryUsed ();
307309 long memoryFreed = 0 ;
308310 for (MemoryBlock block : allocatedPages ) {
309- memoryManager .freePage (block );
311+ taskMemoryManager .freePage (block );
310312 shuffleMemoryManager .release (block .size ());
311313 memoryFreed += block .size ();
312314 }
@@ -320,54 +322,53 @@ private long freeMemory() {
320322 /**
321323 * Force all memory and spill files to be deleted; called by shuffle error-handling code.
322324 */
323- public void cleanupAfterError () {
325+ public void cleanupResources () {
324326 freeMemory ();
325327 for (SpillInfo spill : spills ) {
326328 if (spill .file .exists () && !spill .file .delete ()) {
327329 logger .error ("Unable to delete spill file {}" , spill .file .getPath ());
328330 }
329331 }
330- if (sorter != null ) {
331- shuffleMemoryManager .release (sorter .getMemoryUsage ());
332- sorter = null ;
332+ if (inMemSorter != null ) {
333+ shuffleMemoryManager .release (inMemSorter .getMemoryUsage ());
334+ inMemSorter = null ;
333335 }
334336 }
335337
336338 /**
337- * Checks whether there is enough space to insert a new record into the sorter.
338- *
339- * @param requiredSpace the required space in the data page, in bytes, including space for storing
340- * the record size.
341-
342- * @return true if the record can be inserted without requiring more allocations, false otherwise.
343- */
344- private boolean haveSpaceForRecord (int requiredSpace ) {
345- assert (requiredSpace > 0 );
346- return (sorter .hasSpaceForAnotherRecord () && (requiredSpace <= freeSpaceInCurrentPage ));
347- }
348-
349- /**
350- * Allocates more memory in order to insert an additional record. This will request additional
351- * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
352- * obtained.
353- *
354- * @param requiredSpace the required space in the data page, in bytes, including space for storing
355- * the record size.
339+ * Checks whether there is enough space to insert an additional record in to the sort pointer
340+ * array and grows the array if additional space is required. If the required space cannot be
341+ * obtained, then the in-memory data will be spilled to disk.
356342 */
357- private void allocateSpaceForRecord (int requiredSpace ) throws IOException {
358- if (!sorter .hasSpaceForAnotherRecord ()) {
343+ private void growPointerArrayIfNecessary () throws IOException {
344+ assert (inMemSorter != null );
345+ if (!inMemSorter .hasSpaceForAnotherRecord ()) {
359346 logger .debug ("Attempting to expand sort pointer array" );
360- final long oldPointerArrayMemoryUsage = sorter .getMemoryUsage ();
347+ final long oldPointerArrayMemoryUsage = inMemSorter .getMemoryUsage ();
361348 final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2 ;
362349 final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryToGrowPointerArray );
363350 if (memoryAcquired < memoryToGrowPointerArray ) {
364351 shuffleMemoryManager .release (memoryAcquired );
365352 spill ();
366353 } else {
367- sorter .expandPointerArray ();
354+ inMemSorter .expandPointerArray ();
368355 shuffleMemoryManager .release (oldPointerArrayMemoryUsage );
369356 }
370357 }
358+ }
359+
360+ /**
361+ * Allocates more memory in order to insert an additional record. This will request additional
362+ * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
363+ * obtained.
364+ *
365+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
366+ * the record size. This must be less than or equal to the page size (records
367+ * that exceed the page size are handled via a different code path which uses
368+ * special overflow pages).
369+ */
370+ private void acquireNewPageIfNecessary (int requiredSpace ) throws IOException {
371+ growPointerArrayIfNecessary ();
371372 if (requiredSpace > freeSpaceInCurrentPage ) {
372373 logger .trace ("Required space {} is less than free space in current page ({})" , requiredSpace ,
373374 freeSpaceInCurrentPage );
@@ -388,7 +389,7 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
388389 throw new IOException ("Unable to acquire " + pageSizeBytes + " bytes of memory" );
389390 }
390391 }
391- currentPage = memoryManager .allocatePage (pageSizeBytes );
392+ currentPage = taskMemoryManager .allocatePage (pageSizeBytes );
392393 currentPagePosition = currentPage .getBaseOffset ();
393394 freeSpaceInCurrentPage = pageSizeBytes ;
394395 allocatedPages .add (currentPage );
@@ -404,27 +405,58 @@ public void insertRecord(
404405 long recordBaseOffset ,
405406 int lengthInBytes ,
406407 int partitionId ) throws IOException {
408+
409+ growPointerArrayIfNecessary ();
407410 // Need 4 bytes to store the record length.
408411 final int totalSpaceRequired = lengthInBytes + 4 ;
409- if (!haveSpaceForRecord (totalSpaceRequired )) {
410- allocateSpaceForRecord (totalSpaceRequired );
412+
413+ // --- Figure out where to insert the new record ----------------------------------------------
414+
415+ final MemoryBlock dataPage ;
416+ long dataPagePosition ;
417+ boolean useOverflowPage = totalSpaceRequired > pageSizeBytes ;
418+ if (useOverflowPage ) {
419+ long overflowPageSize = ByteArrayMethods .roundNumberOfBytesToNearestWord (totalSpaceRequired );
420+ // The record is larger than the page size, so allocate a special overflow page just to hold
421+ // that record.
422+ final long memoryGranted = shuffleMemoryManager .tryToAcquire (overflowPageSize );
423+ if (memoryGranted != overflowPageSize ) {
424+ shuffleMemoryManager .release (memoryGranted );
425+ spill ();
426+ final long memoryGrantedAfterSpill = shuffleMemoryManager .tryToAcquire (overflowPageSize );
427+ if (memoryGrantedAfterSpill != overflowPageSize ) {
428+ shuffleMemoryManager .release (memoryGrantedAfterSpill );
429+ throw new IOException ("Unable to acquire " + overflowPageSize + " bytes of memory" );
430+ }
431+ }
432+ MemoryBlock overflowPage = taskMemoryManager .allocatePage (overflowPageSize );
433+ allocatedPages .add (overflowPage );
434+ dataPage = overflowPage ;
435+ dataPagePosition = overflowPage .getBaseOffset ();
436+ } else {
437+ // The record is small enough to fit in a regular data page, but the current page might not
438+ // have enough space to hold it (or no pages have been allocated yet).
439+ acquireNewPageIfNecessary (totalSpaceRequired );
440+ dataPage = currentPage ;
441+ dataPagePosition = currentPagePosition ;
442+ // Update bookkeeping information
443+ freeSpaceInCurrentPage -= totalSpaceRequired ;
444+ currentPagePosition += totalSpaceRequired ;
411445 }
446+ final Object dataPageBaseObject = dataPage .getBaseObject ();
412447
413448 final long recordAddress =
414- memoryManager .encodePageNumberAndOffset (currentPage , currentPagePosition );
415- final Object dataPageBaseObject = currentPage .getBaseObject ();
416- PlatformDependent .UNSAFE .putInt (dataPageBaseObject , currentPagePosition , lengthInBytes );
417- currentPagePosition += 4 ;
418- freeSpaceInCurrentPage -= 4 ;
449+ taskMemoryManager .encodePageNumberAndOffset (dataPage , dataPagePosition );
450+ PlatformDependent .UNSAFE .putInt (dataPageBaseObject , dataPagePosition , lengthInBytes );
451+ dataPagePosition += 4 ;
419452 PlatformDependent .copyMemory (
420453 recordBaseObject ,
421454 recordBaseOffset ,
422455 dataPageBaseObject ,
423- currentPagePosition ,
456+ dataPagePosition ,
424457 lengthInBytes );
425- currentPagePosition += lengthInBytes ;
426- freeSpaceInCurrentPage -= lengthInBytes ;
427- sorter .insertRecord (recordAddress , partitionId );
458+ assert (inMemSorter != null );
459+ inMemSorter .insertRecord (recordAddress , partitionId );
428460 }
429461
430462 /**
@@ -436,14 +468,14 @@ public void insertRecord(
436468 */
437469 public SpillInfo [] closeAndGetSpills () throws IOException {
438470 try {
439- if (sorter != null ) {
471+ if (inMemSorter != null ) {
440472 // Do not count the final file towards the spill count.
441473 writeSortedFile (true );
442474 freeMemory ();
443475 }
444476 return spills .toArray (new SpillInfo [spills .size ()]);
445477 } catch (IOException e ) {
446- cleanupAfterError ();
478+ cleanupResources ();
447479 throw e ;
448480 }
449481 }
0 commit comments