From 1fe771028b85eb2298d860da02e2e5a062512027 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 18 Jun 2014 17:01:53 +0800 Subject: [PATCH 1/8] Link the spark UI to RM ui in yarn-client mode --- .../cluster/CoarseGrainedClusterMessage.scala | 2 ++ .../CoarseGrainedSchedulerBackend.scala | 3 +++ .../org/apache/spark/ui/JettyUtils.scala | 5 ++-- .../scala/org/apache/spark/ui/UIUtils.scala | 11 ++++++++- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 13 +++++++++++ .../spark/deploy/yarn/ExecutorLauncher.scala | 23 ++++++++++++++++--- 7 files changed, 52 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 318e16552201..9f6c1d889433 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -66,4 +66,6 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage + case class AddWebUIFilter(filter: String, proxyBase :String) extends CoarseGrainedClusterMessage + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0f5545e2ed65..a8b98ea46180 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -136,6 +136,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A removeExecutor(executorId, reason) sender ! true + case AddWebUIFilter(filter, proxyBase) => + addWebUIFilter(filter, proxyBase) + sender ! true case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a2535e3c1c41..ec85ac91a640 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -139,7 +139,8 @@ private[spark] object JettyUtils extends Logging { /** Add filters, if any, to the given list of ServletContextHandlers */ def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { - val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) + val filters: Array[String] = sys.props.get("spark.ui.filters"). + getOrElse("").split(',').map(_.trim()) filters.foreach { case filter : String => if (!filter.isEmpty) { @@ -148,7 +149,7 @@ private[spark] object JettyUtils extends Logging { holder.setClassName(filter) // Get any parameters for each filter val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet + val params = sys.props.get(paramName).getOrElse("").split(',').map(_.trim()).toSet params.foreach { case param : String => if (!param.isEmpty) { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 9cb50d9b83dd..e07aa2ee3a5a 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging { } // Yarn has to go through a proxy so the base uri is provided and has to be on all links - val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") + def uiRoot: String = { + if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) { + System.getenv("APPLICATION_WEB_PROXY_BASE") + } else if (System.getProperty("spark.ui.proxyBase") != null) { + System.getProperty("spark.ui.proxyBase") + } + else { + "" + } + } def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 856273e1d4e2..b49fb2a1528b 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -43,7 +43,7 @@ private[spark] abstract class WebUI( extends Logging { protected val tabs = ArrayBuffer[WebUITab]() - protected val handlers = ArrayBuffer[ServletContextHandler]() + protected[spark] val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 0f9fdcfcb651..e47ccc1aef47 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} +import org.apache.spark.ui.JettyUtils import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher} import org.apache.spark.scheduler.TaskSchedulerImpl @@ -48,6 +49,7 @@ private[spark] class YarnClientSchedulerBackend( val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort + conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( @@ -115,4 +117,15 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } + override def addWebUIFilter(filter: String, proxyBase: String) { + if (filter != null && filter.nonEmpty && proxyBase != null && proxyBase.nonEmpty) { + logInfo(s"Add WebUI Filter. $filter") + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + System.setProperty("spark.ui.filters", amFilter) + System.setProperty(s"spark.$amFilter.params", filter) + System.setProperty("spark.ui.proxyBase", proxyBase) + JettyUtils.addFilters(sc.ui.handlers, conf) + } + } + } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index f71ad036ce0f..e16cdc2031eb 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -31,10 +31,12 @@ import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.hadoop.yarn.webapp.util.WebAppUtils /** * An application master that allocates executors on behalf of a driver that is running outside @@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp case x: DisassociatedEvent => logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x } } @@ -99,6 +104,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp registerApplicationMaster() waitForSparkMaster() + // setup AmIpFilter for the SparkUI - do this before we start the UI + addAmIpFilter() // Allocate all containers allocateExecutors() @@ -142,9 +149,19 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - logInfo("Registering the ApplicationMaster") - // TODO: Find out client's Spark UI address and fill in here? - amClient.registerApplicationMaster(Utils.localHostName(), 0, "") + val appUIAddress = sparkConf.getOption("spark.driver.appUIAddress").getOrElse("") + logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress") + amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress) + } + + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts : Array[String] = proxy.split(":") + val proxyBase =System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val uriBase = "http://" + proxy + proxyBase + val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + actor ! AddWebUIFilter(amFilter, proxyBase) } private def waitForSparkMaster() { From 142ee29ee84cf6e47a8cad401129a8e8b80be46a Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 24 Jun 2014 00:25:36 +0800 Subject: [PATCH 2/8] review commit --- .../cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../scala/org/apache/spark/ui/JettyUtils.scala | 5 ++--- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 14 +++++++------- .../spark/deploy/yarn/ExecutorLauncher.scala | 7 ++++--- 6 files changed, 18 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 9f6c1d889433..6abf6d930c15 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -66,6 +66,7 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage - case class AddWebUIFilter(filter: String, proxyBase :String) extends CoarseGrainedClusterMessage + case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String) + extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a8b98ea46180..f14666082c3e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -136,8 +136,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A removeExecutor(executorId, reason) sender ! true - case AddWebUIFilter(filter, proxyBase) => - addWebUIFilter(filter, proxyBase) + case AddWebUIFilter(filterName, filterParams, proxyBase) => + addWebUIFilter(filterName, filterParams, proxyBase) sender ! true case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index ec85ac91a640..a2535e3c1c41 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -139,8 +139,7 @@ private[spark] object JettyUtils extends Logging { /** Add filters, if any, to the given list of ServletContextHandlers */ def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { - val filters: Array[String] = sys.props.get("spark.ui.filters"). - getOrElse("").split(',').map(_.trim()) + val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) filters.foreach { case filter : String => if (!filter.isEmpty) { @@ -149,7 +148,7 @@ private[spark] object JettyUtils extends Logging { holder.setClassName(filter) // Get any parameters for each filter val paramName = "spark." + filter + ".params" - val params = sys.props.get(paramName).getOrElse("").split(',').map(_.trim()).toSet + val params = conf.get(paramName, "").split(',').map(_.trim()).toSet params.foreach { case param : String => if (!param.isEmpty) { diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index b49fb2a1528b..856273e1d4e2 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -43,7 +43,7 @@ private[spark] abstract class WebUI( extends Logging { protected val tabs = ArrayBuffer[WebUITab]() - protected[spark] val handlers = ArrayBuffer[ServletContextHandler]() + protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e47ccc1aef47..c0afa6d01998 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -117,14 +117,14 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } - override def addWebUIFilter(filter: String, proxyBase: String) { - if (filter != null && filter.nonEmpty && proxyBase != null && proxyBase.nonEmpty) { - logInfo(s"Add WebUI Filter. $filter") - val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" - System.setProperty("spark.ui.filters", amFilter) - System.setProperty(s"spark.$amFilter.params", filter) + override def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { + if (filterParams != null && filterParams.nonEmpty && proxyBase != null && + proxyBase.nonEmpty && filterName != null && filterName.nonEmpty) { + logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") + conf.set("spark.ui.filters", filterName) + conf.set(s"spark.$filterName.params", filterParams) System.setProperty("spark.ui.proxyBase", proxyBase) - JettyUtils.addFilters(sc.ui.handlers, conf) + JettyUtils.addFilters(sc.ui.getHandlers, conf) } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index e16cdc2031eb..debfafdb2a31 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -149,7 +149,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - val appUIAddress = sparkConf.getOption("spark.driver.appUIAddress").getOrElse("") + val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "") logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress") amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress) } @@ -158,10 +158,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp private def addAmIpFilter() { val proxy = WebAppUtils.getProxyHostAndPort(conf) val parts : Array[String] = proxy.split(":") - val proxyBase =System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val uriBase = "http://" + proxy + proxyBase val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - actor ! AddWebUIFilter(amFilter, proxyBase) + val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) } private def waitForSparkMaster() { From 3e9630b9c44d83fc8b0cfafa682fd1503ffd1665 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 24 Jun 2014 00:27:33 +0800 Subject: [PATCH 3/8] review commit --- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index debfafdb2a31..b22d9888efd1 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -161,7 +161,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val uriBase = "http://" + proxy + proxyBase val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) } From 689658659dd851a071995991229363479be5d8f6 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 24 Jun 2014 11:11:48 +0800 Subject: [PATCH 4/8] Add comments to addWebUIFilter --- .../cluster/CoarseGrainedSchedulerBackend.scala | 12 ++++++++++++ .../cluster/YarnClientSchedulerBackend.scala | 12 ------------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f14666082c3e..563ad1bcbeb9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} +import org.apache.spark.ui.JettyUtils /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -279,6 +280,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } false } + + // Add filters to the SparkUI + def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { + if (Seq(filterName, filterParams, proxyBase).forall(t => t != null && t.nonEmpty)) { + logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") + conf.set("spark.ui.filters", filterName) + conf.set(s"spark.$filterName.params", filterParams) + System.setProperty("spark.ui.proxyBase", proxyBase) + JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf) + } + } } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index c0afa6d01998..1b37c4bb13f4 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} -import org.apache.spark.ui.JettyUtils import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher} import org.apache.spark.scheduler.TaskSchedulerImpl @@ -117,15 +116,4 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } - override def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { - if (filterParams != null && filterParams.nonEmpty && proxyBase != null && - proxyBase.nonEmpty && filterName != null && filterName.nonEmpty) { - logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") - conf.set("spark.ui.filters", filterName) - conf.set(s"spark.$filterName.params", filterParams) - System.setProperty("spark.ui.proxyBase", proxyBase) - JettyUtils.addFilters(sc.ui.getHandlers, conf) - } - } - } From 1b92a0729a856ef6edf1d48c6d365dc2e8c84c0f Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 27 Jun 2014 10:13:33 +0800 Subject: [PATCH 5/8] review commit --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 563ad1bcbeb9..9f085eef4672 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -283,11 +283,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Add filters to the SparkUI def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { - if (Seq(filterName, filterParams, proxyBase).forall(t => t != null && t.nonEmpty)) { + if (proxyBase != null && proxyBase.nonEmpty) { + System.setProperty("spark.ui.proxyBase", proxyBase) + } + + if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) { logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") conf.set("spark.ui.filters", filterName) conf.set(s"spark.$filterName.params", filterParams) - System.setProperty("spark.ui.proxyBase", proxyBase) JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf) } } From 210299caa55efd998e139b162b289d2b35c91fd6 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 15 Jul 2014 09:54:24 +0800 Subject: [PATCH 6/8] review commit --- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b22d9888efd1..ed138cd3f946 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -104,7 +104,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp registerApplicationMaster() waitForSparkMaster() - // setup AmIpFilter for the SparkUI - do this before we start the UI addAmIpFilter() // Allocate all containers From 1fbb9258ac7c3342c0c86d2c7f1e43bb3c95bc12 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 15 Jul 2014 23:00:50 +0800 Subject: [PATCH 7/8] add addAmIpFilter to yarn alpha --- .../spark/deploy/yarn/ExecutorLauncher.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index bfdb6232f511..1bad12f8b098 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -32,6 +32,7 @@ import akka.actor.Terminated import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.scheduler.SplitInfo import org.apache.spark.deploy.SparkHadoopUtil @@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp case x: DisassociatedEvent => logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x } } @@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } waitForSparkMaster() - + addAmIpFilter() // Allocate all containers allocateExecutors() @@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - logInfo("Registering the ApplicationMaster") + val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "") + logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress") val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) .asInstanceOf[RegisterApplicationMasterRequest] appMasterRequest.setApplicationAttemptId(appAttemptId) @@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? - appMasterRequest.setTrackingUrl("") + appMasterRequest.setTrackingUrl(appUIAddress) resourceManager.registerApplicationMaster(appMasterRequest) } + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val proxy = YarnConfiguration.getProxyHostAndPort(conf) + val parts: Array[String] = proxy.split(":") + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val uriBase = "http://" + proxy + proxyBase + val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) + } + private def waitForSparkMaster() { logInfo("Waiting for spark driver to be reachable.") var driverUp = false From 6022bcd4277c6038baf7da42b890d04d14e9d262 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 15 Jul 2014 23:14:22 +0800 Subject: [PATCH 8/8] review commit --- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 1bad12f8b098..a86ad256dfa3 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -192,7 +192,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // add the yarn amIpFilter that Yarn requires for properly securing the UI private def addAmIpFilter() { val proxy = YarnConfiguration.getProxyHostAndPort(conf) - val parts: Array[String] = proxy.split(":") + val parts = proxy.split(":") val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val uriBase = "http://" + proxy + proxyBase val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index ed138cd3f946..5ac95f379872 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -156,7 +156,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // add the yarn amIpFilter that Yarn requires for properly securing the UI private def addAmIpFilter() { val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts : Array[String] = proxy.split(":") + val parts = proxy.split(":") val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val uriBase = "http://" + proxy + proxyBase val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase