Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
checkNumExecutorsFailed()
allocateMissingExecutor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move these 2 lines up above the allocateResources() call. That way max failed check happens first, then we would add in any missing executors, and then call allocate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that only after the call yarnAllocator.allocateResources(),yarnAllocator.getNumExecutorsFailed value will change.

This is appropriate: after changing the value of yarnAllocator.getNumExecutorsFailed, immediately call checkNumExecutorsFailed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct that it isn't updated until that is called and in this has it really doesn't matter to much since we are in while loop and then after this we will just go to the reporter thread to continue to loop. I just find it odd to add in missing executors but then not call allocate immediately afterwards (we sleep and then we possibly break from the loop). For that reason I would prefer these to be moved up. Yes we have one extra unneeded call to them, but I think it flows better and will be more resilient to other code changes in the future.
Also note that launchReporterThread runs the logic the same way so right now you are doing the failed check and and allocateMissing twice in a row on the first time it goes into the reporter thread.

If you really have objections to that then we could make one call of allocateResources() outside of the while loop and then do the checkFailed, allocateMissing, and then allocateResources in side the while loop.

yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
Expand Down Expand Up @@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}

private def allocateMissingExecutor() {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
}

private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}

private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime

val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
checkNumExecutorsFailed()
allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure

yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
allocateMissingExecutor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here can you move this up above allocateResources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change this to have similar logic - allocate outside loop, then inside loop add missing and then allocate.

yarnAllocator.allocateResources()
Thread.sleep(100)
}

logInfo("All executors have launched.")
}

private def allocateMissingExecutor() {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
}

// TODO: We might want to extend this to allocate more containers in case they die !
Expand All @@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
Expand Down