From e8256e5278da8a8b4aaefb7638a2683e2102ab4a Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 16 Aug 2014 08:20:44 +0800 Subject: [PATCH 01/17] support https in spark web ui --- .../apache/spark/deploy/DeployMessage.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 7 +-- .../spark/deploy/master/WorkerInfo.scala | 6 +-- .../apache/spark/deploy/worker/Worker.scala | 4 +- .../spark/deploy/worker/WorkerArguments.scala | 3 ++ .../org/apache/spark/ui/JettyUtils.scala | 49 ++++++++++++++++++- .../scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../scala/org/apache/spark/ui/WebUI.scala | 6 ++- .../spark/deploy/JsonProtocolSuite.scala | 2 +- 9 files changed, 68 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index a7368f9f3dfbe..352ad2a7980e6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -39,7 +39,7 @@ private[deploy] object DeployMessages { port: Int, cores: Int, memory: Int, - webUiPort: Int, + workerWebUiUrl: String, publicAddress: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index cfa2c028a807b..7d5df1d3ea75c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -122,7 +122,8 @@ private[spark] class Master( // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() - masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort + val masterWebUiUrlPrefix = conf.get("spark.http.policy") + "://" + masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) @@ -190,7 +191,7 @@ private[spark] class Master( System.exit(0) } - case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) @@ -200,7 +201,7 @@ private[spark] class Master( sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - sender, workerUiPort, publicAddress) + sender, workerWebUiUrl, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c5fa9cf7d7c2d..0335894bae51a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -30,7 +30,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val actor: ActorRef, - val webUiPort: Int, + val webUiAddress: String, val publicAddress: String) extends Serializable { @@ -99,10 +99,6 @@ private[spark] class WorkerInfo( coresUsed -= driver.desc.cores } - def webUiAddress : String = { - "http://" + this.publicAddress + ":" + this.webUiPort - } - def setState(state: WorkerState.Value) = { this.state = state } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 80fde7e4b2624..40753ba519631 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -78,6 +78,7 @@ private[spark] class Worker( var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) + var workerWebUiUrl: String = _ @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -138,6 +139,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() + workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) @@ -163,7 +165,7 @@ private[spark] class Worker( for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 1e295aaa48c30..f49bdd35c3f36 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -53,6 +53,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") } + if (conf.contains("worker.ui.port")) { + webUiPort = conf.get("worker.ui.port").toInt + } parse(args.toList) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 6b4689291097f..2950a01fc2948 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -26,7 +26,7 @@ import scala.language.implicitConversions import scala.util.{Failure, Success, Try} import scala.xml.Node -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool @@ -35,6 +35,8 @@ import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector /** * Utilities for launching a web server using Jetty's HTTP Server class @@ -183,7 +185,8 @@ private[spark] object JettyUtils extends Logging { // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { - val server = new Server(new InetSocketAddress(hostName, currentPort)) + val server = new Server + server.addConnector(getConnector(currentPort, conf)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -207,6 +210,48 @@ private[spark] object JettyUtils extends Logging { private def attachPrefix(basePath: String, relativePath: String): String = { if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") } + + private def getConnector(port: Int, conf: SparkConf): Connector = { + val https = getHttpPolicy(conf) + if (https) { + buildSslSelectChannelConnector(port, conf) + } else { + conf.set("spark.http.policy", "http") + val connector = new SelectChannelConnector + connector.setPort(port) + connector + } + } + + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = + { + val connector = new SslSelectChannelConnector + connector.setPort(port) + + val context = connector.getSslContextFactory + val needAuth = conf.getBoolean("spark.client.https.need-auth", false) + context.setNeedClientAuth(needAuth) + context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword")) + if (conf.contains("spark.ssl.server.keystore.location")) { + context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) + context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password")) + context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) + } + if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { + context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) + context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) + context.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) + } + connector + } + + def getHttpPolicy(conf: SparkConf): Boolean = { + if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) { + true + } else { + false + } + } } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6c788a37dc70b..548f305db06b5 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -97,7 +97,9 @@ private[spark] class SparkUI( */ private[spark] def appUIHostPort = publicHostName + ":" + boundPort - private[spark] def appUIAddress = s"http://$appUIHostPort" + private def appUiAddressPrefix = conf.get("spark.http.policy") + + private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort" } private[spark] object SparkUI { diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 5f52f95088007..d498661ccdaf5 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -99,7 +99,11 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + if (conf.get("spark.http.policy").equals("https")) { + logInfo("Started %s at https://%s:%d".format(className, publicHostName, boundPort)) + } else { + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + } } catch { case e: Exception => logError("Failed to bind %s".format(className), e) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 31aa7ec837f43..545be8eac8afe 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://host:8080", "publicAddress") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } From 8f7cc967e6fd1b9d0b3768543afe06211e2178fa Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 16 Aug 2014 10:27:15 +0800 Subject: [PATCH 02/17] add unit test --- .../org/apache/spark/ui/JettyUtils.scala | 12 +++--- core/src/test/resources/spark.keystore | Bin 0 -> 1383 bytes .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../scala/org/apache/spark/ui/UISuite.scala | 40 ++++++++++++++++++ 4 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 core/src/test/resources/spark.keystore diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 2950a01fc2948..f72f46b10610d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -230,13 +230,13 @@ private[spark] object JettyUtils extends Logging { val context = connector.getSslContextFactory val needAuth = conf.getBoolean("spark.client.https.need-auth", false) + context.setNeedClientAuth(needAuth) - context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword")) - if (conf.contains("spark.ssl.server.keystore.location")) { - context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) - context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password")) - context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) - } + context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) + context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) + context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) + context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) + if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) diff --git a/core/src/test/resources/spark.keystore b/core/src/test/resources/spark.keystore new file mode 100644 index 0000000000000000000000000000000000000000..f30716b57b30298e78e85011d7595bb42d6a85d7 GIT binary patch literal 1383 zcmezO_TO6u1_mY|W&~sI#Dc`+jMU;q4hU zrO=-|ukKbAC9>v6Xj~6^dGz-;rEIy-hpu9VYZQKRFxVT-dox2Ha~PrWerl|?>*gSJ~<=YJoCs7 z*}f&=)m!s=l$QKIcjfZVg^^3XXEFT`vO68ixrQZv8(+};usDIh+i!Q@T=VR`^89pT z6UnxZD&GWt=NYjq{y2LTm;dYUELm3^{5SfoauvOH{%7jM zPd9ov=e?iU^JzzTZ}X+bIj3AJ&nEJhmq@bQG`idUM*r_}6^5YVRe!GJFIIo??*6>C z7vIbpE||~zQ!sDui>F(=K72@uKDoT)BuD=)FFucpI|Dxnt`Wa(wwCK?M(X@4_g0qv ztpD_f>7yp2OTnRV-yp{hjyJClXHNV+>q73A6Efx(7W|w4hRtoVLjNK5_p&_tOri@G z{Lh`aywmi#B-gd1RwMiDXR`Sds`^gv$v>tTp*Z=;ypD7=&V}2nyWfYn94Z5+Yt{%o zQv*w2N=^W#ngfr2=%p^1Top_ze^fw_@olmx$#AyB~F$QUX>2RAg% zNA@=`KQlM>G8i;=GBq|b>|Y()b*zay^1{UnwF`D`zQs^EAxidc%t7#mf$4xWh?;QP`aNzKxWvBigJX3kB%Cn;TgK>#Ki-+Ei zc;e5llU;DN!SO-MqGZpx%c~L(NKLurv%1*WaJBx7*DqEbh)WH4UC(G58aeeo6Eh`HsJjn8~h_Q&2Yt_uD=KU+=u`I5WC28qf3udlRWFG)yjv46m)qD=0ESebC ziYo0X-*0sLL%{Uzrn!0-|1q(j+ZbcPrTDqLXr@NmG%jVavp<7ao@I!_uIxq zi}0s+x-40>E*VS9-92;o$X37h9T#V&HuGLNv9?}$<0-YDvbCT06;8F*j&j~ua5_~f w#jTO2x_9%}r!KmNKLjS12(B>byIgicKkS7jrwSLx5-rOqYs=I>YaZYR0Q-$KZvX%Q literal 0 HcmV?d00001 diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 545be8eac8afe..f6c7c19c41014 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://host:8080", "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80", "publicAddress") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 038746d2eda4b..597d3dd72d162 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -115,6 +115,30 @@ class UISuite extends FunSuite { assert(boundPort1 != boundPort2) } + test("jetty with https selects different port under contention") { + val startPort = 4040 + val server = new Server(startPort) + + Try { server.start() } match { + case Success(s) => + case Failure(e) => + // Either case server port is busy hence setup for test complete + } + val sparkConf = new SparkConf() + .set("spark.http.policy", "https") + .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + val serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + val serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } + test("jetty binds to port 0 correctly") { val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) @@ -128,6 +152,22 @@ class UISuite extends FunSuite { } } + test("jetty with https binds to port 0 correctly") { + val sparkConf = new SparkConf() + .set("spark.http.policy", "https") + .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + val serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + Try { new ServerSocket(boundPort) } match { + case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) + case Failure(e) => + } + } + test("verify appUIAddress contains the scheme") { withSpark(new SparkContext("local", "test")) { sc => val uiAddress = sc.ui.appUIAddress From c90d84e0c70a88742feb453c6f1517163878c003 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 21 Sep 2014 22:44:55 +0800 Subject: [PATCH 03/17] fix according to comments --- .../org/apache/spark/deploy/DeployMessage.scala | 3 +-- .../org/apache/spark/deploy/master/Master.scala | 10 +++++++--- .../apache/spark/deploy/master/WorkerInfo.scala | 3 +-- .../org/apache/spark/deploy/worker/Worker.scala | 11 ++++++++--- .../spark/deploy/worker/WorkerArguments.scala | 3 --- .../scala/org/apache/spark/ui/JettyUtils.scala | 14 +++----------- .../main/scala/org/apache/spark/ui/SparkUI.scala | 2 -- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 12 +++++++----- .../test/scala/org/apache/spark/ui/UISuite.scala | 6 ++++-- 9 files changed, 31 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 352ad2a7980e6..964ede01922b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -39,8 +39,7 @@ private[deploy] object DeployMessages { port: Int, cores: Int, memory: Int, - workerWebUiUrl: String, - publicAddress: String) + workerWebUiUrl: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") assert (port > 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7d5df1d3ea75c..a7e928e5908b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -122,7 +122,11 @@ private[spark] class Master( // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() - val masterWebUiUrlPrefix = conf.get("spark.http.policy") + "://" + val masterWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { + "https://" + } else{ + "http://" + } masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) @@ -191,7 +195,7 @@ private[spark] class Master( System.exit(0) } - case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl, publicAddress) => + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) @@ -201,7 +205,7 @@ private[spark] class Master( sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - sender, workerWebUiUrl, publicAddress) + sender, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 0335894bae51a..f775d0d783e0f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -30,8 +30,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val actor: ActorRef, - val webUiAddress: String, - val publicAddress: String) + val webUiAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 40753ba519631..616b2d01b64c1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -78,7 +78,7 @@ private[spark] class Worker( var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) - var workerWebUiUrl: String = _ + var workerWebUiUrl: String = "" @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -139,7 +139,12 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort + val workerWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { + "https://" + } else{ + "http://" + } + workerWebUiUrl = workerWebUiUrlPrefix + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) @@ -165,7 +170,7 @@ private[spark] class Worker( for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) - actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index f49bdd35c3f36..1e295aaa48c30 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -53,9 +53,6 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") } - if (conf.contains("worker.ui.port")) { - webUiPort = conf.get("worker.ui.port").toInt - } parse(args.toList) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index f72f46b10610d..9e4bad4a54a84 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -30,13 +30,13 @@ import org.eclipse.jetty.server.{Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils -import org.eclipse.jetty.server.nio.SelectChannelConnector -import org.eclipse.jetty.server.ssl.SslSelectChannelConnector /** * Utilities for launching a web server using Jetty's HTTP Server class @@ -212,11 +212,10 @@ private[spark] object JettyUtils extends Logging { } private def getConnector(port: Int, conf: SparkConf): Connector = { - val https = getHttpPolicy(conf) + val https = conf.get("spark.ui.https.enabled", "false").toBoolean if (https) { buildSslSelectChannelConnector(port, conf) } else { - conf.set("spark.http.policy", "http") val connector = new SelectChannelConnector connector.setPort(port) connector @@ -245,13 +244,6 @@ private[spark] object JettyUtils extends Logging { connector } - def getHttpPolicy(conf: SparkConf): Boolean = { - if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) { - true - } else { - false - } - } } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 548f305db06b5..c450160e50648 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -97,8 +97,6 @@ private[spark] class SparkUI( */ private[spark] def appUIHostPort = publicHostName + ":" + boundPort - private def appUiAddressPrefix = conf.get("spark.http.policy") - private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort" } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index d498661ccdaf5..2e98c6ebd1c53 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -91,6 +91,12 @@ private[spark] abstract class WebUI( } } + def appUiAddressPrefix = if(conf.get("spark.ui.https.enabled", "false").toBoolean) { + "https" + } else { + "http" + } + /** Initialize all components of the server. */ def initialize() @@ -99,11 +105,7 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - if (conf.get("spark.http.policy").equals("https")) { - logInfo("Started %s at https://%s:%d".format(className, publicHostName, boundPort)) - } else { - logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) - } + logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 597d3dd72d162..13990628b98f1 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -124,8 +124,10 @@ class UISuite extends FunSuite { case Failure(e) => // Either case server port is busy hence setup for test complete } + //keytool -export -keystore core/src/test/resources/spark.keystore -alias spark -file /home/wf/code/spark2/core/src/test/resources/spark.cer -storepass 123456 + val sparkConf = new SparkConf() - .set("spark.http.policy", "https") + .set("spark.ui.https.enabled", "true") .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") val serverInfo1 = JettyUtils.startJettyServer( "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) @@ -154,7 +156,7 @@ class UISuite extends FunSuite { test("jetty with https binds to port 0 correctly") { val sparkConf = new SparkConf() - .set("spark.http.policy", "https") + .set("spark.ui.https.enabled", "true") .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) From de8d1bd5cc3c840230d6d5dd4a4866bf0f0f7575 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 21 Sep 2014 23:00:55 +0800 Subject: [PATCH 04/17] fix scalastyle --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2e98c6ebd1c53..b16c3e346d2e7 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -105,7 +105,8 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format(className, publicHostName, boundPort)) + logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format( + className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) From 35074fd465dee6c3dae8e0a43f2787aed50f4537 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 21 Sep 2014 23:27:32 +0800 Subject: [PATCH 05/17] fix workerinfo in JsonProtocolSuite --- .../test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala | 2 +- core/src/test/scala/org/apache/spark/ui/UISuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index f6c7c19c41014..fe7e0cb98941e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80", "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 13990628b98f1..cedb593cc801d 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -124,7 +124,6 @@ class UISuite extends FunSuite { case Failure(e) => // Either case server port is busy hence setup for test complete } - //keytool -export -keystore core/src/test/resources/spark.keystore -alias spark -file /home/wf/code/spark2/core/src/test/resources/spark.cer -storepass 123456 val sparkConf = new SparkConf() .set("spark.ui.https.enabled", "true") From 9591c9c5b90e0d1d65b4a48b08b53f6a3ed3e49d Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 22 Sep 2014 00:27:37 +0800 Subject: [PATCH 06/17] import org.eclipse.jetty.server.Server to fix test error --- core/src/test/scala/org/apache/spark/ui/UISuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index fffc29ec5034e..ac503e4dc756e 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -24,6 +24,7 @@ import scala.io.Source import scala.util.{Failure, Success, Try} import org.eclipse.jetty.servlet.ServletContextHandler +import org.eclipse.jetty.server.Server import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ From 333334adcefca2f52eeef58884b62eed87e65d15 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 25 Sep 2014 08:04:35 +0800 Subject: [PATCH 07/17] fix comments --- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../org/apache/spark/ui/JettyUtils.scala | 51 ++++++++++++++----- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 98165277911ad..f073799a95d75 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -138,7 +138,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - val workerWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { + val workerWebUiUrlPrefix = if ( conf.get("spark.ui.https.enabled", "false").toBoolean) { "https://" } else{ "http://" diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 9e4bad4a54a84..574f9f165528c 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -21,6 +21,8 @@ import java.net.{InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import org.eclipse.jetty.util.ssl.SslContextFactory + import scala.annotation.tailrec import scala.language.implicitConversions import scala.util.{Failure, Success, Try} @@ -186,7 +188,9 @@ private[spark] object JettyUtils extends Logging { // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server - server.addConnector(getConnector(currentPort, conf)) + val connector = getConnector(currentPort, conf) + connector.setHost(hostName) + server.addConnector(connector) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -222,28 +226,47 @@ private[spark] object JettyUtils extends Logging { } } - private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = - { - val connector = new SslSelectChannelConnector - connector.setPort(port) + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { - val context = connector.getSslContextFactory + val ctxFactory = new SslContextFactory() val needAuth = conf.getBoolean("spark.client.https.need-auth", false) - context.setNeedClientAuth(needAuth) - context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) - context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) - context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) - context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) + conf.getAll + .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } + .foreach { case (k, v) => setSslContextFactoryProps(k,v,ctxFactory) } + + ctxFactory.setNeedClientAuth(needAuth) + ctxFactory.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) + ctxFactory.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) + ctxFactory.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) + ctxFactory.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { - context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) - context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) - context.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) + ctxFactory.setTrustStore(conf.get("spark.ssl.server.truststore.location")) + ctxFactory.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) + ctxFactory.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) } + + val connector = new SslSelectChannelConnector(ctxFactory) + connector.setPort(port) connector } + private def setSslContextFactoryProps( + key: String, value: String, ctxFactory:SslContextFactory) = { + key match { + case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) + case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) + case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) + case "spark.ui.ssl.server.keystore.type" => ctxFactory.setKeyStoreType(value) + case "spark.ui.ssl.server.truststore.location" => ctxFactory.setTrustStore(value) + case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) + case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) + } + ctxFactory + + } + } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 8584ac054d507..bd49559afff77 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -92,7 +92,7 @@ private[spark] abstract class WebUI( } } - def appUiAddressPrefix = if(conf.get("spark.ui.https.enabled", "false").toBoolean) { + def appUiAddressPrefix = if (conf.get("spark.ui.https.enabled", "false").toBoolean) { "https" } else { "http" From 64d7dc0eb846b7aba6ab42e4157b8f65bfb3f133 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 17:22:22 +0800 Subject: [PATCH 08/17] add redirect from http to https --- .../apache/spark/deploy/master/Master.scala | 7 +- .../apache/spark/deploy/worker/Worker.scala | 7 +- .../org/apache/spark/ui/JettyUtils.scala | 120 ++++++++++++------ .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../scala/org/apache/spark/ui/WebUI.scala | 8 +- .../scala/org/apache/spark/util/Utils.scala | 13 +- .../scala/org/apache/spark/ui/UISuite.scala | 29 +++-- 7 files changed, 110 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 00d0216ebd94f..f66caa6355d88 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -123,12 +123,7 @@ private[spark] class Master( // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() - val masterWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { - "https://" - } else{ - "http://" - } - masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f073799a95d75..d590c262e9a39 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -138,12 +138,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - val workerWebUiUrlPrefix = if ( conf.get("spark.ui.https.enabled", "false").toBoolean) { - "https://" - } else{ - "http://" - } - workerWebUiUrl = workerWebUiUrlPrefix + publicAddress + ":" + webUi.boundPort + workerWebUiUrl = "http://" + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 574f9f165528c..b60eae337cb90 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,23 +17,24 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.URL import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import org.eclipse.jetty.http.HttpStatus import org.eclipse.jetty.util.ssl.SslContextFactory -import scala.annotation.tailrec +import scala.collection.mutable.StringBuilder import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} import scala.xml.Node -import org.eclipse.jetty.server.{Connector, Server} +import org.eclipse.jetty.server.{Request, Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.eclipse.jetty.server.nio.SelectChannelConnector import org.eclipse.jetty.server.ssl.SslSelectChannelConnector + import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} @@ -175,22 +176,38 @@ private[spark] object JettyUtils extends Logging { * found. Return the jetty Server object, the chosen port, and a mutable collection of handlers. */ def startJettyServer( - hostName: String, - port: Int, - handlers: Seq[ServletContextHandler], - conf: SparkConf, - serverName: String = ""): ServerInfo = { + hostName: String, + port: Int, + handlers: Seq[ServletContextHandler], + conf: SparkConf, + serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection - collection.setHandlers(handlers.toArray) addFilters(handlers, conf) // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server - val connector = getConnector(currentPort, conf) - connector.setHost(hostName) - server.addConnector(connector) + // Create a connector on port currentPort to listen for HTTP requests + val httpConnector = new SelectChannelConnector() + httpConnector.setPort(currentPort) + httpConnector.setHost(hostName) + + if (conf.get("spark.ui.https.enabled", "false").toBoolean) { + val securePort = (currentPort + 8000) % 65536 + val schema = "https" + // Create a connector on port currentPort+1 to listen for HTTPS requests + val connector = buildSslSelectChannelConnector(securePort, conf) + connector.setHost(hostName) + server.setConnectors(Seq(httpConnector,connector).toArray) + + // redirect the HTTP requests to HTTPS port + val newHandlers = Seq(createRedirectHttpsHandler(securePort, schema)) ++ handlers + collection.setHandlers(newHandlers.toArray) + } else { + server.addConnector(httpConnector) + collection.setHandlers(handlers.toArray) + } val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -210,43 +227,69 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - /** Attach a prefix to the given path, but avoid returning an empty path */ - private def attachPrefix(basePath: String, relativePath: String): String = { - if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") + private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { + val builder = newURIBuilder(scheme, server, port) + builder.append(path) + if (query != null && query.length > 0) builder.append('?').append(query) + builder.toString + } + + private def newURIBuilder(scheme: String, server: String, port: Int) = { + val builder = new StringBuilder + appendSchemeHostPort(builder, scheme, server, port) + builder } - private def getConnector(port: Int, conf: SparkConf): Connector = { - val https = conf.get("spark.ui.https.enabled", "false").toBoolean - if (https) { - buildSslSelectChannelConnector(port, conf) + private def appendSchemeHostPort(url: StringBuilder, scheme: String, server: String, port: Int) { + if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { + url.append(scheme).append("://").append('[').append(server).append(']') } else { - val connector = new SelectChannelConnector - connector.setPort(port) - connector + url.append(scheme).append("://").append(server) + } + if (port > 0) { + url.append(':').append(port) } } + def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { + val redirectHandler: ContextHandler = new ContextHandler + redirectHandler.setContextPath("/") + redirectHandler.setHandler(new AbstractHandler { + @Override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + if (baseRequest.isSecure) { + return + } + if (securePort > 0) { + val url = newURI(schema, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.sendRedirect(url) + } + else { + response.sendError(HttpStatus.FORBIDDEN_403, "!Secure") + } + baseRequest.setHandled(true) + } + }) + redirectHandler + } + + /** Attach a prefix to the given path, but avoid returning an empty path */ + private def attachPrefix(basePath: String, relativePath: String): String = { + if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") + } + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { val ctxFactory = new SslContextFactory() - val needAuth = conf.getBoolean("spark.client.https.need-auth", false) - conf.getAll .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } .foreach { case (k, v) => setSslContextFactoryProps(k,v,ctxFactory) } - ctxFactory.setNeedClientAuth(needAuth) - ctxFactory.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) - ctxFactory.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) - ctxFactory.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) - ctxFactory.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) - - if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { - ctxFactory.setTrustStore(conf.get("spark.ssl.server.truststore.location")) - ctxFactory.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) - ctxFactory.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) - } - val connector = new SslSelectChannelConnector(ctxFactory) connector.setPort(port) connector @@ -255,6 +298,7 @@ private[spark] object JettyUtils extends Logging { private def setSslContextFactoryProps( key: String, value: String, ctxFactory:SslContextFactory) = { key match { + case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) @@ -263,8 +307,8 @@ private[spark] object JettyUtils extends Logging { case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) } - ctxFactory + ctxFactory } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 283629ea1254c..cccd59d122a92 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -99,7 +99,7 @@ private[spark] class SparkUI( */ private[spark] def appUIHostPort = publicHostName + ":" + boundPort - private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort" + private[spark] def appUIAddress = s"http://$appUIHostPort" } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index bd49559afff77..1ddaf947741fd 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -92,12 +92,6 @@ private[spark] abstract class WebUI( } } - def appUiAddressPrefix = if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - "https" - } else { - "http" - } - /** Initialize all components of the server. */ def initialize() @@ -106,7 +100,7 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format( + logInfo(s"Started %s at http://%s:%d".format( className, publicHostName, boundPort)) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ed063844323af..4c054f996d16e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -24,6 +24,7 @@ import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} import org.apache.log4j.PropertyConfigurator +import org.eclipse.jetty.util.MultiException import scala.collection.JavaConversions._ import scala.collection.Map @@ -1415,9 +1416,10 @@ private[spark] object Utils extends Logging { for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536 + println(s"start $serviceName at tryport: $tryPort") try { val (service, port) = startService(tryPort) - logInfo(s"Successfully started service$serviceString on port $port.") + println(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => @@ -1429,7 +1431,7 @@ private[spark] object Utils extends Logging { exception.setStackTrace(e.getStackTrace) throw exception } - logWarning(s"Service$serviceString could not bind on port $tryPort. " + + println(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } @@ -1447,8 +1449,11 @@ private[spark] object Utils extends Logging { return true } isBindCollision(e.getCause) - case e: Exception => isBindCollision(e.getCause) - case _ => false + case e: MultiException => e.getThrowables.exists(isBindCollision) + case e: Exception => + isBindCollision(e.getCause) + case _ => + return false } } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index ac503e4dc756e..66d1237605a5c 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -24,7 +24,6 @@ import scala.io.Source import scala.util.{Failure, Success, Try} import org.eclipse.jetty.servlet.ServletContextHandler -import org.eclipse.jetty.server.Server import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -126,28 +125,27 @@ class UISuite extends FunSuite { } test("jetty with https selects different port under contention") { - val startPort = 4040 - val server = new Server(startPort) - - Try { server.start() } match { - case Success(s) => - case Failure(e) => - // Either case server port is busy hence setup for test complete - } + val server = new ServerSocket(0) + val startPort = server.getLocalPort val sparkConf = new SparkConf() .set("spark.ui.https.enabled", "true") - .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.password", "123456") + .set("spark.ui.ssl.server.keystore.keypassword", "123456") val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1") val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server2") // Allow some wiggle room in case ports on the machine are under contention val boundPort1 = serverInfo1.boundPort val boundPort2 = serverInfo2.boundPort assert(boundPort1 != startPort) assert(boundPort2 != startPort) assert(boundPort1 != boundPort2) + serverInfo1.server.stop() + serverInfo2.server.stop() + server.close() } test("jetty binds to port 0 correctly") { @@ -165,8 +163,10 @@ class UISuite extends FunSuite { test("jetty with https binds to port 0 correctly") { val sparkConf = new SparkConf() - .set("spark.ui.https.enabled", "true") - .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.https.enabled", "false") + .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.password", "123456") + .set("spark.ui.ssl.server.keystore.keypassword", "123456") val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) val server = serverInfo.server @@ -177,6 +177,7 @@ class UISuite extends FunSuite { case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) case Failure(e) => } + serverInfo.server.stop() } test("verify appUIAddress contains the scheme") { From 89bf98644477050b149891c6cceb344316970652 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 17:25:30 +0800 Subject: [PATCH 09/17] revert debug code --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4c054f996d16e..fa14a839b170c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1416,10 +1416,10 @@ private[spark] object Utils extends Logging { for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536 - println(s"start $serviceName at tryport: $tryPort") + logInfo(s"start $serviceName at tryport: $tryPort") try { val (service, port) = startService(tryPort) - println(s"Successfully started service$serviceString on port $port.") + logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => @@ -1431,7 +1431,7 @@ private[spark] object Utils extends Logging { exception.setStackTrace(e.getStackTrace) throw exception } - println(s"Service$serviceString could not bind on port $tryPort. " + + logInfo(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } From 677b746d4e1ef56cebfe0ec06b1e7ba7c5cd1206 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 21:09:01 +0800 Subject: [PATCH 10/17] add https/ssl to docs --- .../org/apache/spark/ui/JettyUtils.scala | 10 ++-- .../scala/org/apache/spark/ui/WebUI.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 2 +- docs/security.md | 56 ++++++++++++++++++- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b60eae337cb90..295ccd3669884 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -176,11 +176,11 @@ private[spark] object JettyUtils extends Logging { * found. Return the jetty Server object, the chosen port, and a mutable collection of handlers. */ def startJettyServer( - hostName: String, - port: Int, - handlers: Seq[ServletContextHandler], - conf: SparkConf, - serverName: String = ""): ServerInfo = { + hostName: String, + port: Int, + handlers: Seq[ServletContextHandler], + conf: SparkConf, + serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection addFilters(handlers, conf) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 1ddaf947741fd..5d88ca403a674 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -100,8 +100,7 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo(s"Started %s at http://%s:%d".format( - className, publicHostName, boundPort)) + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ec38119f554e9..dd496d0d2a3ed 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1455,7 +1455,7 @@ private[spark] object Utils extends Logging { exception.setStackTrace(e.getStackTrace) throw exception } - logInfo(s"Service$serviceString could not bind on port $tryPort. " + + logWarning(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } diff --git a/docs/security.md b/docs/security.md index ec0523184d665..64fd65e6a95bc 100644 --- a/docs/security.md +++ b/docs/security.md @@ -11,12 +11,66 @@ Spark currently supports authentication via a shared secret. Authentication can ## Web UI -The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting and by using [Jetty https/SSL](http://www.eclipse.org/jetty/documentation/current/configuring-ssl.html) via the `spark.ui.https.enabled` setting. + +### Authentication + +A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. +### Encryption + +Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI server and browser client. The config `spark.ui.https.enabled` open switch for encryption, other configs of SSL encryption is as follows + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.ui.ssl.server.keystore.keypassword(none)The password for the specific key within the key store.
spark.ui.ssl.server.keystore.location(none)The file or URL of the SSL Key store.
spark.ui.ssl.server.keystore.password(none)The password for the key store.
spark.ui.ssl.server.keystore.typeJKSThe type of the key store (default "JKS").
spark.ui.ssl.client.https.needAuth(none) + Set true if SSL needs client authentication. +
spark.ui.ssl.server.truststore.location(none)The file name or URL of the trust store location.
spark.ui.ssl.server.truststore.password(none)The password for the trust store
spark.ui.ssl.server.truststore.typeJKSThe type of the trust store (default "JKS")
+ ## Event Logging If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. From a4ce9239e8ec912e0ba7d9a2a132de8b97cc8f71 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 22:25:00 +0800 Subject: [PATCH 11/17] fix docs --- docs/security.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/security.md b/docs/security.md index 64fd65e6a95bc..0e1c8422df558 100644 --- a/docs/security.md +++ b/docs/security.md @@ -50,9 +50,7 @@ Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI se spark.ui.ssl.client.https.needAuth (none) - - Set true if SSL needs client authentication. - + Set true if SSL needs client authentication. spark.ui.ssl.server.truststore.location From 6c31dc71a932083e158ef9c6737f33ebe9de1348 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 23:09:02 +0800 Subject: [PATCH 12/17] fix code format --- .../org/apache/spark/ui/JettyUtils.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 295ccd3669884..58559c94ff15c 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,9 +194,13 @@ private[spark] object JettyUtils extends Logging { httpConnector.setHost(hostName) if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - val securePort = (currentPort + 8000) % 65536 + val securePort = if (currentPort + 1 > 65536) { + currentPort - 1 + } else { + currentPort + 1 + } val schema = "https" - // Create a connector on port currentPort+1 to listen for HTTPS requests + // Create a connector on port securePort to listen for HTTPS requests val connector = buildSslSelectChannelConnector(securePort, conf) connector.setHost(hostName) server.setConnectors(Seq(httpConnector,connector).toArray) @@ -251,15 +255,15 @@ private[spark] object JettyUtils extends Logging { } } - def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { + private def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { val redirectHandler: ContextHandler = new ContextHandler redirectHandler.setContextPath("/") redirectHandler.setHandler(new AbstractHandler { @Override def handle( - target: String, - baseRequest: Request, - request: HttpServletRequest, - response: HttpServletResponse): Unit = { + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { if (baseRequest.isSecure) { return } @@ -268,8 +272,7 @@ private[spark] object JettyUtils extends Logging { baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) response.sendRedirect(url) - } - else { + }else { response.sendError(HttpStatus.FORBIDDEN_403, "!Secure") } baseRequest.setHandled(true) @@ -284,11 +287,10 @@ private[spark] object JettyUtils extends Logging { } private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { - val ctxFactory = new SslContextFactory() conf.getAll .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } - .foreach { case (k, v) => setSslContextFactoryProps(k,v,ctxFactory) } + .foreach { case (k, v) => setSslContextFactoryProps(k, v, ctxFactory) } val connector = new SslSelectChannelConnector(ctxFactory) connector.setPort(port) From 7a898fbbb566172ee33f462357bb4fb02cced237 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 2 Oct 2014 00:08:41 +0800 Subject: [PATCH 13/17] fix securePort --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 58559c94ff15c..b306e2b0e15ab 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,10 +194,12 @@ private[spark] object JettyUtils extends Logging { httpConnector.setHost(hostName) if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - val securePort = if (currentPort + 1 > 65536) { - currentPort - 1 + // do not use 1 - 1024 ports for securePort + val tmpPort = (currentPort + 1) % 65536 + val securePort = if ( tmpPort <= 1024) { + tmpPort + 1024 } else { - currentPort + 1 + tmpPort } val schema = "https" // Create a connector on port securePort to listen for HTTPS requests From e5c87cb596ab3ae16ae52458c8e1edf6d1d3647a Mon Sep 17 00:00:00 2001 From: scwf Date: Fri, 3 Oct 2014 20:19:57 +0800 Subject: [PATCH 14/17] fix comments by JoshRosen --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 9 ++------- docs/security.md | 5 +++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b306e2b0e15ab..35e4ef044ccd8 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,13 +194,8 @@ private[spark] object JettyUtils extends Logging { httpConnector.setHost(hostName) if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - // do not use 1 - 1024 ports for securePort - val tmpPort = (currentPort + 1) % 65536 - val securePort = if ( tmpPort <= 1024) { - tmpPort + 1024 - } else { - tmpPort - } + // / If the new port wraps around, do not try a privilege port + val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 val schema = "https" // Create a connector on port securePort to listen for HTTPS requests val connector = buildSslSelectChannelConnector(securePort, conf) diff --git a/docs/security.md b/docs/security.md index 0e1c8422df558..83198d4ec0496 100644 --- a/docs/security.md +++ b/docs/security.md @@ -27,6 +27,11 @@ Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI se + + + + + From a48c6fc585c7fdb9c859b00f9b7d7d46836706b8 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 5 Oct 2014 08:22:17 +0800 Subject: [PATCH 15/17] address JoshRosen's comments --- .../org/apache/spark/ui/JettyUtils.scala | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 51a422f067d6d..194b6a943ffdf 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -232,27 +232,17 @@ private[spark] object JettyUtils extends Logging { } private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { - val builder = newURIBuilder(scheme, server, port) - builder.append(path) - if (query != null && query.length > 0) builder.append('?').append(query) - builder.toString - } - - private def newURIBuilder(scheme: String, server: String, port: Int) = { val builder = new StringBuilder - appendSchemeHostPort(builder, scheme, server, port) - builder - } - private def appendSchemeHostPort(url: StringBuilder, scheme: String, server: String, port: Int) { if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { - url.append(scheme).append("://").append('[').append(server).append(']') + builder.append(scheme).append("://").append('[').append(server).append(']') } else { - url.append(scheme).append("://").append(server) - } - if (port > 0) { - url.append(':').append(port) + builder.append(scheme).append("://").append(server) } + builder.append(':').append(port) + builder.append(path) + if (query != null && query.length > 0) builder.append('?').append(query) + builder.toString } private def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { @@ -267,14 +257,10 @@ private[spark] object JettyUtils extends Logging { if (baseRequest.isSecure) { return } - if (securePort > 0) { - val url = newURI(schema, baseRequest.getServerName, securePort, - baseRequest.getRequestURI, baseRequest.getQueryString) - response.setContentLength(0) - response.sendRedirect(url) - }else { - response.sendError(HttpStatus.FORBIDDEN_403, "!Secure") - } + val url = newURI(schema, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.sendRedirect(url) baseRequest.setHandled(true) } }) @@ -298,7 +284,7 @@ private[spark] object JettyUtils extends Logging { } private def setSslContextFactoryProps( - key: String, value: String, ctxFactory:SslContextFactory) = { + key: String, value: String, ctxFactory: SslContextFactory) = { key match { case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) From 2dadb2f206a81d5c45c898833dd9561f53bdaf0e Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 7 Oct 2014 10:22:16 +0800 Subject: [PATCH 16/17] address vanzin's comments --- .../org/apache/spark/ui/JettyUtils.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 194b6a943ffdf..9598779f9e999 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -199,14 +199,14 @@ private[spark] object JettyUtils extends Logging { if (conf.get("spark.ui.https.enabled", "false").toBoolean) { // / If the new port wraps around, do not try a privilege port val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 - val schema = "https" + val scheme = "https" // Create a connector on port securePort to listen for HTTPS requests val connector = buildSslSelectChannelConnector(securePort, conf) connector.setHost(hostName) server.setConnectors(Seq(httpConnector,connector).toArray) // redirect the HTTP requests to HTTPS port - val newHandlers = Seq(createRedirectHttpsHandler(securePort, schema)) ++ handlers + val newHandlers = Seq(createRedirectHttpsHandler(securePort, scheme)) ++ handlers collection.setHandlers(newHandlers.toArray) } else { server.addConnector(httpConnector) @@ -231,7 +231,8 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { + // to generate a new url string scheme://server:port+path + private def newURL(scheme: String, server: String, port: Int, path: String, query: String) = { val builder = new StringBuilder if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { @@ -245,22 +246,23 @@ private[spark] object JettyUtils extends Logging { builder.toString } - private def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { + private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { val redirectHandler: ContextHandler = new ContextHandler redirectHandler.setContextPath("/") redirectHandler.setHandler(new AbstractHandler { - @Override def handle( - target: String, - baseRequest: Request, - request: HttpServletRequest, - response: HttpServletResponse): Unit = { + override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { if (baseRequest.isSecure) { return } - val url = newURI(schema, baseRequest.getServerName, securePort, + val httpsURL = newURL(scheme, baseRequest.getServerName, securePort, baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) - response.sendRedirect(url) + response.encodeRedirectURL(httpsURL) + response.sendRedirect(httpsURL) baseRequest.setHandled(true) } }) @@ -295,8 +297,6 @@ private[spark] object JettyUtils extends Logging { case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) } - - ctxFactory } } From 3b01d3a66d21854b5e053de89f3d301fee58b315 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 10 Oct 2014 16:03:27 +0800 Subject: [PATCH 17/17] add reference to method newURI --- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 9598779f9e999..371002abe4c5b 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -231,8 +231,10 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - // to generate a new url string scheme://server:port+path - private def newURL(scheme: String, server: String, port: Int, path: String, query: String) = { + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. Based on: + // https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/ + // jetty/util/URIUtil.java#L726-L733 + private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { val builder = new StringBuilder if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { @@ -258,11 +260,11 @@ private[spark] object JettyUtils extends Logging { if (baseRequest.isSecure) { return } - val httpsURL = newURL(scheme, baseRequest.getServerName, securePort, + val httpsURI = newURI(scheme, baseRequest.getServerName, securePort, baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) - response.encodeRedirectURL(httpsURL) - response.sendRedirect(httpsURL) + response.encodeRedirectURL(httpsURI) + response.sendRedirect(httpsURI) baseRequest.setHandled(true) } })
Property NameDefaultMeaning
spark.ui.https.enabledfalseWhether to enable https in web ui.
spark.ui.ssl.server.keystore.keypassword (none)