diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 0e3750fdde415..f575a0d65e80d 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -31,14 +31,18 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo var httpServer : HttpServer = null var serverUri : String = null - def initialize() { + def initialize(port: Option[Int]) { baseDir = Utils.createTempDir() fileDir = new File(baseDir, "files") jarDir = new File(baseDir, "jars") fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(baseDir, securityManager) + httpServer = if (port.isEmpty) { + new HttpServer(baseDir, securityManager) + } else { + new HttpServer(baseDir, securityManager, port.get) + } httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 7e9b517f901a2..e0c76feb124a2 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.File +import org.apache.spark.network.PortManager import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} @@ -41,45 +42,54 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * as well as classes created by the interpreter when the user types in code. This is just a wrapper * around a Jetty server. */ -private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager) - extends Logging { +private[spark] class HttpServer(resourceBase: File, + securityManager: SecurityManager, + localPort: Int = 0) extends Logging { private var server: Server = null - private var port: Int = -1 + private var port: Int = localPort + + private def startOnPort(startPort: Int): (Server, Int) = { + val server = new Server() + val connector = new SocketConnector + connector.setMaxIdleTime(60*1000) + connector.setSoLingerTime(-1) + connector.setPort(startPort) + server.addConnector(connector) + + val threadPool = new QueuedThreadPool + threadPool.setDaemon(true) + server.setThreadPool(threadPool) + val resHandler = new ResourceHandler + resHandler.setResourceBase(resourceBase.getAbsolutePath) + + val handlerList = new HandlerList + handlerList.setHandlers(Array(resHandler, new DefaultHandler)) + + if (securityManager.isAuthenticationEnabled()) { + logDebug("HttpServer is using security") + val sh = setupSecurityHandler(securityManager) + // make sure we go through security handler to get resources + sh.setHandler(handlerList) + server.setHandler(sh) + } else { + logDebug("HttpServer is not using security") + server.setHandler(handlerList) + } + + server.start() + val actualPort = server.getConnectors()(0).getLocalPort() + + (server, actualPort) + } def start() { if (server != null) { throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - server = new Server() - val connector = new SocketConnector - connector.setMaxIdleTime(60*1000) - connector.setSoLingerTime(-1) - connector.setPort(0) - server.addConnector(connector) - - val threadPool = new QueuedThreadPool - threadPool.setDaemon(true) - server.setThreadPool(threadPool) - val resHandler = new ResourceHandler - resHandler.setResourceBase(resourceBase.getAbsolutePath) - - val handlerList = new HandlerList - handlerList.setHandlers(Array(resHandler, new DefaultHandler)) - - if (securityManager.isAuthenticationEnabled()) { - logDebug("HttpServer is using security") - val sh = setupSecurityHandler(securityManager) - // make sure we go through security handler to get resources - sh.setHandler(handlerList) - server.setHandler(sh) - } else { - logDebug("HttpServer is not using security") - server.setHandler(handlerList) - } - - server.start() - port = server.getConnectors()(0).getLocalPort() + val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort) + server = actualServer + port = actualPort } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8f70744d804d9..6e272f7ff2211 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -231,7 +231,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { val server = new HttpFileServer(securityManager) - server.initialize() + server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt)) conf.set("spark.fileserver.uri", server.serverUri) server } else { diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 487456467b23b..eaf3b69a7c5a6 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) - server = new HttpServer(broadcastDir, securityManager) + val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0) + server = new HttpServer(broadcastDir, securityManager, broadcastListenPort) server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 8a1cdb812962e..7e498fb78a06e 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -102,7 +102,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.setReuseAddress(true) serverChannel.socket.setReceiveBufferSize(256 * 1024) - serverChannel.socket.bind(new InetSocketAddress(port)) + private def startService(port: Int) = { + serverChannel.socket.bind(new InetSocketAddress(port)) + (serverChannel, port) + } + PortManager.startWithIncrements(port, 3, startService) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala new file mode 100644 index 0000000000000..f9ad9629fe80a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/PortManager.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network + +import java.net.InetSocketAddress + +import org.apache.spark.{Logging, SparkException} +import org.eclipse.jetty.server.Server + +private[spark] object PortManager extends Logging +{ + + /** + * Start service on given port, or attempt to fall back to the n+1 port for a certain number of + * retries + * + * @param startPort + * @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4 + * total attempts, on ports n, n+1, n+2, and n+3 + * @param startService Function to start service on a given port. Expected to throw a java.net + * .BindException if the port is already in use + * @tparam T + * @throws SparkException When unable to start service in the given number of attempts + * @return + */ + def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)): + (T, Int) = { + for( offset <- 0 to maxRetries) { + val tryPort = startPort + offset + try { + return startService(tryPort) + } catch { + case e: java.net.BindException => { + if (!e.getMessage.contains("Address already in use") || + offset == (maxRetries-1)) { + throw e + } + logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) + } + case e: Exception => throw e + } + } + throw new SparkException(s"Couldn't start service on port $startPort") + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0db0a5bc7341b..ae55a0ace5b4f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -63,7 +63,8 @@ private[spark] class BlockManager( val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) - val connectionManager = new ConnectionManager(0, conf, securityManager) + val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf, + securityManager) implicit val futureExecContext = connectionManager.futureExecContext diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ad8b6c0e51a78..2183c54f8c672 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -322,7 +322,7 @@ configure those ports. Browser - Driver + Application 4040 Web UI spark.ui.port @@ -372,18 +372,37 @@ configure those ports. - Driver and other Workers Worker + Application (random) - - - - None - Jetty-based. Each of these services starts on a random port that cannot be configured + File server for files and jars + spark.fileserver.port + Jetty-based + + + Worker + Application + (random) + HTTP Broadcast + spark.broadcast.port + Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager + instead + + + Worker + Spark Shell + (random) + Class file server (Spark Shell only) + spark.replClassServer.port + Jetty-based + + + Worker + Other Workers + (random) + Block Manager port + spark.blockManager.port + Raw socket via ServerSocketChannel diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 3842c291d0b7b..c62c3eb64f6a7 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -102,7 +102,8 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - val classServer = new HttpServer(outputDir, new SecurityManager(conf)) + val classServerListenPort = conf.getInt("spark.replClassServer.port", 0) + val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort) private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything