Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.memory;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -25,9 +26,19 @@

public class TaskMemoryManagerSuite {

private TaskMemoryManager manager;

@After
public void after() {
Assert.assertNotNull(manager);
Assert.assertEquals(0, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
manager = null;
}

@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
manager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Expand All @@ -38,14 +49,15 @@ public void leakedPageMemoryIsDetected() {
manager.allocatePage(4096, c); // leak memory
Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}

@Test
public void encodePageNumberAndOffsetOffHeap() {
final SparkConf conf = new SparkConf()
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
final MemoryBlock dataPage = manager.allocatePage(256, c);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
Expand All @@ -58,7 +70,7 @@ public void encodePageNumberAndOffsetOffHeap() {

@Test
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
manager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
final MemoryBlock dataPage = manager.allocatePage(256, c);
Expand All @@ -71,7 +83,7 @@ public void encodePageNumberAndOffsetOnHeap() {
public void cooperativeSpilling() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
manager = new TaskMemoryManager(memoryManager, 0);

TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
Expand Down Expand Up @@ -106,14 +118,13 @@ public void cooperativeSpilling() {

c1.free(0);
c2.free(100);
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}

@Test
public void cooperativeSpilling2() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
manager = new TaskMemoryManager(memoryManager, 0);

TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
Expand Down Expand Up @@ -141,14 +152,13 @@ public void cooperativeSpilling2() {
c1.free(0);
c2.free(80);
c3.free(10);
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}

@Test
public void shouldNotForceSpillingInDifferentModes() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
manager = new TaskMemoryManager(memoryManager, 0);

TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
Expand All @@ -170,7 +180,7 @@ public void offHeapConfigurationBackwardsCompatibility() {
final SparkConf conf = new SparkConf()
.set("spark.unsafe.offHeap", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode);
}

Expand Down