From d0ada7b5c6f8be9a91624e4eb5a6ca2242edf16b Mon Sep 17 00:00:00 2001
From: Davies Liu
Date: Mon, 26 Oct 2015 15:29:49 -0700
Subject: [PATCH 01/17] force spilling
---
.../apache/spark/memory/MemoryConsumer.java | 100 +++++
.../spark/memory/TaskMemoryManager.java | 106 ++++-
.../shuffle/sort/ShuffleExternalSorter.java | 171 ++------
.../shuffle/sort/ShuffleInMemorySorter.java | 6 +-
.../shuffle/sort/UnsafeShuffleWriter.java | 2 +-
.../spark/unsafe/map/BytesToBytesMap.java | 371 +++++++++--------
.../unsafe/sort/UnsafeExternalSorter.java | 391 +++++++++---------
.../unsafe/sort/UnsafeInMemorySorter.java | 14 +-
.../unsafe/sort/UnsafeSorterSpillReader.java | 6 +-
.../unsafe/sort/UnsafeSorterSpillWriter.java | 2 +-
.../sort/PackedRecordPointerSuite.java | 16 +-
.../sort/UnsafeShuffleWriterSuite.java | 11 +-
.../map/AbstractBytesToBytesMapSuite.java | 144 ++++++-
.../sort/UnsafeExternalSorterSuite.java | 49 +--
.../memory/GrantEverythingMemoryManager.scala | 21 +-
.../execution/UnsafeExternalRowSorter.java | 2 +-
.../UnsafeFixedWidthAggregationMap.java | 2 +-
.../sql/execution/UnsafeKVExternalSorter.java | 9 +-
.../UnsafeFixedWidthAggregationMapSuite.scala | 10 +-
.../UnsafeKVExternalSorterSuite.scala | 2 +-
.../TungstenAggregationIteratorSuite.scala | 54 ---
.../unsafe/memory/HeapMemoryAllocator.java | 9 +-
.../unsafe/memory/UnsafeMemoryAllocator.java | 3 -
23 files changed, 861 insertions(+), 640 deletions(-)
create mode 100644 core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
new file mode 100644
index 0000000000000..e308c4b74a579
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+
+import java.io.IOException;
+
+import org.apache.spark.unsafe.memory.MemoryBlock;
+
+
+/**
+ * An memory consumer of TaskMemoryManager, which support spilling.
+ */
+public class MemoryConsumer {
+
+ private TaskMemoryManager memoryManager;
+ private long pageSize;
+
+ protected MemoryConsumer(TaskMemoryManager memoryManager, long pageSize) {
+ this.memoryManager = memoryManager;
+ this.pageSize = pageSize;
+ }
+
+ protected MemoryConsumer(TaskMemoryManager memoryManager) {
+ this(memoryManager, memoryManager.pageSizeBytes());
+ }
+
+ /**
+ * Spill some data to disk to release memory, which will be called by TaskMemoryManager
+ * when there is not enough memory for the task.
+ *
+ * @param size the amount of memory should be released
+ * @return the amount of released memory in bytes
+ * @throws IOException
+ */
+ public long spill(long size) throws IOException {
+ return 0L;
+ }
+
+ /**
+ * Acquire `size` bytes memory.
+ *
+ * If there is not enough memory, throws IOException.
+ *
+ * @throws IOException
+ */
+ protected void acquireMemory(long size) throws IOException {
+ long got = memoryManager.acquireExecutionMemory(size, this);
+ if (got < size) {
+ throw new IOException("Could not acquire " + size + " bytes of memory " + got);
+ }
+ }
+
+ /**
+ * Release amount of memory.
+ */
+ protected void releaseMemory(long size) {
+ memoryManager.releaseExecutionMemory(size, this);
+ }
+
+ /**
+ * Allocate a memory block with at least `required` bytes.
+ *
+ * Throws IOException if there is not enough memory.
+ *
+ * @throws IOException
+ */
+ protected MemoryBlock allocatePage(long required) throws IOException {
+ MemoryBlock page = memoryManager.allocatePage(Math.max(pageSize, required), this);
+ if (page == null || page.size() < required) {
+ if (page != null) {
+ freePage(page);
+ }
+ throw new IOException("Unable to acquire " + required + " bytes of memory");
+ }
+ return page;
+ }
+
+ /**
+ * Free a memory block.
+ */
+ protected void freePage(MemoryBlock page) {
+ memoryManager.freePage(page, this);
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 7b31c90dac666..4f7e98ace60d1 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -17,7 +17,9 @@
package org.apache.spark.memory;
-import java.util.*;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -100,6 +102,11 @@ public class TaskMemoryManager {
*/
private final boolean inHeap;
+ /**
+ * The size of memory granted to each consumer.
+ */
+ private HashMap consumers;
+
/**
* Construct a new TaskMemoryManager.
*/
@@ -107,6 +114,7 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
+ this.consumers = new HashMap<>();
}
/**
@@ -117,6 +125,46 @@ public long acquireExecutionMemory(long size) {
return memoryManager.acquireExecutionMemory(size, taskAttemptId);
}
+ /**
+ * Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
+ * spill() of consumers to release more memory.
+ *
+ * @return number of bytes successfully granted (<= N).
+ */
+ public long acquireExecutionMemory(long size, MemoryConsumer consumer) throws IOException {
+ synchronized (this) {
+ long got = acquireExecutionMemory(size);
+
+ if (got < size && consumer != null) {
+ // call spill() on itself to release some memory
+ consumer.spill(size - got);
+ got += acquireExecutionMemory(size - got);
+
+ if (got < size) {
+ long needed = size - got;
+ // call spill() on other consumers to release memory
+ for (MemoryConsumer c: consumers.keySet()) {
+ if (c != consumer) {
+ needed -= c.spill(size - got);
+ if (needed < 0) {
+ break;
+ }
+ }
+ }
+ got += acquireExecutionMemory(size - got);
+ }
+ }
+
+ long old = 0L;
+ if (consumers.containsKey(consumer)) {
+ old = consumers.get(consumer);
+ }
+ consumers.put(consumer, got + old);
+
+ return got;
+ }
+ }
+
/**
* Release N bytes of execution memory.
*/
@@ -124,6 +172,28 @@ public void releaseExecutionMemory(long size) {
memoryManager.releaseExecutionMemory(size, taskAttemptId);
}
+ /**
+ * Release N bytes of execution memory for a MemoryConsumer.
+ */
+ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
+ synchronized (this) {
+ if (consumer != null && consumers.containsKey(consumer)) {
+ long old = consumers.get(consumer);
+ if (old > size) {
+ consumers.put(consumer, old - size);
+ } else {
+ if (old < size) {
+ // TODO
+ }
+ consumers.remove(consumer);
+ }
+ } else {
+ // TODO
+ }
+ memoryManager.releaseExecutionMemory(size, taskAttemptId);
+ }
+ }
+
public long pageSizeBytes() {
return memoryManager.pageSizeBytes();
}
@@ -134,12 +204,27 @@ public long pageSizeBytes() {
*
* Returns `null` if there was not enough memory to allocate the page.
*/
- public MemoryBlock allocatePage(long size) {
+ public MemoryBlock allocatePage(long size) throws IOException {
+ return allocatePage(size, null);
+ }
+
+ /**
+ * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
+ * intended for allocating large blocks of Tungsten memory that will be shared between operators.
+ *
+ * Returns `null` if there was not enough memory to allocate the page.
+ */
+ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) throws IOException {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
+ long acquired = acquireExecutionMemory(size, consumer);
+ if (acquired <= 0) {
+ return null;
+ }
+
final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
@@ -149,14 +234,6 @@ public MemoryBlock allocatePage(long size) {
}
allocatedPages.set(pageNumber);
}
- final long acquiredExecutionMemory = acquireExecutionMemory(size);
- if (acquiredExecutionMemory != size) {
- releaseExecutionMemory(acquiredExecutionMemory);
- synchronized (this) {
- allocatedPages.clear(pageNumber);
- }
- return null;
- }
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
@@ -170,6 +247,13 @@ public MemoryBlock allocatePage(long size) {
* Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
*/
public void freePage(MemoryBlock page) {
+ freePage(page, null);
+ }
+
+ /**
+ * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
+ */
+ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
assert (page.pageNumber != -1) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
assert(allocatedPages.get(page.pageNumber));
@@ -182,7 +266,7 @@ public void freePage(MemoryBlock page) {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
- releaseExecutionMemory(pageSize);
+ releaseExecutionMemory(pageSize, consumer);
}
/**
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index f43236f41ae7b..69ac697a03511 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -31,15 +31,15 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
/**
@@ -58,14 +58,13 @@
* spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
* specialized merge procedure that avoids extra serialization/deserialization.
*/
-final class ShuffleExternalSorter {
+final class ShuffleExternalSorter extends MemoryConsumer {
private final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
@VisibleForTesting
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
- private final int initialSize;
private final int numPartitions;
private final int pageSizeBytes;
@VisibleForTesting
@@ -98,8 +97,7 @@ final class ShuffleExternalSorter {
// These variables are reset after spilling:
@Nullable private ShuffleInMemorySorter inMemSorter;
@Nullable private MemoryBlock currentPage = null;
- private long currentPagePosition = -1;
- private long freeSpaceInCurrentPage = 0;
+ private long pageCursor = -1;
public ShuffleExternalSorter(
TaskMemoryManager memoryManager,
@@ -109,10 +107,10 @@ public ShuffleExternalSorter(
int numPartitions,
SparkConf conf,
ShuffleWriteMetrics writeMetrics) throws IOException {
+ super(memoryManager);
this.taskMemoryManager = memoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
- this.initialSize = initialSize;
this.peakMemoryUsedBytes = initialSize;
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
@@ -123,25 +121,7 @@ public ShuffleExternalSorter(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, taskMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
- initializeForWriting();
-
- // preserve first page to ensure that we have at least one page to work with. Otherwise,
- // other operators in the same task may starve this sorter (SPARK-9709).
- acquireNewPageIfNecessary(pageSizeBytes);
- }
-
- /**
- * Allocates new sort data structures. Called when creating the sorter and after each spill.
- */
- private void initializeForWriting() throws IOException {
- // TODO: move this sizing calculation logic into a static method of sorter:
- final long memoryRequested = initialSize * 8L;
- final long memoryAcquired = taskMemoryManager.acquireExecutionMemory(memoryRequested);
- if (memoryAcquired != memoryRequested) {
- taskMemoryManager.releaseExecutionMemory(memoryAcquired);
- throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
- }
-
+ acquireMemory(initialSize * 8L);
this.inMemSorter = new ShuffleInMemorySorter(initialSize);
numRecordsInsertedSinceLastSpill = 0;
}
@@ -242,6 +222,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
}
}
+ inMemSorter.reset();
+
if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -267,8 +249,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
* Sort and spill the current records in response to memory pressure.
*/
@VisibleForTesting
- void spill() throws IOException {
- assert(inMemSorter != null);
+ @Override
+ public long spill(long size) throws IOException {
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
@@ -276,13 +258,9 @@ void spill() throws IOException {
spills.size() > 1 ? " times" : " time");
writeSortedFile(false);
- final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
- inMemSorter = null;
- taskMemoryManager.releaseExecutionMemory(inMemSorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
-
- initializeForWriting();
+ return spillSize;
}
private long getMemoryUsage() {
@@ -312,18 +290,12 @@ private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
- taskMemoryManager.freePage(block);
+ freePage(block);
memoryFreed += block.size();
}
- if (inMemSorter != null) {
- long sorterMemoryUsage = inMemSorter.getMemoryUsage();
- inMemSorter = null;
- taskMemoryManager.releaseExecutionMemory(sorterMemoryUsage);
- }
allocatedPages.clear();
currentPage = null;
- currentPagePosition = -1;
- freeSpaceInCurrentPage = 0;
+ pageCursor = 0;
return memoryFreed;
}
@@ -332,16 +304,16 @@ private long freeMemory() {
*/
public void cleanupResources() {
freeMemory();
+ if (inMemSorter != null) {
+ long sorterMemoryUsage = inMemSorter.getMemoryUsage();
+ inMemSorter = null;
+ releaseMemory(sorterMemoryUsage);
+ }
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
logger.error("Unable to delete spill file {}", spill.file.getPath());
}
}
- if (inMemSorter != null) {
- long sorterMemoryUsage = inMemSorter.getMemoryUsage();
- inMemSorter = null;
- taskMemoryManager.releaseExecutionMemory(sorterMemoryUsage);
- }
}
/**
@@ -354,15 +326,8 @@ private void growPointerArrayIfNecessary() throws IOException {
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
- final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
- final long memoryAcquired = taskMemoryManager.acquireExecutionMemory(memoryToGrowPointerArray);
- if (memoryAcquired < memoryToGrowPointerArray) {
- taskMemoryManager.releaseExecutionMemory(memoryAcquired);
- spill();
- } else {
- inMemSorter.expandPointerArray();
- taskMemoryManager.releaseExecutionMemory(oldPointerArrayMemoryUsage);
- }
+ acquireMemory(oldPointerArrayMemoryUsage); //TODO actual memory may be less
+ inMemSorter.expandPointerArray();
}
}
@@ -370,93 +335,42 @@ private void growPointerArrayIfNecessary() throws IOException {
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the memory manager and spill if the requested memory can not be obtained.
*
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
+ * @param required the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
* that exceed the page size are handled via a different code path which uses
* special overflow pages).
*/
- private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
+ private void acquireNewPageIfNecessary(int required) throws IOException {
growPointerArrayIfNecessary();
- if (requiredSpace > freeSpaceInCurrentPage) {
- logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
- freeSpaceInCurrentPage);
- // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
- // without using the free space at the end of the current page. We should also do this for
- // BytesToBytesMap.
- if (requiredSpace > pageSizeBytes) {
- throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
- pageSizeBytes + ")");
- } else {
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- spill();
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
- }
- }
- currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = pageSizeBytes;
- allocatedPages.add(currentPage);
- }
+ if (currentPage == null ||
+ pageCursor + required > currentPage.getBaseOffset() + currentPage.size() ) {
+ // TODO: try to find space in previous pages
+ currentPage = allocatePage(pageSizeBytes);
+ pageCursor = currentPage.getBaseOffset();
+ allocatedPages.add(currentPage);
}
}
/**
* Write a record to the shuffle sorter.
*/
- public void insertRecord(
- Object recordBaseObject,
- long recordBaseOffset,
- int lengthInBytes,
- int partitionId) throws IOException {
+ public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
+ throws IOException {
+ // for tests
if (numRecordsInsertedSinceLastSpill > numElementsForSpillThreshold) {
- spill();
+ spill(0);
}
- growPointerArrayIfNecessary();
- // Need 4 bytes to store the record length.
- final int totalSpaceRequired = lengthInBytes + 4;
-
- // --- Figure out where to insert the new record ----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
- long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
- // The record is larger than the page size, so allocate a special overflow page just to hold
- // that record.
- MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- spill();
- overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
- }
- }
- allocatedPages.add(overflowPage);
- dataPage = overflowPage;
- dataPagePosition = overflowPage.getBaseOffset();
- } else {
- // The record is small enough to fit in a regular data page, but the current page might not
- // have enough space to hold it (or no pages have been allocated yet).
- acquireNewPageIfNecessary(totalSpaceRequired);
- dataPage = currentPage;
- dataPagePosition = currentPagePosition;
- // Update bookkeeping information
- freeSpaceInCurrentPage -= totalSpaceRequired;
- currentPagePosition += totalSpaceRequired;
- }
- final Object dataPageBaseObject = dataPage.getBaseObject();
-
- final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
- Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
- dataPagePosition += 4;
- Platform.copyMemory(
- recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes);
+ final int required = length + 4;
+ acquireNewPageIfNecessary(required);
+
+ final Object base = currentPage.getBaseObject();
+ final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
+ Platform.putInt(base, pageCursor, length);
+ pageCursor += 4;
+ Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
+ pageCursor += length;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, partitionId);
numRecordsInsertedSinceLastSpill += 1;
@@ -475,6 +389,9 @@ public SpillInfo[] closeAndGetSpills() throws IOException {
// Do not count the final file towards the spill count.
writeSortedFile(true);
freeMemory();
+ long sorterMemoryUsage = inMemSorter.getMemoryUsage();
+ inMemSorter = null;
+ releaseMemory(sorterMemoryUsage);
}
return spills.toArray(new SpillInfo[spills.size()]);
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index a8dee6c6101c1..ca18e510e8c62 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -50,6 +50,10 @@ public ShuffleInMemorySorter(int initialSize) {
this.sorter = new Sorter(ShuffleSortDataFormat.INSTANCE);
}
+ public void reset() {
+ pointerArrayInsertPosition = 0;
+ }
+
public void expandPointerArray() {
final long[] oldArray = pointerArray;
// Guard against overflow:
@@ -59,7 +63,7 @@ public void expandPointerArray() {
}
public boolean hasSpaceForAnotherRecord() {
- return pointerArrayInsertPosition + 1 < pointerArray.length;
+ return pointerArrayInsertPosition < pointerArray.length;
}
public long getMemoryUsage() {
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index f6c5c944bd77b..fcb3f3064794c 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -245,7 +245,7 @@ void insertRecordIntoSorter(Product2 record) throws IOException {
@VisibleForTesting
void forceSorterToSpill() throws IOException {
assert (sorter != null);
- sorter.spill();
+ sorter.spill(Long.MAX_VALUE);
}
/**
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index f035bdac810bd..42c6ac34dc22c 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -18,14 +18,19 @@
package org.apache.spark.unsafe.map;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
@@ -33,7 +38,8 @@
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
-import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader;
+import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
/**
* An append-only hash map where keys and values are contiguous regions of bytes.
@@ -54,7 +60,7 @@
* is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
-public final class BytesToBytesMap {
+public final class BytesToBytesMap extends MemoryConsumer {
private final Logger logger = LoggerFactory.getLogger(BytesToBytesMap.class);
@@ -62,27 +68,22 @@ public final class BytesToBytesMap {
private static final HashMapGrowthStrategy growthStrategy = HashMapGrowthStrategy.DOUBLING;
- /**
- * Special record length that is placed after the last record in a data page.
- */
- private static final int END_OF_PAGE_MARKER = -1;
-
private final TaskMemoryManager taskMemoryManager;
/**
* A linked list for tracking all allocated data pages so that we can free all of our memory.
*/
- private final List dataPages = new LinkedList();
+ private final LinkedList dataPages = new LinkedList<>();
/**
* The data page that will be used to store keys and values for new hashtable entries. When this
* page becomes full, a new page will be allocated and this pointer will change to point to that
* new page.
*/
- private MemoryBlock currentDataPage = null;
+ private MemoryBlock currentPage = null;
/**
- * Offset into `currentDataPage` that points to the location where new data can be inserted into
+ * Offset into `currentPage` that points to the location where new data can be inserted into
* the page. This does not incorporate the page's base offset.
*/
private long pageCursor = 0;
@@ -164,13 +165,19 @@ public final class BytesToBytesMap {
private long peakMemoryUsedBytes = 0L;
+ private final BlockManager blockManager;
+ private MapIterator destructiveIterator = null;
+
public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
+ BlockManager blockManager,
int initialCapacity,
double loadFactor,
long pageSizeBytes,
boolean enablePerfMetrics) {
+ super(taskMemoryManager, pageSizeBytes);
this.taskMemoryManager = taskMemoryManager;
+ this.blockManager = blockManager;
this.loadFactor = loadFactor;
this.loc = new Location();
this.pageSizeBytes = pageSizeBytes;
@@ -187,18 +194,13 @@ public BytesToBytesMap(
TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
}
allocate(initialCapacity);
-
- // Acquire a new page as soon as we construct the map to ensure that we have at least
- // one page to work with. Otherwise, other operators in the same task may starve this
- // map (SPARK-9747).
- acquireNewPage();
}
public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
int initialCapacity,
long pageSizeBytes) {
- this(taskMemoryManager, initialCapacity, 0.70, pageSizeBytes, false);
+ this(taskMemoryManager, initialCapacity, pageSizeBytes, false);
}
public BytesToBytesMap(
@@ -208,6 +210,7 @@ public BytesToBytesMap(
boolean enablePerfMetrics) {
this(
taskMemoryManager,
+ SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
initialCapacity,
0.70,
pageSizeBytes,
@@ -219,61 +222,146 @@ public BytesToBytesMap(
*/
public int numElements() { return numElements; }
- public static final class BytesToBytesMapIterator implements Iterator {
+ public final class MapIterator implements Iterator {
- private final int numRecords;
- private final Iterator dataPagesIterator;
+ private int numRecords;
private final Location loc;
private MemoryBlock currentPage = null;
- private int currentRecordNumber = 0;
+ private int recordsInPage = 0;
private Object pageBaseObject;
private long offsetInPage;
// If this iterator destructive or not. When it is true, it frees each page as it moves onto
// next one.
private boolean destructive = false;
- private BytesToBytesMap bmap;
+ private LinkedList spillWriters =
+ new LinkedList();
+ private UnsafeSorterSpillReader reader = null;
- private BytesToBytesMapIterator(
- int numRecords, Iterator dataPagesIterator, Location loc,
- boolean destructive, BytesToBytesMap bmap) {
+ private MapIterator(int numRecords, Location loc, boolean destructive) {
this.numRecords = numRecords;
- this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
this.destructive = destructive;
- this.bmap = bmap;
- if (dataPagesIterator.hasNext()) {
- advanceToNextPage();
+ if (destructive) {
+ destructiveIterator = this;
}
}
private void advanceToNextPage() {
- if (destructive && currentPage != null) {
- dataPagesIterator.remove();
- this.bmap.taskMemoryManager.freePage(currentPage);
+ synchronized (this) {
+ int nextIdx = dataPages.indexOf(currentPage) + 1;
+ if (destructive && currentPage != null) {
+ dataPages.remove(currentPage);
+ freePage(currentPage);
+ nextIdx --;
+ }
+ if (dataPages.size() > nextIdx) {
+ currentPage = dataPages.get(nextIdx);
+ pageBaseObject = currentPage.getBaseObject();
+ offsetInPage = currentPage.getBaseOffset();
+ recordsInPage = Platform.getInt(pageBaseObject, offsetInPage);
+ offsetInPage += 4;
+ } else {
+ currentPage = null;
+ try {
+ reader = spillWriters.removeFirst().getReader(blockManager);
+ recordsInPage = -1;
+ } catch (IOException e) {
+ // Scala iterator does not handle exception
+ Platform.throwException(e);
+ }
+ }
}
- currentPage = dataPagesIterator.next();
- pageBaseObject = currentPage.getBaseObject();
- offsetInPage = currentPage.getBaseOffset();
}
@Override
public boolean hasNext() {
- return currentRecordNumber != numRecords;
+ return numRecords > 0;
}
@Override
public Location next() {
- int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
- if (totalLength == END_OF_PAGE_MARKER) {
+ if (recordsInPage == 0) {
advanceToNextPage();
- totalLength = Platform.getInt(pageBaseObject, offsetInPage);
}
- loc.with(currentPage, offsetInPage);
- offsetInPage += 4 + totalLength;
- currentRecordNumber++;
- return loc;
+ numRecords--;
+ if (currentPage != null) {
+ int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
+ loc.with(currentPage, offsetInPage);
+ offsetInPage += 4 + totalLength;
+ recordsInPage --;
+ return loc;
+ } else {
+ assert(reader != null);
+ if (!reader.hasNext()) {
+ advanceToNextPage();
+ }
+ try {
+ reader.loadNext();
+ } catch (IOException e) {
+ // Scala iterator does not handle exception
+ Platform.throwException(e);
+ }
+ loc.with(reader.getBaseObject(), reader.getBaseOffset(), reader.getRecordLength());
+ return loc;
+ }
+ }
+
+ public long spill(long numBytes) throws IOException {
+ synchronized (this) {
+ if (!destructive || dataPages.size() == 1) {
+ return 0L;
+ }
+
+ // TODO: use existing ShuffleWriteMetrics
+ ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+
+ long released = 0L;
+ while (dataPages.size() > 0) {
+ MemoryBlock block = dataPages.getLast();
+ // The currentPage is used, cannot be released
+ if (block == currentPage) {
+ break;
+ }
+
+ Object base = block.getBaseObject();
+ long offset = block.getBaseOffset();
+ int numRecords = Platform.getInt(base, offset);
+ offset += 4;
+ final UnsafeSorterSpillWriter writer =
+ new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
+ while (numRecords-- > 0) {
+ int length = Platform.getInt(base, offset);
+ writer.write(base, offset + 4, length, 0);
+ offset += 4 + length;
+ }
+ writer.close();
+ spillWriters.add(writer);
+// taskContext.addOnCompleteCallback(new AbstractFunction0() {
+// @Override
+// public BoxedUnit apply() {
+// File file = writer.getFile();
+// if (file != null && file.exists()) {
+// if (!file.delete()) {
+// logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
+// }
+// }
+// return null;
+// }
+// });
+
+ dataPages.removeLast();
+ freePage(block);
+ released += block.size();
+
+ if (released > numBytes) {
+ break;
+ }
+ }
+
+ return released;
+ }
}
@Override
@@ -290,8 +378,8 @@ public void remove() {
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
*/
- public BytesToBytesMapIterator iterator() {
- return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, false, this);
+ public MapIterator iterator() {
+ return new MapIterator(numElements, loc, false);
}
/**
@@ -304,8 +392,8 @@ public BytesToBytesMapIterator iterator() {
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
*/
- public BytesToBytesMapIterator destructiveIterator() {
- return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, true, this);
+ public MapIterator destructiveIterator() {
+ return new MapIterator(numElements, loc, true);
}
/**
@@ -314,11 +402,8 @@ public BytesToBytesMapIterator destructiveIterator() {
*
* This function always return the same {@link Location} instance to avoid object allocation.
*/
- public Location lookup(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyRowLengthBytes) {
- safeLookup(keyBaseObject, keyBaseOffset, keyRowLengthBytes, loc);
+ public Location lookup(Object keyBase, long keyOffset, int keyLength) {
+ safeLookup(keyBase, keyOffset, keyLength, loc);
return loc;
}
@@ -327,18 +412,14 @@ public Location lookup(
*
* This is a thread-safe version of `lookup`, could be used by multiple threads.
*/
- public void safeLookup(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyRowLengthBytes,
- Location loc) {
+ public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) {
assert(bitset != null);
assert(longArray != null);
if (enablePerfMetrics) {
numKeyLookups++;
}
- final int hashcode = HASHER.hashUnsafeWords(keyBaseObject, keyBaseOffset, keyRowLengthBytes);
+ final int hashcode = HASHER.hashUnsafeWords(keyBase, keyOffset, keyLength);
int pos = hashcode & mask;
int step = 1;
while (true) {
@@ -354,16 +435,16 @@ public void safeLookup(
if ((int) (stored) == hashcode) {
// Full hash code matches. Let's compare the keys for equality.
loc.with(pos, hashcode, true);
- if (loc.getKeyLength() == keyRowLengthBytes) {
+ if (loc.getKeyLength() == keyLength) {
final MemoryLocation keyAddress = loc.getKeyAddress();
- final Object storedKeyBaseObject = keyAddress.getBaseObject();
- final long storedKeyBaseOffset = keyAddress.getBaseOffset();
+ final Object storedkeyBase = keyAddress.getBaseObject();
+ final long storedkeyOffset = keyAddress.getBaseOffset();
final boolean areEqual = ByteArrayMethods.arrayEquals(
- keyBaseObject,
- keyBaseOffset,
- storedKeyBaseObject,
- storedKeyBaseOffset,
- keyRowLengthBytes
+ keyBase,
+ keyOffset,
+ storedkeyBase,
+ storedkeyOffset,
+ keyLength
);
if (areEqual) {
return;
@@ -410,18 +491,18 @@ private void updateAddressesAndSizes(long fullKeyAddress) {
taskMemoryManager.getOffsetInPage(fullKeyAddress));
}
- private void updateAddressesAndSizes(final Object page, final long offsetInPage) {
- long position = offsetInPage;
- final int totalLength = Platform.getInt(page, position);
+ private void updateAddressesAndSizes(final Object base, final long offset) {
+ long position = offset;
+ final int totalLength = Platform.getInt(base, position);
position += 4;
- keyLength = Platform.getInt(page, position);
+ keyLength = Platform.getInt(base, position);
position += 4;
valueLength = totalLength - keyLength - 4;
- keyMemoryLocation.setObjAndOffset(page, position);
+ keyMemoryLocation.setObjAndOffset(base, position);
position += keyLength;
- valueMemoryLocation.setObjAndOffset(page, position);
+ valueMemoryLocation.setObjAndOffset(base, position);
}
private Location with(int pos, int keyHashcode, boolean isDefined) {
@@ -443,6 +524,19 @@ private Location with(MemoryBlock page, long offsetInPage) {
return this;
}
+ /**
+ + * This is only used for spilling
+ + */
+ private Location with(Object base, long offset, int length) {
+ this.isDefined = true;
+ this.memoryPage = null;
+ keyLength = Platform.getInt(base, offset);
+ valueLength = length - 4 - keyLength;
+ keyMemoryLocation.setObjAndOffset(base, offset + 4);
+ valueMemoryLocation.setObjAndOffset(base, offset + 4 + keyLength);
+ return this;
+ }
+
/**
* Returns the memory page that contains the current record.
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
@@ -517,9 +611,9 @@ public int getValueLength() {
* As an example usage, here's the proper way to store a new key:
*
*
- * Location loc = map.lookup(keyBaseObject, keyBaseOffset, keyLengthInBytes);
+ * Location loc = map.lookup(keyBase, keyOffset, keyLength);
* if (!loc.isDefined()) {
- * if (!loc.putNewKey(keyBaseObject, keyBaseOffset, keyLengthInBytes, ...)) {
+ * if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) {
* // handle failure to grow map (by spilling, for example)
* }
* }
@@ -531,16 +625,11 @@ public int getValueLength() {
* @return true if the put() was successful and false if the put() failed because memory could
* not be acquired.
*/
- public boolean putNewKey(
- Object keyBaseObject,
- long keyBaseOffset,
- int keyLengthBytes,
- Object valueBaseObject,
- long valueBaseOffset,
- int valueLengthBytes) {
+ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
+ Object valueBase, long valueOffset, int valueLength) {
assert (!isDefined) : "Can only set value once for a key";
- assert (keyLengthBytes % 8 == 0);
- assert (valueLengthBytes % 8 == 0);
+ assert (keyLength % 8 == 0);
+ assert (valueLength % 8 == 0);
assert(bitset != null);
assert(longArray != null);
@@ -552,86 +641,32 @@ public boolean putNewKey(
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (value)
- final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;
-
- // --- Figure out where to insert the new record ---------------------------------------------
-
- final MemoryBlock dataPage;
- final Object dataPageBaseObject;
- final long dataPageInsertOffset;
- boolean useOverflowPage = requiredSize > pageSizeBytes - 8;
- if (useOverflowPage) {
- // The record is larger than the page size, so allocate a special overflow page just to hold
- // that record.
- final long overflowPageSize = requiredSize + 8;
- MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- logger.debug("Failed to acquire {} bytes of memory", overflowPageSize);
- return false;
- }
- dataPages.add(overflowPage);
- dataPage = overflowPage;
- dataPageBaseObject = overflowPage.getBaseObject();
- dataPageInsertOffset = overflowPage.getBaseOffset();
- } else if (currentDataPage == null || pageSizeBytes - 8 - pageCursor < requiredSize) {
- // The record can fit in a data page, but either we have not allocated any pages yet or
- // the current page does not have enough space.
- if (currentDataPage != null) {
- // There wasn't enough space in the current page, so write an end-of-page marker:
- final Object pageBaseObject = currentDataPage.getBaseObject();
- final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
- Platform.putInt(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
- }
- if (!acquireNewPage()) {
+ final long recordLength = 8 + keyLength + valueLength;
+ if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
+ if (!acquireNewPage(recordLength + 4L)) {
return false;
}
- dataPage = currentDataPage;
- dataPageBaseObject = currentDataPage.getBaseObject();
- dataPageInsertOffset = currentDataPage.getBaseOffset();
- } else {
- // There is enough space in the current data page.
- dataPage = currentDataPage;
- dataPageBaseObject = currentDataPage.getBaseObject();
- dataPageInsertOffset = currentDataPage.getBaseOffset() + pageCursor;
}
// --- Append the key and value data to the current data page --------------------------------
-
- long insertCursor = dataPageInsertOffset;
-
- // Compute all of our offsets up-front:
- final long recordOffset = insertCursor;
- insertCursor += 4;
- final long keyLengthOffset = insertCursor;
- insertCursor += 4;
- final long keyDataOffsetInPage = insertCursor;
- insertCursor += keyLengthBytes;
- final long valueDataOffsetInPage = insertCursor;
- insertCursor += valueLengthBytes; // word used to store the value size
-
- Platform.putInt(dataPageBaseObject, recordOffset,
- keyLengthBytes + valueLengthBytes + 4);
- Platform.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
- // Copy the key
- Platform.copyMemory(
- keyBaseObject, keyBaseOffset, dataPageBaseObject, keyDataOffsetInPage, keyLengthBytes);
- // Copy the value
- Platform.copyMemory(valueBaseObject, valueBaseOffset, dataPageBaseObject,
- valueDataOffsetInPage, valueLengthBytes);
-
- // --- Update bookeeping data structures -----------------------------------------------------
-
- if (useOverflowPage) {
- // Store the end-of-page marker at the end of the data page
- Platform.putInt(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
- } else {
- pageCursor += requiredSize;
- }
-
+ final Object base = currentPage.getBaseObject();
+ long offset = currentPage.getBaseOffset() + pageCursor;
+ final long recordOffset = offset;
+ Platform.putInt(base, offset, keyLength + valueLength + 4);
+ Platform.putInt(base, offset + 4, keyLength);
+ offset += 8;
+ Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
+ offset += keyLength;
+ Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength);
+
+ // --- Update bookkeeping data structures -----------------------------------------------------
+ offset = currentPage.getBaseOffset();
+ Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
+ pageCursor += recordLength;
numElements++;
bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
- dataPage, recordOffset);
+ currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
@@ -647,16 +682,24 @@ public boolean putNewKey(
* Acquire a new page from the memory manager.
* @return whether there is enough space to allocate the new page.
*/
- private boolean acquireNewPage() {
- MemoryBlock newPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (newPage == null) {
- logger.debug("Failed to acquire {} bytes of memory", pageSizeBytes);
+ private boolean acquireNewPage(long required) {
+ try {
+ currentPage = allocatePage(required);
+ dataPages.add(currentPage);
+ Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
+ pageCursor = 4;
+ return true;
+ } catch (IOException e) {
return false;
}
- dataPages.add(newPage);
- pageCursor = 0;
- currentDataPage = newPage;
- return true;
+ }
+
+ @Override
+ public long spill(long size) throws IOException {
+ if (destructiveIterator != null) {
+ return destructiveIterator.spill(size);
+ }
+ return 0L;
}
/**
@@ -691,7 +734,7 @@ public void free() {
while (dataPagesIterator.hasNext()) {
MemoryBlock dataPage = dataPagesIterator.next();
dataPagesIterator.remove();
- taskMemoryManager.freePage(dataPage);
+ freePage(dataPage);
}
assert(dataPages.isEmpty());
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index e317ea391c556..e554695f4e3e5 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -17,12 +17,11 @@
package org.apache.spark.util.collection.unsafe.sort;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
-import javax.annotation.Nullable;
-
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
@@ -32,24 +31,22 @@
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.storage.BlockManager;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
/**
* External sorter based on {@link UnsafeInMemorySorter}.
*/
-public final class UnsafeExternalSorter {
+public final class UnsafeExternalSorter extends MemoryConsumer {
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
- private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
- private final int initialSize;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
@@ -70,13 +67,13 @@ public final class UnsafeExternalSorter {
// These variables are reset after spilling:
@Nullable private UnsafeInMemorySorter inMemSorter;
- // Whether the in-mem sorter is created internally, or passed in from outside.
- // If it is passed in from outside, we shouldn't release the in-mem sorter's memory.
- private boolean isInMemSorterExternal = false;
+ // The acquired memory for in-memory sorter
+ private long acquiredMem = 0L;
+
private MemoryBlock currentPage = null;
- private long currentPagePosition = -1;
- private long freeSpaceInCurrentPage = 0;
+ private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
+ private SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
TaskMemoryManager taskMemoryManager,
@@ -112,26 +109,26 @@ private UnsafeExternalSorter(
int initialSize,
long pageSizeBytes,
@Nullable UnsafeInMemorySorter existingInMemorySorter) throws IOException {
+ super(taskMemoryManager, pageSizeBytes);
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
this.recordComparator = recordComparator;
this.prefixComparator = prefixComparator;
- this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- this.pageSizeBytes = pageSizeBytes;
+ // TODO: metrics tracking + integration with shuffle write metrics
+ // need to connect the write metrics to task metrics so we count the spill IO somewhere.
this.writeMetrics = new ShuffleWriteMetrics();
if (existingInMemorySorter == null) {
- initializeForWriting();
- // Acquire a new page as soon as we construct the sorter to ensure that we have at
- // least one page to work with. Otherwise, other operators in the same task may starve
- // this sorter (SPARK-9709). We don't need to do this if we already have an existing sorter.
- acquireNewPage();
+ this.inMemSorter =
+ new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
+ acquireMemory(inMemSorter.getMemoryUsage());
+ acquiredMem = inMemSorter.getMemoryUsage();
} else {
- this.isInMemSorterExternal = true;
+ acquiredMem = 0;
this.inMemSorter = existingInMemorySorter;
}
@@ -147,25 +144,7 @@ public BoxedUnit apply() {
});
}
- // TODO: metrics tracking + integration with shuffle write metrics
- // need to connect the write metrics to task metrics so we count the spill IO somewhere.
- /**
- * Allocates new sort data structures. Called when creating the sorter and after each spill.
- */
- private void initializeForWriting() throws IOException {
- // Note: Do not track memory for the pointer array for now because of SPARK-10474.
- // In more detail, in TungstenAggregate we only reserve a page, but when we fall back to
- // sort-based aggregation we try to acquire a page AND a pointer array, which inevitably
- // fails if all other memory is already occupied. It should be safe to not track the array
- // because its memory footprint is frequently much smaller than that of a page. This is a
- // temporary hack that we should address in 1.6.0.
- // TODO: track the pointer array memory!
- this.writeMetrics = new ShuffleWriteMetrics();
- this.inMemSorter =
- new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
- this.isInMemSorterExternal = false;
- }
/**
* Marks the current page as no-more-space-available, and as a result, either allocate a
@@ -173,13 +152,16 @@ private void initializeForWriting() throws IOException {
*/
@VisibleForTesting
public void closeCurrentPage() {
- freeSpaceInCurrentPage = 0;
+ if (currentPage != null) {
+ pageCursor = currentPage.getBaseOffset() + currentPage.size();
+ }
}
/**
* Sort and spill the current records in response to memory pressure.
*/
- public void spill() throws IOException {
+ @Override
+ public long spill(long size) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@@ -187,6 +169,10 @@ public void spill() throws IOException {
spillWriters.size(),
spillWriters.size() > 1 ? " times" : " time");
+ if (readingIterator != null) {
+ return readingIterator.spill();
+ }
+
// We only write out contents of the inMemSorter if it is not empty.
if (inMemSorter.numRecords() > 0) {
final UnsafeSorterSpillWriter spillWriter =
@@ -202,6 +188,8 @@ public void spill() throws IOException {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();
+
+ inMemSorter.reset();
}
final long spillSize = freeMemory();
@@ -210,7 +198,7 @@ public void spill() throws IOException {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
- initializeForWriting();
+ return spillSize;
}
/**
@@ -246,7 +234,7 @@ public int getNumberOfAllocatedPages() {
}
/**
- * Free this sorter's in-memory data structures, including its data pages and pointer array.
+ * Free this sorter's data pages.
*
* @return the number of bytes freed.
*/
@@ -254,14 +242,12 @@ private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
- taskMemoryManager.freePage(block);
+ freePage(block);
memoryFreed += block.size();
}
- // TODO: track in-memory sorter memory usage (SPARK-10474)
allocatedPages.clear();
currentPage = null;
- currentPagePosition = -1;
- freeSpaceInCurrentPage = 0;
+ pageCursor = 0;
return memoryFreed;
}
@@ -285,6 +271,11 @@ private void deleteSpillFiles() {
public void cleanupResources() {
deleteSpillFiles();
freeMemory();
+ if (inMemSorter != null) {
+ inMemSorter = null;
+ releaseMemory(acquiredMem);
+ acquiredMem = 0;
+ }
}
/**
@@ -295,8 +286,15 @@ public void cleanupResources() {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
- // TODO: track the pointer array memory! (SPARK-10474)
- inMemSorter.expandPointerArray();
+ // assume that the memory of array will be doubled
+ long needed = inMemSorter.getMemoryUsage();
+ acquireMemory(needed); // could trigger spilling
+ if (inMemSorter.hasSpaceForAnotherRecord()) {
+ releaseMemory(needed);
+ } else {
+ acquiredMem += needed;
+ inMemSorter.expandPointerArray();
+ }
}
}
@@ -304,101 +302,38 @@ private void growPointerArrayIfNecessary() throws IOException {
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the memory manager and spill if the requested memory can not be obtained.
*
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
+ * @param required the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
* that exceed the page size are handled via a different code path which uses
* special overflow pages).
*/
- private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
- assert (requiredSpace <= pageSizeBytes);
- if (requiredSpace > freeSpaceInCurrentPage) {
- logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
- freeSpaceInCurrentPage);
- // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
- // without using the free space at the end of the current page. We should also do this for
- // BytesToBytesMap.
- if (requiredSpace > pageSizeBytes) {
- throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
- pageSizeBytes + ")");
- } else {
- acquireNewPage();
- }
+ private void acquireNewPageIfNecessary(int required) throws IOException {
+ if (currentPage == null ||
+ pageCursor + required > currentPage.getBaseOffset() + currentPage.size()) {
+ // TODO: try to find space on previous pages
+ currentPage = allocatePage(required);
+ pageCursor = currentPage.getBaseOffset();
+ allocatedPages.add(currentPage);
}
}
- /**
- * Acquire a new page from the memory manager.
- *
- * If there is not enough space to allocate the new page, spill all existing ones
- * and try again. If there is still not enough space, report error to the caller.
- */
- private void acquireNewPage() throws IOException {
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- spill();
- currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
- if (currentPage == null) {
- throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
- }
- }
- currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = pageSizeBytes;
- allocatedPages.add(currentPage);
- }
-
/**
* Write a record to the sorter.
*/
- public void insertRecord(
- Object recordBaseObject,
- long recordBaseOffset,
- int lengthInBytes,
- long prefix) throws IOException {
+ public void insertRecord(Object recordBase, long recordOffset, int length, long prefix)
+ throws IOException {
growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
- final int totalSpaceRequired = lengthInBytes + 4;
-
- // --- Figure out where to insert the new record ----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
- long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
- // The record is larger than the page size, so allocate a special overflow page just to hold
- // that record.
- MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- spill();
- overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
- }
- }
- allocatedPages.add(overflowPage);
- dataPage = overflowPage;
- dataPagePosition = overflowPage.getBaseOffset();
- } else {
- // The record is small enough to fit in a regular data page, but the current page might not
- // have enough space to hold it (or no pages have been allocated yet).
- acquireNewPageIfNecessary(totalSpaceRequired);
- dataPage = currentPage;
- dataPagePosition = currentPagePosition;
- // Update bookkeeping information
- freeSpaceInCurrentPage -= totalSpaceRequired;
- currentPagePosition += totalSpaceRequired;
- }
- final Object dataPageBaseObject = dataPage.getBaseObject();
-
- // --- Insert the record ----------------------------------------------------------------------
-
- final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
- Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
- dataPagePosition += 4;
- Platform.copyMemory(
- recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes);
+ final int required = length + 4;
+ acquireNewPageIfNecessary(required);
+
+ final Object base = currentPage.getBaseObject();
+ final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
+ Platform.putInt(base, pageCursor, length);
+ pageCursor += 4;
+ Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
+ pageCursor += length;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}
@@ -411,59 +346,23 @@ public void insertRecord(
*
* record length = key length + value length + 4
*/
- public void insertKVRecord(
- Object keyBaseObj, long keyOffset, int keyLen,
- Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException {
+ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen,
+ Object valueBase, long valueOffset, int valueLen, long prefix) throws IOException {
growPointerArrayIfNecessary();
- final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
-
- // --- Figure out where to insert the new record ----------------------------------------------
-
- final MemoryBlock dataPage;
- long dataPagePosition;
- boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
- if (useOverflowPage) {
- long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
- // The record is larger than the page size, so allocate a special overflow page just to hold
- // that record.
- MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- spill();
- overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
- if (overflowPage == null) {
- throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
- }
- }
- allocatedPages.add(overflowPage);
- dataPage = overflowPage;
- dataPagePosition = overflowPage.getBaseOffset();
- } else {
- // The record is small enough to fit in a regular data page, but the current page might not
- // have enough space to hold it (or no pages have been allocated yet).
- acquireNewPageIfNecessary(totalSpaceRequired);
- dataPage = currentPage;
- dataPagePosition = currentPagePosition;
- // Update bookkeeping information
- freeSpaceInCurrentPage -= totalSpaceRequired;
- currentPagePosition += totalSpaceRequired;
- }
- final Object dataPageBaseObject = dataPage.getBaseObject();
-
- // --- Insert the record ----------------------------------------------------------------------
-
- final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
- Platform.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4);
- dataPagePosition += 4;
-
- Platform.putInt(dataPageBaseObject, dataPagePosition, keyLen);
- dataPagePosition += 4;
-
- Platform.copyMemory(keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen);
- dataPagePosition += keyLen;
-
- Platform.copyMemory(valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen);
+ final int required = keyLen + valueLen + 4 + 4;
+ acquireNewPageIfNecessary(required);
+
+ final Object base = currentPage.getBaseObject();
+ final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
+ Platform.putInt(base, pageCursor, keyLen + valueLen + 4);
+ pageCursor += 4;
+ Platform.putInt(base, pageCursor, keyLen);
+ pageCursor += 4;
+ Platform.copyMemory(keyBase, keyOffset, base, pageCursor, keyLen);
+ pageCursor += keyLen;
+ Platform.copyMemory(valueBase, valueOffset, base, pageCursor, valueLen);
+ pageCursor += valueLen;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
@@ -475,10 +374,10 @@ public void insertKVRecord(
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
- final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();
- int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
+ readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
+ int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
- return inMemoryIterator;
+ return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
@@ -486,9 +385,131 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
}
spillWriters.clear();
- spillMerger.addSpillIfNotEmpty(inMemoryIterator);
+ spillMerger.addSpillIfNotEmpty(readingIterator);
return spillMerger.getSortedIterator();
}
}
+
+ class SpillableIterator extends UnsafeSorterIterator {
+ private Object baseObject = null;
+ private long baseOffset = 0;
+ private long keyPrefix = 0;
+ private int recordLength = 0;
+
+ private boolean cached = false;
+ private MemoryBlock lastPage = null;
+
+ private UnsafeSorterIterator upstream;
+
+ public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) {
+ this.upstream = inMemIterator;
+ }
+
+ public long spill() throws IOException {
+ synchronized (this) {
+ if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && upstream.hasNext())) {
+ return 0L;
+ }
+
+ UnsafeInMemorySorter.SortedIterator inMemIterator =
+ (UnsafeInMemorySorter.SortedIterator) upstream;
+
+ // backup the current record
+ baseObject = inMemIterator.getBaseObject();
+ baseOffset = inMemIterator.getBaseOffset();
+ keyPrefix = inMemIterator.getKeyPrefix();
+ recordLength = inMemIterator.getRecordLength();
+
+ final UnsafeSorterSpillWriter spillWriter =
+ new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
+ inMemIterator.numRecordsLeft());
+ while (inMemIterator.hasNext()) {
+ inMemIterator.loadNext();
+ final Object baseObject = inMemIterator.getBaseObject();
+ final long baseOffset = inMemIterator.getBaseOffset();
+ final int recordLength = inMemIterator.getRecordLength();
+ spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
+ }
+ spillWriter.close();
+ upstream = spillWriter.getReader(blockManager);
+
+ cached = true;
+ // release the pages except the one that is used
+ long released = 0L;
+ for (MemoryBlock block : allocatedPages) {
+ if (block.getBaseObject() != baseObject) {
+ freePage(block);
+ released += block.size();
+ } else {
+ lastPage = block;
+ }
+ }
+ allocatedPages.clear();
+ return released;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ synchronized (this) {
+ return upstream.hasNext();
+ }
+ }
+
+ @Override
+ public void loadNext() throws IOException {
+ synchronized (this) {
+ if (cached) {
+ // Just consumed the last record from in memory iterator
+ if (lastPage != null) {
+ freePage(lastPage);
+ lastPage = null;
+ }
+ cached = false;
+ }
+ upstream.loadNext();
+ }
+ }
+
+ @Override
+ public Object getBaseObject() {
+ synchronized (this) {
+ if (cached) {
+ return baseObject;
+ }
+ return upstream.getBaseObject();
+ }
+ }
+
+ @Override
+ public long getBaseOffset() {
+ synchronized (this) {
+ if (cached) {
+ return baseOffset;
+ }
+ return upstream.getBaseOffset();
+ }
+ }
+
+ @Override
+ public int getRecordLength() {
+ synchronized (this) {
+ if (cached) {
+ return recordLength;
+ }
+ return upstream.getRecordLength();
+ }
+ }
+
+ @Override
+ public long getKeyPrefix() {
+ synchronized (this) {
+ if (cached) {
+ return keyPrefix;
+ }
+ return upstream.getKeyPrefix();
+ }
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 5aad72c374c37..45596fba8b2fe 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -89,6 +89,10 @@ public UnsafeInMemorySorter(
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
}
+ public void reset() {
+ pointerArrayInsertPosition = 0;
+ }
+
/**
* @return the number of records that have been inserted into this sorter.
*/
@@ -100,12 +104,8 @@ public long getMemoryUsage() {
return pointerArray.length * 8L;
}
- static long getMemoryRequirementsForPointerArray(long numEntries) {
- return numEntries * 2L * 8L;
- }
-
public boolean hasSpaceForAnotherRecord() {
- return pointerArrayInsertPosition + 2 < pointerArray.length;
+ return pointerArrayInsertPosition + 2 <= pointerArray.length;
}
public void expandPointerArray() {
@@ -158,6 +158,10 @@ public boolean hasNext() {
return position < sortBufferInsertPosition;
}
+ public int numRecordsLeft() {
+ return sortBufferInsertPosition - position;
+ }
+
@Override
public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 501dfe77d13cb..039e940a357ea 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -20,18 +20,18 @@
import java.io.*;
import com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
* of the file format).
*/
-final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
+public final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
private final File file;
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index e59a84ff8d118..234e21140a1dd 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -35,7 +35,7 @@
*
* [# of records (int)] [[len (int)][prefix (long)][data (bytes)]...]
*/
-final class UnsafeSorterSpillWriter {
+public final class UnsafeSorterSpillWriter {
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
index 7fb2f92ca80e8..4ba4d7ab9641b 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
@@ -17,20 +17,24 @@
package org.apache.spark.shuffle.sort;
-import org.apache.spark.shuffle.sort.PackedRecordPointer;
+import java.io.IOException;
+
import org.junit.Test;
-import static org.junit.Assert.*;
import org.apache.spark.SparkConf;
import org.apache.spark.memory.GrantEverythingMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.memory.TaskMemoryManager;
-import static org.apache.spark.shuffle.sort.PackedRecordPointer.*;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+
+import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
+import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PARTITION_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class PackedRecordPointerSuite {
@Test
- public void heap() {
+ public void heap() throws IOException {
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new GrantEverythingMemoryManager(conf), 0);
@@ -49,7 +53,7 @@ public void heap() {
}
@Test
- public void offHeap() {
+ public void offHeap() throws IOException {
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "true");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new GrantEverythingMemoryManager(conf), 0);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index d65926949c036..00f3c1eea7545 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -49,6 +49,7 @@
import org.apache.spark.io.SnappyCompressionCodec;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.serializer.*;
import org.apache.spark.scheduler.MapStatus;
@@ -344,9 +345,7 @@ private void testMergingSpills(
}
assertEquals(sumOfPartitionSizes, mergedOutputFile.length());
- assertEquals(
- HashMultiset.create(dataToWrite),
- HashMultiset.create(readRecordsFromFile()));
+ assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten());
@@ -411,7 +410,7 @@ public void writeEnoughDataToTriggerSpill() throws Exception {
dataToWrite.add(new Tuple2