Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.ControlThrowable
import scala.util.control.{ControlThrowable, NonFatal}

import com.codahale.metrics.{Gauge, MetricRegistry}

Expand Down Expand Up @@ -245,14 +245,15 @@ private[spark] class ExecutorAllocationManager(
}

/**
* Reset the allocation manager to the initial state. Currently this will only be called in
* yarn-client mode when AM re-registers after a failure.
* Reset the allocation manager when the cluster manager loses track of the driver's state.
* This is currently only done in YARN client mode, when the AM is restarted.
*
* This method forgets about any state about existing executors, and forces the scheduler to
* re-evaluate the number of needed executors the next time it's run.
*/
def reset(): Unit = synchronized {
initializing = true
addTime = 0L
numExecutorsTarget = initialNumExecutors
numExecutorsToAdd = 1

executorsPendingToRemove.clear()
removeTimes.clear()
}
Expand Down Expand Up @@ -376,8 +377,17 @@ private[spark] class ExecutorAllocationManager(
return 0
}

val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
val addRequestAcknowledged = try {
testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
} catch {
case NonFatal(e) =>
// Use INFO level so the error it doesn't show up by default in shells. Errors here are more
// commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of
// noisy output.
logInfo("Error reaching cluster manager.", e)
false
}
if (addRequestAcknowledged) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ private[spark] class ApplicationMaster(
private var nextAllocationInterval = initialAllocationInterval

private var rpcEnv: RpcEnv = null
private var amEndpoint: RpcEndpointRef = _

// In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
private val sparkContextPromise = Promise[SparkContext]()
Expand Down Expand Up @@ -397,32 +396,26 @@ private[spark] class ApplicationMaster(
securityMgr,
localResources)

// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

allocator.allocateResources()
reporterThread = launchReporterThread()
}

/**
* Create an [[RpcEndpoint]] that communicates with the driver.
*
* In cluster mode, the AM and the driver belong to same process
* so the AMEndpoint need not monitor lifecycle of the driver.
*
* @return A reference to the driver's RPC endpoint.
* @return An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
private def runAMEndpoint(
host: String,
port: String,
isClusterMode: Boolean): RpcEndpointRef = {
val driverEndpoint = rpcEnv.setupEndpointRef(
private def createSchedulerRef(host: String, port: String): RpcEndpointRef = {
rpcEnv.setupEndpointRef(
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
driverEndpoint
}

private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
addAmIpFilter(None)
userClassThread = startUserApplication()

// This a bit hacky, but we need to wait until the spark.driver.port property has
Expand All @@ -434,10 +427,9 @@ private[spark] class ApplicationMaster(
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
val driverRef = runAMEndpoint(
val driverRef = createSchedulerRef(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
sc.getConf.get("spark.driver.port"))
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
Expand All @@ -462,7 +454,7 @@ private[spark] class ApplicationMaster(
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
addAmIpFilter(Some(driverRef))
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)

Expand Down Expand Up @@ -610,20 +602,21 @@ private[spark] class ApplicationMaster(

sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false)
createSchedulerRef(driverHost, driverPort.toString)
}

/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
if (isClusterMode) {
System.setProperty("spark.ui.filters", amFilter)
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
driver match {
case Some(d) =>
d.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))

case None =>
System.setProperty("spark.ui.filters", amFilter)
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
}
}

Expand Down Expand Up @@ -694,20 +687,13 @@ private[spark] class ApplicationMaster(
/**
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
private class AMEndpoint(
override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean)
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
extends RpcEndpoint with Logging {

override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
}

override def receive: PartialFunction[Any, Unit] = {
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver.send(x)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RequestExecutors =>
Option(allocator) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ private[spark] abstract class YarnSchedulerBackend(
/** Scheduler extension services. */
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()

// Flag to specify whether this schedulerBackend should be reset.
private var shouldResetOnAmRegister = false

/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
Expand Down Expand Up @@ -262,13 +259,7 @@ private[spark] abstract class YarnSchedulerBackend(
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Option(am)
if (!shouldResetOnAmRegister) {
shouldResetOnAmRegister = true
} else {
// AM is already registered before, this potentially means that AM failed and
// a new one registered after the failure. This will only happen in yarn-client mode.
reset()
}
reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to trigger reset even in the first attempt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There are no executors when the first attempt registers with the driver, so everything reset() does basically amounts to a no-op.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explain, let me try your patch locally.


case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
Expand Down