Skip to content

Commit 4a46b88

Browse files
author
Marcelo Vanzin
committed
[SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.
This avoids bringing up yet another HTTP server on the driver, and instead reuses the file server already managed by the driver's RpcEnv. As a bonus, the repl now inherits the security features of the network library. There's also a small change to create the directory for storing classes under the root temp dir for the application (instead of directly under java.io.tmpdir). Author: Marcelo Vanzin <[email protected]> Closes #9923 from vanzin/SPARK-11563.
1 parent 2ecbe02 commit 4a46b88

File tree

15 files changed

+183
-98
lines changed

15 files changed

+183
-98
lines changed

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ private[spark] class HttpFileServer(
7171
serverUri + "/jars/" + file.getName
7272
}
7373

74+
def addDirectory(path: String, resourceBase: String): String = {
75+
httpServer.addDirectory(path, resourceBase)
76+
serverUri + path
77+
}
78+
7479
def addFileToDir(file: File, dir: File) : String = {
7580
// Check whether the file is a directory. If it is, throw a more meaningful exception.
7681
// If we don't catch this, Guava throws a very confusing error message:

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ import org.eclipse.jetty.server.ssl.SslSocketConnector
2323
import org.eclipse.jetty.util.security.{Constraint, Password}
2424
import org.eclipse.jetty.security.authentication.DigestAuthenticator
2525
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
26-
2726
import org.eclipse.jetty.server.Server
2827
import org.eclipse.jetty.server.bio.SocketConnector
29-
import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
28+
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
3029
import org.eclipse.jetty.util.thread.QueuedThreadPool
3130

3231
import org.apache.spark.util.Utils
@@ -52,6 +51,11 @@ private[spark] class HttpServer(
5251

5352
private var server: Server = null
5453
private var port: Int = requestedPort
54+
private val servlets = {
55+
val handler = new ServletContextHandler()
56+
handler.setContextPath("/")
57+
handler
58+
}
5559

5660
def start() {
5761
if (server != null) {
@@ -65,6 +69,14 @@ private[spark] class HttpServer(
6569
}
6670
}
6771

72+
def addDirectory(contextPath: String, resourceBase: String): Unit = {
73+
val holder = new ServletHolder()
74+
holder.setInitParameter("resourceBase", resourceBase)
75+
holder.setInitParameter("pathInfoOnly", "true")
76+
holder.setServlet(new DefaultServlet())
77+
servlets.addServlet(holder, contextPath.stripSuffix("/") + "/*")
78+
}
79+
6880
/**
6981
* Actually start the HTTP server on the given port.
7082
*
@@ -85,21 +97,17 @@ private[spark] class HttpServer(
8597
val threadPool = new QueuedThreadPool
8698
threadPool.setDaemon(true)
8799
server.setThreadPool(threadPool)
88-
val resHandler = new ResourceHandler
89-
resHandler.setResourceBase(resourceBase.getAbsolutePath)
90-
91-
val handlerList = new HandlerList
92-
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
100+
addDirectory("/", resourceBase.getAbsolutePath)
93101

94102
if (securityManager.isAuthenticationEnabled()) {
95103
logDebug("HttpServer is using security")
96104
val sh = setupSecurityHandler(securityManager)
97105
// make sure we go through security handler to get resources
98-
sh.setHandler(handlerList)
106+
sh.setHandler(servlets)
99107
server.setHandler(sh)
100108
} else {
101109
logDebug("HttpServer is not using security")
102-
server.setHandler(handlerList)
110+
server.setHandler(servlets)
103111
}
104112

105113
server.start()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
457457
_env = createSparkEnv(_conf, isLocal, listenerBus)
458458
SparkEnv.set(_env)
459459

460+
// If running the REPL, register the repl's output dir with the file server.
461+
_conf.getOption("spark.repl.class.outputDir").foreach { path =>
462+
val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
463+
_conf.set("spark.repl.class.uri", replUri)
464+
}
465+
460466
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
461467

462468
_statusTracker = new SparkStatusTracker(this)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,9 @@ private[spark] class Executor(
364364
val _userClassPathFirst: java.lang.Boolean = userClassPathFirst
365365
val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader")
366366
.asInstanceOf[Class[_ <: ClassLoader]]
367-
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
368-
classOf[ClassLoader], classOf[Boolean])
369-
constructor.newInstance(conf, classUri, parent, _userClassPathFirst)
367+
val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv],
368+
classOf[String], classOf[ClassLoader], classOf[Boolean])
369+
constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst)
370370
} catch {
371371
case _: ClassNotFoundException =>
372372
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,24 @@ private[spark] trait RpcEnvFileServer {
179179
*/
180180
def addJar(file: File): String
181181

182+
/**
183+
* Adds a local directory to be served via this file server.
184+
*
185+
* @param baseUri Leading URI path (files can be retrieved by appending their relative
186+
* path to this base URI). This cannot be "files" nor "jars".
187+
* @param path Path to the local directory.
188+
* @return URI for the root of the directory in the file server.
189+
*/
190+
def addDirectory(baseUri: String, path: File): String
191+
192+
/** Validates and normalizes the base URI for directories. */
193+
protected def validateDirectoryUri(baseUri: String): String = {
194+
val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/")
195+
require(fixedBaseUri != "/files" && fixedBaseUri != "/jars",
196+
"Directory URI cannot be /files nor /jars.")
197+
fixedBaseUri
198+
}
199+
182200
}
183201

184202
private[spark] case class RpcEnvConfig(

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ private[akka] class AkkaFileServer(
273273
getFileServer().addJar(file)
274274
}
275275

276+
override def addDirectory(baseUri: String, path: File): String = {
277+
val fixedBaseUri = validateDirectoryUri(baseUri)
278+
getFileServer().addDirectory(fixedBaseUri, path.getAbsolutePath())
279+
}
280+
276281
def shutdown(): Unit = {
277282
if (httpFileServer != null) {
278283
httpFileServer.stop()

core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,22 @@ import org.apache.spark.rpc.RpcEnvFileServer
2525

2626
/**
2727
* StreamManager implementation for serving files from a NettyRpcEnv.
28+
*
29+
* Three kinds of resources can be registered in this manager, all backed by actual files:
30+
*
31+
* - "/files": a flat list of files; used as the backend for [[SparkContext.addFile]].
32+
* - "/jars": a flat list of files; used as the backend for [[SparkContext.addJar]].
33+
* - arbitrary directories; all files under the directory become available through the manager,
34+
* respecting the directory's hierarchy.
35+
*
36+
* Only streaming (openStream) is supported.
2837
*/
2938
private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
3039
extends StreamManager with RpcEnvFileServer {
3140

3241
private val files = new ConcurrentHashMap[String, File]()
3342
private val jars = new ConcurrentHashMap[String, File]()
43+
private val dirs = new ConcurrentHashMap[String, File]()
3444

3545
override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
3646
throw new UnsupportedOperationException()
@@ -41,7 +51,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
4151
val file = ftype match {
4252
case "files" => files.get(fname)
4353
case "jars" => jars.get(fname)
44-
case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype")
54+
case other =>
55+
val dir = dirs.get(ftype)
56+
require(dir != null, s"Invalid stream URI: $ftype not found.")
57+
new File(dir, fname)
4558
}
4659

4760
require(file != null && file.isFile(), s"File not found: $streamId")
@@ -60,4 +73,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
6073
s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
6174
}
6275

76+
override def addDirectory(baseUri: String, path: File): String = {
77+
val fixedBaseUri = validateDirectoryUri(baseUri)
78+
require(dirs.putIfAbsent(fixedBaseUri.stripPrefix("/"), path) == null,
79+
s"URI '$fixedBaseUri' already registered.")
80+
s"${rpcEnv.address.toSparkURL}$fixedBaseUri"
81+
}
82+
6383
}

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,28 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
734734
val jar = new File(tempDir, "jar")
735735
Files.write(UUID.randomUUID().toString(), jar, UTF_8)
736736

737+
val dir1 = new File(tempDir, "dir1")
738+
assert(dir1.mkdir())
739+
val subFile1 = new File(dir1, "file1")
740+
Files.write(UUID.randomUUID().toString(), subFile1, UTF_8)
741+
742+
val dir2 = new File(tempDir, "dir2")
743+
assert(dir2.mkdir())
744+
val subFile2 = new File(dir2, "file2")
745+
Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
746+
737747
val fileUri = env.fileServer.addFile(file)
738748
val emptyUri = env.fileServer.addFile(empty)
739749
val jarUri = env.fileServer.addJar(jar)
750+
val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
751+
val dir2Uri = env.fileServer.addDirectory("/dir2", dir2)
752+
753+
// Try registering directories with invalid names.
754+
Seq("/files", "/jars").foreach { uri =>
755+
intercept[IllegalArgumentException] {
756+
env.fileServer.addDirectory(uri, dir1)
757+
}
758+
}
740759

741760
val destDir = Utils.createTempDir()
742761
val sm = new SecurityManager(conf)
@@ -745,15 +764,17 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
745764
val files = Seq(
746765
(file, fileUri),
747766
(empty, emptyUri),
748-
(jar, jarUri))
767+
(jar, jarUri),
768+
(subFile1, dir1Uri + "/file1"),
769+
(subFile2, dir2Uri + "/file2"))
749770
files.foreach { case (f, uri) =>
750771
val destFile = new File(destDir, f.getName())
751772
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
752773
assert(Files.equal(f, destFile))
753774
}
754775

755776
// Try to download files that do not exist.
756-
Seq("files", "jars").foreach { root =>
777+
Seq("files", "jars", "dir1").foreach { root =>
757778
intercept[Exception] {
758779
val uri = env.address.toSparkURL + s"/$root/doesNotExist"
759780
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)

docs/configuration.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,14 +1053,6 @@ Apart from these, the following properties are also available, and may be useful
10531053
to port + maxRetries.
10541054
</td>
10551055
</tr>
1056-
<tr>
1057-
<td><code>spark.replClassServer.port</code></td>
1058-
<td>(random)</td>
1059-
<td>
1060-
Port for the driver's HTTP class server to listen on.
1061-
This is only relevant for the Spark shell.
1062-
</td>
1063-
</tr>
10641056
<tr>
10651057
<td><code>spark.rpc.numRetries</code></td>
10661058
<td>3</td>

docs/security.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,6 @@ configure those ports.
169169
<td>Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager
170170
instead.</td>
171171
</tr>
172-
<tr>
173-
<td>Executor</td>
174-
<td>Driver</td>
175-
<td>(random)</td>
176-
<td>Class file server</td>
177-
<td><code>spark.replClassServer.port</code></td>
178-
<td>Jetty-based. Only used in Spark shells.</td>
179-
</tr>
180172
<tr>
181173
<td>Executor / Driver</td>
182174
<td>Executor / Driver</td>

0 commit comments

Comments
 (0)