Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,7 @@ public LongArray allocateArray(long size) {
long required = size * 8L;
MemoryBlock page = taskMemoryManager.allocatePage(required, this);
if (page == null || page.size() < required) {
long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
throwOom(page, required);
}
used += required;
return new LongArray(page);
Expand All @@ -116,13 +110,7 @@ public void freeArray(LongArray array) {
protected MemoryBlock allocatePage(long required) {
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
if (page == null || page.size() < required) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While here, pull this check into the method and call it "checkAllocation" or something?

Copy link
Member Author

@original-brownbear original-brownbear Sep 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I didn't do this on purpose so that allocatePage and allocateArray compile smaller since the condition isn't true very often. Not worth the duplication here?

long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
throwOom(page, required);
}
used += page.size();
return page;
Expand Down Expand Up @@ -152,4 +140,14 @@ public void freeMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, this);
used -= size;
}

private void throwOom(final MemoryBlock page, final long required) {
long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
}
}
24 changes: 10 additions & 14 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,7 @@ private void advanceToNextPage() {
} else {
currentPage = null;
if (reader != null) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
handleFailedDelete();
}
try {
Closeables.close(reader, /* swallowIOException = */ false);
Expand All @@ -307,13 +301,7 @@ private void advanceToNextPage() {
public boolean hasNext() {
if (numRecords == 0) {
if (reader != null) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
handleFailedDelete();
}
}
return numRecords > 0;
Expand Down Expand Up @@ -403,6 +391,14 @@ public long spill(long numBytes) throws IOException {
public void remove() {
throw new UnsupportedOperationException();
}

private void handleFailedDelete() {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
inMemSorter.numRecords());
spillWriters.add(spillWriter);
final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator();
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final Object baseObject = sortedRecords.getBaseObject();
final long baseOffset = sortedRecords.getBaseOffset();
final int recordLength = sortedRecords.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();
spillIterator(inMemSorter.getSortedIterator(), spillWriter);
}

final long spillSize = freeMemory();
Expand Down Expand Up @@ -488,6 +480,18 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
}
}

private static void spillIterator(UnsafeSorterIterator inMemIterator,
UnsafeSorterSpillWriter spillWriter) throws IOException {
while (inMemIterator.hasNext()) {
inMemIterator.loadNext();
final Object baseObject = inMemIterator.getBaseObject();
final long baseOffset = inMemIterator.getBaseOffset();
final int recordLength = inMemIterator.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
}
spillWriter.close();
}

/**
* An UnsafeSorterIterator that support spilling.
*/
Expand All @@ -503,6 +507,7 @@ class SpillableIterator extends UnsafeSorterIterator {
this.numRecords = inMemIterator.getNumRecords();
}

@Override
public int getNumRecords() {
return numRecords;
}
Expand All @@ -521,14 +526,7 @@ public long spill() throws IOException {
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
while (inMemIterator.hasNext()) {
inMemIterator.loadNext();
final Object baseObject = inMemIterator.getBaseObject();
final long baseOffset = inMemIterator.getBaseOffset();
final int recordLength = inMemIterator.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
}
spillWriter.close();
spillIterator(inMemIterator, spillWriter);
spillWriters.add(spillWriter);
nextUpstream = spillWriter.getReader(serializerManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,6 @@ public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
}

// Do word alignment for this row and grow the row buffer if needed.
// todo: remove this after we make unsafe array data word align.
public void alignToWords(int numBytes) {
final int remainder = numBytes & 0x07;

if (remainder > 0) {
final int paddingBytes = 8 - remainder;
holder.grow(paddingBytes);

for (int i = 0; i < paddingBytes; i++) {
Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
holder.cursor++;
}
}
}

public void write(int ordinal, boolean value) {
final long offset = getFieldOffset(ordinal);
Platform.putLong(holder.buffer, offset, 0L);
Expand Down