From 62ec336fd3c600a5646d3614287cbb1de72e930d Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 30 Dec 2014 20:12:39 +0800 Subject: [PATCH 1/9] spark.port.maxRetries doesn't work --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++++ core/src/main/scala/org/apache/spark/util/Utils.scala | 8 +++----- .../apache/spark/deploy/yarn/ExecutorRunnableUtil.scala | 4 +++- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c14764f773982..728c38f1a86b3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -372,5 +372,5 @@ private[spark] object SparkConf { /** * Return whether the given config is a Spark port config. */ - def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port") + def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port") } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 57bc3d4e4ae36..1b6411928db64 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -176,6 +176,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(s"Running Spark version $SPARK_VERSION") private[spark] val conf = config.clone() + val portRetriesConf = conf.getOption("spark.port.maxRetries") + if (portRetriesConf.isDefined) { + System.setProperty("spark.port.maxRetries", portRetriesConf.get) + } conf.validateSettings() /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0d771baaa6abc..f323870463b1c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1691,15 +1691,12 @@ private[spark] object Utils extends Logging { /** * Default maximum number of retries when binding to a port before giving up. */ - val portMaxRetries: Int = { + lazy val portMaxRetries: Int = { if (sys.props.contains("spark.testing")) { // Set a higher number of retries for tests... sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) } else { - Option(SparkEnv.get) - .flatMap(_.conf.getOption("spark.port.maxRetries")) - .map(_.toInt) - .getOrElse(16) + sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16) } } @@ -1719,6 +1716,7 @@ private[spark] object Utils extends Logging { serviceName: String = "", maxRetries: Int = portMaxRetries): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + logInfo(s"Starting service$serviceString on port $port with maximum $maxRetries retries. ") for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 22d73ecf6d010..205c1e538fe83 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -76,7 +76,9 @@ trait ExecutorRunnableUtil extends Logging { // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. sparkConf.getAll. - filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. + filter { case (k, v) => + k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries") + }. foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } sparkConf.getAkkaConf. From 191face9291c8d455223858882ef509406a8826d Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 30 Dec 2014 20:34:54 +0800 Subject: [PATCH 2/9] invalid value name --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f323870463b1c..dda1aa8d2f670 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1716,7 +1716,7 @@ private[spark] object Utils extends Logging { serviceName: String = "", maxRetries: Int = portMaxRetries): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" - logInfo(s"Starting service$serviceString on port $port with maximum $maxRetries retries. ") + logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ") for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { From 396c226b36a784d2aad846080cc377309c76bcb3 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 31 Dec 2014 10:35:24 +0800 Subject: [PATCH 3/9] make the grammar more like scala --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++---- .../org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1b6411928db64..89542d5622a34 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -176,10 +176,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(s"Running Spark version $SPARK_VERSION") private[spark] val conf = config.clone() - val portRetriesConf = conf.getOption("spark.port.maxRetries") - if (portRetriesConf.isDefined) { - System.setProperty("spark.port.maxRetries", portRetriesConf.get) - } + conf.getOption("spark.port.maxRetries") + .foreach(portRetriesConf => System.setProperty("spark.port.maxRetries", portRetriesConf)) conf.validateSettings() /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 205c1e538fe83..356f5ea27aa9e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -75,10 +75,9 @@ trait ExecutorRunnableUtil extends Logging { // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. - sparkConf.getAll. - filter { case (k, v) => + sparkConf.getAll.filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries") - }. + }. foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } sparkConf.getAkkaConf. From f450cd15c6d054454e35bf0afba617bf6207cbaa Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 12 Jan 2015 16:45:31 +0800 Subject: [PATCH 4/9] startServiceOnPort will use a SparkConf arg --- core/src/main/scala/org/apache/spark/HttpServer.scala | 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 -- .../org/apache/spark/network/nio/ConnectionManager.scala | 2 +- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 2 +- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 ++++--- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 912558d0cab7d..d8d6deef1c33a 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -57,7 +57,7 @@ private[spark] class HttpServer( } else { logInfo("Starting HTTP Server") val (actualServer, actualPort) = - Utils.startServiceOnPort[Server](requestedPort, doStart, serverName) + Utils.startServiceOnPort[Server](requestedPort, doStart, serverName, new SparkConf()) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9807e392423c1..3bf3acd245d8f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -176,8 +176,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(s"Running Spark version $SPARK_VERSION") private[spark] val conf = config.clone() - conf.getOption("spark.port.maxRetries") - .foreach(portRetriesConf => System.setProperty("spark.port.maxRetries", portRetriesConf)) conf.validateSettings() /** diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 3340fca08014e..76c313b27f0d6 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -174,7 +174,7 @@ private[nio] class ConnectionManager( serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, serverChannel.socket.getLocalPort) } - Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) + Utils.startServiceOnPort[ServerSocketChannel](port, startService, name, conf) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 2a27d49d2de05..3bc1b8a84c551 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging { } } - val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName) + val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName, conf) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index db2531dc171f8..06828e9eea4b1 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } - Utils.startServiceOnPort(port, startService, name) + Utils.startServiceOnPort(port, startService, name, conf) } private def doCreateActorSystem( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f6997fa4d0f4f..310eaa4c82fd4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1692,12 +1692,12 @@ private[spark] object Utils extends Logging { /** * Default maximum number of retries when binding to a port before giving up. */ - lazy val portMaxRetries: Int = { + def portMaxRetries(conf: SparkConf): Int = { if (sys.props.contains("spark.testing")) { // Set a higher number of retries for tests... sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) } else { - sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(16) + conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16) } } @@ -1715,8 +1715,9 @@ private[spark] object Utils extends Logging { startPort: Int, startService: Int => (T, Int), serviceName: String = "", - maxRetries: Int = portMaxRetries): (T, Int) = { + conf: SparkConf): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + val maxRetries = portMaxRetries(conf) logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ") for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port From 67bcb46c99e645e158ca1f67122ce943daa7030b Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 12 Jan 2015 18:58:54 +0800 Subject: [PATCH 5/9] put conf at 3rd position, modify suite class, add comments --- core/src/main/scala/org/apache/spark/HttpServer.scala | 2 +- .../org/apache/spark/network/nio/ConnectionManager.scala | 2 +- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 2 +- .../src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 +++++---- .../apache/spark/streaming/flume/FlumeStreamSuite.scala | 2 +- .../apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 3 ++- 7 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index d8d6deef1c33a..aeeb2b385d864 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -57,7 +57,7 @@ private[spark] class HttpServer( } else { logInfo("Starting HTTP Server") val (actualServer, actualPort) = - Utils.startServiceOnPort[Server](requestedPort, doStart, serverName, new SparkConf()) + Utils.startServiceOnPort[Server](requestedPort, doStart, new SparkConf(), serverName) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 76c313b27f0d6..03c4137ca0a81 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -174,7 +174,7 @@ private[nio] class ConnectionManager( serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, serverChannel.socket.getLocalPort) } - Utils.startServiceOnPort[ServerSocketChannel](port, startService, name, conf) + Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 3bc1b8a84c551..88fed833f922d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging { } } - val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName, conf) + val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 06828e9eea4b1..4c9b1e3c46f0f 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } - Utils.startServiceOnPort(port, startService, name, conf) + Utils.startServiceOnPort(port, startService, conf, name) } private def doCreateActorSystem( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 310eaa4c82fd4..8988d8467ef32 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1706,16 +1706,17 @@ private[spark] object Utils extends Logging { * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * * @param startPort The initial port to start the service on. - * @param maxRetries Maximum number of retries to attempt. - * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. + * @param conf Used to get maximum number of retries. + * @param serviceName Name of the service. */ def startServiceOnPort[T]( startPort: Int, startService: Int => (T, Int), - serviceName: String = "", - conf: SparkConf): (T, Int) = { + conf: SparkConf, + serviceName: String = "" + ): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ") diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 13943ed5442b9..f333e3891b5f0 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) - })._2 + }, conf)._2 } /** Setup and start the streaming context */ diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 98fe6cb301f52..2e4b8a7c406a8 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.apache.spark.SparkConf class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { @@ -101,7 +102,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) - })._2 + }, new SparkConf())._2 } def publishData(data: String): Unit = { From 61a370d00e699c11ed4af7c08fb8b620cd84678b Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 13 Jan 2015 10:53:18 +0800 Subject: [PATCH 6/9] some minor fixes --- .../scala/org/apache/spark/HttpFileServer.scala | 3 ++- .../main/scala/org/apache/spark/HttpServer.scala | 3 ++- .../src/main/scala/org/apache/spark/SparkConf.scala | 6 ++++-- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 13 ++++++------- .../apache/spark/deploy/yarn/ExecutorRunnable.scala | 10 +++------- 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index edc3889c9ae51..677c5e0f89d72 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -24,6 +24,7 @@ import com.google.common.io.Files import org.apache.spark.util.Utils private[spark] class HttpFileServer( + conf: SparkConf, securityManager: SecurityManager, requestedPort: Int = 0) extends Logging { @@ -41,7 +42,7 @@ private[spark] class HttpFileServer( fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server") + httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server") httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index aeeb2b385d864..fa22787ce7ea3 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * around a Jetty server. */ private[spark] class HttpServer( + conf: SparkConf, resourceBase: File, securityManager: SecurityManager, requestedPort: Int = 0, @@ -57,7 +58,7 @@ private[spark] class HttpServer( } else { logInfo("Starting HTTP Server") val (actualServer, actualPort) = - Utils.startServiceOnPort[Server](requestedPort, doStart, new SparkConf(), serverName) + Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 728c38f1a86b3..93d98db5e3f5e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -370,7 +370,9 @@ private[spark] object SparkConf { } /** - * Return whether the given config is a Spark port config. + * Return true if the given config matches either `spark.*.port` or `spark.port.*`. */ - def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.contains(".port") + def isSparkPortConf(name: String): Boolean = { + (name.startsWith("spark.") && name.endsWith(".port")) | name.startsWith("spark.port.") + } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 43436a1697000..4d418037bd33f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -312,7 +312,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { val fileServerPort = conf.getInt("spark.fileserver.port", 0) - val server = new HttpFileServer(securityManager, fileServerPort) + val server = new HttpFileServer(conf, securityManager, fileServerPort) server.initialize() conf.set("spark.fileserver.uri", server.serverUri) server diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8988d8467ef32..0c21c3d1b8a4b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1693,11 +1693,12 @@ private[spark] object Utils extends Logging { * Default maximum number of retries when binding to a port before giving up. */ def portMaxRetries(conf: SparkConf): Int = { - if (sys.props.contains("spark.testing")) { + val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt) + if (conf.contains("spark.testing")) { // Set a higher number of retries for tests... - sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100) + maxRetries.getOrElse(100) } else { - conf.getOption("spark.port.maxRetries").map(_.toInt).getOrElse(16) + maxRetries.getOrElse(16) } } @@ -1708,18 +1709,16 @@ private[spark] object Utils extends Logging { * @param startPort The initial port to start the service on. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. - * @param conf Used to get maximum number of retries. + * @param conf A SparkConf used to get the maximum number of retries when binding to a port. * @param serviceName Name of the service. */ def startServiceOnPort[T]( startPort: Int, startService: Int => (T, Int), conf: SparkConf, - serviceName: String = "" - ): (T, Int) = { + serviceName: String = ""): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) - logInfo(s"Starting service$serviceString on port $startPort with maximum $maxRetries retries. ") for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index c5f8b4eb138cb..c537da9f67552 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -148,13 +148,9 @@ class ExecutorRunnable( // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. - sparkConf.getAll.filter { case (k, v) => - k.startsWith("spark.auth") || k.startsWith("spark.akka") || k.equals("spark.port.maxRetries") - }. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - - sparkConf.getAkkaConf. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } + sparkConf.getAll + .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) } + .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. From 7cdfd9842e61642eaf1a8a5e0841551798b8985c Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 13 Jan 2015 10:57:26 +0800 Subject: [PATCH 7/9] fit for new HttpServer constructor --- .../main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 2 +- .../src/main/scala/org/apache/spark/repl/SparkIMain.scala | 2 +- repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 31f0a462f84d8..8271cdb2d789f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -153,7 +153,7 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) val broadcastPort = conf.getInt("spark.broadcast.port", 0) - server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") + server = new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 646c68e60c2e9..b646f0b6f0868 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -106,7 +106,7 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ val classServerPort = conf.getInt("spark.replClassServer.port", 0) - val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") + val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 5e93a71995072..69e44d4f916e1 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -32,7 +32,7 @@ object Main extends Logging { val s = new Settings() s.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true) - val classServer = new HttpServer(outputDir, new SecurityManager(conf)) + val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf)) var sparkContext: SparkContext = _ var interp = new SparkILoop // this is a public var because tests reset it. From 2d86d650b0bdd48d5022b2d22ffa22f6b010a661 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 13 Jan 2015 11:05:36 +0800 Subject: [PATCH 8/9] fix line length --- .../main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 8271cdb2d789f..d312e5ca23cea 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) val broadcastPort = conf.getInt("spark.broadcast.port", 0) - server = new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") + server = + new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) From 8cdf96dd7ec41ade12c49c3a2a9f59fd4e842a0c Mon Sep 17 00:00:00 2001 From: WangTao Date: Tue, 13 Jan 2015 11:15:14 +0800 Subject: [PATCH 9/9] indent thing --- .../main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index d312e5ca23cea..31d6958c403b3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -154,7 +154,7 @@ private[broadcast] object HttpBroadcast extends Logging { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) val broadcastPort = conf.getInt("spark.broadcast.port", 0) server = - new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") + new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri)