Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b968e0e
LIHADOOP-48527 Enable Shuffle writer to push blocks to remote shuffle…
May 8, 2020
510f504
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
2d9d27d
LIHADOOP-54374 Separate the configurations for connection creation ti…
zhouyejoe Jun 25, 2020
3d74277
LIHADOOP-54370 Not to retry on certain exceptions when pushing blocks
otterc Jun 24, 2020
02cff3b
Made the code compile, added more tests, introduced ShuffleBlockPushId
otterc Nov 6, 2020
770c25c
Renamed isPushBasedShuffleEnabled to isPushShuffleEnabled and fixed t…
otterc Nov 10, 2020
d429fb3
Added Since tags all the newly introduced classes
otterc Nov 10, 2020
3d10b20
Explicitly converting ArrayBuffer to Seq so that build with SBT succeeds
otterc Nov 11, 2020
c19bcf2
Update core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
otterc Nov 12, 2020
f411944
Addressed the typos and other comments
otterc Nov 12, 2020
bd3649f
Moved push of data out of ShuffleWriter and addressed other minor com…
otterc Nov 18, 2020
bf4b277
LIHADOOP-48527 Enable Shuffle writer to push blocks to remote shuffle…
May 8, 2020
a8a350c
LIHADOOP-54370 Not to retry on certain exceptions when pushing blocks
otterc Jun 24, 2020
0b951a7
Made the code compile, added more tests, introduced ShuffleBlockPushId
otterc Nov 6, 2020
7d16198
Added Since tags all the newly introduced classes
otterc Nov 10, 2020
415c2d0
Moved push of data out of ShuffleWriter and addressed other minor com…
otterc Nov 18, 2020
28d8098
Renamed PushShuffleSupport to PushShufleComponent and changed initiat…
otterc Nov 18, 2020
5b725a2
Renamed PushShuffleComponent to PushShuffleWriterComponent
otterc Nov 19, 2020
a2f6635
Not changing the default value of connectionCreationTimeout
otterc Nov 21, 2020
4a2aef7
Renamed PushShuffleWriterComponent to ShuffleBlockPusher and addresse…
otterc Nov 23, 2020
1e9fb08
Addressed review comments
otterc Nov 25, 2020
462af7a
Fixed the documentation for numPushThreads
otterc Nov 25, 2020
a88ffd6
Changed the initiateBlockPush method, added tests, and stop pushing c…
otterc Dec 1, 2020
08386f3
Update core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPushe…
otterc Dec 2, 2020
d5370db
Addressed review comments
otterc Dec 2, 2020
fd0e98b
Addressed review comments
otterc Dec 3, 2020
8c9a482
Changed the default of maxBlockSizeToPush to 1M
otterc Dec 4, 2020
f8632b3
Removed the stopPushing flag which is not needed
otterc Dec 4, 2020
6d0fade
Addressed review comments
otterc Dec 4, 2020
7d70f82
Update core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPushe…
otterc Dec 17, 2020
23cc502
Update core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPushe…
otterc Dec 17, 2020
bcebb13
Addressed Mridul's comments
otterc Dec 17, 2020
762ac1e
Update core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPushe…
otterc Dec 19, 2020
6aae02a
Update core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPushe…
otterc Dec 19, 2020
21ea881
Coverting blockSize to Int during preparation of push request
otterc Dec 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ TransportClient createClient(InetSocketAddress address)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);

if (conf.receiveBuf() > 0) {
Expand All @@ -280,9 +280,10 @@ public void initChannel(SocketChannel ch) {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await(conf.connectionTimeoutMs())) {
if (!cf.await(conf.connectionCreationTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
String.format("Connecting to %s timed out (%s ms)",
address, conf.connectionCreationTimeoutMs()));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import com.google.common.primitives.Ints;
import io.netty.util.NettyRuntime;
Expand All @@ -31,6 +32,7 @@ public class TransportConf {
private final String SPARK_NETWORK_IO_MODE_KEY;
private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
private final String SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY;
private final String SPARK_NETWORK_IO_BACKLOG_KEY;
private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
Expand All @@ -54,6 +56,7 @@ public TransportConf(String module, ConfigProvider conf) {
SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY = getConfKey("io.connectionCreationTimeout");
SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer");
SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
Expand Down Expand Up @@ -94,7 +97,7 @@ public boolean preferDirectBufs() {
return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
}

/** Connect timeout in milliseconds. Default 120 secs. */
/** Connection idle timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
conf.get("spark.network.timeout", "120s"));
Expand All @@ -103,6 +106,14 @@ public int connectionTimeoutMs() {
return (int) defaultTimeoutMs;
}

/** Connect creation timeout in milliseconds. Default 30 secs. */
public int connectionCreationTimeoutMs() {
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
}

/** Number of concurrent connections between two nodes for fetching data. */
public int numConnectionsPerPeer() {
return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import scala.Tuple2;
import scala.collection.Iterator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -178,8 +177,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
}

@VisibleForTesting
long[] getPartitionLengths() {
@Override
public long[] getPartitionLengths() {
return partitionLengths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {

@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
@Nullable private long[] partitionLengths;
private long peakMemoryUsedBytes = 0;

/** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
Expand Down Expand Up @@ -219,7 +220,6 @@ void closeAndWriteOutput() throws IOException {
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
try {
partitionLengths = mergeSpills(spills);
} finally {
Expand Down Expand Up @@ -543,4 +543,9 @@ public void close() throws IOException {
channel.close();
}
}

@Override
public long[] getPartitionLengths() {
return partitionLengths;
}
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher}
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
Expand Down Expand Up @@ -325,6 +325,7 @@ private[spark] class Executor(
case NonFatal(e) =>
logWarning("Unable to stop heartbeater", e)
}
ShuffleBlockPusher.stop()
threadPool.shutdown()

// Notify plugins that executor is shutting down so they can terminate cleanly
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2023,4 +2023,33 @@ package object config {
.version("3.1.0")
.doubleConf
.createWithDefault(5)

private[spark] val SHUFFLE_NUM_PUSH_THREADS =
ConfigBuilder("spark.shuffle.push.numPushThreads")
.doc("Specify the number of threads in the block pusher pool. These threads assist " +
"in creating connections and pushing blocks to remote shuffle services. By default, the " +
"threadpool size is equal to the number of spark executor cores.")
.version("3.2.0")
.intConf
.createOptional

private[spark] val SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH =
ConfigBuilder("spark.shuffle.push.maxBlockSizeToPush")
.doc("The max size of an individual block to push to the remote shuffle services. Blocks " +
"larger than this threshold are not pushed to be merged remotely. These shuffle blocks " +
"will be fetched by the executors in the original manner.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")

private[spark] val SHUFFLE_MAX_BLOCK_BATCH_SIZE_FOR_PUSH =
ConfigBuilder("spark.shuffle.push.maxBlockBatchSize")
.doc("The max size of a batch of shuffle blocks to be grouped into a single push request.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
// Default is 3m because it is greater than 2m which is the default value for
// TransportConf#memoryMapBytes. If this defaults to 2m as well it is very likely that each
// batch of block will be loaded in memory with memory mapping, which has higher overhead
// with small MB sized chunk of data.
.createWithDefaultString("3m")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be 2m according to the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment actually means that it should be higher than 2m. If it is 2m than each block will be loaded in memory which increases memory overhead. I will make the comment more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
Loading