Skip to content

Commit 2f523fa

Browse files
viiryamridulm
authored andcommitted
[SPARK-19244][CORE] Sort MemoryConsumers according to their memory usage when spilling
## What changes were proposed in this pull request? In `TaskMemoryManager `, when we acquire memory by calling `acquireExecutionMemory` and we can't acquire required memory, we will try to spill other memory consumers. Currently, we simply iterates the memory consumers in a hash set. Normally each time the consumer will be iterated in the same order. The first issue is that we might spill additional consumers. For example, if consumer 1 uses 10MB, consumer 2 uses 50MB, then consumer 3 acquires 100MB but we can only get 60MB and spilling is needed. We might spill both consumer 1 and consumer 2. But we actually just need to spill consumer 2 and get the required 100MB. The second issue is that if we spill consumer 1 in first time spilling. After a while, consumer 1 now uses 5MB. Then consumer 4 may acquire some memory and spilling is needed again. Because we iterate the memory consumers in the same order, we will spill consumer 1 again. So for consumer 1, we will produce many small spilling files. This patch modifies the way iterating the memory consumers. It sorts the memory consumers by their memory usage. So the consumer using more memory will spill first. Once it is spilled, even it acquires few memory again, in next time spilling happens it will not be the consumers to spill again if there are other consumers using more memory than it. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #16603 from viirya/sort-memoryconsumer-when-spill.
1 parent 52d4f61 commit 2f523fa

File tree

2 files changed

+78
-13
lines changed

2 files changed

+78
-13
lines changed

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
import javax.annotation.concurrent.GuardedBy;
2121
import java.io.IOException;
2222
import java.util.Arrays;
23+
import java.util.ArrayList;
2324
import java.util.BitSet;
2425
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.TreeMap;
2529

2630
import com.google.common.annotations.VisibleForTesting;
2731
import org.slf4j.Logger;
@@ -144,23 +148,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
144148
// spilling, avoid to have too many spilled files.
145149
if (got < required) {
146150
// Call spill() on other consumers to release memory
151+
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
152+
// which is just spilled in last few times and re-spilling on it will produce many small
153+
// spill files.
154+
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
147155
for (MemoryConsumer c: consumers) {
148156
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
149-
try {
150-
long released = c.spill(required - got, consumer);
151-
if (released > 0) {
152-
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
153-
Utils.bytesToString(released), c, consumer);
154-
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
155-
if (got >= required) {
156-
break;
157-
}
157+
long key = c.getUsed();
158+
List<MemoryConsumer> list = sortedConsumers.get(key);
159+
if (list == null) {
160+
list = new ArrayList<>(1);
161+
sortedConsumers.put(key, list);
162+
}
163+
list.add(c);
164+
}
165+
}
166+
while (!sortedConsumers.isEmpty()) {
167+
// Get the consumer using the least memory more than the remaining required memory.
168+
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
169+
sortedConsumers.ceilingEntry(required - got);
170+
// No consumer has used memory more than the remaining required memory.
171+
// Get the consumer of largest used memory.
172+
if (currentEntry == null) {
173+
currentEntry = sortedConsumers.lastEntry();
174+
}
175+
List<MemoryConsumer> cList = currentEntry.getValue();
176+
MemoryConsumer c = cList.remove(cList.size() - 1);
177+
if (cList.isEmpty()) {
178+
sortedConsumers.remove(currentEntry.getKey());
179+
}
180+
try {
181+
long released = c.spill(required - got, consumer);
182+
if (released > 0) {
183+
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
184+
Utils.bytesToString(released), c, consumer);
185+
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
186+
if (got >= required) {
187+
break;
158188
}
159-
} catch (IOException e) {
160-
logger.error("error while calling spill() on " + c, e);
161-
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
162-
+ e.getMessage());
163189
}
190+
} catch (IOException e) {
191+
logger.error("error while calling spill() on " + c, e);
192+
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
193+
+ e.getMessage());
164194
}
165195
}
166196
}

core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,41 @@ public void cooperativeSpilling() {
109109
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
110110
}
111111

112+
@Test
113+
public void cooperativeSpilling2() {
114+
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
115+
memoryManager.limit(100);
116+
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
117+
118+
TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
119+
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
120+
TestMemoryConsumer c3 = new TestMemoryConsumer(manager);
121+
122+
c1.use(20);
123+
Assert.assertEquals(20, c1.getUsed());
124+
c2.use(80);
125+
Assert.assertEquals(80, c2.getUsed());
126+
c3.use(80);
127+
Assert.assertEquals(20, c1.getUsed()); // c1: not spilled
128+
Assert.assertEquals(0, c2.getUsed()); // c2: spilled as it has required size of memory
129+
Assert.assertEquals(80, c3.getUsed());
130+
131+
c2.use(80);
132+
Assert.assertEquals(20, c1.getUsed()); // c1: not spilled
133+
Assert.assertEquals(0, c3.getUsed()); // c3: spilled as it has required size of memory
134+
Assert.assertEquals(80, c2.getUsed());
135+
136+
c3.use(10);
137+
Assert.assertEquals(0, c1.getUsed()); // c1: spilled as it has required size of memory
138+
Assert.assertEquals(80, c2.getUsed()); // c2: not spilled as spilling c1 already satisfies c3
139+
Assert.assertEquals(10, c3.getUsed());
140+
141+
c1.free(0);
142+
c2.free(80);
143+
c3.free(10);
144+
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
145+
}
146+
112147
@Test
113148
public void shouldNotForceSpillingInDifferentModes() {
114149
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());

0 commit comments

Comments
 (0)