From f35afa215a214d1b5ff8296e5e20b00044ee9f02 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 3 Aug 2016 00:09:22 +0800 Subject: [PATCH 1/5] link application page to master page --- .../src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- .../scala/org/apache/spark/deploy/DeployMessage.scala | 4 +++- .../spark/deploy/client/StandaloneAppClient.scala | 4 +++- .../deploy/client/StandaloneAppClientListener.scala | 2 ++ .../scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- .../spark/deploy/master/ui/ApplicationPage.scala | 2 ++ .../org/apache/spark/scheduler/SchedulerBackend.scala | 2 ++ .../org/apache/spark/scheduler/SparkListener.scala | 3 ++- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 10 ++++++++++ .../scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 6 +++++- .../scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 3 +++ .../org/apache/spark/deploy/JsonProtocolSuite.scala | 2 +- .../apache/spark/deploy/client/AppClientSuite.scala | 2 ++ 13 files changed, 39 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d48e2b420d718..2f31c637fd60e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2157,7 +2157,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) + startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls, + schedulerBackend.getMasterWebUiUrl)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 34c0696bfc4e5..b0a2298cd54f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -127,7 +127,8 @@ private[deploy] object DeployMessages { // Master to AppClient - case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage + case class RegisteredApplication(appId: String, master: RpcEndpointRef, + masterWebUiUrl: String) extends DeployMessage // TODO(matei): replace hostPort with host case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { @@ -176,6 +177,7 @@ private[deploy] object DeployMessages { host: String, port: Int, restPort: Option[Int], + webUiUrl: String, workers: Array[WorkerInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index a9df732df93ca..805845bb3068a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -156,7 +156,7 @@ private[spark] class StandaloneAppClient( } override def receive: PartialFunction[Any, Unit] = { - case RegisteredApplication(appId_, masterRef) => + case RegisteredApplication(appId_, masterRef, masterWebUiUrl) => // FIXME How to handle the following cases? // 1. A master receives multiple registrations and sends back multiple // RegisteredApplications due to an unstable network. @@ -166,6 +166,7 @@ private[spark] class StandaloneAppClient( registered.set(true) master = Some(masterRef) listener.connected(appId.get) + listener.masterChanged(masterWebUiUrl) case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message)) @@ -188,6 +189,7 @@ private[spark] class StandaloneAppClient( case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) + listener.masterChanged(masterWebUiUrl) alreadyDisconnected = false masterRef.send(MasterChangeAcknowledged(appId.get)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 370b16ce4213a..92a16de227837 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -37,4 +37,6 @@ private[spark] trait StandaloneAppClientListener { fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit + + def masterChanged(masterWebUiUrl: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f8aac3008cefa..67203239620be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -233,7 +233,7 @@ private[deploy] class Master( registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) - driver.send(RegisteredApplication(app.id, self)) + driver.send(RegisteredApplication(app.id, self, masterWebUiUrl)) schedule() } @@ -471,7 +471,7 @@ private[deploy] class Master( case RequestMasterState => context.reply(MasterStateResponse( - address.host, address.port, restServerBoundPort, + address.host, address.port, restServerBoundPort, masterWebUiUrl, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 8875fc223250d..2bb8170696201 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -35,6 +35,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val state = master.askWithRetry[MasterStateResponse](RequestMasterState) + val masterWebUiUrl = state.webUiUrl val app = state.activeApps.find(_.id == appId) .getOrElse(state.completedApps.find(_.id == appId).orNull) if (app == null) { @@ -81,6 +82,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } } +

Back to Master

diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 8801a761afae3..e7c0616bbca0c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -56,4 +56,6 @@ private[spark] trait SchedulerBackend { */ def getDriverLogUrls: Option[Map[String, String]] = None + def getMasterWebUiUrl: Option[String] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 7618dfeeedf8d..d5dc9fa2b1b99 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -126,7 +126,8 @@ case class SparkListenerApplicationStart( time: Long, sparkUser: String, appAttemptId: Option[String], - driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent + driverLogs: Option[Map[String, String]] = None, + masterWebUiUrl: Option[String] = None) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8382fbe9ddb80..fa778bcf527e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -157,6 +157,16 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + var masterWebUiUrl: Option[String] = None + + override def masterChanged(webUiUrl: String): Unit = { + masterWebUiUrl = Some(webUiUrl) + } + + override def getMasterWebUiUrl(): Option[String] = { + masterWebUiUrl + } + override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 287390b87bd73..2be3a35bf09bd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -54,6 +54,10 @@ private[ui] class ExecutorsPage( private val GCTimePercent = 0.1 def render(request: HttpServletRequest): Seq[Node] = { + val masterWebUiUrl = listener.masterWebUiUrl + val backLink = masterWebUiUrl.map( + link =>

Back to Master

+ ).getOrElse(Seq.empty) val content =
{ @@ -63,7 +67,7 @@ private[ui] class ExecutorsPage( }
; - UIUtils.headerSparkPage("Executors", content, parent, useDataTables = true) + UIUtils.headerSparkPage("Executors", backLink++content, parent, useDataTables = true) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 676f4457510c2..64ed843467093 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -45,6 +45,8 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec @DeveloperApi class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) extends SparkListener { + var masterWebUiUrl: Option[String] = None + val executorToTotalCores = HashMap[String, Int]() val executorToTasksMax = HashMap[String, Int]() val executorToTasksActive = HashMap[String, Int]() @@ -89,6 +91,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar } storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap } } + masterWebUiUrl = applicationStart.masterWebUiUrl } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 2d48e75cfbd96..e1228d824ee25 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -69,7 +69,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { val activeDrivers = Array(createDriverInfo()) val completedDrivers = Array(createDriverInfo()) val stateResponse = new MasterStateResponse( - "host", 8080, None, workers, activeApps, completedApps, + "host", 8080, None, "host:port", workers, activeApps, completedApps, activeDrivers, completedDrivers, RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index f6ef9d15ddee2..7e57ebe509805 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -187,6 +187,8 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd deadReasonList.add(reason) } + def masterChanged(masterWebUiUrl: String): Unit = { } + def executorAdded( id: String, workerId: String, From 1f56f460e263ef098ef5c28153c623698a2b05e7 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Sat, 6 Aug 2016 09:41:32 +0800 Subject: [PATCH 2/5] Remove the link on the application summary page --- core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala | 1 - core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- .../org/apache/spark/deploy/master/ui/ApplicationPage.scala | 2 -- .../test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala | 2 +- 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b0a2298cd54f1..a492dcaf9cd2a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -177,7 +177,6 @@ private[deploy] object DeployMessages { host: String, port: Int, restPort: Option[Int], - webUiUrl: String, workers: Array[WorkerInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 67203239620be..55c7f1be10752 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -471,7 +471,7 @@ private[deploy] class Master( case RequestMasterState => context.reply(MasterStateResponse( - address.host, address.port, restServerBoundPort, masterWebUiUrl, + address.host, address.port, restServerBoundPort, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 2bb8170696201..8875fc223250d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -35,7 +35,6 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val state = master.askWithRetry[MasterStateResponse](RequestMasterState) - val masterWebUiUrl = state.webUiUrl val app = state.activeApps.find(_.id == appId) .getOrElse(state.completedApps.find(_.id == appId).orNull) if (app == null) { @@ -82,7 +81,6 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } } -

Back to Master

diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index e1228d824ee25..2d48e75cfbd96 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -69,7 +69,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { val activeDrivers = Array(createDriverInfo()) val completedDrivers = Array(createDriverInfo()) val stateResponse = new MasterStateResponse( - "host", 8080, None, "host:port", workers, activeApps, completedApps, + "host", 8080, None, workers, activeApps, completedApps, activeDrivers, completedDrivers, RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) From 0192b374695af4e28cbbe264c75406966ad36f82 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Sat, 6 Aug 2016 09:42:28 +0800 Subject: [PATCH 3/5] Update MimaExcludes --- project/MimaExcludes.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 56061559feff9..3cdd1e5f99dfc 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -39,6 +39,13 @@ object MimaExcludes { Seq( // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references") + ) ++ + Seq( + // [SPARK-16856] [WEBUI] [CORE] Link the application's executor page to the master's UI + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this") ) } From e3707db1649bf084d75a86bbf2fa755b7dc526d1 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Sat, 6 Aug 2016 10:30:20 +0800 Subject: [PATCH 4/5] remove blanks --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e14e3d5bd6783..90750f3c4ef06 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -42,7 +42,7 @@ object MimaExcludes { ) ++ Seq( // [SPARK-16853][SQL] Fixes encoder error in DataSet typed select - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select") ) ++ Seq( // [SPARK-16856] [WEBUI] [CORE] Link the application's executor page to the master's UI From 76e68eb70187f977aa569fde50422018061c8bcf Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Sat, 6 Aug 2016 10:51:36 +0800 Subject: [PATCH 5/5] update MimaExcludes --- project/MimaExcludes.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 90750f3c4ef06..33a9c955598b8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,9 +46,9 @@ object MimaExcludes { ) ++ Seq( // [SPARK-16856] [WEBUI] [CORE] Link the application's executor page to the master's UI - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$") - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply") - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this") ) }