diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index d3264a4bb3c81..f97e434c81e08 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -24,42 +24,42 @@ import com.google.common.io.Files import org.apache.spark.util.Utils private[spark] class HttpFileServer extends Logging { - + var baseDir : File = null var fileDir : File = null var jarDir : File = null var httpServer : HttpServer = null var serverUri : String = null - - def initialize() { + + def initialize(port: Option[Int]) { baseDir = Utils.createTempDir() fileDir = new File(baseDir, "files") jarDir = new File(baseDir, "jars") fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(baseDir) + httpServer = if (port.isEmpty) new HttpServer(baseDir) else new HttpServer(baseDir, port.get) httpServer.start() serverUri = httpServer.uri } - + def stop() { httpServer.stop() } - + def addFile(file: File) : String = { addFileToDir(file, fileDir) serverUri + "/files/" + file.getName } - + def addJar(file: File) : String = { addFileToDir(file, jarDir) serverUri + "/jars/" + file.getName } - + def addFileToDir(file: File, dir: File) : String = { Files.copy(file, new File(dir, file.getName)) dir + "/" + file.getName } - + } diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 759e68ee0cc61..dacd77132e39d 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -38,9 +38,9 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * as well as classes created by the interpreter when the user types in code. This is just a wrapper * around a Jetty server. */ -private[spark] class HttpServer(resourceBase: File) extends Logging { +private[spark] class HttpServer(resourceBase: File, localPort: Int = 0) extends Logging { private var server: Server = null - private var port: Int = -1 + private var port: Int = localPort def start() { if (server != null) { @@ -51,7 +51,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging { val connector = new SocketConnector connector.setMaxIdleTime(60*1000) connector.setSoLingerTime(-1) - connector.setPort(0) + connector.setPort(port) server.addConnector(connector) val threadPool = new QueuedThreadPool @@ -72,7 +72,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging { throw new ServerStateException("Server is already stopped") } else { server.stop() - port = -1 + port = 0 server = null } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 7ac65828f670f..01db413f89ed6 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -191,7 +191,7 @@ object SparkEnv extends Logging { "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") val httpFileServer = new HttpFileServer() - httpFileServer.initialize() + httpFileServer.initialize(conf.getOption("spark.fileserver.port").map(_.toInt)) conf.set("spark.fileserver.uri", httpFileServer.serverUri) val metricsSystem = if (isDriver) {