Skip to content

Commit 85c6ce6

Browse files
jinxingTom Graves
authored andcommitted
[SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service.
## What changes were proposed in this pull request? When application contains large amount of shuffle blocks. NodeManager requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in `StreamManager`. When the number of shuffle blocks is big enough. NodeManager can run OOM. This pr proposes to do lazy initialization of `FileSegmentManagedBuffer` in shuffle service. ## How was this patch tested? Manually test. Author: jinxing <[email protected]> Closes #17744 from jinxing64/SPARK-20426.
1 parent 561e9cc commit 85c6ce6

File tree

4 files changed

+29
-20
lines changed

4 files changed

+29
-20
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
2323
import java.util.HashMap;
24-
import java.util.List;
24+
import java.util.Iterator;
2525
import java.util.Map;
2626

2727
import com.codahale.metrics.Gauge;
@@ -30,7 +30,6 @@
3030
import com.codahale.metrics.MetricSet;
3131
import com.codahale.metrics.Timer;
3232
import com.google.common.annotations.VisibleForTesting;
33-
import com.google.common.collect.Lists;
3433
import org.slf4j.Logger;
3534
import org.slf4j.LoggerFactory;
3635

@@ -93,14 +92,25 @@ protected void handleMessage(
9392
OpenBlocks msg = (OpenBlocks) msgObj;
9493
checkAuth(client, msg.appId);
9594

96-
List<ManagedBuffer> blocks = Lists.newArrayList();
97-
long totalBlockSize = 0;
98-
for (String blockId : msg.blockIds) {
99-
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
100-
totalBlockSize += block != null ? block.size() : 0;
101-
blocks.add(block);
102-
}
103-
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
95+
Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
96+
private int index = 0;
97+
98+
@Override
99+
public boolean hasNext() {
100+
return index < msg.blockIds.length;
101+
}
102+
103+
@Override
104+
public ManagedBuffer next() {
105+
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
106+
msg.blockIds[index]);
107+
index++;
108+
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
109+
return block;
110+
}
111+
};
112+
113+
long streamId = streamManager.registerStream(client.getClientId(), iter);
104114
if (logger.isTraceEnabled()) {
105115
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
106116
streamId,
@@ -109,7 +119,6 @@ protected void handleMessage(
109119
getRemoteAddress(client.getChannel()));
110120
}
111121
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
112-
metrics.blockTransferRateBytes.mark(totalBlockSize);
113122
} finally {
114123
responseDelayContext.stop();
115124
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ public void testOpenShuffleBlocks() {
8888
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
8989
.toByteBuffer();
9090
handler.receive(client, openBlocks, callback);
91-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
92-
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
9391

9492
ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
9593
verify(callback, times(1)).onSuccess(response.capture());
@@ -107,6 +105,8 @@ public void testOpenShuffleBlocks() {
107105
assertEquals(block0Marker, buffers.next());
108106
assertEquals(block1Marker, buffers.next());
109107
assertFalse(buffers.hasNext());
108+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
109+
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
110110

111111
// Verify open block request latency metrics
112112
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,8 @@ public void testFetchWrongExecutor() throws Exception {
216216
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
217217
FetchResult execFetch = fetchBlocks("exec-0",
218218
new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
219-
// Both still fail, as we start by checking for all block.
220-
assertTrue(execFetch.successBlocks.isEmpty());
221-
assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
219+
assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
220+
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
222221
}
223222

224223
@Test

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ class NettyBlockRpcServer(
5656

5757
message match {
5858
case openBlocks: OpenBlocks =>
59-
val blocks: Seq[ManagedBuffer] =
60-
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
59+
val blocksNum = openBlocks.blockIds.length
60+
val blocks = for (i <- (0 until blocksNum).view)
61+
yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
6162
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
62-
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
63-
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
63+
logTrace(s"Registered streamId $streamId with $blocksNum buffers")
64+
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
6465

6566
case uploadBlock: UploadBlock =>
6667
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.

0 commit comments

Comments
 (0)