Skip to content

Commit 3f94e64

Browse files
jinxingcloud-fan
authored andcommitted
[SPARK-19659] Fetch big blocks to disk when shuffle-read.
## What changes were proposed in this pull request? Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse. Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming. It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM. In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019): 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Request memory from `MemoryManager` before fetch blocks and release the memory to `MemoryManager` when `ManagedBuffer` is released. 3. Fetch remote blocks to disk when failing acquiring memory from `MemoryManager`, otherwise fetch to memory. This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below: 1. Single huge block; 2. Sizes of many blocks are underestimated in `MapStatus` and the actual footprint of blocks is much larger than the estimated. ## How was this patch tested? Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`. Author: jinxing <[email protected]> Closes #16989 from jinxing64/SPARK-19659.
1 parent 731462a commit 3f94e64

File tree

17 files changed

+254
-47
lines changed

17 files changed

+254
-47
lines changed

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.atomic.AtomicLong;
2525

26+
import scala.Tuple2;
27+
2628
import com.google.common.base.Preconditions;
2729
import io.netty.channel.Channel;
2830
import org.slf4j.Logger;
@@ -94,6 +96,25 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {
9496
return nextChunk;
9597
}
9698

99+
@Override
100+
public ManagedBuffer openStream(String streamChunkId) {
101+
Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
102+
return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
103+
}
104+
105+
public static String genStreamChunkId(long streamId, int chunkId) {
106+
return String.format("%d_%d", streamId, chunkId);
107+
}
108+
109+
public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
110+
String[] array = streamChunkId.split("_");
111+
assert array.length == 2:
112+
"Stream id and chunk index should be specified when open stream for fetching block.";
113+
long streamId = Long.valueOf(array[0]);
114+
int chunkIndex = Integer.valueOf(array[1]);
115+
return new Tuple2<>(streamId, chunkIndex);
116+
}
117+
97118
@Override
98119
public void connectionTerminated(Channel channel) {
99120
// Close all streams which have been associated with the channel.

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.shuffle;
1919

20+
import java.io.File;
2021
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223
import java.util.List;
@@ -86,14 +87,16 @@ public void fetchBlocks(
8687
int port,
8788
String execId,
8889
String[] blockIds,
89-
BlockFetchingListener listener) {
90+
BlockFetchingListener listener,
91+
File[] shuffleFiles) {
9092
checkInit();
9193
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
9294
try {
9395
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
9496
(blockIds1, listener1) -> {
9597
TransportClient client = clientFactory.createClient(host, port);
96-
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
98+
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf,
99+
shuffleFiles).start();
97100
};
98101

99102
int maxRetries = conf.maxIORetries();

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,28 @@
1717

1818
package org.apache.spark.network.shuffle;
1919

20+
import java.io.File;
21+
import java.io.FileOutputStream;
22+
import java.io.IOException;
2023
import java.nio.ByteBuffer;
24+
import java.nio.channels.Channels;
25+
import java.nio.channels.WritableByteChannel;
2126
import java.util.Arrays;
2227

2328
import org.slf4j.Logger;
2429
import org.slf4j.LoggerFactory;
2530

31+
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
2632
import org.apache.spark.network.buffer.ManagedBuffer;
2733
import org.apache.spark.network.client.ChunkReceivedCallback;
2834
import org.apache.spark.network.client.RpcResponseCallback;
35+
import org.apache.spark.network.client.StreamCallback;
2936
import org.apache.spark.network.client.TransportClient;
37+
import org.apache.spark.network.server.OneForOneStreamManager;
3038
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
3139
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
3240
import org.apache.spark.network.shuffle.protocol.StreamHandle;
41+
import org.apache.spark.network.util.TransportConf;
3342

3443
/**
3544
* Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and
@@ -48,6 +57,8 @@ public class OneForOneBlockFetcher {
4857
private final String[] blockIds;
4958
private final BlockFetchingListener listener;
5059
private final ChunkReceivedCallback chunkCallback;
60+
private TransportConf transportConf = null;
61+
private File[] shuffleFiles = null;
5162

5263
private StreamHandle streamHandle = null;
5364

@@ -56,12 +67,20 @@ public OneForOneBlockFetcher(
5667
String appId,
5768
String execId,
5869
String[] blockIds,
59-
BlockFetchingListener listener) {
70+
BlockFetchingListener listener,
71+
TransportConf transportConf,
72+
File[] shuffleFiles) {
6073
this.client = client;
6174
this.openMessage = new OpenBlocks(appId, execId, blockIds);
6275
this.blockIds = blockIds;
6376
this.listener = listener;
6477
this.chunkCallback = new ChunkCallback();
78+
this.transportConf = transportConf;
79+
if (shuffleFiles != null) {
80+
this.shuffleFiles = shuffleFiles;
81+
assert this.shuffleFiles.length == blockIds.length:
82+
"Number of shuffle files should equal to blocks";
83+
}
6584
}
6685

6786
/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
@@ -100,7 +119,12 @@ public void onSuccess(ByteBuffer response) {
100119
// Immediately request all chunks -- we expect that the total size of the request is
101120
// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
102121
for (int i = 0; i < streamHandle.numChunks; i++) {
103-
client.fetchChunk(streamHandle.streamId, i, chunkCallback);
122+
if (shuffleFiles != null) {
123+
client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
124+
new DownloadCallback(shuffleFiles[i], i));
125+
} else {
126+
client.fetchChunk(streamHandle.streamId, i, chunkCallback);
127+
}
104128
}
105129
} catch (Exception e) {
106130
logger.error("Failed while starting block fetches after success", e);
@@ -126,4 +150,38 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
126150
}
127151
}
128152
}
153+
154+
private class DownloadCallback implements StreamCallback {
155+
156+
private WritableByteChannel channel = null;
157+
private File targetFile = null;
158+
private int chunkIndex;
159+
160+
public DownloadCallback(File targetFile, int chunkIndex) throws IOException {
161+
this.targetFile = targetFile;
162+
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
163+
this.chunkIndex = chunkIndex;
164+
}
165+
166+
@Override
167+
public void onData(String streamId, ByteBuffer buf) throws IOException {
168+
channel.write(buf);
169+
}
170+
171+
@Override
172+
public void onComplete(String streamId) throws IOException {
173+
channel.close();
174+
ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0,
175+
targetFile.length());
176+
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
177+
}
178+
179+
@Override
180+
public void onFailure(String streamId, Throwable cause) throws IOException {
181+
channel.close();
182+
// On receipt of a failure, fail every block from chunkIndex onwards.
183+
String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
184+
failRemainingBlocks(remainingBlockIds, cause);
185+
}
186+
}
129187
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.network.shuffle;
1919

2020
import java.io.Closeable;
21+
import java.io.File;
2122

2223
/** Provides an interface for reading shuffle files, either from an Executor or external service. */
2324
public abstract class ShuffleClient implements Closeable {
@@ -40,5 +41,6 @@ public abstract void fetchBlocks(
4041
int port,
4142
String execId,
4243
String[] blockIds,
43-
BlockFetchingListener listener);
44+
BlockFetchingListener listener,
45+
File[] shuffleFiles);
4446
}

common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void onBlockFetchFailure(String blockId, Throwable t) {
204204

205205
String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
206206
OneForOneBlockFetcher fetcher =
207-
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
207+
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf, null);
208208
fetcher.start();
209209
blockFetchLatch.await();
210210
checkSecurityException(exception.get());

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public void onBlockFetchFailure(String blockId, Throwable exception) {
158158
}
159159
}
160160
}
161-
});
161+
}, null);
162162

