From 254248a9080b420ef40dad5f72594d655a3b0c45 Mon Sep 17 00:00:00 2001 From: Benoy Antony Date: Fri, 29 Aug 2014 13:41:52 -0700 Subject: [PATCH 1/2] =?UTF-8?q?SPARK-3286=20-=20Cannot=20view=20Applicatio?= =?UTF-8?q?nMaster=20UI=20when=20Yarn=E2=80=99s=20url=20scheme=20is=20http?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 5f66a98e7528b..ce71dc7347006 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -174,7 +174,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc == null) { finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") } else { - registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf)) + registerAM(sc.ui.appUIAddress, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf)) try { userThread.join() } finally { From d109d9a8fded0bf96f43b6cfc4ece42c8f9fbbe0 Mon Sep 17 00:00:00 2001 From: Benoy Antony Date: Fri, 29 Aug 2014 15:44:21 -0700 Subject: [PATCH 2/2] [SPARK-3287] When ResourceManager High Availability is enabled, ApplicationMaster webUI is not displayed --- .../org/apache/spark/ui/JettyUtils.scala | 16 +++++++++--- .../spark/deploy/yarn/YarnRMClientImpl.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 25 +++++++++++++------ .../spark/deploy/yarn/YarnRMClient.scala | 2 +- .../spark/deploy/yarn/YarnRMClientImpl.scala | 4 ++- 5 files changed, 35 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 6b4689291097f..4b82d16326abc 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.{InetSocketAddress, URL, URLDecoder} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} @@ -147,8 +147,8 @@ private[spark] object JettyUtils extends Logging { val holder : FilterHolder = new FilterHolder() holder.setClassName(filter) // Get any parameters for each filter - val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet + var paramName = "spark." + filter + ".params" + var params = conf.get(paramName, "").split(',').map(_.trim()).toSet params.foreach { case param : String => if (!param.isEmpty) { @@ -156,6 +156,16 @@ private[spark] object JettyUtils extends Logging { if (parts.length == 2) holder.setInitParameter(parts(0), parts(1)) } } + paramName = "spark." + filter + ".encodedparams" + params = conf.get(paramName, "").split(',').map(_.trim()).toSet + params.foreach { + case param : String => + if (!param.isEmpty) { + val parts = param.split("=") + if (parts.length == 2) holder.setInitParameter(parts(0), + URLDecoder.decode(parts(1), "UTF-8")) + } + } val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST) handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index cc5392192ec51..acf35a538db1b 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -75,7 +75,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC } override def getProxyHostAndPort(conf: YarnConfiguration) = - YarnConfiguration.getProxyHostAndPort(conf) + List(YarnConfiguration.getProxyHostAndPort(conf)) override def getMaxRegAttempts(conf: YarnConfiguration) = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ce71dc7347006..67bfbdb4a0462 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.IOException import java.net.Socket +import java.net.URLEncoder import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConversions._ @@ -32,6 +33,7 @@ import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil @@ -324,17 +326,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, /** Add the Yarn IP filter that is required for properly securing the UI. */ private def addAmIpFilter() = { val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" - val proxy = client.getProxyHostAndPort(yarnConf) - val parts = proxy.split(":") - val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - val uriBase = "http://" + proxy + proxyBase - val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - + val proxies = client.getProxyHostsAndPorts(yarnConf) + var sbProxies = new StringBuilder + var sbUrlBases = new StringBuilder + for (proxy <- proxies) { + sbProxies ++= proxy.split(":")(0) + sbProxies +=',' + sbUrlBases ++= WebAppUtils.getHttpSchemePrefix(yarnConf) + sbUrlBases ++= proxy + sbUrlBases ++= System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + sbUrlBases +=',' + } + var params = "PROXY_HOSTS=" + URLEncoder.encode(sbProxies.toString(), "UTF-8") + "," + params ++= "PROXY_URI_BASES=" + URLEncoder.encode(sbUrlBases.toString(), "UTF-8") if (isDriver) { System.setProperty("spark.ui.filters", amFilter) - System.setProperty(s"spark.$amFilter.params", params) + System.setProperty(s"spark.$amFilter.encodedparams", params) } else { - actor ! AddWebUIFilter(amFilter, params, proxyBase) + actor ! AddWebUIFilter(amFilter, params, sbUrlBases.toString()) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 922d7d1a854a5..2becdd2b5b20c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -59,7 +59,7 @@ trait YarnRMClient { def getAttemptId(): ApplicationAttemptId /** Returns the RM's proxy host and port. */ - def getProxyHostAndPort(conf: YarnConfiguration): String + def getProxyHostsAndPorts(conf: YarnConfiguration): scala.collection.mutable.Buffer[String] /** Returns the maximum number of attempts to register the AM. */ def getMaxRegAttempts(conf: YarnConfiguration): Int diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index e8b8d9bc722bd..041056a642959 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import scala.collection.{Map, Set} +import scala.collection.JavaConversions._ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -68,7 +69,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC appAttemptId } - override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf) + override def getProxyHostsAndPorts(conf: YarnConfiguration) = + WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) override def getMaxRegAttempts(conf: YarnConfiguration) = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)