Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,42 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils

private[spark] class HttpFileServer extends Logging {

var baseDir : File = null
var fileDir : File = null
var jarDir : File = null
var httpServer : HttpServer = null
var serverUri : String = null
def initialize() {

def initialize(port: Option[Int]) {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir)
httpServer = if (port.isEmpty) new HttpServer(baseDir) else new HttpServer(baseDir, port.get)
httpServer.start()
serverUri = httpServer.uri
}

def stop() {
httpServer.stop()
}

def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri + "/files/" + file.getName
}

def addJar(file: File) : String = {
addFileToDir(file, jarDir)
serverUri + "/jars/" + file.getName
}

def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}

}
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File) extends Logging {
private[spark] class HttpServer(resourceBase: File, localPort: Int = 0) extends Logging {
private var server: Server = null
private var port: Int = -1
private var port: Int = localPort

def start() {
if (server != null) {
Expand All @@ -51,7 +51,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
connector.setPort(port)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
Expand All @@ -72,7 +72,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
throw new ServerStateException("Server is already stopped")
} else {
server.stop()
port = -1
port = 0
server = null
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ object SparkEnv extends Logging {
"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")

val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
httpFileServer.initialize(conf.getOption("spark.fileserver.port").map(_.toInt))
conf.set("spark.fileserver.uri", httpFileServer.serverUri)

val metricsSystem = if (isDriver) {
Expand Down