diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index efb5f9d501e48..6fa15ac168bb0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2350,7 +2350,8 @@ class SparkContext(config: SparkConf) extends Logging { // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) + startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls, + schedulerBackend.getMasterWebUiUrl)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index ac09c6c497f8b..2fceffad6e7d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -127,7 +127,8 @@ private[deploy] object DeployMessages { // Master to AppClient - case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage + case class RegisteredApplication(appId: String, master: RpcEndpointRef, + masterWebUiUrl: String) extends DeployMessage // TODO(matei): replace hostPort with host case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 93f58ce63799f..2700896d499a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -153,7 +153,7 @@ private[spark] class StandaloneAppClient( } override def receive: PartialFunction[Any, Unit] = { - case RegisteredApplication(appId_, masterRef) => + case RegisteredApplication(appId_, masterRef, masterWebUiUrl) => // FIXME How to handle the following cases? // 1. A master receives multiple registrations and sends back multiple // RegisteredApplications due to an unstable network. @@ -163,6 +163,7 @@ private[spark] class StandaloneAppClient( registered.set(true) master = Some(masterRef) listener.connected(appId.get) + listener.masterChanged(masterWebUiUrl) case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message)) @@ -185,6 +186,7 @@ private[spark] class StandaloneAppClient( case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) + listener.masterChanged(masterWebUiUrl) alreadyDisconnected = false masterRef.send(MasterChangeAcknowledged(appId.get)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 64255ec92b72a..6e8c182d2ddea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + + def masterChanged(masterWebUiUrl: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c5f7c077fe202..3223c2de78e22 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -264,7 +264,7 @@ private[deploy] class Master( registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) - driver.send(RegisteredApplication(app.id, self)) + driver.send(RegisteredApplication(app.id, self, masterWebUiUrl)) schedule() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 8801a761afae3..e7c0616bbca0c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -56,4 +56,6 @@ private[spark] trait SchedulerBackend { */ def getDriverLogUrls: Option[Map[String, String]] = None + def getMasterWebUiUrl: Option[String] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 7618dfeeedf8d..d5dc9fa2b1b99 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -126,7 +126,8 @@ case class SparkListenerApplicationStart( time: Long, sparkUser: String, appAttemptId: Option[String], - driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent + driverLogs: Option[Map[String, String]] = None, + masterWebUiUrl: Option[String] = None) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 7befdb0c1f64d..1581627d08516 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -160,6 +160,16 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + var masterWebUiUrl: Option[String] = None + + override def masterChanged(webUiUrl: String): Unit = { + masterWebUiUrl = Some(webUiUrl) + } + + override def getMasterWebUiUrl(): Option[String] = { + masterWebUiUrl + } + override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 7953d77fd7ece..d721ccf5112eb 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -50,6 +50,10 @@ private[ui] class ExecutorsPage( private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { + val masterWebUiUrl = listener.masterWebUiUrl + val backLink = masterWebUiUrl.map( + link =>

Back to Master

+ ).getOrElse(Seq.empty) val content =
{ @@ -60,7 +64,7 @@ private[ui] class ExecutorsPage( }
; - UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true) + UIUtils.headerSparkPage("Executors", backLink++content, parent, useDataTables = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 678571fd4f5ac..03ad4473505fa 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -63,6 +63,8 @@ private[ui] case class ExecutorTaskSummary( @DeveloperApi class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) extends SparkListener { + var masterWebUiUrl: Option[String] = None + var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]() var executorEvents = new ListBuffer[SparkListenerEvent]() @@ -112,6 +114,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar taskSummary.executorLogs = logs.toMap } } + masterWebUiUrl = applicationStart.masterWebUiUrl } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index bc58fb2a362a4..0fe6b7cd05a30 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -201,6 +201,8 @@ class AppClientSuite deadReasonList.add(reason) } + def masterChanged(masterWebUiUrl: String): Unit = { } + def executorAdded( id: String, workerId: String, diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2314d7f45cb21..066d79e8a8be9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -54,9 +54,16 @@ object MimaExcludes { // [SPARK-14743] Improve delegation token handling in secure cluster ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"), // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references") + ) ++ + Seq( // [SPARK-16853][SQL] Fixes encoder error in DataSet typed select ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"), + // [SPARK-16856] [WEBUI] [CORE] Link the application's executor page to the master's UI + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"), // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), // [SPARK-16240] ML persistence backward compatibility for LDA