163163
if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
164164
fail("Timeout getting response from the server");

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,13 @@
4646
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
4747
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
4848
import org.apache.spark.network.shuffle.protocol.StreamHandle;
49+
import org.apache.spark.network.util.MapConfigProvider;
50+
import org.apache.spark.network.util.TransportConf;
4951

5052
public class OneForOneBlockFetcherSuite {
53+
54+
private static final TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
55+
5156
@Test
5257
public void testFetchOne() {
5358
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
@@ -126,7 +131,7 @@ private static BlockFetchingListener fetchBlocks(LinkedHashMap<String, ManagedBu
126131
BlockFetchingListener listener = mock(BlockFetchingListener.class);
127132
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
128133
OneForOneBlockFetcher fetcher =
129-
new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
134+
new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener, conf, null);
130135

131136
// Respond to the "OpenBlocks" message with an appropriate ShuffleStreamHandle with streamId 123
132137
doAnswer(invocationOnMock -> {

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,10 @@ package object config {
287287
.bytesConf(ByteUnit.BYTE)
288288
.createWithDefault(100 * 1024 * 1024)
289289

290+
private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
291+
ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
292+
.doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
293+
"above this threshold. This is to avoid a giant request takes too much memory.")
294+
.bytesConf(ByteUnit.BYTE)
295+
.createWithDefaultString("200m")
290296
}

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.network
1919

20-
import java.io.Closeable
20+
import java.io.{Closeable, File}
2121
import java.nio.ByteBuffer
2222

2323
import scala.concurrent.{Future, Promise}
@@ -67,7 +67,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
6767
port: Int,
6868
execId: String,
6969
blockIds: Array[String],
70-
listener: BlockFetchingListener): Unit
70+
listener: BlockFetchingListener,
71+
shuffleFiles: Array[File]): Unit
7172

7273
/**
7374
* Upload a single block to a remote node, available only after [[init]] is invoked.
@@ -100,7 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
100101
ret.flip()
101102
result.success(new NioManagedBuffer(ret))
102103
}
103-
})
104+
}, shuffleFiles = null)
104105
ThreadUtils.awaitResult(result.future, Duration.Inf)
105106
}
106107

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.netty
1919

20+
import java.io.File
2021
import java.nio.ByteBuffer
2122

2223
import scala.collection.JavaConverters._
@@ -88,13 +89,15 @@ private[spark] class NettyBlockTransferService(
8889
port: Int,
8990
execId: String,
9091
blockIds: Array[String],
91-
listener: BlockFetchingListener): Unit = {
92+
listener: BlockFetchingListener,
93+
shuffleFiles: Array[File]): Unit = {
9294
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
9395
try {
9496
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
9597
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
9698
val client = clientFactory.createClient(host, port)
97-
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
99+
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener,
100+
transportConf, shuffleFiles).start()
98101
}
99102
}
100103

0 commit comments

Comments
 (0)