Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
64b7c85
Factor cross-task memory arbitration into own component.
JoshRosen Oct 28, 2015
8396ed6
Hacky WIP approach at supporting fixed limit on off-heap execution me…
JoshRosen Oct 28, 2015
f3f44fe
MemoryManager internals refactoring (WIP)
JoshRosen Oct 29, 2015
eb3180a
Gradually fix test issues; address thread-safety (WIP).
JoshRosen Oct 29, 2015
d795524
Merge remote-tracking branch 'origin/master' into offheap-memory-acco…
JoshRosen Oct 29, 2015
82fffab
Fix scalstyle.
JoshRosen Oct 29, 2015
641e9e5
Merge remote-tracking branch 'origin/master' into offheap-memory-acco…
JoshRosen Oct 30, 2015
4d97f69
Merge remote-tracking branch 'origin/master' into offheap-memory-acco…
JoshRosen Nov 1, 2015
b59dab9
Attempt at fixing merge conflicts; refactor to use MemoryMode.
JoshRosen Nov 1, 2015
8bbc111
Fix double-free of pages.
JoshRosen Nov 1, 2015
df2168f
Fixes to execution evicting storage.
JoshRosen Nov 1, 2015
144e680
Avoid unnecessary ensureFreeSpace() calls
JoshRosen Nov 1, 2015
542dd56
Fix memory leak detection test in TaskMemoryManager.
JoshRosen Nov 2, 2015
709ecf2
Fix TaskMemoryManagerSuite.
JoshRosen Nov 2, 2015
1356cdb
Another fix to freeing pages
JoshRosen Nov 2, 2015
d8ffd35
Fix failing test in UnsafeShuffleWriterSuite.
JoshRosen Nov 2, 2015
8e12eb4
Add more comments.
JoshRosen Nov 2, 2015
418f9b3
Fix TODO related to logging of non-consumer memory.
JoshRosen Nov 2, 2015
b0d569d
Add missing newline for Scalastyle.
JoshRosen Nov 2, 2015
820fa38
Document thread-safety.
JoshRosen Nov 2, 2015
f976faa
Update comment about spilling + tungsten pages.
JoshRosen Nov 2, 2015
71cd9b0
Import ordering
JoshRosen Nov 2, 2015
852b5c9
Address commented-out test asserts in UnifiedMemoryManagerSuite.
JoshRosen Nov 2, 2015
6a5204f
Add rudimentary test for off-heap accounting.
JoshRosen Nov 2, 2015
9ec29c3
Fix scalastyle.
JoshRosen Nov 3, 2015
2fb52b5
De-allocate pages when freeing leaked memory in TaskMemoryManager.
JoshRosen Nov 3, 2015
96705b8
Comment rewording
JoshRosen Nov 3, 2015
b5fb705
Rename acquireMemory to acquireOnHeapMemory in order to disambiguate.
JoshRosen Nov 4, 2015
a0c5668
Change exception and log levels
JoshRosen Nov 4, 2015
9904c4f
Merge remote-tracking branch 'origin/master' into offheap-memory-acco…
JoshRosen Nov 6, 2015
da89862
Merge remote-tracking branch 'origin/master' into offheap-memory-acco…
JoshRosen Nov 6, 2015
c761736
Fix test compilation.
JoshRosen Nov 6, 2015
32398bb
Address review feedback and fix bug.
JoshRosen Nov 6, 2015
eac53f1
Minor update to calculation of page sizes.
JoshRosen Nov 6, 2015
55feee0
Correct previous bug fix, which misidentified problem.
JoshRosen Nov 6, 2015
1e5eefa
Fix typo.
JoshRosen Nov 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.memory;


import java.io.IOException;

import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;


