From e00b6563aeb5569e0aeb391d7fe81fc82de28582 Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 31 May 2014 00:04:27 +0800 Subject: [PATCH 1/5] In some cases, yarn does not automatically restart the container --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++++ .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c1dfe3f53b40..174cc9b0ebc1 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -261,6 +261,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached") } + val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed + if (numExecutorsFailed > 0) { + yarnAllocator.addResourceRequests(numExecutorsFailed) + } yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a4ce8766d347..3a4cd02c28b1 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -206,6 +206,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp yarnAllocator.addResourceRequests(args.numExecutors) while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { yarnAllocator.allocateResources() + val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed + if (numExecutorsFailed > 0) { + yarnAllocator.addResourceRequests(numExecutorsFailed) + } Thread.sleep(100) } From 3c464bd36ba26343c9251eff31cdd4960c2ab8ff Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 31 May 2014 13:27:52 +0800 Subject: [PATCH 2/5] add time limit to allocateExecutors --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 8 ++++++-- .../org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 6 +++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 174cc9b0ebc1..7c09b685182a 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -256,18 +256,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. - while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { + val startTime = System.currentTimeMillis() + var usedTime = 0L + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors + && userThread.isAlive) && (usedTime < 1000L * 60 * 10)) { if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached") } + yarnAllocator.allocateResources() val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed if (numExecutorsFailed > 0) { yarnAllocator.addResourceRequests(numExecutorsFailed) } - yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) + usedTime = System.currentTimeMillis() - startTime } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 3a4cd02c28b1..083d5fb56074 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -204,13 +204,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numExecutors) - while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + val startTime = System.currentTimeMillis() + var usedTime = 0L + while (((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) + && (usedTime < 1000L * 60 * 10)) { yarnAllocator.allocateResources() val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed if (numExecutorsFailed > 0) { yarnAllocator.addResourceRequests(numExecutorsFailed) } Thread.sleep(100) + usedTime = System.currentTimeMillis() - startTime } logInfo("All executors have launched.") From aff827cff3b29bdfeadc2d1f704e69bf61889a9b Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 5 Jun 2014 17:31:32 +0800 Subject: [PATCH 3/5] review commit --- .../spark/deploy/yarn/ApplicationMaster.scala | 48 +++++++++---------- .../spark/deploy/yarn/ExecutorLauncher.scala | 31 +++++------- 2 files changed, 34 insertions(+), 45 deletions(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 7c09b685182a..844290f2edf7 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -252,26 +252,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, try { logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. - val startTime = System.currentTimeMillis() - var usedTime = 0L - while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors - && userThread.isAlive) && (usedTime < 1000L * 60 * 10)) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } + while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { yarnAllocator.allocateResources() - val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed - if (numExecutorsFailed > 0) { - yarnAllocator.addResourceRequests(numExecutorsFailed) - } + checkNumExecutorsFailed() + allocateMissingExecutor() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) - usedTime = System.currentTimeMillis() - startTime } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, @@ -297,23 +285,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } + } + + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } + } + private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + checkNumExecutorsFailed() + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 083d5fb56074..24e34227fd7a 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -200,25 +200,24 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numExecutors) - val startTime = System.currentTimeMillis() - var usedTime = 0L - while (((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) - && (usedTime < 1000L * 60 * 10)) { + while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { yarnAllocator.allocateResources() - val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed - if (numExecutorsFailed > 0) { - yarnAllocator.addResourceRequests(numExecutorsFailed) - } + allocateMissingExecutor() Thread.sleep(100) - usedTime = System.currentTimeMillis() - startTime } logInfo("All executors have launched.") + } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } } // TODO: We might want to extend this to allocate more containers in case they die ! @@ -228,13 +227,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val t = new Thread { override def run() { while (!driverClosed) { - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } From 32ac7af6f2c6700df5e187bbce51ce10035aadca Mon Sep 17 00:00:00 2001 From: witgo Date: Mon, 9 Jun 2014 23:19:42 +0800 Subject: [PATCH 4/5] review commit --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 844290f2edf7..33a60d978c58 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -253,11 +253,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { - yarnAllocator.allocateResources() checkNumExecutorsFailed() allocateMissingExecutor() + yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } From bc3aa660ad28955306b7756e506ead083e4c879d Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 10 Jun 2014 09:40:48 +0800 Subject: [PATCH 5/5] review commit --- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 24e34227fd7a..d93e5bb0225d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -201,9 +201,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { - yarnAllocator.allocateResources() allocateMissingExecutor() + yarnAllocator.allocateResources() Thread.sleep(100) }