Skip to content

Commit 884ca71

Browse files
witgotgravescs
authored andcommitted
[SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container
Author: witgo <[email protected]> Closes apache#921 from witgo/allocateExecutors and squashes the following commits: bc3aa66 [witgo] review commit 8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 32ac7af [witgo] review commit 056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 04c6f7e [witgo] Merge branch 'master' into allocateExecutors aff827c [witgo] review commit 5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 1faf4f4 [witgo] Merge branch 'master' into allocateExecutors 3c464bd [witgo] add time limit to allocateExecutors e00b656 [witgo] In some cases, yarn does not automatically restart the container
1 parent a9a461c commit 884ca71

File tree

2 files changed

+34
-27
lines changed

2 files changed

+34
-27
lines changed

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
252252
try {
253253
logInfo("Allocating " + args.numExecutors + " executors.")
254254
// Wait until all containers have finished
255-
// TODO: This is a bit ugly. Can we make it nicer?
256-
// TODO: Handle container failure
257255
yarnAllocator.addResourceRequests(args.numExecutors)
256+
yarnAllocator.allocateResources()
258257
// Exits the loop if the user thread exits.
259258
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
260-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
261-
finishApplicationMaster(FinalApplicationStatus.FAILED,
262-
"max number of executor failures reached")
263-
}
259+
checkNumExecutorsFailed()
260+
allocateMissingExecutor()
264261
yarnAllocator.allocateResources()
265262
ApplicationMaster.incrementAllocatorLoop(1)
266263
Thread.sleep(100)
@@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
289286
}
290287
}
291288

289+
private def allocateMissingExecutor() {
290+
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
291+
yarnAllocator.getNumPendingAllocate
292+
if (missingExecutorCount > 0) {
293+
logInfo("Allocating %d containers to make up for (potentially) lost containers".
294+
format(missingExecutorCount))
295+
yarnAllocator.addResourceRequests(missingExecutorCount)
296+
}
297+
}
298+
299+
private def checkNumExecutorsFailed() {
300+
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
301+
finishApplicationMaster(FinalApplicationStatus.FAILED,
302+
"max number of executor failures reached")
303+
}
304+
}
305+
292306
private def launchReporterThread(_sleepTime: Long): Thread = {
293307
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
294308

295309
val t = new Thread {
296310
override def run() {
297311
while (userThread.isAlive) {
298-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
299-
finishApplicationMaster(FinalApplicationStatus.FAILED,
300-
"max number of executor failures reached")
301-
}
302-
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
303-
yarnAllocator.getNumPendingAllocate
304-
if (missingExecutorCount > 0) {
305-
logInfo("Allocating %d containers to make up for (potentially) lost containers".
306-
format(missingExecutorCount))
307-
yarnAllocator.addResourceRequests(missingExecutorCount)
308-
}
312+
checkNumExecutorsFailed()
313+
allocateMissingExecutor()
309314
sendProgress()
310315
Thread.sleep(sleepTime)
311316
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
200200

201201
logInfo("Allocating " + args.numExecutors + " executors.")
202202
// Wait until all containers have finished
203-
// TODO: This is a bit ugly. Can we make it nicer?
204-
// TODO: Handle container failure
205-
206203
yarnAllocator.addResourceRequests(args.numExecutors)
204+
yarnAllocator.allocateResources()
207205
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
206+
allocateMissingExecutor()
208207
yarnAllocator.allocateResources()
209208
Thread.sleep(100)
210209
}
211210

212211
logInfo("All executors have launched.")
212+
}
213213

214+
private def allocateMissingExecutor() {
215+
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
216+
yarnAllocator.getNumPendingAllocate
217+
if (missingExecutorCount > 0) {
218+
logInfo("Allocating %d containers to make up for (potentially) lost containers".
219+
format(missingExecutorCount))
220+
yarnAllocator.addResourceRequests(missingExecutorCount)
221+
}
214222
}
215223

216224
// TODO: We might want to extend this to allocate more containers in case they die !
@@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
220228
val t = new Thread {
221229
override def run() {
222230
while (!driverClosed) {
223-
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
224-
yarnAllocator.getNumPendingAllocate
225-
if (missingExecutorCount > 0) {
226-
logInfo("Allocating %d containers to make up for (potentially) lost containers".
227-
format(missingExecutorCount))
228-
yarnAllocator.addResourceRequests(missingExecutorCount)
229-
}
231+
allocateMissingExecutor()
230232
sendProgress()
231233
Thread.sleep(sleepTime)
232234
}

0 commit comments

Comments
 (0)