Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.executorLost(executorId, SlaveLost(reason))
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
case None => logError(s"Asked to remove non-existent executor $executorId")
case None => logInfo(s"Asked to remove non-existent executor $executorId")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ private[spark] abstract class YarnSchedulerBackend(
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)

case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ private[spark] class ApplicationMaster(
sparkContextRef.compareAndSet(sc, null)
}

private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
private def registerAM(
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: String,
securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()

val appId = client.getAttemptId().getApplicationId().toString()
Expand All @@ -246,6 +250,7 @@ private[spark] class ApplicationMaster(
RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
Expand All @@ -262,17 +267,20 @@ private[spark] class ApplicationMaster(
*
* In cluster mode, the AM and the driver belong to same process
* so the AMEndpoint need not monitor lifecycle of the driver.
*
* @return A reference to the driver's RPC endpoint.
*/
private def runAMEndpoint(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
isClusterMode: Boolean): RpcEndpointRef = {
val driverEndpoint = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
driverEndpoint
}

private def runDriver(securityMgr: SecurityManager): Unit = {
Expand All @@ -290,21 +298,21 @@ private[spark] class ApplicationMaster(
"Timed out waiting for SparkContext.")
} else {
rpcEnv = sc.env.rpcEnv
runAMEndpoint(
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
}

private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.getInt("spark.yarn.am.port", 0)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
waitForSparkDriver()
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)

// In client mode the actor will stop the reporter thread.
reporterThread.join()
Expand Down Expand Up @@ -428,7 +436,7 @@ private[spark] class ApplicationMaster(
}
}

private def waitForSparkDriver(): Unit = {
private def waitForSparkDriver(): RpcEndpointRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import org.apache.log4j.{Level, Logger}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._

/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
Expand All @@ -52,6 +55,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
*/
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: Configuration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
Expand Down Expand Up @@ -88,6 +92,9 @@ private[yarn] class YarnAllocator(
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]

private var numUnexpectedContainerRelease = 0L
private val containerIdToExecutorId = new HashMap[ContainerId, String]

// Executor memory in MB.
protected val executorMemory = args.executorMemory
// Additional memory overhead.
Expand Down Expand Up @@ -184,6 +191,7 @@ private[yarn] class YarnAllocator(
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
containerIdToExecutorId.remove(container.getId)
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
Expand Down Expand Up @@ -383,6 +391,7 @@ private[yarn] class YarnAllocator(

logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId

val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
Expand Down Expand Up @@ -413,12 +422,8 @@ private[yarn] class YarnAllocator(
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId

if (releasedContainers.contains(containerId)) {
// Already marked the container for release, so remove it from
// `releasedContainers`.
releasedContainers.remove(containerId)
} else {
val alreadyReleased = releasedContainers.remove(containerId)
if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
Expand Down Expand Up @@ -460,13 +465,28 @@ private[yarn] class YarnAllocator(

allocatedContainerToHostMap.remove(containerId)
}

containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)

if (!alreadyReleased) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we already in an if (!alreadyReleased) up there? If I'm not wrong we don't need to check this here again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it comes out of first if !alreadyReleased at line 423 of the diff (418 or original) if you expand. The diff here just makes it look like it

// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
numUnexpectedContainerRelease += 1
driverRef.send(RemoveExecutor(eid,
s"Yarn deallocated the executor $eid (container $containerId)"))
}
}
}
}

private def internalReleaseContainer(container: Container): Unit = {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}

private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease

}

private object YarnAllocator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils

Expand All @@ -56,6 +57,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
*/
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
Expand All @@ -73,7 +75,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
securityMgr)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}

import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.mockito.Mockito._

import org.apache.spark.{SecurityManager, SparkFunSuite}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo

class MockResolver extends DNSToSwitchMapping {
Expand Down Expand Up @@ -90,6 +94,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
"--class", "SomeClass")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
sparkConf,
rmClient,
Expand Down Expand Up @@ -230,6 +235,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumPendingAllocate should be (1)
}

test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))

handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())

val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}

test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
Expand Down