Skip to content

Commit aff827c

Browse files
committed
review commit
1 parent 5c376e0 commit aff827c

File tree

2 files changed

+34
-45
lines changed

2 files changed

+34
-45
lines changed

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -252,26 +252,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
252252
try {
253253
logInfo("Allocating " + args.numExecutors + " executors.")
254254
// Wait until all containers have finished
255-
// TODO: This is a bit ugly. Can we make it nicer?
256-
// TODO: Handle container failure
257255
yarnAllocator.addResourceRequests(args.numExecutors)
258256
// Exits the loop if the user thread exits.
259-
val startTime = System.currentTimeMillis()
260-
var usedTime = 0L
261-
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors
262-
&& userThread.isAlive) && (usedTime < 1000L * 60 * 10)) {
263-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
264-
finishApplicationMaster(FinalApplicationStatus.FAILED,
265-
"max number of executor failures reached")
266-
}
257+
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
267258
yarnAllocator.allocateResources()
268-
val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed
269-
if (numExecutorsFailed > 0) {
270-
yarnAllocator.addResourceRequests(numExecutorsFailed)
271-
}
259+
checkNumExecutorsFailed()
260+
allocateMissingExecutor()
272261
ApplicationMaster.incrementAllocatorLoop(1)
273262
Thread.sleep(100)
274-
usedTime = System.currentTimeMillis() - startTime
275263
}
276264
} finally {
277265
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
@@ -297,23 +285,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
297285
}
298286
}
299287

288+
private def allocateMissingExecutor() {
289+
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
290+
yarnAllocator.getNumPendingAllocate
291+
if (missingExecutorCount > 0) {
292+
logInfo("Allocating %d containers to make up for (potentially) lost containers".
293+
format(missingExecutorCount))
294+
yarnAllocator.addResourceRequests(missingExecutorCount)
295+
}
296+
}
297+
298+
private def checkNumExecutorsFailed() {
299+
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
300+
finishApplicationMaster(FinalApplicationStatus.FAILED,
301+
"max number of executor failures reached")
302+
}
303+
}
304+
300305
private def launchReporterThread(_sleepTime: Long): Thread = {
301306
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
302307

303308
val t = new Thread {
304309
override def run() {
305310
while (userThread.isAlive) {
306-
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
307-
finishApplicationMaster(FinalApplicationStatus.FAILED,
308-
"max number of executor failures reached")
309-
}
310-
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
311-
yarnAllocator.getNumPendingAllocate
312-
if (missingExecutorCount > 0) {
313-
logInfo("Allocating %d containers to make up for (potentially) lost containers".
314-
format(missingExecutorCount))
315-
yarnAllocator.addResourceRequests(missingExecutorCount)
316-
}
311+
checkNumExecutorsFailed()
312+
allocateMissingExecutor()
317313
sendProgress()
318314
Thread.sleep(sleepTime)
319315
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -200,25 +200,24 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
200200

201201
logInfo("Allocating " + args.numExecutors + " executors.")
202202
// Wait until all containers have finished
203-
// TODO: This is a bit ugly. Can we make it nicer?
204-
// TODO: Handle container failure
205-
206203
yarnAllocator.addResourceRequests(args.numExecutors)
207-
val startTime = System.currentTimeMillis()
208-
var usedTime = 0L
209-
while (((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed))
210-
&& (usedTime < 1000L * 60 * 10)) {
204+
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
211205
yarnAllocator.allocateResources()
212-
val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed
213-
if (numExecutorsFailed > 0) {
214-
yarnAllocator.addResourceRequests(numExecutorsFailed)
215-
}
206+
allocateMissingExecutor()
216207
Thread.sleep(100)
217-
usedTime = System.currentTimeMillis() - startTime
218208
}
219209

220210
logInfo("All executors have launched.")
211+
}
221212

213+
private def allocateMissingExecutor() {
214+
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
215+
yarnAllocator.getNumPendingAllocate
216+
if (missingExecutorCount > 0) {
217+
logInfo("Allocating %d containers to make up for (potentially) lost containers".
218+
format(missingExecutorCount))
219+
yarnAllocator.addResourceRequests(missingExecutorCount)
220+
}
222221
}
223222

224223
// TODO: We might want to extend this to allocate more containers in case they die !
@@ -228,13 +227,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
228227
val t = new Thread {
229228
override def run() {
230229
while (!driverClosed) {
231-
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
232-
yarnAllocator.getNumPendingAllocate
233-
if (missingExecutorCount > 0) {
234-
logInfo("Allocating %d containers to make up for (potentially) lost containers".
235-
format(missingExecutorCount))
236-
yarnAllocator.addResourceRequests(missingExecutorCount)
237-
}
230+
allocateMissingExecutor()
238231
sendProgress()
239232
Thread.sleep(sleepTime)
240233
}

0 commit comments

Comments
 (0)