From 7126547da9ca6447d8c53bec8bb4094e11badb2e Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Thu, 16 Apr 2015 09:37:20 +0800 Subject: [PATCH 1/2] Do not let Yarn Shuffle Server retry its server port. --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 63b21222e7b7..b6a888314e85 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -98,6 +98,13 @@ private boolean isAuthenticationEnabled() { */ @Override protected void serviceInit(Configuration conf) { + + // It's better to let the NodeManager get down rather than take a port retry + // when `spark.shuffle.service.port` has been conflicted during starting + // the Spark Yarn Shuffle Server, because the retry mechanism will make the + // inconsistency of shuffle port and also make client fail to find the port. + conf.setInt("spark.port.maxRetries", 0); + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests From 962770c914a1a1928dccbf14a26df735ba4f77f3 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Fri, 17 Apr 2015 16:36:03 +0800 Subject: [PATCH 2/2] Disable port retry in StandaloneWorkerShuffleService and clone the Configuration in YarnShuffleService --- .../deploy/worker/StandaloneWorkerShuffleService.scala | 3 ++- .../spark/network/netty/SparkTransportConf.scala | 10 +++++++++- .../apache/spark/network/server/TransportServer.java | 3 ++- .../apache/spark/network/yarn/YarnShuffleService.java | 9 +++++---- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index b9798963bab0..b159fa74abed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -39,7 +39,8 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) private val useSasl: Boolean = securityManager.isAuthenticationEnabled() - private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0, + disablePortRetry = true) private val blockHandler = new ExternalShuffleBlockHandler(transportConf) private val transportContext: TransportContext = { val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index cef203006d68..5f3fcbaf6bbb 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -43,8 +43,12 @@ object SparkTransportConf { * @param numUsableCores if nonzero, this will restrict the server and client threads to only * use the given number of cores, rather than all of the machine's cores. * This restriction will only occur if these properties are not already set. + * @param disablePortRetry if true, server will not retry its port. It's better for the long-run + * server to disable it since the server and client had the agreement of + * the specific port. */ - def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = { + def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0, + disablePortRetry: Boolean = false): TransportConf = { val conf = _conf.clone // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily @@ -55,6 +59,10 @@ object SparkTransportConf { conf.get("spark.shuffle.io.serverThreads", numThreads.toString)) conf.set("spark.shuffle.io.clientThreads", conf.get("spark.shuffle.io.clientThreads", numThreads.toString)) + + if (disablePortRetry) { + conf.set("spark.port.maxRetries", "0") + } new TransportConf(new ConfigProvider { override def get(name: String): String = conf.get(name) diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index b7ce8541e565..f9a09c2e788c 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -122,7 +122,8 @@ public void close() { } /** - * Attempt to bind to the specified port up to a fixed number of retries. + * Attempt to bind on the given port, or fail after a number of attempts. + * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * If all attempts fail after the max number of retries, exit. */ private void bindRightPort(int portToBind) { diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index b6a888314e85..38fb15794fbc 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -98,17 +98,18 @@ private boolean isAuthenticationEnabled() { */ @Override protected void serviceInit(Configuration conf) { + Configuration newConf = new Configuration(conf); // It's better to let the NodeManager get down rather than take a port retry // when `spark.shuffle.service.port` has been conflicted during starting // the Spark Yarn Shuffle Server, because the retry mechanism will make the // inconsistency of shuffle port and also make client fail to find the port. - conf.setInt("spark.port.maxRetries", 0); + newConf.setInt("spark.port.maxRetries", 0); - TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(newConf)); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + boolean authEnabled = newConf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); blockHandler = new ExternalShuffleBlockHandler(transportConf); RpcHandler rpcHandler = blockHandler; if (authEnabled) { @@ -116,7 +117,7 @@ protected void serviceInit(Configuration conf) { rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); } - int port = conf.getInt( + int port = newConf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); shuffleServer = transportContext.createServer(port);