Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import java.net.{InetSocketAddress, URL, URLDecoder}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

Expand Down Expand Up @@ -147,15 +147,25 @@ private[spark] object JettyUtils extends Logging {
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
// Get any parameters for each filter
val paramName = "spark." + filter + ".params"
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
var paramName = "spark." + filter + ".params"
var params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
case param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
paramName = "spark." + filter + ".encodedparams"
params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
case param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0),
URLDecoder.decode(parts(1), "UTF-8"))
}
}
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
}

override def getProxyHostAndPort(conf: YarnConfiguration) =
YarnConfiguration.getProxyHostAndPort(conf)
List(YarnConfiguration.getProxyHostAndPort(conf))

override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.io.IOException
import java.net.Socket
import java.net.URLEncoder
import java.util.concurrent.atomic.AtomicReference

import scala.collection.JavaConversions._
Expand All @@ -32,6 +33,7 @@ import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.webapp.util.WebAppUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -174,7 +176,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
registerAM(sc.ui.appUIAddress, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought when I originally added this support that yarn didn't like having the scheme on it. Perhaps they fixed in in hadoop 2.x when they added support for https. I need to investigate that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see YARN-1553.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that patch is pretty recent. If adding the scheme breaks Yarn 2.3 and older, then Spark should detect which version is running before setting the address.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check was originally introduced in YARN-1203. So from Hadoop-2.2 onwards , it uses the scheme passed by AppMaster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what @tgravescs said applies; this code needs to work with older Hadoop versions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for older versions , we need to pass the url as before and for newer versions, we pass the url with the scheme. What's the best way to check the Hadoop/Yarn version? Could you please point me to code snippet ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me investigate a bit more and I'll get back to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes so unfortunately I went and verified that hadoop 0.23 can't have the scheme. It automatically adds the scheme itself (just supports http://).

This is already calling down into stable/alpha versions of YarnRMClientImpl.register. We could potentially pass whole uri, including scheme and then have the alpha version strip off the scheme.

Also you modified the call in runDriver to registerAM which covers client mode, I assume the same change needs to happen in runExecutorLauncher for yarn client mode.

try {
userThread.join()
} finally {
Expand Down Expand Up @@ -324,17 +326,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val proxy = client.getProxyHostAndPort(yarnConf)
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase

val proxies = client.getProxyHostsAndPorts(yarnConf)
var sbProxies = new StringBuilder
var sbUrlBases = new StringBuilder
for (proxy <- proxies) {
sbProxies ++= proxy.split(":")(0)
sbProxies +=','
sbUrlBases ++= WebAppUtils.getHttpSchemePrefix(yarnConf)
sbUrlBases ++= proxy
sbUrlBases ++= System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
sbUrlBases +=','
}
var params = "PROXY_HOSTS=" + URLEncoder.encode(sbProxies.toString(), "UTF-8") + ","
params ++= "PROXY_URI_BASES=" + URLEncoder.encode(sbUrlBases.toString(), "UTF-8")
if (isDriver) {
System.setProperty("spark.ui.filters", amFilter)
System.setProperty(s"spark.$amFilter.params", params)
System.setProperty(s"spark.$amFilter.encodedparams", params)
} else {
actor ! AddWebUIFilter(amFilter, params, proxyBase)
actor ! AddWebUIFilter(amFilter, params, sbUrlBases.toString())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ trait YarnRMClient {
def getAttemptId(): ApplicationAttemptId

/** Returns the RM's proxy host and port. */
def getProxyHostAndPort(conf: YarnConfiguration): String
def getProxyHostsAndPorts(conf: YarnConfiguration): scala.collection.mutable.Buffer[String]

/** Returns the maximum number of attempts to register the AM. */
def getMaxRegAttempts(conf: YarnConfiguration): Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn

import scala.collection.{Map, Set}
import scala.collection.JavaConversions._

import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
Expand Down Expand Up @@ -68,7 +69,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}

override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
override def getProxyHostsAndPorts(conf: YarnConfiguration) =
WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)

override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
Expand Down