Skip to content

Commit 04dc112

Browse files
author
Marcelo Vanzin
committed
Use driver <-> AM communication to send "remove executor" request.
1 parent 8855b97 commit 04dc112

File tree

6 files changed

+44
-47
lines changed

6 files changed

+44
-47
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ private[spark] abstract class YarnSchedulerBackend(
108108
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
109109
addWebUIFilter(filterName, filterParams, proxyBase)
110110

111+
case RemoveExecutor(executorId, reason) =>
112+
removeExecutor(executorId, reason)
111113
}
112114

113115
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ private[spark] class ApplicationMaster(
7777
@volatile private var allocator: YarnAllocator = _
7878
private val allocatorLock = new Object()
7979

80-
@volatile private var backend: CoarseGrainedSchedulerBackend = _
81-
8280
// Fields used in client mode.
8381
private var rpcEnv: RpcEnv = null
8482
private var amEndpoint: RpcEndpointRef = _
@@ -220,20 +218,22 @@ private[spark] class ApplicationMaster(
220218
}
221219
}
222220

223-
private def sparkContextInitialized(sc: SparkContext, backend: CoarseGrainedSchedulerBackend) = {
221+
private def sparkContextInitialized(sc: SparkContext) = {
224222
sparkContextRef.synchronized {
225223
sparkContextRef.compareAndSet(null, sc)
226224
sparkContextRef.notifyAll()
227225
}
228-
this.backend = backend
229-
if (null != allocator) allocator.setScheduler(backend)
230226
}
231227

232228
private def sparkContextStopped(sc: SparkContext) = {
233229
sparkContextRef.compareAndSet(sc, null)
234230
}
235231

236-
private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
232+
private def registerAM(
233+
_rpcEnv: RpcEnv,
234+
driverRef: RpcEndpointRef,
235+
uiAddress: String,
236+
securityMgr: SecurityManager) = {
237237
val sc = sparkContextRef.get()
238238

239239
val appId = client.getAttemptId().getApplicationId().toString()
@@ -250,13 +250,13 @@ private[spark] class ApplicationMaster(
250250
RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
251251
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
252252
allocator = client.register(driverUrl,
253+
driverRef,
253254
yarnConf,
254255
_sparkConf,
255256
if (sc != null) sc.preferredNodeLocationData else Map(),
256257
uiAddress,
257258
historyAddress,
258259
securityMgr)
259-
if (null != backend) allocator.setScheduler(backend)
260260

261261
allocator.allocateResources()
262262
reporterThread = launchReporterThread()
@@ -267,17 +267,20 @@ private[spark] class ApplicationMaster(
267267
*
268268
* In cluster mode, the AM and the driver belong to same process
269269
* so the AMEndpoint need not monitor lifecycle of the driver.
270+
*
271+
* @return A reference to the driver's RPC endpoint.
270272
*/
271273
private def runAMEndpoint(
272274
host: String,
273275
port: String,
274-
isClusterMode: Boolean): Unit = {
276+
isClusterMode: Boolean): RpcEndpointRef = {
275277
val driverEndpoint = rpcEnv.setupEndpointRef(
276278
SparkEnv.driverActorSystemName,
277279
RpcAddress(host, port.toInt),
278280
YarnSchedulerBackend.ENDPOINT_NAME)
279281
amEndpoint =
280282
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
283+
driverEndpoint
281284
}
282285

283286
private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -295,21 +298,21 @@ private[spark] class ApplicationMaster(
295298
"Timed out waiting for SparkContext.")
296299
} else {
297300
rpcEnv = sc.env.rpcEnv
298-
runAMEndpoint(
301+
val driverRef = runAMEndpoint(
299302
sc.getConf.get("spark.driver.host"),
300303
sc.getConf.get("spark.driver.port"),
301304
isClusterMode = true)
302-
registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
305+
registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
303306
userClassThread.join()
304307
}
305308
}
306309

307310
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
308311
val port = sparkConf.getInt("spark.yarn.am.port", 0)
309312
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
310-
waitForSparkDriver()
313+
val driverRef = waitForSparkDriver()
311314
addAmIpFilter()
312-
registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
315+
registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
313316

314317
// In client mode the actor will stop the reporter thread.
315318
reporterThread.join()
@@ -433,7 +436,7 @@ private[spark] class ApplicationMaster(
433436
}
434437
}
435438

436-
private def waitForSparkDriver(): Unit = {
439+
private def waitForSparkDriver(): RpcEndpointRef = {
437440
logInfo("Waiting for Spark driver to be reachable.")
438441
var driverUp = false
439442
val hostport = args.userArgs(0)
@@ -617,9 +620,8 @@ object ApplicationMaster extends Logging {
617620
}
618621
}
619622

620-
private[spark] def sparkContextInitialized(sc: SparkContext,
621-
backend: CoarseGrainedSchedulerBackend): Unit = {
622-
master.sparkContextInitialized(sc, backend)
623+
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
624+
master.sparkContextInitialized(sc)
623625
}
624626

625627
private[spark] def sparkContextStopped(sc: SparkContext): Boolean = {

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ import org.apache.log4j.{Level, Logger}
3636

3737
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3838
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
39+
import org.apache.spark.rpc.RpcEndpointRef
3940
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
41+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
4042

4143
/**
4244
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -53,6 +55,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
5355
*/
5456
private[yarn] class YarnAllocator(
5557
driverUrl: String,
58+
driverRef: RpcEndpointRef,
5659
conf: Configuration,
5760
sparkConf: SparkConf,
5861
amClient: AMRMClient[ContainerRequest],
@@ -90,7 +93,6 @@ private[yarn] class YarnAllocator(
9093
private[yarn] val executorIdToContainer = new HashMap[String, Container]
9194

9295
private var numUnexpectedContainerRelease = 0L
93-
private var backend: CoarseGrainedSchedulerBackend = _
9496
private val containerIdToExecutorId = new HashMap[ContainerId, String]
9597

9698
// Executor memory in MB.
@@ -390,13 +392,8 @@ private[yarn] class YarnAllocator(
390392
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
391393
for (completedContainer <- completedContainers) {
392394
val containerId = completedContainer.getContainerId
393-
394-
var needNotify = false
395-
if (releasedContainers.contains(containerId)) {
396-
// Already marked the container for release, so remove it from
397-
// `releasedContainers`.
398-
releasedContainers.remove(containerId)
399-
} else {
395+
val alreadyReleased = releasedContainers.remove(containerId)
396+
if (!alreadyReleased) {
400397
// Decrement the number of executors running. The next iteration of
401398
// the ApplicationMaster's reporting thread will take care of allocating.
402399
numExecutorsRunning -= 1
@@ -423,7 +420,6 @@ private[yarn] class YarnAllocator(
423420
". Diagnostics: " + completedContainer.getDiagnostics)
424421
numExecutorsFailed += 1
425422
}
426-
needNotify = true
427423
}
428424

429425
if (allocatedContainerToHostMap.containsKey(containerId)) {
@@ -440,13 +436,16 @@ private[yarn] class YarnAllocator(
440436
allocatedContainerToHostMap.remove(containerId)
441437
}
442438

443-
val executorIdOpt = containerIdToExecutorId.remove(containerId)
444-
if (executorIdOpt.isDefined) executorIdToContainer.remove(executorIdOpt.get)
439+
containerIdToExecutorId.remove(containerId).foreach { eid =>
440+
executorIdToContainer.remove(eid)
445441

446-
if (needNotify && executorIdOpt.isDefined) {
447-
// The executor could have gone away (like no route to host, node failure, etc)
448-
// Notify backend about the failure of the executor
449-
notifyBackend(executorIdOpt.get, containerId)
442+
if (!alreadyReleased) {
443+
// The executor could have gone away (like no route to host, node failure, etc)
444+
// Notify backend about the failure of the executor
445+
numUnexpectedContainerRelease += 1
446+
driverRef.send(RemoveExecutor(eid,
447+
s"Yarn deallocated the executor $eid (container $containerId)"))
448+
}
450449
}
451450
}
452451
}
@@ -456,19 +455,8 @@ private[yarn] class YarnAllocator(
456455
amClient.releaseAssignedContainer(container.getId())
457456
}
458457

459-
private[yarn] def notifyBackend(executorId: String, containerId: ContainerId): Unit = {
460-
numUnexpectedContainerRelease += 1
461-
if (null != backend) {
462-
backend.removeExecutor(executorId,
463-
"Yarn deallocated the executor (" + executorId + ") container " + containerId)
464-
}
465-
}
466-
467458
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
468459

469-
private[yarn] def setScheduler(backend: CoarseGrainedSchedulerBackend): Unit = synchronized {
470-
this.backend = backend
471-
}
472460
}
473461

474462
private object YarnAllocator {

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
3333
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
3434

3535
import org.apache.spark.{Logging, SecurityManager, SparkConf}
36+
import org.apache.spark.rpc.RpcEndpointRef
3637
import org.apache.spark.scheduler.SplitInfo
3738
import org.apache.spark.util.Utils
3839

@@ -56,6 +57,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
5657
*/
5758
def register(
5859
driverUrl: String,
60+
driverRef: RpcEndpointRef,
5961
conf: YarnConfiguration,
6062
sparkConf: SparkConf,
6163
preferredNodeLocations: Map[String, Set[SplitInfo]],
@@ -73,7 +75,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
7375
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
7476
registered = true
7577
}
76-
new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
78+
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
79+
securityMgr)
7780
}
7881

7982
/**

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnSchedule
2929
logInfo("Created YarnClusterScheduler")
3030

3131
override def postStartHook() {
32-
ApplicationMaster.sparkContextInitialized(sc,
33-
this.backend.asInstanceOf[CoarseGrainedSchedulerBackend])
32+
ApplicationMaster.sparkContextInitialized(sc)
3433
super.postStartHook()
3534
logInfo("YarnClusterScheduler.postStartHook done")
3635
}

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ import org.apache.hadoop.yarn.api.records._
2626
import org.apache.hadoop.yarn.client.api.AMRMClient
2727
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
2828

29+
import org.scalatest.{BeforeAndAfterEach, Matchers}
30+
import org.mockito.Mockito._
31+
2932
import org.apache.spark.{SecurityManager, SparkFunSuite}
3033
import org.apache.spark.SparkConf
3134
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
3235
import org.apache.spark.deploy.yarn.YarnAllocator._
36+
import org.apache.spark.rpc.RpcEndpointRef
3337
import org.apache.spark.scheduler.SplitInfo
3438

35-
import org.scalatest.{BeforeAndAfterEach, Matchers}
36-
3739
class MockResolver extends DNSToSwitchMapping {
3840

3941
override def resolve(names: JList[String]): JList[String] = {
@@ -91,6 +93,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
9193
"--class", "SomeClass")
9294
new YarnAllocator(
9395
"not used",
96+
mock(classOf[RpcEndpointRef]),
9497
conf,
9598
sparkConf,
9699
rmClient,

0 commit comments

Comments
 (0)