1717
1818package org .apache .spark .memory ;
1919
20- import java .util .*;
20+ import java .io .IOException ;
21+ import java .util .BitSet ;
22+ import java .util .HashMap ;
2123
2224import com .google .common .annotations .VisibleForTesting ;
2325import org .slf4j .Logger ;
@@ -100,13 +102,19 @@ public class TaskMemoryManager {
100102 */
101103 private final boolean inHeap ;
102104
105+ /**
106+ * The size of memory granted to each consumer.
107+ */
108+ private HashMap <MemoryConsumer , Long > consumers ;
109+
103110 /**
104111 * Construct a new TaskMemoryManager.
105112 */
106113 public TaskMemoryManager (MemoryManager memoryManager , long taskAttemptId ) {
107114 this .inHeap = memoryManager .tungstenMemoryIsAllocatedInHeap ();
108115 this .memoryManager = memoryManager ;
109116 this .taskAttemptId = taskAttemptId ;
117+ this .consumers = new HashMap <>();
110118 }
111119
112120 /**
@@ -117,13 +125,75 @@ public long acquireExecutionMemory(long size) {
117125 return memoryManager .acquireExecutionMemory (size , taskAttemptId );
118126 }
119127
128+ /**
129+ * Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
130+ * spill() of consumers to release more memory.
131+ *
132+ * @return number of bytes successfully granted (<= N).
133+ */
134+ public long acquireExecutionMemory (long size , MemoryConsumer consumer ) throws IOException {
135+ synchronized (this ) {
136+ long got = acquireExecutionMemory (size );
137+
138+ if (got < size && consumer != null ) {
139+ // call spill() on itself to release some memory
140+ consumer .spill (size - got );
141+ got += acquireExecutionMemory (size - got );
142+
143+ if (got < size ) {
144+ long needed = size - got ;
145+ // call spill() on other consumers to release memory
146+ for (MemoryConsumer c : consumers .keySet ()) {
147+ if (c != consumer ) {
148+ needed -= c .spill (size - got );
149+ if (needed < 0 ) {
150+ break ;
151+ }
152+ }
153+ }
154+ got += acquireExecutionMemory (size - got );
155+ }
156+ }
157+
158+ long old = 0L ;
159+ if (consumers .containsKey (consumer )) {
160+ old = consumers .get (consumer );
161+ }
162+ consumers .put (consumer , got + old );
163+
164+ return got ;
165+ }
166+ }
167+
120168 /**
121169 * Release N bytes of execution memory.
122170 */
123171 public void releaseExecutionMemory (long size ) {
124172 memoryManager .releaseExecutionMemory (size , taskAttemptId );
125173 }
126174
175+ /**
176+ * Release N bytes of execution memory for a MemoryConsumer.
177+ */
178+ public void releaseExecutionMemory (long size , MemoryConsumer consumer ) {
179+ synchronized (this ) {
180+ if (consumer != null && consumers .containsKey (consumer )) {
181+ long old = consumers .get (consumer );
182+ if (old > size ) {
183+ consumers .put (consumer , old - size );
184+ } else {
185+ if (old < size ) {
186+ // TODO
187+ }
188+ consumers .remove (consumer );
189+ }
190+ } else {
191+ // TODO
192+ }
193+ memoryManager .releaseExecutionMemory (size , taskAttemptId );
194+ }
195+ }
196+
127197 public long pageSizeBytes () {
128198 return memoryManager .pageSizeBytes ();
129199 }
@@ -134,12 +204,27 @@ public long pageSizeBytes() {
134204 *
135205 * Returns `null` if there was not enough memory to allocate the page.
136206 */
137- public MemoryBlock allocatePage (long size ) {
207+ public MemoryBlock allocatePage (long size ) throws IOException {
208+ return allocatePage (size , null );
209+ }
210+
211+ /**
212+ * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
213+ * intended for allocating large blocks of Tungsten memory that will be shared between operators.
214+ *
215+ * Returns `null` if there was not enough memory to allocate the page.
216+ */
217+ public MemoryBlock allocatePage (long size , MemoryConsumer consumer ) throws IOException {
138218 if (size > MAXIMUM_PAGE_SIZE_BYTES ) {
139219 throw new IllegalArgumentException (
140220 "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes" );
141221 }
142222
223+ long acquired = acquireExecutionMemory (size , consumer );
224+ if (acquired <= 0 ) {
225+ return null ;
226+ }
227+
143228 final int pageNumber ;
144229 synchronized (this ) {
145230 pageNumber = allocatedPages .nextClearBit (0 );
@@ -149,14 +234,6 @@ public MemoryBlock allocatePage(long size) {
149234 }
150235 allocatedPages .set (pageNumber );
151236 }
152- final long acquiredExecutionMemory = acquireExecutionMemory (size );
153- if (acquiredExecutionMemory != size ) {
154- releaseExecutionMemory (acquiredExecutionMemory );
155- synchronized (this ) {
156- allocatedPages .clear (pageNumber );
157- }
158- return null ;
159- }
160237 final MemoryBlock page = memoryManager .tungstenMemoryAllocator ().allocate (size );
161238 page .pageNumber = pageNumber ;
162239 pageTable [pageNumber ] = page ;
@@ -170,6 +247,13 @@ public MemoryBlock allocatePage(long size) {
170247 * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
171248 */
172249 public void freePage (MemoryBlock page ) {
250+ freePage (page , null );
251+ }
252+
253+ /**
254+ * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
255+ */
256+ public void freePage (MemoryBlock page , MemoryConsumer consumer ) {
173257 assert (page .pageNumber != -1 ) :
174258 "Called freePage() on memory that wasn't allocated with allocatePage()" ;
175259 assert (allocatedPages .get (page .pageNumber ));
@@ -182,7 +266,7 @@ public void freePage(MemoryBlock page) {
182266 }
183267 long pageSize = page .size ();
184268 memoryManager .tungstenMemoryAllocator ().free (page );
185- releaseExecutionMemory (pageSize );
269+ releaseExecutionMemory (pageSize , consumer );
186270 }
187271
188272 /**
0 commit comments