/**
* An memory consumer of TaskMemoryManager, which support spilling.
*
* Note: this only supports allocation / spilling of Tungsten memory.
*/
public abstract class MemoryConsumer {

Expand All @@ -36,7 +36,6 @@ public abstract class MemoryConsumer {
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
this.used = 0;
}

protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
Expand Down Expand Up @@ -67,6 +66,8 @@ public void spill() throws IOException {
*
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
*
* Note: today, this only frees Tungsten-managed pages.
*
* @param size the amount of memory should be released
* @param trigger the MemoryConsumer that trigger this spilling
* @return the amount of released memory in bytes
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/spark/memory/MemoryMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.memory;

import org.apache.spark.annotation.Private;

@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
72 changes: 48 additions & 24 deletions core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ public class TaskMemoryManager {
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
private final boolean inHeap;
final MemoryMode tungstenMemoryMode;

/**
* The size of memory granted to each consumer.
* Tracks spillable memory consumers.
*/
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;
Expand All @@ -115,7 +115,7 @@ public class TaskMemoryManager {
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
Expand All @@ -127,23 +127,30 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
*
* @return number of bytes successfully granted (<= N).
*/
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
public long acquireExecutionMemory(
long required,
MemoryMode mode,
MemoryConsumer consumer) {
assert(required >= 0);
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
// memory here, then it may not make sense to spill since that would only end up freeing
// off-heap memory. This is subject to change, though, so it may be risky to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

// try to release memory from other consumers first, then we can reduce the frequency of
// Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0) {
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.info("Task {} released {} from {} for {}", taskAttemptId,
if (released > 0 && mode == tungstenMemoryMode) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
Expand All @@ -161,10 +168,10 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
if (got < required && consumer != null) {
try {
long released = consumer.spill(required - got, consumer);
if (released > 0) {
logger.info("Task {} released {} from itself ({})", taskAttemptId,
if (released > 0 && mode == tungstenMemoryMode) {
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
Expand All @@ -184,9 +191,9 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
/**
* Release N bytes of execution memory for a MemoryConsumer.
*/
public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
memoryManager.releaseExecutionMemory(size, taskAttemptId);
memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
}

/**
Expand All @@ -195,11 +202,19 @@ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
public void showMemoryUsage() {
logger.info("Memory used in task " + taskAttemptId);
synchronized (this) {
long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
if (c.getUsed() > 0) {
logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
long totalMemUsage = c.getUsed();
memoryAccountedForByConsumers += totalMemUsage;
if (totalMemUsage > 0) {
logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
}
}
long memoryNotAccountedFor =
memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
memoryNotAccountedFor, taskAttemptId);
}
}

Expand All @@ -214,15 +229,16 @@ public long pageSizeBytes() {
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
* Returns `null` if there was not enough memory to allocate the page.
* Returns `null` if there was not enough memory to allocate the page. May return a page that
* contains fewer bytes than requested, so callers should verify the size of returned pages.
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}

long acquired = acquireExecutionMemory(size, consumer);
long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
if (acquired <= 0) {
return null;
}
Expand All @@ -231,7 +247,7 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
releaseExecutionMemory(acquired, consumer);
releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
Expand Down Expand Up @@ -262,7 +278,7 @@ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize, consumer);
releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
}

/**
Expand All @@ -276,7 +292,7 @@ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
* @return an encoded page address.
*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
if (!inHeap) {
if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
Expand Down Expand Up @@ -305,7 +321,7 @@ private static long decodeOffset(long pagePlusOffsetAddress) {
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public Object getPage(long pagePlusOffsetAddress) {
if (inHeap) {
if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
Expand All @@ -323,7 +339,7 @@ public Object getPage(long pagePlusOffsetAddress) {
*/
public long getOffsetInPage(long pagePlusOffsetAddress) {
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
if (inHeap) {
if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
return offsetInPage;
} else {
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
Expand Down Expand Up @@ -351,11 +367,19 @@ public long cleanUpAllAllocatedMemory() {
}
consumers.clear();
}

for (MemoryBlock page : pageTable) {
if (page != null) {
memoryManager.tungstenMemoryAllocator().free(page);
}
}
Arrays.fill(pageTable, null);

return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}

/**
* Returns the memory consumption, in bytes, for the current task
* Returns the memory consumption, in bytes, for the current task.
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ object SparkEnv extends Logging {
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
new UnifiedMemoryManager(conf, numUsableCores)
UnifiedMemoryManager(conf, numUsableCores)
}

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
Expand Down
Loading