Skip to content

Commit 837475b

Browse files
author
Andrew Or
committed
Show the REST port on the Master UI
1 parent d8d3717 commit 837475b

File tree

6 files changed

+49
-18
lines changed

6 files changed

+49
-18
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ span.expand-details {
103103
float: right;
104104
}
105105

106+
span.stable-uri {
107+
font-size: 10pt;
108+
font-style: italic;
109+
color: gray;
110+
}
111+
106112
pre {
107113
font-size: 0.8em;
108114
}

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,22 @@ private[deploy] object DeployMessages {
148148

149149
// Master to MasterWebUI
150150

151-
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
152-
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
153-
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
154-
status: MasterState) {
151+
case class MasterStateResponse(
152+
host: String,
153+
port: Int,
154+
stablePort: Option[Int],
155+
workers: Array[WorkerInfo],
156+
activeApps: Array[ApplicationInfo],
157+
completedApps: Array[ApplicationInfo],
158+
activeDrivers: Array[DriverInfo],
159+
completedDrivers: Array[DriverInfo],
160+
status: MasterState) {
155161

156162
Utils.checkHost(host, "Required hostname")
157163
assert (port > 0)
158164

159165
def uri = "spark://" + host + ":" + port
166+
def stableUri: Option[String] = stablePort.map { p => "spark://" + host + ":" + p }
160167
}
161168

162169
// WorkerWebUI to Worker

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,17 @@ object SparkSubmit {
112112
* Second, we use this launch environment to invoke the main method of the child
113113
* main class.
114114
*
115-
* Note that standalone cluster mode is an exception in that we do not invoke the
116-
* main method of a child class. Instead, we pass the submit parameters directly to
117-
* a REST client, which will submit the application using the stable REST protocol.
115+
* As of Spark 1.3, a stable REST-based application submission gateway is introduced.
116+
* If this is enabled, then we will run standalone cluster mode by passing the submit
117+
* parameters directly to a REST client, which will submit the application using the
118+
* REST protocol instead.
118119
*/
119120
private def submit(args: SparkSubmitArguments): Unit = {
120121
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
121-
// In standalone cluster mode, use the stable application submission REST protocol.
122-
// Otherwise, just call the main method of the child class.
123-
if (args.isStandaloneCluster) {
122+
val restKey = "spark.submit.rest.enabled"
123+
val restEnabled = args.sparkProperties.get(restKey).getOrElse("false").toBoolean
124+
if (args.isStandaloneCluster && restEnabled) {
125+
printStream.println("Running standalone cluster mode using the stable REST protocol.")
124126
// NOTE: since we mutate the values of some configs in `prepareSubmitEnvironment`, we
125127
// must update the corresponding fields in the original SparkSubmitArguments to reflect
126128
// these changes.

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,14 @@ private[spark] class Master(
124124

125125
// Alternative application submission gateway that is stable across Spark versions
126126
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
127-
private val restServerPort = conf.getInt("spark.master.rest.port", 17077)
128-
private val restServer = new StandaloneRestServer(this, host, restServerPort)
129-
if (restServerEnabled) {
130-
restServer.start()
131-
}
127+
private val restServer =
128+
if (restServerEnabled) {
129+
val port = conf.getInt("spark.master.rest.port", 17077)
130+
Some(new StandaloneRestServer(this, host, port))
131+
} else {
132+
None
133+
}
134+
private val restServerBoundPort = restServer.map(_.start())
132135

133136
override def preStart() {
134137
logInfo("Starting Spark master at " + masterUrl)
@@ -183,7 +186,7 @@ private[spark] class Master(
183186
recoveryCompletionTask.cancel()
184187
}
185188
webUi.stop()
186-
restServer.stop()
189+
restServer.foreach(_.stop())
187190
masterMetricsSystem.stop()
188191
applicationMetricsSystem.stop()
189192
persistenceEngine.close()
@@ -431,7 +434,9 @@ private[spark] class Master(
431434
}
432435

433436
case RequestMasterState => {
434-
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
437+
sender ! MasterStateResponse(
438+
host, port, restServerBoundPort,
439+
workers.toArray, apps.toArray, completedApps.toArray,
435440
drivers.toArray, completedDrivers.toArray, state)
436441
}
437442

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,15 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
7373
<div class="span12">
7474
<ul class="unstyled">
7575
<li><strong>URL:</strong> {state.uri}</li>
76+
{
77+
state.stableUri
78+
.map { uri =>
79+
<li>
80+
<strong>Stable URL:</strong> {uri}
81+
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
82+
</li> }
83+
.getOrElse { Seq.empty }
84+
}
7685
<li><strong>Workers:</strong> {state.workers.size}</li>
7786
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
7887
{state.workers.map(_.coresUsed).sum} Used</li>

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ private[spark] abstract class SubmitRestServer(host: String, requestedPort: Int,
4141
protected val handler: SubmitRestServerHandler
4242
private var _server: Option[Server] = None
4343

44-
def start(): Unit = {
44+
/** Start the server and return the bound port. */
45+
def start(): Int = {
4546
val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, conf)
4647
_server = Some(server)
4748
logInfo(s"Started REST server for submitting applications on port $boundPort")
49+
boundPort
4850
}
4951

5052
def stop(): Unit = {

0 commit comments

Comments
 (0)