diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 7e706bcc42f04..d99ae49546b02 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils private[spark] class HttpFileServer( conf: SparkConf, securityManager: SecurityManager, - requestedPort: Int = 0) + requestedPort: String = "0") extends Logging { var baseDir : File = null diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 8de3a6c04df34..5fd11aa5f86ce 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -46,12 +46,12 @@ private[spark] class HttpServer( conf: SparkConf, resourceBase: File, securityManager: SecurityManager, - requestedPort: Int = 0, + requestedPort: String = "0", serverName: String = "HTTP server") extends Logging { private var server: Server = null - private var port: Int = requestedPort + private var port: Int = 0 def start() { if (server != null) { diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a185954089528..253500f551df4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -184,7 +184,7 @@ object SparkEnv extends Logging { assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!") assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") val hostname = conf.get("spark.driver.host") - val port = conf.get("spark.driver.port").toInt + val port = conf.get("spark.driver.port") create( conf, SparkContext.DRIVER_IDENTIFIER, @@ -205,7 +205,7 @@ object SparkEnv extends Logging { conf: SparkConf, executorId: String, hostname: String, - port: Int, + port: String, numCores: Int, isLocal: Boolean): SparkEnv = { val env = create( @@ -228,7 +228,7 @@ object SparkEnv extends Logging { conf: SparkConf, executorId: String, hostname: String, - port: Int, + port: String, isDriver: Boolean, isLocal: Boolean, listenerBus: LiveListenerBus = null, @@ -345,7 +345,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { - val fileServerPort = conf.getInt("spark.fileserver.port", 0) + val fileServerPort = conf.get("spark.fileserver.port", "0") val server = new HttpFileServer(conf, securityManager, fileServerPort) server.initialize() conf.set("spark.fileserver.uri", server.serverUri) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index b69af639f7862..8bf21d327b23d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -152,7 +152,7 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast") - val broadcastPort = conf.getInt("spark.broadcast.port", 0) + val broadcastPort = conf.get("spark.broadcast.port", "0") server = new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 848b62f9de71b..207c22a8c9e8e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -195,7 +195,7 @@ object Client { Logger.getRootLogger.setLevel(driverArgs.logLevel) val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) + "driverClient", Utils.localHostName(), "0", conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely for (m <- driverArgs.masters) { diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 0550f00a172ab..5f1cdc85b0174 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -56,7 +56,7 @@ class LocalSparkCluster( /* Start the Master */ val (masterSystem, masterPort, webUiPort, _) = - Master.startSystemAndActor(localHostname, 0, 0, _conf) + Master.startSystemAndActor(localHostname, "0", "0", _conf) masterWebUIPort = webUiPort masterActorSystems += masterSystem val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort @@ -64,7 +64,7 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, + val (workerSystem, _) = Worker.startSystemAndActor(localHostname, "0", "0", coresPerWorker, memoryPerWorker, masters, null, Some(workerNum), _conf) workerActorSystems += workerSystem } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index 40835b9550586..ff35b636d0095 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -46,7 +46,7 @@ private[spark] object TestClient { def main(args: Array[String]) { val url = args(0) val conf = new SparkConf - val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0, + val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), "0", conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription("TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 5a0eb585a9049..d92c77684992e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -46,7 +46,7 @@ class HistoryServer( conf: SparkConf, provider: ApplicationHistoryProvider, securityManager: SecurityManager, - port: Int) + port: String) extends WebUI(securityManager, port, conf) with Logging with UIRoot { // How many applications to retain @@ -225,7 +225,7 @@ object HistoryServer extends Logging { .newInstance(conf) .asInstanceOf[ApplicationHistoryProvider] - val port = conf.getInt("spark.history.ui.port", 18080) + val port = conf.get("spark.history.ui.port", "18080") val server = new HistoryServer(conf, provider, securityManager, port) server.bind() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fccceb3ea528b..b4041434a18af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -52,7 +52,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger private[master] class Master( host: String, port: Int, - webUiPort: Int, + webUiPort: String, val securityMgr: SecurityManager, val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable { @@ -129,7 +129,7 @@ private[master] class Master( private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true) private val restServer = if (restServerEnabled) { - val port = conf.getInt("spark.master.rest.port", 6066) + val port = conf.get("spark.master.rest.port", "6066") Some(new StandaloneRestServer(host, port, conf, self, masterUrl)) } else { None @@ -931,8 +931,8 @@ private[deploy] object Master extends Logging { */ def startSystemAndActor( host: String, - port: Int, - webUiPort: Int, + port: String, + webUiPort: String, conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala index 435b9b12f83b8..5714d1af00a5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala @@ -25,8 +25,8 @@ import org.apache.spark.util.{IntParam, Utils} */ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() - var port = 7077 - var webUiPort = 8080 + var port = "7077" + var webUiPort = "8080" var propertiesFile: String = null // Check for settings in environment variables @@ -34,10 +34,10 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { host = System.getenv("SPARK_MASTER_HOST") } if (System.getenv("SPARK_MASTER_PORT") != null) { - port = System.getenv("SPARK_MASTER_PORT").toInt + port = System.getenv("SPARK_MASTER_PORT") } if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) { - webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt + webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT") } parse(args.toList) @@ -46,7 +46,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) if (conf.contains("spark.master.ui.port")) { - webUiPort = conf.get("spark.master.ui.port").toInt + webUiPort = conf.get("spark.master.ui.port") } private def parse(args: List[String]): Unit = args match { @@ -60,11 +60,11 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { host = value parse(tail) - case ("--port" | "-p") :: IntParam(value) :: tail => + case ("--port" | "-p") :: value :: tail => port = value parse(tail) - case "--webui-port" :: IntParam(value) :: tail => + case "--webui-port" :: value :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 2111a8581f2e4..dd17dafc89e4e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -29,7 +29,7 @@ import org.apache.spark.util.RpcUtils * Web UI server for the standalone master. */ private[master] -class MasterWebUI(val master: Master, requestedPort: Int) +class MasterWebUI(val master: Master, requestedPort: String) extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot { diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala index 894cb78d8591a..41aca3729d954 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -23,9 +23,9 @@ import org.apache.spark.util.{IntParam, Utils} private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() - var port = 7077 + var port = "7077" var name = "Spark Cluster" - var webUiPort = 8081 + var webUiPort = "8081" var masterUrl: String = _ var zookeeperUrl: Option[String] = None var propertiesFile: String = _ @@ -40,11 +40,11 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: host = value parse(tail) - case ("--port" | "-p") :: IntParam(value) :: tail => + case ("--port" | "-p") :: value :: tail => port = value parse(tail) - case ("--webui-port" | "-p") :: IntParam(value) :: tail => + case ("--webui-port" | "-p") :: value :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala index 3f693545a0349..cc333ba648b70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.{SparkUI, WebUI} */ private[spark] class MesosClusterUI( securityManager: SecurityManager, - port: Int, + port: String, conf: SparkConf, dispatcherPublicAddress: String, val scheduler: MesosClusterScheduler) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index 2e78d03e5c0cc..e4b7df5269b59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -48,7 +48,7 @@ import org.apache.spark.util.Utils */ private[spark] abstract class RestSubmissionServer( val host: String, - val requestedPort: Int, + val requestedPort: String, val masterConf: SparkConf) extends Logging { protected val submitRequestServlet: SubmitRequestServlet protected val killRequestServlet: KillRequestServlet diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 502b9bb701ccf..b7fd235890da1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -50,7 +50,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} */ private[deploy] class StandaloneRestServer( host: String, - requestedPort: Int, + requestedPort: String, masterConf: SparkConf, masterActor: ActorRef, masterUrl: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 8198296eeb341..088316f85d0f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -40,7 +40,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} */ private[spark] class MesosRestServer( host: String, - requestedPort: Int, + requestedPort: String, masterConf: SparkConf, scheduler: MesosClusterScheduler) extends RestSubmissionServer(host, requestedPort, masterConf) { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index d1a12b01e78f7..16da4be8e7508 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -39,7 +39,7 @@ object DriverWrapper { case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() val rpcEnv = RpcEnv.create("Driver", - Utils.localHostName(), 0, conf, new SecurityManager(conf)) + Utils.localHostName(), "0", conf, new SecurityManager(conf)) rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl)) val currentLoader = Thread.currentThread.getContextClassLoader diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ebc6cd76c6afd..0fd231f594253 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} private[worker] class Worker( host: String, port: Int, - webUiPort: Int, + webUiPort: String, cores: Int, memory: Int, masterAkkaUrls: Array[String], @@ -59,7 +59,6 @@ private[worker] class Worker( import context.dispatcher Utils.checkHost(host, "Expected hostname") - assert (port > 0) // For worker and executor IDs private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") @@ -271,8 +270,8 @@ private[worker] class Worker( INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => - logInfo("Not spawning another attempt to register with the master, since there is an" + - " attempt scheduled already.") + logInfo("Not spawning another attempt to register with the master, " + + "since there is an attempt scheduled already.") } } @@ -283,7 +282,8 @@ private[worker] class Worker( changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) if (CLEANUP_ENABLED) { - logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + logInfo(s"Worker cleanup enabled; " + + s"old application directories will be deleted in: $workDir") context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } @@ -413,7 +413,7 @@ private[worker] class Worker( case KillExecutor(masterUrl, appId, execId) => if (masterUrl != activeMasterUrl) { - logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId) + logWarning(s"Invalid Master ($masterUrl) attempted to launch executor $execId") } else { val fullId = appId + "/" + execId executors.get(fullId) match { @@ -456,7 +456,8 @@ private[worker] class Worker( case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR => - logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") + logWarning(s"Driver $driverId failed " + + s"with unrecoverable exception: ${exception.get}") case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => @@ -537,8 +538,8 @@ private[deploy] object Worker extends Logging { def startSystemAndActor( host: String, - port: Int, - webUiPort: Int, + port: String, + webUiPort: String, cores: Int, memory: Int, masterUrls: Array[String], diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 9678631da9f6f..ca300ccd21a86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -27,8 +27,8 @@ import org.apache.spark.SparkConf */ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() - var port = 0 - var webUiPort = 8081 + var port = "0" + var webUiPort = "8081" var cores = inferDefaultCores() var memory = inferDefaultMemory() var masters: Array[String] = null @@ -37,7 +37,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { // Check for settings in environment variables if (System.getenv("SPARK_WORKER_PORT") != null) { - port = System.getenv("SPARK_WORKER_PORT").toInt + port = System.getenv("SPARK_WORKER_PORT") } if (System.getenv("SPARK_WORKER_CORES") != null) { cores = System.getenv("SPARK_WORKER_CORES").toInt @@ -46,7 +46,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY")) } if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) { - webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt + webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT") } if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") @@ -58,7 +58,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) if (conf.contains("spark.worker.ui.port")) { - webUiPort = conf.get("spark.worker.ui.port").toInt + webUiPort = conf.get("spark.worker.ui.port") } checkWorkerMemory() @@ -74,7 +74,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { host = value parse(tail) - case ("--port" | "-p") :: IntParam(value) :: tail => + case ("--port" | "-p") :: value :: tail => port = value parse(tail) @@ -90,7 +90,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { workDir = value parse(tail) - case "--webui-port" :: IntParam(value) :: tail => + case "--webui-port" :: value :: tail => webUiPort = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index b3bb5f911dbd7..072bfc2f2ca23 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -34,7 +34,7 @@ private[worker] class WorkerWebUI( val worker: Worker, val workDir: File, - requestedPort: Int) + requestedPort: String) extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI") with Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f3a26f54a81fb..b0757cf07a680 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -149,7 +149,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) + val port = executorConf.get("spark.executor.port", "0") val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index cfd672e1d8a97..f9d2c7c7ecf3f 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -66,7 +66,7 @@ private[spark] class MesosExecutorBackend val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) + val port = conf.get("spark.executor.port", "0") val env = SparkEnv.createExecutorEnv( conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index d650d5fe73087..408261910df3f 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -71,7 +71,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage (server, server.getPort) } - val portToTry = conf.getInt("spark.blockManager.port", 0) + val portToTry = conf.get("spark.blockManager.port", "0") Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1 } diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 497871ed6d5e5..74b825b9a4c88 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -42,7 +42,7 @@ import scala.util.Try import scala.util.control.NonFatal private[nio] class ConnectionManager( - port: Int, + port: String, conf: SparkConf, securityManager: SecurityManager, name: String = "Connection manager") @@ -1015,7 +1015,7 @@ private[spark] object ConnectionManager { def main(args: Array[String]) { val conf = new SparkConf - val manager = new ConnectionManager(9999, conf, new SecurityManager(conf)) + val manager = new ConnectionManager("9999", conf, new SecurityManager(conf)) manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { println("Received [" + msg + "] from [" + id + "]") None diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index b2aec160635c7..d057f632926ea 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -63,7 +63,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa override def init(blockDataManager: BlockDataManager): Unit = { this.blockDataManager = blockDataManager cm = new ConnectionManager( - conf.getInt("spark.blockManager.port", 0), + conf.get("spark.blockManager.port", "0"), conf, securityManager, "Connection manager for block manager") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 12b6b28d4d7ec..9a060728b1043 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -44,7 +44,7 @@ private[spark] object RpcEnv { def create( name: String, host: String, - port: Int, + port: String, conf: SparkConf, securityManager: SecurityManager): RpcEnv = { // Using Reflection to create the RpcEnv to avoid to depend on Akka directly @@ -54,7 +54,6 @@ private[spark] object RpcEnv { } - /** * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote @@ -145,7 +144,7 @@ private[spark] case class RpcEnvConfig( conf: SparkConf, name: String, host: String, - port: Int, + port: String, securityManager: SecurityManager) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 06e616220c706..add2b0bb636cd 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -205,7 +205,7 @@ private[spark] object JettyUtils extends Logging { */ def startJettyServer( hostName: String, - port: Int, + port: String, handlers: Seq[ServletContextHandler], conf: SparkConf, serverName: String = ""): ServerInfo = { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 3788916cf39bb..66f5de88a97a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -120,14 +120,14 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) } private[spark] object SparkUI { - val DEFAULT_PORT = 4040 + val DEFAULT_PORT = "4040" val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 val DEFAULT_RETAINED_JOBS = 1000 - def getUIPort(conf: SparkConf): Int = { - conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) + def getUIPort(conf: SparkConf): String = { + conf.get("spark.ui.port", SparkUI.DEFAULT_PORT) } def createLiveUI( diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 594df15e9cc85..e8cf09b97ab31 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -38,7 +38,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} */ private[spark] abstract class WebUI( val securityManager: SecurityManager, - port: Int, + port: String, conf: SparkConf, basePath: String = "", name: String = "") diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 96aa2fe164703..037847148cca5 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -47,7 +47,7 @@ private[spark] object AkkaUtils extends Logging { def createActorSystem( name: String, host: String, - port: Int, + port: String, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val startService: Int => (ActorSystem, Int) = { actualPort => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 693e1a0a3d5f0..e79b4d1f4da09 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,7 +21,7 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer -import java.util.{PriorityQueue, Properties, Locale, Random, UUID} +import java.util.{Locale, PriorityQueue, Properties, Random, UUID} import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection @@ -1962,30 +1962,42 @@ private[spark] object Utils extends Logging { * Attempt to start a service on the given port, or fail after a number of attempts. * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). * - * @param startPort The initial port to start the service on. + * @param port The minimum and maximum port to start the service on, separated by colon. + * If just set one number, take it as the minimum. * @param startService Function to start service on a given port. * This is expected to throw java.net.BindException on port collision. * @param conf A SparkConf used to get the maximum number of retries when binding to a port. * @param serviceName Name of the service. */ def startServiceOnPort[T]( - startPort: Int, + port: String, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { - - require(startPort == 0 || (1024 <= startPort && startPort < 65536), - "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") + val ports = port.split(":", 2) + val (minPort, maxPort, maxRetries) = if (ports.length == 2) { + val _minPort = ports(0).toInt + val _maxPort = ports(1).toInt + (_minPort, _maxPort, _maxPort - _minPort) + } else { + val _minPort = ports(0).toInt + (_minPort, 65535, portMaxRetries(conf)) + } + require(minPort == 0 || (1024 <= minPort && minPort <= 65535), + s"Minimum port ${minPort} should be between 1024 and 65535 (inclusive)," + + " or 0 for a random free port.") + require((1024 <= maxPort && maxPort <= 65535), + s"Maximum port ${maxPort} should be between 1024 and 65535 (inclusive).") + require(minPort <= maxPort, s"Minimum ${minPort} port should not be" + + s" less than the maximum ${maxPort}.") val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" - val maxRetries = portMaxRetries(conf) for (offset <- 0 to maxRetries) { - // Do not increment port if startPort is 0, which is treated as a special port - val tryPort = if (startPort == 0) { - startPort + val tryPort = if (minPort == 0) { + minPort } else { - // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 + // If user specify minimum and maximum ports, retry in the range. + (offset % (maxPort - minPort + 1)) + minPort } try { val (service, port) = startService(tryPort) @@ -2006,7 +2018,7 @@ private[spark] object Utils extends Logging { } } // Should never happen - throw new SparkException(s"Failed to start service$serviceString on port $startPort") + throw new SparkException(s"Failed to start service$serviceString on port $minPort") } /** diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index bff2d10b9946c..47515d055a298 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -176,7 +176,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("HttpFileServer should work with SSL") { val sparkConf = sparkSSLConfig() val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() @@ -192,7 +192,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { sparkConf.set("spark.authenticate.secret", "good") val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() @@ -208,7 +208,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { sparkConf.set("spark.authenticate.secret", "bad") val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() @@ -223,7 +223,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("HttpFileServer should not work with SSL when the server is untrusted") { val sparkConf = sparkSSLConfigUntrusted() val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) + val server = new HttpFileServer(sparkConf, sm, "0") try { server.initialize() diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6ed057a7cab97..14e3d83a80a2f 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.storage.BlockManagerId class MapOutputTrackerSuite extends FunSuite { private val conf = new SparkConf - def createRpcEnv(name: String, host: String = "localhost", port: Int = 0, + def createRpcEnv(name: String, host: String = "localhost", port: String = "0", securityManager: SecurityManager = new SecurityManager(conf)): RpcEnv = { RpcEnv.create(name, host, port, conf, securityManager) } @@ -113,13 +113,13 @@ class MapOutputTrackerSuite extends FunSuite { test("remote fetch") { val hostname = "localhost" - val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(conf)) + val rpcEnv = createRpcEnv("spark", hostname, "0", new SecurityManager(conf)) val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf)) - val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf)) + val slaveRpcEnv = createRpcEnv("spark-slave", hostname, "0", new SecurityManager(conf)) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index e10dd4cf837aa..6a6e24e2bb949 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -58,7 +58,7 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with provider.checkForLogs() val securityManager = new SecurityManager(conf) - server = new HistoryServer(conf, provider, securityManager, 18080) + server = new HistoryServer(conf, provider, securityManager, "18080") server.initialize() server.bind() port = server.boundPort diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index f97e5ff6db31d..19d25238805ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -135,7 +135,7 @@ class MasterSuite extends FunSuite with Matchers with Eventually { ) val (actorSystem, port, uiPort, restPort) = - Master.startSystemAndActor("127.0.0.1", 7077, 8080, conf) + Master.startSystemAndActor("127.0.0.1", "7077", "8080", conf) try { Await.result(actorSystem.actorSelection("/user/Master").resolveOne(10 seconds), 10 seconds) diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index f4d548d9e7720..eac3651ebd46c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -400,13 +400,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach { val conf = new SparkConf val localhost = Utils.localHostName() val securityManager = new SecurityManager(conf) - val (_actorSystem, _) = AkkaUtils.createActorSystem(name, localhost, 0, conf, securityManager) + val (_actorSystem, _) = AkkaUtils.createActorSystem(name, localhost, "0", conf, securityManager) val fakeMasterRef = _actorSystem.actorOf(Props(makeFakeMaster)) val _server = if (faulty) { - new FaultyStandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077") + new FaultyStandaloneRestServer(localhost, "0", conf, fakeMasterRef, "spark://fake:7077") } else { - new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077") + new StandaloneRestServer(localhost, "0", conf, fakeMasterRef, "spark://fake:7077") } val port = _server.start() // set these to clean them up after every test @@ -566,7 +566,7 @@ private class SmarterMaster extends Actor { */ private class FaultyStandaloneRestServer( host: String, - requestedPort: Int, + requestedPort: String, masterConf: SparkConf, masterActor: ActorRef, masterUrl: String) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index 6a6f29dd613cd..f27545535539a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.FunSuite class WorkerWatcherSuite extends FunSuite { test("WorkerWatcher shuts down on valid disassociation") { val conf = new SparkConf() - val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) + val rpcEnv = RpcEnv.create("test", "localhost", "12345", conf, new SecurityManager(conf)) val targetWorkerUrl = "akka://test@1.2.3.4:1234/user/Worker" val targetWorkerAddress = AddressFromURIString(targetWorkerUrl) val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl) @@ -40,7 +40,7 @@ class WorkerWatcherSuite extends FunSuite { test("WorkerWatcher stays alive on invalid disassociation") { val conf = new SparkConf() - val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) + val rpcEnv = RpcEnv.create("test", "localhost", "12345", conf, new SecurityManager(conf)) val targetWorkerUrl = "akka://test@1.2.3.4:1234/user/Worker" val otherAkkaURL = "akka://test@4.3.2.1:1234/user/OtherActor" val otherAkkaAddress = AddressFromURIString(otherAkkaURL) diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala index 02424c59d6831..467e74c2d3834 100644 --- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala @@ -37,7 +37,7 @@ class ConnectionManagerSuite extends FunSuite { test("security default off") { val conf = new SparkConf val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var receivedMessage = false manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { receivedMessage = true @@ -62,14 +62,14 @@ class ConnectionManagerSuite extends FunSuite { conf.set("spark.authenticate.secret", "good") conf.set("spark.app.id", "app-id") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { numReceivedMessages += 1 None }) - val managerServer = new ConnectionManager(0, conf, securityManager) + val managerServer = new ConnectionManager("0", conf, securityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { numReceivedServerMessages += 1 @@ -99,7 +99,7 @@ class ConnectionManagerSuite extends FunSuite { conf.set("spark.app.id", "app-id") conf.set("spark.authenticate.secret", "good") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -109,7 +109,7 @@ class ConnectionManagerSuite extends FunSuite { val badconf = conf.clone.set("spark.authenticate.secret", "bad") val badsecurityManager = new SecurityManager(badconf) - val managerServer = new ConnectionManager(0, badconf, badsecurityManager) + val managerServer = new ConnectionManager("0", badconf, badsecurityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -138,7 +138,7 @@ class ConnectionManagerSuite extends FunSuite { conf.set("spark.authenticate", "false") conf.set("spark.authenticate.secret", "good") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -150,7 +150,7 @@ class ConnectionManagerSuite extends FunSuite { badconf.set("spark.authenticate", "true") badconf.set("spark.authenticate.secret", "good") val badsecurityManager = new SecurityManager(badconf) - val managerServer = new ConnectionManager(0, badconf, badsecurityManager) + val managerServer = new ConnectionManager("0", badconf, badsecurityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { numReceivedServerMessages += 1 @@ -188,7 +188,7 @@ class ConnectionManagerSuite extends FunSuite { val conf = new SparkConf conf.set("spark.authenticate", "false") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) var numReceivedMessages = 0 manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -199,7 +199,7 @@ class ConnectionManagerSuite extends FunSuite { val badconf = new SparkConf badconf.set("spark.authenticate", "false") val badsecurityManager = new SecurityManager(badconf) - val managerServer = new ConnectionManager(0, badconf, badsecurityManager) + val managerServer = new ConnectionManager("0", badconf, badsecurityManager) var numReceivedServerMessages = 0 managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { @@ -234,8 +234,8 @@ class ConnectionManagerSuite extends FunSuite { val conf = new SparkConf conf.set("spark.authenticate", "false") val securityManager = new SecurityManager(conf) - val manager = new ConnectionManager(0, conf, securityManager) - val managerServer = new ConnectionManager(0, conf, securityManager) + val manager = new ConnectionManager("0", conf, securityManager) + val managerServer = new ConnectionManager("0", conf, securityManager) managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { throw new Exception("Custom exception text") }) @@ -264,12 +264,12 @@ class ConnectionManagerSuite extends FunSuite { clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s") val clientSecurityManager = new SecurityManager(clientConf) - val manager = new ConnectionManager(0, clientConf, clientSecurityManager) + val manager = new ConnectionManager("0", clientConf, clientSecurityManager) val serverConf = new SparkConf serverConf.set("spark.authenticate", "false") val serverSecurityManager = new SecurityManager(serverConf) - val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager) + val managerServer = new ConnectionManager("0", serverConf, serverSecurityManager) managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { // sleep 60 sec > ack timeout for simulating server slow down or hang up Thread.sleep(ackTimeoutS * 3 * 1000) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 21eb71d9acfbd..dd4470d558dc3 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -38,7 +38,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { val conf = new SparkConf() - env = createRpcEnv(conf, "local", 12345) + env = createRpcEnv(conf, "local", "12345") } override def afterAll(): Unit = { @@ -47,7 +47,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } } - def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + def createRpcEnv(conf: SparkConf, name: String, port: String): RpcEnv test("send a message locally") { @volatile var message: String = null @@ -75,7 +75,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely") try { @@ -130,7 +130,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely") try { @@ -157,7 +157,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { val conf = new SparkConf() conf.set("spark.rpc.retry.wait", "0") conf.set("spark.rpc.numRetries", "1") - val anotherEnv = createRpcEnv(conf, "remote", 13345) + val anotherEnv = createRpcEnv(conf, "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout") try { @@ -415,7 +415,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely") try { @@ -455,7 +455,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef( "local", env.address, "sendWithReply-remotely-error") @@ -495,7 +495,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef( "local", env.address, "network-events") @@ -524,7 +524,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", "13345") // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef( "local", env.address, "sendWithReply-unserializable-error") diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index a33a83db7bc9e..f44cb8b5ab274 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.{SecurityManager, SparkConf} class AkkaRpcEnvSuite extends RpcEnvSuite { - override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = { + override def createRpcEnv(conf: SparkConf, name: String, port: String): RpcEnv = { new AkkaRpcEnvFactory().create( RpcEnvConfig(conf, name, "localhost", port, new SecurityManager(conf))) } @@ -37,7 +37,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { }) val conf = new SparkConf() val newRpcEnv = new AkkaRpcEnvFactory().create( - RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf))) + RpcEnvConfig(conf, "test", "localhost", "12346", new SecurityManager(conf))) try { val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint") assert(s"akka.tcp://local@${env.address}/user/test_endpoint" === diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index f647200402ecb..a9a0b6b369cf8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -68,7 +68,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd } before { - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", "localhost", "0", conf, securityMgr) conf.set("spark.authenticate", "false") conf.set("spark.driver.port", rpcEnv.address.port.toString) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 151955ef7f435..64e25e84fa235 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -73,7 +73,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach } override def beforeEach(): Unit = { - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", "localhost", "0", conf, securityMgr) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case System.setProperty("os.arch", "amd64") diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 77a038dc1720d..eece75c658c52 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -74,9 +74,9 @@ class UISuite extends FunSuite { val server = new ServerSocket(0) val startPort = server.getLocalPort val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) + "0.0.0.0", startPort.toString, Seq[ServletContextHandler](), new SparkConf) val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) + "0.0.0.0", startPort.toString, Seq[ServletContextHandler](), new SparkConf) // Allow some wiggle room in case ports on the machine are under contention val boundPort1 = serverInfo1.boundPort val boundPort2 = serverInfo2.boundPort @@ -90,7 +90,7 @@ class UISuite extends FunSuite { test("jetty binds to port 0 correctly") { val serverInfo = JettyUtils.startJettyServer( - "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) + "0.0.0.0", "0", Seq[ServletContextHandler](), new SparkConf) val server = serverInfo.server val boundPort = serverInfo.boundPort assert(server.getState === "STARTED") diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index ccdb3f571429d..d200e0722edd1 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -42,7 +42,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -58,7 +58,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerBad.isAuthenticationEnabled() === true) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, conf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", conf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerEndpoint = @@ -76,7 +76,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -90,7 +90,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro badconf.set("spark.authenticate.secret", "good") val securityManagerBad = new SecurityManager(badconf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, badconf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", badconf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -122,7 +122,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -138,7 +138,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerGood.isAuthenticationEnabled() === true) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", goodconf, securityManagerGood) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -170,7 +170,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -187,7 +187,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerBad.isAuthenticationEnabled() === false) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, badconf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", badconf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerEndpoint = @@ -203,7 +203,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -215,7 +215,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val slaveConf = sparkSSLConfig() val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slaves", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slaves", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -248,7 +248,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -262,7 +262,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveConf.set("spark.authenticate.secret", "good") val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) @@ -295,7 +295,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -310,7 +310,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveConf.set("spark.authenticate.secret", "bad") val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerEndpoint = @@ -327,7 +327,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val securityManager = new SecurityManager(conf) val hostname = "localhost" - val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager) + val rpcEnv = RpcEnv.create("spark", hostname, "0", conf, securityManager) System.setProperty("spark.hostPort", rpcEnv.address.hostPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -339,7 +339,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val slaveConf = sparkSSLConfig() val securityManagerBad = new SecurityManager(slaveConf) - val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, "0", slaveConf, securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) try { slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 016de4c63d1d2..c9183efa6531e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -111,7 +111,7 @@ object FeederActor { val Seq(host, port) = args.toSeq val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, + val actorSystem = AkkaUtils.createActorSystem("test", host, port, conf = conf, securityManager = new SecurityManager(conf))._1 val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 39e6754c81dbf..25d933b065395 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -77,7 +77,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L /** Find a free port */ private def findFreePort(): Int = { val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + Utils.startServiceOnPort(candidatePort.toString, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 6dc4e9517d5a4..77696e6026d96 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -62,7 +62,7 @@ private class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + private var brokerPort = "9092" private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -108,7 +108,7 @@ private class KafkaTestUtils extends Logging { // Kafka broker startup Utils.startServiceOnPort(brokerPort, port => { - brokerPort = port + brokerPort = port.toString brokerConf = new KafkaConfig(brokerConfiguration) server = new KafkaServer(brokerConf) server.startup() @@ -183,7 +183,7 @@ private class KafkaTestUtils extends Logging { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") - props.put("port", brokerPort.toString) + props.put("port", brokerPort) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) props.put("log.flush.interval.messages", "1") diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index a19a72c58a705..3869cee9b1a31 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -115,7 +115,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { private def findFreePort(): Int = { val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + Utils.startServiceOnPort(candidatePort.toString, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 35fb625645022..4f952b9cd4119 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -114,7 +114,7 @@ import org.apache.spark.annotation.DeveloperApi private val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - private val classServerPort = conf.getInt("spark.replClassServer.port", 0) + private val classServerPort = conf.get("spark.replClassServer.port", "0") private val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") private var currentSettings: Settings = initialSettings private var printResults = true // whether to print result lines diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 23804237bda80..2b3ab59b8ac17 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -60,7 +60,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche var tempDirectory: File = null before { - rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + rpcEnv = RpcEnv.create("test", "localhost", "0", conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 760e458972d98..d3846954d34cf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -291,7 +291,7 @@ private[spark] class ApplicationMaster( } private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { - val port = sparkConf.getInt("spark.yarn.am.port", 0) + val port = sparkConf.get("spark.yarn.am.port", "0") rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr) waitForSparkDriver() addAmIpFilter()