From 5f5fda8244c59970d117a0e9d32bd8bddde348f2 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 17 Nov 2014 10:34:17 +0800 Subject: [PATCH 01/13] https -> http in pom --- pom.xml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 639ea22a1fda3..26fa0e823edbf 100644 --- a/pom.xml +++ b/pom.xml @@ -156,7 +156,7 @@ central Maven Repository - https://repo1.maven.org/maven2 + http://repo1.maven.org/maven2 true @@ -167,7 +167,7 @@ apache-repo Apache Repository - https://repository.apache.org/content/repositories/releases + http://repository.apache.org/content/repositories/releases true @@ -178,7 +178,7 @@ jboss-repo JBoss Repository - https://repository.jboss.org/nexus/content/repositories/releases + http://repository.jboss.org/nexus/content/repositories/releases true @@ -189,7 +189,7 @@ mqtt-repo MQTT Repository - https://repo.eclipse.org/content/repositories/paho-releases + http://repo.eclipse.org/content/repositories/paho-releases true @@ -200,7 +200,7 @@ cloudera-repo Cloudera Repository - https://repository.cloudera.com/artifactory/cloudera-repos + http://repository.cloudera.com/artifactory/cloudera-repos true @@ -222,7 +222,7 @@ spring-releases Spring Release Repository - https://repo.spring.io/libs-release + http://repo.spring.io/libs-release true @@ -234,7 +234,7 @@ spark-staging Spring Staging Repository - https://oss.sonatype.org/content/repositories/orgspark-project-1085 + http://oss.sonatype.org/content/repositories/orgspark-project-1085 true @@ -246,7 +246,7 @@ spark-staging-hive13 Spring Staging Repository Hive 13 - https://oss.sonatype.org/content/repositories/orgspark-project-1089/ + http://oss.sonatype.org/content/repositories/orgspark-project-1089/ true @@ -258,7 +258,7 @@ central - https://repo1.maven.org/maven2 + http://repo1.maven.org/maven2 true From 4203b7f0061e18e5057cf500e43f765c5ae1820d Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 17 Nov 2014 18:23:17 +0800 Subject: [PATCH 02/13] start port in specified rang ports --- .../main/scala/org/apache/spark/HttpServer.scala | 3 ++- .../scala/org/apache/spark/SecurityManager.scala | 3 ++- .../spark/network/nio/ConnectionManager.scala | 2 +- .../scala/org/apache/spark/ui/JettyUtils.scala | 2 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++------ 6 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 912558d0cab7d..310e87ec56576 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -57,7 +57,8 @@ private[spark] class HttpServer( } else { logInfo("Starting HTTP Server") val (actualServer, actualPort) = - Utils.startServiceOnPort[Server](requestedPort, doStart, serverName) + Utils.startServiceOnPort[Server]( + requestedPort, doStart, securityManager.sparkConf, serverName) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index dbff9d12b5ad7..abdbba0c7b907 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -144,7 +144,8 @@ import org.apache.spark.network.sasl.SecretKeyHolder * can take place. */ -private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder { +private[spark] class SecurityManager( + val sparkConf: SparkConf) extends Logging with SecretKeyHolder { // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index df4b085d2251e..302b496b8a849 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -164,7 +164,7 @@ private[nio] class ConnectionManager( serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, serverChannel.socket.getLocalPort) } - Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) + Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 2a27d49d2de05..88fed833f922d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging { } } - val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName) + val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 10010bdfa1a51..3505346ac44ba 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } - Utils.startServiceOnPort(port, startService, name) + Utils.startServiceOnPort(port, startService, conf, name) } private def doCreateActorSystem( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index eb4a598dbf857..f830d4bd42905 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1652,25 +1652,28 @@ private[spark] object Utils extends Logging { * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * - * @param startPort The initial port to start the service on. + * @param port The initial port to start the service on. * @param maxRetries Maximum number of retries to attempt. * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. */ def startServiceOnPort[T]( - startPort: Int, + port: Int, startService: Int => (T, Int), + conf: SparkConf = new SparkConf(), serviceName: String = "", maxRetries: Int = portMaxRetries): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + val startPort = conf.getInt("spark.port.min", 0) + val endPort = conf.getInt("spark.port.max", 65536) for (offset <- 0 to maxRetries) { - // Do not increment port if startPort is 0, which is treated as a special port - val tryPort = if (startPort == 0) { - startPort + // Do not increment port if port is 0, which is treated as a special port + val tryPort = if (port == 0) { + (startPort + Math.random() * (endPort - startPort)).toInt } else { // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 + ((port + offset - 1024) % (65536 - 1024)) + 1024 } try { val (service, port) = startService(tryPort) From 1773b12115db5242feed4532126ff5a26cd35c7a Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 17 Nov 2014 19:58:08 +0800 Subject: [PATCH 03/13] Revert "https -> http in pom" This reverts commit 5f5fda8244c59970d117a0e9d32bd8bddde348f2. --- pom.xml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 26fa0e823edbf..639ea22a1fda3 100644 --- a/pom.xml +++ b/pom.xml @@ -156,7 +156,7 @@ central Maven Repository - http://repo1.maven.org/maven2 + https://repo1.maven.org/maven2 true @@ -167,7 +167,7 @@ apache-repo Apache Repository - http://repository.apache.org/content/repositories/releases + https://repository.apache.org/content/repositories/releases true @@ -178,7 +178,7 @@ jboss-repo JBoss Repository - http://repository.jboss.org/nexus/content/repositories/releases + https://repository.jboss.org/nexus/content/repositories/releases true @@ -189,7 +189,7 @@ mqtt-repo MQTT Repository - http://repo.eclipse.org/content/repositories/paho-releases + https://repo.eclipse.org/content/repositories/paho-releases true @@ -200,7 +200,7 @@ cloudera-repo Cloudera Repository - http://repository.cloudera.com/artifactory/cloudera-repos + https://repository.cloudera.com/artifactory/cloudera-repos true @@ -222,7 +222,7 @@ spring-releases Spring Release Repository - http://repo.spring.io/libs-release + https://repo.spring.io/libs-release true @@ -234,7 +234,7 @@ spark-staging Spring Staging Repository - http://oss.sonatype.org/content/repositories/orgspark-project-1085 + https://oss.sonatype.org/content/repositories/orgspark-project-1085 true @@ -246,7 +246,7 @@ spark-staging-hive13 Spring Staging Repository Hive 13 - http://oss.sonatype.org/content/repositories/orgspark-project-1089/ + https://oss.sonatype.org/content/repositories/orgspark-project-1089/ true @@ -258,7 +258,7 @@ central - http://repo1.maven.org/maven2 + https://repo1.maven.org/maven2 true From cd1a88dd462373ce890bfc0eaaa455fc82ee76b4 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Tue, 18 Nov 2014 11:42:22 +0800 Subject: [PATCH 04/13] fix 'else' branch and doc update --- .../main/scala/org/apache/spark/util/Utils.scala | 6 +++--- docs/configuration.md | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f830d4bd42905..59148fe1ea7da 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1665,15 +1665,15 @@ private[spark] object Utils extends Logging { serviceName: String = "", maxRetries: Int = portMaxRetries): (T, Int) = { val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" - val startPort = conf.getInt("spark.port.min", 0) + val startPort = conf.getInt("spark.port.min", 1024) val endPort = conf.getInt("spark.port.max", 65536) for (offset <- 0 to maxRetries) { // Do not increment port if port is 0, which is treated as a special port val tryPort = if (port == 0) { (startPort + Math.random() * (endPort - startPort)).toInt } else { - // If the new port wraps around, do not try a privilege port - ((port + offset - 1024) % (65536 - 1024)) + 1024 + // If the new port wraps around, ensure it is in range(startPort, endPort) + ((port + offset - startPort) % (endPort - startPort)) + startPort } try { val (service, port) = startService(tryPort) diff --git a/docs/configuration.md b/docs/configuration.md index 8839162c3a13e..e29f48364f300 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -183,6 +183,20 @@ of the most common options to set are: Logs the effective SparkConf as INFO when a SparkContext is started. + + spark.port.min + 1024 + + Min port for spark(UI, HttpServer. ConnectionManager, Akka) + + + + spark.port.max + 65536 + + Max port for spark(UI, HttpServer. ConnectionManager, Akka) + + Apart from these, the following properties are also available, and may be useful in some situations: From da05d64f6ab0b97f86a481430ebcc3a7dc8099c3 Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 22 Nov 2014 22:42:13 +0800 Subject: [PATCH 05/13] inclusive-exclusive issue fix --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 59148fe1ea7da..6ff75aed51421 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1670,10 +1670,10 @@ private[spark] object Utils extends Logging { for (offset <- 0 to maxRetries) { // Do not increment port if port is 0, which is treated as a special port val tryPort = if (port == 0) { - (startPort + Math.random() * (endPort - startPort)).toInt + port } else { // If the new port wraps around, ensure it is in range(startPort, endPort) - ((port + offset - startPort) % (endPort - startPort)) + startPort + ((port + offset) % (endPort - startPort + 1)) + startPort } try { val (service, port) = startService(tryPort) From d4da80b808c860b48d72c56756bbcd67f0fb0a94 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 27 Apr 2015 22:16:53 +0800 Subject: [PATCH 06/13] specify range for port retry --- .../org/apache/spark/HttpFileServer.scala | 2 +- .../scala/org/apache/spark/HttpServer.scala | 4 +- .../scala/org/apache/spark/SparkEnv.scala | 8 ++-- .../spark/broadcast/HttpBroadcast.scala | 2 +- .../org/apache/spark/deploy/Client.scala | 2 +- .../spark/deploy/LocalSparkCluster.scala | 4 +- .../spark/deploy/client/TestClient.scala | 2 +- .../spark/deploy/history/HistoryServer.scala | 4 +- .../apache/spark/deploy/master/Master.scala | 8 ++-- .../spark/deploy/master/MasterArguments.scala | 14 +++---- .../spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../deploy/rest/StandaloneRestServer.scala | 2 +- .../spark/deploy/worker/DriverWrapper.scala | 2 +- .../apache/spark/deploy/worker/Worker.scala | 15 ++++---- .../spark/deploy/worker/WorkerArguments.scala | 14 +++---- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 2 +- .../spark/executor/MesosExecutorBackend.scala | 2 +- .../spark/network/nio/ConnectionManager.scala | 4 +- .../network/nio/NioBlockTransferService.scala | 2 +- .../scala/org/apache/spark/rpc/RpcEnv.scala | 4 +- .../org/apache/spark/ui/JettyUtils.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 6 +-- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- .../org/apache/spark/util/AkkaUtils.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 38 +++++++++++-------- .../org/apache/spark/FileServerSuite.scala | 8 ++-- .../apache/spark/MapOutputTrackerSuite.scala | 6 +-- .../rest/StandaloneRestSubmitSuite.scala | 8 ++-- .../deploy/worker/WorkerWatcherSuite.scala | 4 +- .../network/nio/ConnectionManagerSuite.scala | 26 ++++++------- .../org/apache/spark/rpc/RpcEnvSuite.scala | 18 ++++----- .../spark/rpc/akka/AkkaRpcEnvSuite.scala | 4 +- .../BlockManagerReplicationSuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 2 +- .../scala/org/apache/spark/ui/UISuite.scala | 6 +-- .../apache/spark/util/AkkaUtilsSuite.scala | 32 ++++++++-------- docs/configuration.md | 14 ------- .../examples/streaming/ActorWordCount.scala | 2 +- .../streaming/flume/FlumeStreamSuite.scala | 2 +- .../streaming/kafka/KafkaTestUtils.scala | 6 +-- .../streaming/mqtt/MQTTStreamSuite.scala | 2 +- .../org/apache/spark/repl/SparkIMain.scala | 2 +- .../streaming/ReceivedBlockHandlerSuite.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 45 files changed, 145 insertions(+), 154 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 7e706bcc42f04..d99ae49546b02 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils private[spark] class HttpFileServer( conf: SparkConf, securityManager: SecurityManager, - requestedPort: Int = 0) + requestedPort: String = "0") extends Logging { var baseDir : File = null diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 8de3a6c04df34..5fd11aa5f86ce 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -46,12 +46,12 @@ private[spark] class HttpServer( conf: SparkConf, resourceBase: File, securityManager: SecurityManager, - requestedPort: Int = 0, + requestedPort: String = "0", serverName: String = "HTTP server") extends Logging { private var server: Server = null - private var port: Int = requestedPort + private var port: Int = 0 def start() { if (server != null) { diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 959aefabd8de4..c6b8342924d61 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -182,7 +182,7 @@ object SparkEnv extends Logging { assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!") assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") val hostname = conf.get("spark.driver.host") - val port = conf.get("spark.driver.port").toInt + val port = conf.get("spark.driver.port") create( conf, SparkContext.DRIVER_IDENTIFIER, @@ -203,7 +203,7 @@ object SparkEnv extends Logging { conf: SparkConf, executorId: String, hostname: String, - port: Int, + port: String, numCores: Int, isLocal: Boolean): SparkEnv = { val env = create( @@ -226,7 +226,7 @@ object SparkEnv extends Logging { conf: SparkConf, executorId: String, hostname: String, - port: Int, + port: String, isDriver: Boolean, isLocal: Boolean, listenerBus: LiveListenerBus = null, @@ -342,7 +342,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { - val fileServerPort = conf.getInt("spark.fileserver.port", 0) + val fileServerPort = conf.get("spark.fileserver.port", "0") val server = new HttpFileServer(conf, securityManager, fileServerPort) server.initialize() conf.set("spark.fileserver.uri", server.serverUri) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4457c75e8b0fc..715ee223443f5 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -152,7 +152,7 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast") - val broadcastPort = conf.getInt("spark.broadcast.port", 0) + val broadcastPort = conf.get("spark.broadcast.port", "0") server = new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index c2c3e9a9e4827..3962e3ecf9a2b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -160,7 +160,7 @@ object Client { Logger.getRootLogger.setLevel(driverArgs.logLevel) val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) + "driverClient", Utils.localHostName(), "0", conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem)) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index f0e77c2ba982b..692bdf436588c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -51,14 +51,14 @@ class LocalSparkCluster( val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false") /* Start the Master */ - val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf) + val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, "0", "0", _conf) masterActorSystems += masterSystem val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort val masters = Array(masterUrl) /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, + val (workerSystem, _) = Worker.startSystemAndActor(localHostname, "0", "0", coresPerWorker, memoryPerWorker, masters, null, Some(workerNum), _conf) workerActorSystems += workerSystem } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index 40835b9550586..ff35b636d0095 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -46,7 +46,7 @@ private[spark] object TestClient { def main(args: Array[String]) { val url = args(0) val conf = new SparkConf - val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0, + val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), "0", conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 56bef57e55392..9719dc321c248 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -44,7 +44,7 @@ class HistoryServer( conf: SparkConf, provider: ApplicationHistoryProvider, securityManager: SecurityManager, - port: Int) + port: String) extends WebUI(securityManager, port, conf) with Logging { // How many applications to retain @@ -189,7 +189,7 @@ object HistoryServer extends Logging { .newInstance(conf) .asInstanceOf[ApplicationHistoryProvider] - val port = conf.getInt("spark.history.ui.port", 18080) + val port = conf.get("spark.history.ui.port", "18080") val server = new HistoryServer(conf, provider, securityManager, port) server.bind() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index ff2eed6dee70a..a3c32350d0887 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -52,7 +52,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger private[master] class Master( host: String, port: Int, - webUiPort: Int, + webUiPort: String, val securityMgr: SecurityManager, val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable { @@ -129,7 +129,7 @@ private[master] class Master( private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true) private val restServer = if (restServerEnabled) { - val port = conf.getInt("spark.master.rest.port", 6066) + val port = conf.get("spark.master.rest.port", "6066") Some(new StandaloneRestServer(host, port, self, masterUrl, conf)) } else { None @@ -923,8 +923,8 @@ private[deploy] object Master extends Logging { */ def startSystemAndActor( host: String, - port: Int, - webUiPort: Int, + port: String, + webUiPort: String, conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala index 435b9b12f83b8..5714d1af00a5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala @@ -25,8 +25,8 @@ import org.apache.spark.util.{IntParam, Utils} */ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() - var port = 7077 - var webUiPort = 8080 + var port = "7077" + var webUiPort = "8080" var propertiesFile: String = null // Check for settings in environment variables @@ -34,10 +34,10 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { host = System.getenv("SPARK_MASTER_HOST") } if (System.getenv("SPARK_MASTER_PORT") != null) { - port = System.getenv("SPARK_MASTER_PORT").toInt + port = System.getenv("SPARK_MASTER_PORT") } if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) { - webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt + webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT") } parse(args.toList) @@ -46,7 +46,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) if (conf.contains("spark.master.ui.port")) { - webUiPort = conf.get("spark.master.ui.port").toInt + webUiPort = conf.get("spark.master.ui.port") } private def parse(args: List[String]): Unit = args match { @@ -60,11 +60,11 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { host = value parse(tail) - case ("--port" | "-p") :: IntParam(value) :: tail => + case ("--port" | "-p") :: value :: tail => port = value parse(tail) - case "--webui-port" :: IntParam(value) :: tail => + case "--webui-port" :: value :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index aad9c87bdb987..006543fe56869 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.RpcUtils * Web UI server for the standalone master. */ private[master] -class MasterWebUI(val master: Master, requestedPort: Int) +class MasterWebUI(val master: Master, requestedPort: String) extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging { val masterActorRef = master.self diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 2d6b8d4204795..c392675ec1780 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -60,7 +60,7 @@ import org.apache.spark.deploy.ClientArguments._ */ private[deploy] class StandaloneRestServer( host: String, - requestedPort: Int, + requestedPort: String, masterActor: ActorRef, masterUrl: String, masterConf: SparkConf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index d1a12b01e78f7..16da4be8e7508 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -39,7 +39,7 @@ object DriverWrapper { case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() val rpcEnv = RpcEnv.create("Driver", - Utils.localHostName(), 0, conf, new SecurityManager(conf)) + Utils.localHostName(), "0", conf, new SecurityManager(conf)) rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl)) val currentLoader = Thread.currentThread.getContextClassLoader diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 3ee2eb69e8a4e..0639caf9557d1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -44,8 +44,8 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} */ private[worker] class Worker( host: String, - port: Int, - webUiPort: Int, + port: String, + webUiPort: String, cores: Int, memory: Int, masterAkkaUrls: Array[String], @@ -58,7 +58,6 @@ private[worker] class Worker( import context.dispatcher Utils.checkHost(host, "Expected hostname") - assert (port > 0) // For worker and executor IDs private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") @@ -197,7 +196,7 @@ private[worker] class Worker( for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + actor ! RegisterWorker(workerId, host, port.toInt, cores, memory, webUi.boundPort.toInt, publicAddress) } } @@ -236,7 +235,7 @@ private[worker] class Worker( */ if (master != null) { master ! RegisterWorker( - workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + workerId, host, port.toInt, cores, memory, webUi.boundPort.toInt, publicAddress) } else { // We are retrying the initial registration tryRegisterAllMasters() @@ -480,7 +479,7 @@ private[worker] class Worker( masterDisconnected() case RequestWorkerState => - sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, + sender ! WorkerStateResponse(host, port.toInt, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) @@ -539,8 +538,8 @@ private[deploy] object Worker extends Logging { def startSystemAndActor( host: String, - port: Int, - webUiPort: Int, + port: String, + webUiPort: String, cores: Int, memory: Int, masterUrls: Array[String], diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 88f9d880ac209..b75dc28a8107f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -27,8 +27,8 @@ import org.apache.spark.SparkConf */ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() - var port = 0 - var webUiPort = 8081 + var port = "0" + var webUiPort = "8081" var cores = inferDefaultCores() var memory = inferDefaultMemory() var masters: Array[String] = null @@ -37,7 +37,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { // Check for settings in environment variables if (System.getenv("SPARK_WORKER_PORT") != null) { - port = System.getenv("SPARK_WORKER_PORT").toInt + port = System.getenv("SPARK_WORKER_PORT") } if (System.getenv("SPARK_WORKER_CORES") != null) { cores = System.getenv("SPARK_WORKER_CORES").toInt @@ -46,7 +46,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY")) } if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) { - webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt + webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT") } if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") @@ -58,7 +58,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) if (conf.contains("spark.worker.ui.port")) { - webUiPort = conf.get("spark.worker.ui.port").toInt + webUiPort = conf.get("spark.worker.ui.port") } checkWorkerMemory() @@ -74,7 +74,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { host = value parse(tail) - case ("--port" | "-p") :: IntParam(value) :: tail => + case ("--port" | "-p") :: value :: tail => port = value parse(tail) @@ -90,7 +90,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { workDir = value parse(tail) - case "--webui-port" :: IntParam(value) :: tail => + case "--webui-port" :: value :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index b3bb5f911dbd7..072bfc2f2ca23 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -34,7 +34,7 @@ private[worker] class WorkerWebUI( val worker: Worker, val workDir: File, - requestedPort: Int) + requestedPort: String) extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI") with Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8af46f3327adb..81015457c0406 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -146,7 +146,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) + val port = executorConf.get("spark.executor.port", "0") val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index cfd672e1d8a97..f9d2c7c7ecf3f 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -66,7 +66,7 @@ private[spark] class MesosExecutorBackend val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) + val port = conf.get("spark.executor.port", "0") val env = SparkEnv.createExecutorEnv( conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 16e905982cf64..b6a309cbcb775 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -42,7 +42,7 @@ import scala.util.Try import scala.util.control.NonFatal private[nio] class ConnectionManager( - port: Int, + port: String, conf: SparkConf, securityManager: SecurityManager, name: String = "Connection manager") @@ -1015,7 +1015,7 @@ private[spark] object ConnectionManager { def main(args: Array[String]) { val conf = new SparkConf - val manager = new ConnectionManager(9999, conf, new SecurityManager(conf)) + val manager = new ConnectionManager("9999", conf, new SecurityManager(conf)) manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { println("Received [" + msg + "] from [" + id + "]") None diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index b2aec160635c7..d057f632926ea 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -63,7 +63,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa override def init(blockDataManager: BlockDataManager): Unit = { this.blockDataManager = blockDataManager cm = new ConnectionManager( - conf.getInt("spark.blockManager.port", 0), + conf.get("spark.blockManager.port", "0"), conf, securityManager, "Connection manager for block manager") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index a5336b7563802..561b7c154adfd 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -116,7 +116,7 @@ private[spark] case class RpcEnvConfig( conf: SparkConf, name: String, host: String, - port: Int, + port: String, securityManager: SecurityManager) /** @@ -137,7 +137,7 @@ private[spark] object RpcEnv { def create( name: String, host: String, - port: Int, + port: String, conf: SparkConf, securityManager: SecurityManager): RpcEnv = { // Using Reflection to create the RpcEnv to avoid to depend on Akka directly diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a091ca650c60c..98585b072cb4c 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -200,7 +200,7 @@ private[spark] object JettyUtils extends Logging { */ def startJettyServer( hostName: String, - port: Int, + port: String, handlers: Seq[ServletContextHandler], conf: SparkConf, serverName: String = ""): ServerInfo = { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 580ab8b1325f8..e836aec3f6100 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -89,11 +89,11 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) } private[spark] object SparkUI { - val DEFAULT_PORT = 4040 + val DEFAULT_PORT = "4040" val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" - def getUIPort(conf: SparkConf): Int = { - conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) + def getUIPort(conf: SparkConf): String = { + conf.get("spark.ui.port", SparkUI.DEFAULT_PORT) } def createLiveUI( diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index f9860d1a5ce76..c3fcebd72d7fd 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -38,7 +38,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} */ private[spark] abstract class WebUI( securityManager: SecurityManager, - port: Int, + port: String, conf: SparkConf, basePath: String = "", name: String = "") diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index b725df3b44596..848f7739d0f39 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -47,7 +47,7 @@ private[spark] object AkkaUtils extends Logging { def createActorSystem( name: String, host: String, - port: Int, + port: String, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val startService: Int => (ActorSystem, Int) = { actualPort => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2168422705aaa..67f5fd4919492 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1916,35 +1916,41 @@ private[spark] object Utils extends Logging { * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * - * @param port The initial port to start the service on. - * @param maxRetries Maximum number of retries to attempt. - * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. + * @param port The minimum and maximum port to start the service on, separated by colon. + * If just set one number, take it as the minimum. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. * @param conf A SparkConf used to get the maximum number of retries when binding to a port. * @param serviceName Name of the service. */ def startServiceOnPort[T]( - port: Int, + port: String, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { - - require(startPort == 0 || (1024 <= startPort && startPort < 65536), - "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") + val ports = port.split(":", 2) + val (minPort, maxPort) = if (ports.length == 2) { + (ports(0).toInt, ports(1).toInt) + } else { + (ports(0).toInt, 65535) + } + require(minPort == 0 || (1024 <= minPort && minPort <= 65535), + s"Minimum port ${minPort} should be between 1024 and 65535 (inclusive)," + + " or 0 for a random free port.") + require(maxPort == 0 || (1024 <= maxPort && maxPort <= 65535), + s"Maximum port ${maxPort} should be between 1024 and 65535 (inclusive)," + + " or 0 for a random free port.") + require(minPort <= maxPort, s"Minimum ${minPort} port should not be" + + s" less than the maximum ${maxPort}.") val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) - val startPort = conf.getInt("spark.port.min", 1024) - val endPort = conf.getInt("spark.port.max", 65536) for (offset <- 0 to maxRetries) { - // specific a random port between 'spark.port.min' and 'spark.port.max' - // if port is 0 - val tryPort = if (port == 0) { - (startPort + Math.random() * (endPort - startPort + 1)).toInt + val tryPort = if (minPort == 0) { + minPort } else { - // If the new port wraps around, do not try a privilege port - ((port + offset - 1024) % (65536 - 1024)) + 1024 + // If user specify minimum and maximum ports, retry in the range. + (offset % (maxPort - minPort + 1)) + minPort } try { val (service, port) = startService(tryPort) @@ -1965,7 +1971,7 @@ private[spark] object Utils extends Logging { } } // Should never happen - throw new SparkException(s"Failed to start service$serviceString on port $startPort") + throw new SparkException(s"Failed to start service$serviceString on port $minPort") } /** diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index a69e9b761f9a7..5a430fbd214e0 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -177,7 +177,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("HttpFileServer should work with SSL") { val sparkConf = sparkSSLConfig() val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() @@ -193,7 +193,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { sparkConf.set("spark.authenticate.secret", "good") val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() @@ -209,7 +209,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { sparkConf.set("spark.authenticate.secret", "bad") val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() @@ -224,7 +224,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("HttpFileServer should not work with SSL when the server is untrusted") { val sparkConf = sparkSSLConfigUntrusted() val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6ed057a7cab97..14e3d83a80a2f 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.storage.BlockManagerId class MapOutputTrackerSuite extends FunSuite { private val conf = new SparkConf - def createRpcEnv(name: String, host: String = "localhost", port: Int = 0, + def createRpcEnv(name: String, host: String = "localhost", port: String = "0", securityManager: SecurityManager = new SecurityManager(conf)): RpcEnv = { RpcEnv.create(name, host, port, conf, securityManager) } @@ -113,13 +113,13 @@ class MapOutputTrackerSuite extends FunSuite { test("remote fetch") { val hostname = "localhost" - val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + val rpcEnv = createRpcEnv("spark", hostname, "0", new SecurityManager(conf)) val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) - val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, "0", new SecurityManager(conf)) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 8e09976636386..89145cca9b9a9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -396,13 +396,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val conf = new SparkConf val localhost = Utils.localHostName() val securityManager = new SecurityManager(conf) - val (_actorSystem, _) = AkkaUtils.createActorSystem(name, localhost, 0, conf, securityManager) + val (_actorSystem, _) = AkkaUtils.createActorSystem(name, localhost, "0", conf, securityManager) val fakeMasterRef = _actorSystem.actorOf(Props(makeFakeMaster)) val _server = if (faulty) { - new FaultyStandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf) + new FaultyStandaloneRestServer(localhost, "0", fakeMasterRef, "spark://fake:7077", conf) } else { - new StandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf) + new StandaloneRestServer(localhost, "0", fakeMasterRef, "spark://fake:7077", conf) } val port = _server.start() // set these to clean them up after every test @@ -562,7 +562,7 @@ private class SmarterMaster extends Actor { */ private class FaultyStandaloneRestServer( host: String, - requestedPort: Int, + requestedPort: String, masterActor: ActorRef, masterUrl: String, masterConf: SparkConf) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index 6a6f29dd613cd..f27545535539a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.FunSuite class WorkerWatcherSuite extends FunSuite { test("WorkerWatcher shuts down on valid disassociation") { val conf = new SparkConf() - val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) + val rpcEnv = RpcEnv.create("test", "localhost", "12345", conf, new SecurityManager(conf)) val targetWorkerUrl = "akka://test@1.2.3.4:1234/user/Worker" val targetWorkerAddress = AddressFromURIString(targetWorkerUrl) val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl) @@ -40,7 +40,7 @@ class WorkerWatcherSuite extends FunSuite { test("WorkerWatcher stays alive on invalid disassociation") { val conf = new SparkConf() - val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) + val rpcEnv = RpcEnv.create("test", "localhost", "12345", conf, new SecurityManager(conf)) val targetWorkerUrl = "akka://test@1.2.3.4:1234/user/Worker" val otherAkkaURL = "akka://test@4.3.2.1:1234/user/OtherActor" val otherAkkaAddress = AddressFromURIString(otherAkkaURL) diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala index 02424c59d6831..467e74c2d3834 100644 --- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala @@ -37,7 +37,7 @@ class ConnectionManagerSuite extends FunSuite { test("security default off") { val conf = new SparkConf val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var receivedMessage = false manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { receivedMessage = true @@ -62,14 +62,14 @@ class ConnectionManagerSuite extends FunSuite { conf.set("spark.authenticate.secret", "good") conf.set("spark.app.id", "app-id") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { numReceivedMessages += 1 None }) - val managerServer = new ConnectionManager(0, conf, securityManager) + val managerServer = new ConnectionManager("0", conf, securityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { numReceivedServerMessages += 1 @@ -99,7 +99,7 @@ class ConnectionManagerSuite extends FunSuite { conf.set("spark.app.id", "app-id") conf.set("spark.authenticate.secret", "good") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -109,7 +109,7 @@ class ConnectionManagerSuite extends FunSuite { val badconf = conf.clone.set("spark.authenticate.secret", "bad") val badsecurityManager = new SecurityManager(badconf) - val managerServer = new ConnectionManager(0, badconf, badsecurityManager) + val managerServer = new ConnectionManager("0", badconf, badsecurityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -138,7 +138,7 @@ class ConnectionManagerSuite extends FunSuite { conf.set("spark.authenticate", "false") conf.set("spark.authenticate.secret", "good") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -150,7 +150,7 @@ class ConnectionManagerSuite extends FunSuite { badconf.set("spark.authenticate", "true") badconf.set("spark.authenticate.secret", "good") val badsecurityManager = new SecurityManager(badconf) - val managerServer = new ConnectionManager(0, badconf, badsecurityManager) + val managerServer = new ConnectionManager("0", badconf, badsecurityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { numReceivedServerMessages += 1 @@ -188,7 +188,7 @@ class ConnectionManagerSuite extends FunSuite { val conf = new SparkConf conf.set("spark.authenticate", "false") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -199,7 +199,7 @@ class ConnectionManagerSuite extends FunSuite { val badconf = new SparkConf badconf.set("spark.authenticate", "false") val badsecurityManager = new SecurityManager(badconf) - val managerServer = new ConnectionManager(0, badconf, badsecurityManager) + val managerServer = new ConnectionManager("0", badconf, badsecurityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -234,8 +234,8 @@ class ConnectionManagerSuite extends FunSuite { val conf = new SparkConf conf.set("spark.authenticate", "false") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) - val managerServer = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) + val managerServer = new ConnectionManager("0", conf, securityManager) managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { throw new Exception("Custom exception text") }) @@ -264,12 +264,12 @@ class ConnectionManagerSuite extends FunSuite { clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s") val clientSecurityManager = new SecurityManager(clientConf) - val manager = new ConnectionManager(0, clientConf, clientSecurityManager) + val manager = new ConnectionManager("0", clientConf, clientSecurityManager) val serverConf = new SparkConf serverConf.set("spark.authenticate", "false") val serverSecurityManager = new SecurityManager(serverConf) - val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager) + val managerServer = new ConnectionManager("0", serverConf, serverSecurityManager) managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { // sleep 60 sec > ack timeout for simulating server slow down or hang up Thread.sleep(ackTimeoutS * 3 * 1000) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 44c88b00c442a..ad6f5f62486fc 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -38,7 +38,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { val conf = new SparkConf() - env = createRpcEnv(conf, "local", 12345) + env = createRpcEnv(conf, "local", "12345") } override def afterAll(): Unit = { @@ -47,7 +47,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } } - def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + def createRpcEnv(conf: SparkConf, name: String, port: String): RpcEnv test("send a message locally") { @volatile var message: String = null @@ -75,7 +75,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote" ,13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote" ,"13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely") try { @@ -130,7 +130,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely") try { @@ -157,7 +157,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { val conf = new SparkConf() conf.set("spark.rpc.retry.wait", "0") conf.set("spark.rpc.numRetries", "1") - val anotherEnv = createRpcEnv(conf, "remote", 13345) + val anotherEnv = createRpcEnv(conf, "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout") try { @@ -415,7 +415,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely") try { @@ -455,7 +455,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef( "local", env.address, "sendWithReply-remotely-error") @@ -495,7 +495,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef( "local", env.address, "network-events") @@ -524,7 +524,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef( "local", env.address, "sendWithReply-unserializable-error") diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index 58214c0637235..5cdb9d6ac0d60 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.{SecurityManager, SparkConf} class AkkaRpcEnvSuite extends RpcEnvSuite { - override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = { + override def createRpcEnv(conf: SparkConf, name: String, port: String): RpcEnv = { new AkkaRpcEnvFactory().create( RpcEnvConfig(conf, name, "localhost", port, new SecurityManager(conf))) } @@ -37,7 +37,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { }) val conf = new SparkConf() val newRpcEnv = new AkkaRpcEnvFactory().create( - RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf))) + RpcEnvConfig(conf, "test", "localhost", "12346", new SecurityManager(conf))) try { val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint") assert("akka.tcp://local@localhost:12345/user/test_endpoint" === diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index ffa5162a31841..53a7dcc17586a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -68,7 +68,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd } before { - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", "localhost", "0", conf, securityMgr) conf.set("spark.authenticate", "false") conf.set("spark.driver.port", rpcEnv.address.port.toString) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 7d82a7c66ad1a..2bdbfdc53dc77 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -73,7 +73,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach } override def beforeEach(): Unit = { - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", "localhost", "0", conf, securityMgr) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case System.setProperty("os.arch", "amd64") diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 77a038dc1720d..eece75c658c52 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -74,9 +74,9 @@ class UISuite extends FunSuite { val server = new ServerSocket(0) val startPort = server.getLocalPort val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) + "0.0.0.0", startPort.toString, Seq[ServletContextHandler](), new SparkConf) val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) + "0.0.0.0", startPort.toString, Seq[ServletContextHandler](), new SparkConf) // Allow some wiggle room in case ports on the machine are under contention val boundPort1 = serverInfo1.boundPort val boundPort2 = serverInfo2.boundPort @@ -90,7 +90,7 @@ class UISuite extends FunSuite { test("jetty binds to port 0 correctly") { val serverInfo = JettyUtils.startJettyServer( - "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) + "0.0.0.0", "0", Seq[ServletContextHandler](), new SparkConf) val server = serverInfo.server val boundPort = serverInfo.boundPort assert(server.getState === "STARTED") diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index bec79fc4dc8f7..5e6a2861e007e 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -42,7 +42,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -58,7 +58,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerBad.isAuthenticationEnabled() === true) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, conf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", conf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerEndpoint = @@ -76,7 +76,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -90,7 +90,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro badconf.set("spark.authenticate.secret", "good") val securityManagerBad = new SecurityManager(badconf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, badconf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", badconf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -122,7 +122,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -138,7 +138,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerGood.isAuthenticationEnabled() === true) - val slaveRpcEnv =RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood) + val slaveRpcEnv =RpcEnv.create("spark-slave", hostname, "0", goodconf, securityManagerGood) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -170,7 +170,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -187,7 +187,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerBad.isAuthenticationEnabled() === false) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, badconf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", badconf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerEndpoint = @@ -203,7 +203,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -215,7 +215,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val slaveConf = sparkSSLConfig() val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slaves", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slaves", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -248,7 +248,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -262,7 +262,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveConf.set("spark.authenticate.secret", "good") val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -295,7 +295,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -310,7 +310,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveConf.set("spark.authenticate.secret", "bad") val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerEndpoint = @@ -327,7 +327,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -339,7 +339,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val slaveConf = sparkSSLConfig() val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) try { slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) diff --git a/docs/configuration.md b/docs/configuration.md index 0d95d817a585a..d587b91124cb8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -175,20 +175,6 @@ of the most common options to set are: Logs the effective SparkConf as INFO when a SparkContext is started. - - spark.port.min - 1024 - - Min port for spark(UI, HttpServer. ConnectionManager, Akka) - - - - spark.port.max - 65536 - - Max port for spark(UI, HttpServer. ConnectionManager, Akka) - - spark.master (none) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 92867b44be138..2460f7790e528 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -113,7 +113,7 @@ object FeederActor { val Seq(host, port) = args.toSeq val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, + val actorSystem = AkkaUtils.createActorSystem("test", host, port, conf = conf, securityManager = new SecurityManager(conf))._1 val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 39e6754c81dbf..25d933b065395 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -77,7 +77,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L /** Find a free port */ private def findFreePort(): Int = { val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + Utils.startServiceOnPort(candidatePort.toString, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 13e9475065979..98025938c7f73 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -60,7 +60,7 @@ private class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + private var brokerPort = "9092" private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -106,7 +106,7 @@ private class KafkaTestUtils extends Logging { // Kafka broker startup Utils.startServiceOnPort(brokerPort, port => { - brokerPort = port + brokerPort = port.toString brokerConf = new KafkaConfig(brokerConfiguration) server = new KafkaServer(brokerConf) server.startup() @@ -181,7 +181,7 @@ private class KafkaTestUtils extends Logging { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") - props.put("port", brokerPort.toString) + props.put("port", brokerPort) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) props.put("log.flush.interval.messages", "1") diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index a19a72c58a705..3869cee9b1a31 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -115,7 +115,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { private def findFreePort(): Int = { val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + Utils.startServiceOnPort(candidatePort.toString, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 35fb625645022..4f952b9cd4119 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -114,7 +114,7 @@ import org.apache.spark.annotation.DeveloperApi private val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - private val classServerPort = conf.getInt("spark.replClassServer.port", 0) + private val classServerPort = conf.get("spark.replClassServer.port", "0") private val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") private var currentSettings: Settings = initialSettings private var printResults = true // whether to print result lines diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index c090eaec2928d..804138abd1d01 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -60,7 +60,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche var tempDirectory: File = null before { - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", "localhost", "0", conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 93ae45133ce24..0860264a11344 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -274,7 +274,7 @@ private[spark] class ApplicationMaster( } private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { - rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, 0, sparkConf, securityMgr) + rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, "0", sparkConf, securityMgr) waitForSparkDriver() addAmIpFilter() registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) From a01a20e5c06b19947fa638e3d6f666ba003095ab Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 28 Apr 2015 09:32:30 +0800 Subject: [PATCH 07/13] fix code style --- .../apache/spark/deploy/LocalSparkCluster.scala | 3 ++- .../org/apache/spark/deploy/worker/Worker.scala | 17 ++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 692bdf436588c..ac788a306340a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -51,7 +51,8 @@ class LocalSparkCluster( val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false") /* Start the Master */ - val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, "0", "0", _conf) + val (masterSystem, masterPort, _, _) = + Master.startSystemAndActor(localHostname, "0", "0", _conf) masterActorSystems += masterSystem val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort val masters = Array(masterUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 0639caf9557d1..cfd078ddd6d99 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -196,7 +196,8 @@ private[worker] class Worker( for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port.toInt, cores, memory, webUi.boundPort.toInt, publicAddress) + actor ! RegisterWorker(workerId, host, port.toInt, cores, memory, + webUi.boundPort, publicAddress) } } @@ -235,7 +236,7 @@ private[worker] class Worker( */ if (master != null) { master ! RegisterWorker( - workerId, host, port.toInt, cores, memory, webUi.boundPort.toInt, publicAddress) + workerId, host, port.toInt, cores, memory, webUi.boundPort, publicAddress) } else { // We are retrying the initial registration tryRegisterAllMasters() @@ -269,8 +270,8 @@ private[worker] class Worker( INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => - logInfo("Not spawning another attempt to register with the master, since there is an" + - " attempt scheduled already.") + logInfo("Not spawning another attempt to register with the master, " + + "since there is an attempt scheduled already.") } } @@ -281,7 +282,8 @@ private[worker] class Worker( changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) if (CLEANUP_ENABLED) { - logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + logInfo(s"Worker cleanup enabled; " + + s"old application directories will be deleted in: $workDir") context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } @@ -414,7 +416,7 @@ private[worker] class Worker( case KillExecutor(masterUrl, appId, execId) => if (masterUrl != activeMasterUrl) { - logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId) + logWarning(s"Invalid Master ($masterUrl) attempted to launch executor $execId") } else { val fullId = appId + "/" + execId executors.get(fullId) match { @@ -457,7 +459,8 @@ private[worker] class Worker( case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR => - logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") + logWarning(s"Driver $driverId failed " + + s"with unrecoverable exception: ${exception.get}") case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => From db0d3eeb2d9c0ba9ee45bffe8ef063c3d7a8aaf9 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 28 Apr 2015 11:32:14 +0800 Subject: [PATCH 08/13] don't use String type for bounded port --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index cfd078ddd6d99..71f11986d0247 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} */ private[worker] class Worker( host: String, - port: String, + port: Int, webUiPort: String, cores: Int, memory: Int, @@ -196,8 +196,7 @@ private[worker] class Worker( for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port.toInt, cores, memory, - webUi.boundPort, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } } @@ -236,7 +235,7 @@ private[worker] class Worker( */ if (master != null) { master ! RegisterWorker( - workerId, host, port.toInt, cores, memory, webUi.boundPort, publicAddress) + workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } else { // We are retrying the initial registration tryRegisterAllMasters() @@ -482,7 +481,7 @@ private[worker] class Worker( masterDisconnected() case RequestWorkerState => - sender ! WorkerStateResponse(host, port.toInt, workerId, executors.values.toList, + sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) From 083760a93434131603b37a6dabdff734b6efb298 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 29 Apr 2015 14:09:24 +0800 Subject: [PATCH 09/13] based the newest change https://github.com/apache/spark/pull/5144 --- .../deploy/mesos/MesosClusterDispatcherArguments.scala | 8 ++++---- .../org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala | 2 +- .../apache/spark/deploy/rest/RestSubmissionServer.scala | 2 +- .../apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala index 894cb78d8591a..41aca3729d954 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -23,9 +23,9 @@ import org.apache.spark.util.{IntParam, Utils} private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() - var port = 7077 + var port = "7077" var name = "Spark Cluster" - var webUiPort = 8081 + var webUiPort = "8081" var masterUrl: String = _ var zookeeperUrl: Option[String] = None var propertiesFile: String = _ @@ -40,11 +40,11 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: host = value parse(tail) - case ("--port" | "-p") :: IntParam(value) :: tail => + case ("--port" | "-p") :: value :: tail => port = value parse(tail) - case ("--webui-port" | "-p") :: IntParam(value) :: tail => + case ("--webui-port" | "-p") :: value :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala index 4865d46dbc4ab..f26383cb2b538 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.{SparkUI, WebUI} */ private[spark] class MesosClusterUI( securityManager: SecurityManager, - port: Int, + port: String, conf: SparkConf, dispatcherPublicAddress: String, val scheduler: MesosClusterScheduler) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index 2e78d03e5c0cc..e4b7df5269b59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -48,7 +48,7 @@ import org.apache.spark.util.Utils */ private[spark] abstract class RestSubmissionServer( val host: String, - val requestedPort: Int, + val requestedPort: String, val masterConf: SparkConf) extends Logging { protected val submitRequestServlet: SubmitRequestServlet protected val killRequestServlet: KillRequestServlet diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index fd17a980c9319..3bc25d264caad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -40,7 +40,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} */ private[spark] class MesosRestServer( host: String, - requestedPort: Int, + requestedPort: String, masterConf: SparkConf, scheduler: MesosClusterScheduler) extends RestSubmissionServer(host, requestedPort, masterConf) { From 28a3adf1e58c03e0cb5c274ea5dfe60965878ae4 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 4 May 2015 17:01:57 +0800 Subject: [PATCH 10/13] avoid too large port range --- .../src/main/scala/org/apache/spark/util/Utils.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b858ce82f9d34..9a64e9920852e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,7 +21,7 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer -import java.util.{PriorityQueue, Properties, Locale, Random, UUID} +import java.util.{Locale, PriorityQueue, Properties, Random, UUID} import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection @@ -1971,23 +1971,23 @@ private[spark] object Utils extends Logging { startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { + val maxRetries = portMaxRetries(conf) val ports = port.split(":", 2) val (minPort, maxPort) = if (ports.length == 2) { (ports(0).toInt, ports(1).toInt) } else { - (ports(0).toInt, 65535) + val _minPort = ports(0).toInt + (_minPort, math.min(65535, _minPort + maxRetries)) } require(minPort == 0 || (1024 <= minPort && minPort <= 65535), s"Minimum port ${minPort} should be between 1024 and 65535 (inclusive)," + " or 0 for a random free port.") - require(maxPort == 0 || (1024 <= maxPort && maxPort <= 65535), - s"Maximum port ${maxPort} should be between 1024 and 65535 (inclusive)," + - " or 0 for a random free port.") + require((1024 <= maxPort && maxPort <= 65535), + s"Maximum port ${maxPort} should be between 1024 and 65535 (inclusive).") require(minPort <= maxPort, s"Minimum ${minPort} port should not be" + s" less than the maximum ${maxPort}.") val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" - val maxRetries = portMaxRetries(conf) for (offset <- 0 to maxRetries) { val tryPort = if (minPort == 0) { minPort From 6e80ab00519a0163552cbdb4052a5fe5943e2a06 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 14 May 2015 20:40:50 +0800 Subject: [PATCH 11/13] fix spark.blockManager.port --- .../apache/spark/network/netty/NettyBlockTransferService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index d650d5fe73087..408261910df3f 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -71,7 +71,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage (server, server.getPort) } - val portToTry = conf.getInt("spark.blockManager.port", 0) + val portToTry = conf.get("spark.blockManager.port", "0") Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1 } From 248086575675c5027bcae656568ab6cac93c63de Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 15 May 2015 09:30:12 +0800 Subject: [PATCH 12/13] fix compile error --- .../apache/spark/network/netty/NettyBlockTransferService.scala | 2 +- .../org/apache/spark/deploy/history/HistoryServerSuite.scala | 2 +- .../test/scala/org/apache/spark/deploy/master/MasterSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 408261910df3f..dd422c1cfa5b3 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -81,7 +81,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit = { - logTrace(s"Fetch blocks from $host:$port (executor id $execId)") + logTrace(s"Fetch blocks from $host:$port (executor id $execId`)") try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 318ab5dbc4804..1e70ea595c491 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -58,7 +58,7 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with provider.checkForLogs() val securityManager = new SecurityManager(conf) - server = new HistoryServer(conf, provider, securityManager, 18080) + server = new HistoryServer(conf, provider, securityManager, "18080") server.initialize() server.bind() port = server.boundPort diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 0faa8f650e5e1..d082024b3d403 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -131,7 +131,7 @@ class MasterSuite extends FunSuite with Matchers { ) val (actorSystem, port, uiPort, restPort) = - Master.startSystemAndActor("127.0.0.1", 7077, 8080, conf) + Master.startSystemAndActor("127.0.0.1", "7077", "8080", conf) try { Await.result(actorSystem.actorSelection("/user/Master").resolveOne(10 seconds), 10 seconds) From bb08eef5ff86d7257f1a819c0d68de3e658e2675 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 15 May 2015 11:47:18 +0800 Subject: [PATCH 13/13] fix careless typo --- .../apache/spark/network/netty/NettyBlockTransferService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index dd422c1cfa5b3..408261910df3f 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -81,7 +81,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit = { - logTrace(s"Fetch blocks from $host:$port (executor id $execId`)") + logTrace(s"Fetch blocks from $host:$port (executor id $execId)") try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {