From 7b93a846e258e7d86baa0b271ee01d1823a5dcc4 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 14 Sep 2015 10:47:18 +0800 Subject: [PATCH 1/5] add function --- .../org/apache/spark/ExecutorAllocationManager.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..cb04b5810b603 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -231,6 +231,14 @@ private[spark] class ExecutorAllocationManager( } executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) } + + /** + * Change the value of numExecutorsTarget. + */ + def reSetNumExecutorsTarget(): Unit ={ + logDebug(s"Now reset the value of numExecutorsTarget.") + numExecutorsTarget = conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) + } /** * Stop the allocation manager. From d7ed6dc981551a9a31d1a0088850fc7bedfa6ed2 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 14 Sep 2015 10:48:21 +0800 Subject: [PATCH 2/5] using function --- .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 6a4b536dee191..fd282254f3231 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -91,6 +91,7 @@ private[spark] abstract class YarnSchedulerBackend( filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } } + scheduler.sc.executorAllocationManager.foreach(_.reSetNumExecutorsTarget()) } override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { From 564725b8f222b97ae318f74ed5490283b9069cd4 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 14 Sep 2015 11:02:29 +0800 Subject: [PATCH 3/5] a space --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index cb04b5810b603..a23a8267300f0 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -235,7 +235,7 @@ private[spark] class ExecutorAllocationManager( /** * Change the value of numExecutorsTarget. */ - def reSetNumExecutorsTarget(): Unit ={ + def reSetNumExecutorsTarget(): Unit = { logDebug(s"Now reset the value of numExecutorsTarget.") numExecutorsTarget = conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) } From 209f4da526fedbfeb2159f8a6920c7ae90989e68 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 14 Sep 2015 11:10:45 +0800 Subject: [PATCH 4/5] scala style, thanks --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index a23a8267300f0..1548583cca74a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -231,7 +231,7 @@ private[spark] class ExecutorAllocationManager( } executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) } - + /** * Change the value of numExecutorsTarget. */ From 258f146bcd25faddee91e4ac33179fafba22cee2 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Thu, 17 Sep 2015 12:17:28 +0800 Subject: [PATCH 5/5] change --- .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index fd282254f3231..fb3379318fbff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -91,7 +91,6 @@ private[spark] abstract class YarnSchedulerBackend( filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } } - scheduler.sc.executorAllocationManager.foreach(_.reSetNumExecutorsTarget()) } override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { @@ -171,6 +170,7 @@ private[spark] abstract class YarnSchedulerBackend( case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") amEndpoint = Some(am) + scheduler.sc.executorAllocationManager.foreach(_.reSetNumExecutorsTarget()) case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase)