Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure

// Exists the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
&& !isFinished) {
checkNumExecutorsFailed()
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
Expand Down Expand Up @@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
while (userThread.isAlive && !isFinished) {
checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
Expand All @@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}

private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
logInfo("max number of executor failures reached")
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
// make sure to stop the user thread
val sparkContext = ApplicationMaster.sparkContextRef.get()
if (sparkContext != null) {
logInfo("Invoking sc stop from checkNumExecutorsFailed")
sparkContext.stop()
} else {
logError("sparkContext is null when should shutdown")
}
}
}

private def sendProgress() {
logDebug("Sending progress")
// Simulated with an allocate request with no nodes requested ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
!isFinished) {
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
checkNumExecutorsFailed()
Expand All @@ -271,7 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

val t = new Thread {
override def run() {
while (!driverClosed) {
while (!driverClosed && !isFinished) {
checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.

var iters = 0
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
&& !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
iters += 1
}
}
logInfo("All executors have launched.")
Expand All @@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
logInfo("max number of executor failures reached")
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
// make sure to stop the user thread
val sparkContext = ApplicationMaster.sparkContextRef.get()
if (sparkContext != null) {
logInfo("Invoking sc stop from checkNumExecutorsFailed")
sparkContext.stop()
} else {
logError("sparkContext is null when should shutdown")
}
}
}

Expand All @@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

val t = new Thread {
override def run() {
while (userThread.isAlive) {
while (userThread.isAlive && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Wait until all containers have launched
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
!isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Expand Down Expand Up @@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

val t = new Thread {
override def run() {
while (!driverClosed) {
while (!driverClosed && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
Expand Down