1717
1818package org .apache .spark .memory ;
1919
20+ import org .junit .After ;
2021import org .junit .Assert ;
2122import org .junit .Test ;
2223
2526
2627public class TaskMemoryManagerSuite {
2728
29+ static TaskMemoryManager manager ;
30+
31+ @ After
32+ public void after () {
33+ Assert .assertEquals (0 , manager .getMemoryConsumptionForThisTask ());
34+ Assert .assertEquals (0 , manager .cleanUpAllAllocatedMemory ());
35+ manager = null ;
36+ }
37+
2838 @ Test
2939 public void leakedPageMemoryIsDetected () {
30- final TaskMemoryManager manager = new TaskMemoryManager (
40+ manager = new TaskMemoryManager (
3141 new StaticMemoryManager (
3242 new SparkConf ().set ("spark.memory.offHeap.enabled" , "false" ),
3343 Long .MAX_VALUE ,
3444 Long .MAX_VALUE ,
3545 1 ),
3646 0 );
3747 final MemoryConsumer c = new TestMemoryConsumer (manager );
38- manager .allocatePage (4096 , c ); // leak memory
48+ final MemoryBlock block = manager .allocatePage (4096 , c ); // leak memory
3949 Assert .assertEquals (4096 , manager .getMemoryConsumptionForThisTask ());
4050 Assert .assertEquals (4096 , manager .cleanUpAllAllocatedMemory ());
51+ manager .freePage (block , c );
4152 }
4253
4354 @ Test
4455 public void encodePageNumberAndOffsetOffHeap () {
4556 final SparkConf conf = new SparkConf ()
4657 .set ("spark.memory.offHeap.enabled" , "true" )
4758 .set ("spark.memory.offHeap.size" , "1000" );
48- final TaskMemoryManager manager = new TaskMemoryManager (new TestMemoryManager (conf ), 0 );
59+ manager = new TaskMemoryManager (new TestMemoryManager (conf ), 0 );
4960 final MemoryConsumer c = new TestMemoryConsumer (manager , MemoryMode .OFF_HEAP );
5061 final MemoryBlock dataPage = manager .allocatePage (256 , c );
5162 // In off-heap mode, an offset is an absolute address that may require more than 51 bits to
@@ -58,7 +69,7 @@ public void encodePageNumberAndOffsetOffHeap() {
5869
5970 @ Test
6071 public void encodePageNumberAndOffsetOnHeap () {
61- final TaskMemoryManager manager = new TaskMemoryManager (
72+ manager = new TaskMemoryManager (
6273 new TestMemoryManager (new SparkConf ().set ("spark.memory.offHeap.enabled" , "false" )), 0 );
6374 final MemoryConsumer c = new TestMemoryConsumer (manager , MemoryMode .ON_HEAP );
6475 final MemoryBlock dataPage = manager .allocatePage (256 , c );
@@ -71,7 +82,7 @@ public void encodePageNumberAndOffsetOnHeap() {
7182 public void cooperativeSpilling () {
7283 final TestMemoryManager memoryManager = new TestMemoryManager (new SparkConf ());
7384 memoryManager .limit (100 );
74- final TaskMemoryManager manager = new TaskMemoryManager (memoryManager , 0 );
85+ manager = new TaskMemoryManager (memoryManager , 0 );
7586
7687 TestMemoryConsumer c1 = new TestMemoryConsumer (manager );
7788 TestMemoryConsumer c2 = new TestMemoryConsumer (manager );
@@ -106,14 +117,13 @@ public void cooperativeSpilling() {
106117
107118 c1 .free (0 );
108119 c2 .free (100 );
109- Assert .assertEquals (0 , manager .cleanUpAllAllocatedMemory ());
110120 }
111121
112122 @ Test
113123 public void cooperativeSpilling2 () {
114124 final TestMemoryManager memoryManager = new TestMemoryManager (new SparkConf ());
115125 memoryManager .limit (100 );
116- final TaskMemoryManager manager = new TaskMemoryManager (memoryManager , 0 );
126+ manager = new TaskMemoryManager (memoryManager , 0 );
117127
118128 TestMemoryConsumer c1 = new TestMemoryConsumer (manager );
119129 TestMemoryConsumer c2 = new TestMemoryConsumer (manager );
@@ -141,14 +151,13 @@ public void cooperativeSpilling2() {
141151 c1 .free (0 );
142152 c2 .free (80 );
143153 c3 .free (10 );
144- Assert .assertEquals (0 , manager .cleanUpAllAllocatedMemory ());
145154 }
146155
147156 @ Test
148157 public void shouldNotForceSpillingInDifferentModes () {
149158 final TestMemoryManager memoryManager = new TestMemoryManager (new SparkConf ());
150159 memoryManager .limit (100 );
151- final TaskMemoryManager manager = new TaskMemoryManager (memoryManager , 0 );
160+ manager = new TaskMemoryManager (memoryManager , 0 );
152161
153162 TestMemoryConsumer c1 = new TestMemoryConsumer (manager , MemoryMode .ON_HEAP );
154163 TestMemoryConsumer c2 = new TestMemoryConsumer (manager , MemoryMode .OFF_HEAP );
@@ -170,7 +179,7 @@ public void offHeapConfigurationBackwardsCompatibility() {
170179 final SparkConf conf = new SparkConf ()
171180 .set ("spark.unsafe.offHeap" , "true" )
172181 .set ("spark.memory.offHeap.size" , "1000" );
173- final TaskMemoryManager manager = new TaskMemoryManager (new TestMemoryManager (conf ), 0 );
182+ manager = new TaskMemoryManager (new TestMemoryManager (conf ), 0 );
174183 Assert .assertSame (MemoryMode .OFF_HEAP , manager .tungstenMemoryMode );
175184 }
176185
0 commit comments