From f44e4845119b994ea15f4ffdbccd389b8fd5d854 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 24 Mar 2023 11:06:27 +0800 Subject: [PATCH 1/5] Support eagerly kill redundant executors --- docs/extensions/engines/spark/rules.md | 2 + .../kyuubi/sql/KyuubiSparkSQLExtension.scala | 3 + .../spark/FinalStageResourceManager.scala | 199 ++++++++++++++++++ .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 16 ++ .../sql/KyuubiSparkSQLExtensionTest.scala | 2 + 5 files changed, 222 insertions(+) create mode 100644 extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md index 5c8c0486920..5ba154ce262 100644 --- a/docs/extensions/engines/spark/rules.md +++ b/docs/extensions/engines/spark/rules.md @@ -83,4 +83,6 @@ Kyuubi provides some configs to make these feature easy to use. | spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | | spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | | spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 | +| spark.sql.finalWriteStageEagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 | +| spark.sql.finalWriteStageNumPartitionFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 | diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index ef9da41be13..0d034c26cf1 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.sql +import org.apache.spark.FinalStageResourceManager import org.apache.spark.sql.SparkSessionExtensions import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy} @@ -39,5 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { // watchdog extension extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) extensions.injectPlannerStrategy(MaxPartitionStrategy) + + extensions.injectQueryStagePrepRule(FinalStageResourceManager) } } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala new file mode 100644 index 00000000000..b0a438c04b7 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} + +import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule} + +/** + * This rule assumes the final write stage has less cores requirement than previous, otherwise + * this rule would take no effect. + * + * It provide a feature: + * 1. Kill redundant executors before running final write stage + */ +case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED)) { + return plan + } + + if (!MarkNumOutputColumnsRule.isWrite(session, plan)) { + return plan + } + + val sc = session.sparkContext + val executorCores = sc.getConf.getInt("spark.executor.cores", 1) + val minExecutors = sc.getConf.getInt("spark.dynamicAllocation.minExecutors", 0) + val maxExecutors = sc.getConf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) + val factor = conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_PARTITION_FACTOR) + val hasImprovementRoom = maxExecutors - 1 > minExecutors * factor + // Fast fail if: + // 1. only work with yarn and k8s + // 2. maxExecutors is bigger than minExecutors * factor + if (!sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || hasImprovementRoom) { + return plan + } + + val stage = findFinalRebalanceStage(plan) + if (stage.isEmpty) { + return plan + } + + // Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied. + // So we need to apply it by self. + val shuffleRead = queryStageOptimizerRules.foldLeft(stage.get.asInstanceOf[SparkPlan]) { + case (latest, rule) => rule.apply(latest) + } + shuffleRead match { + case AQEShuffleReadExec(stage: ShuffleQueryStageExec, partitionSpecs) => + // The condition whether inject custom resource profile: + // - target executors < active executors + // - target executors > min executors + val numActiveExecutors = sc.getExecutorIds().length + val expectedCores = partitionSpecs.length + val targetExecutors = (((expectedCores / executorCores) + 1) * factor).toInt + val hasBenefits = targetExecutors < numActiveExecutors && targetExecutors > minExecutors + if (hasBenefits) { + val shuffleId = stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleDependency.shuffleId + val numReduce = stage.plan.asInstanceOf[ShuffleExchangeExec].numPartitions + // Now, there is only a final rebalance stage waiting to execute and all tasks of previous + // stage are finished. Kill redundant existed executors eagerly so the tasks of final + // stage can be centralized scheduled. + killExecutors(sc, targetExecutors, shuffleId, numReduce) + } else { + logInfo(s"Has no benefits to kill executors or inject custom resource profile, " + + s"active executors: $numActiveExecutors, min executor: $minExecutors, " + + s"target executors: $targetExecutors.") + } + + case _ => + } + + plan + } + + /** + * The priority of kill executors follow: + * 1. kill executor who is younger than other (The older the JIT works better) + * 2. kill executor who produces less shuffle data first + */ + private def findExecutorToKill( + sc: SparkContext, + targetExecutors: Int, + shuffleId: Int, + numReduce: Int): Seq[String] = { + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val shuffleStatus = tracker.shuffleStatuses(shuffleId) + val executorToBlockSize = new mutable.HashMap[String, Long] + shuffleStatus.withMapStatuses { mapStatus => + mapStatus.foreach { status => + var i = 0 + var sum = 0L + while (i < numReduce) { + sum += status.getSizeForBlock(i) + i += 1 + } + executorToBlockSize.getOrElseUpdate(status.location.executorId, sum) + } + } + + val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] + val executorsWithRegistrationTs = backend.getExecutorsWithRegistrationTs() + val existedExecutors = executorsWithRegistrationTs.keys.toSet + val expectedNumExecutorToKill = existedExecutors.size - targetExecutors + if (expectedNumExecutorToKill < 1) { + return Seq.empty + } + + val executorIdsToKill = new ArrayBuffer[String]() + // We first kill executor who does not hold shuffle block. It would happen because + // the last stage is running fast and finished in a short time. The existed executors are + // from previous stages that have not been killed by DRA, so we can not find it by tracking + // shuffle status. + // We should evict executors by their alive time first and retain all of executors which + // have better locality for shuffle block. + val numExecutorToKillWithNoShuffle = expectedNumExecutorToKill - executorToBlockSize.size + executorsWithRegistrationTs.toSeq.sortBy(_._2).foreach { case (id, _) => + if (executorIdsToKill.length < numExecutorToKillWithNoShuffle && + !executorToBlockSize.contains(id)) { + executorIdsToKill.append(id) + } + } + + // Evict the rest executors according to the shuffle block size + executorToBlockSize.toSeq.sortBy(_._2).foreach { case (id, _) => + if (executorIdsToKill.length < expectedNumExecutorToKill) { + executorIdsToKill.append(id) + } + } + + executorIdsToKill.toSeq + } + + private def killExecutors( + sc: SparkContext, + targetExecutors: Int, + shuffleId: Int, + numReduce: Int): Unit = { + val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient] + + val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) + logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " + + s"[${executorsToKill.mkString(", ")}].") + + // It is a little hack to kill executors with DRA enabled. + // It may cause the status in `ExecutorAllocationManager` inconsistent with + // `CoarseGrainedSchedulerBackend` for a while. But it should be synchronous finally. + executorAllocationClient.killExecutors( + executorIds = executorsToKill, + adjustTargetNumExecutors = false, + countFailures = false, + force = false) + } + + @tailrec + private def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = { + plan match { + case p: ProjectExec => findFinalRebalanceStage(p.child) + case f: FilterExec => findFinalRebalanceStage(f.child) + case s: SortExec if !s.global => findFinalRebalanceStage(s.child) + case stage: ShuffleQueryStageExec + if stage.isMaterialized && + stage.plan.isInstanceOf[ShuffleExchangeExec] && + stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS => + Some(stage) + case _ => None + } + } + + @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + OptimizeSkewInRebalancePartitions, + CoalesceShufflePartitions(session), + OptimizeShuffleWithLocalRead) +} diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 0fe9f649eaa..a4541faff15 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -190,4 +190,20 @@ object KyuubiSQLConf { .version("1.7.0") .booleanConf .createWithDefault(true) + + val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED = + buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled") + .doc("When true, eagerly kill redundant executors before running final write stage.") + .version("1.8.0") + .booleanConf + .createWithDefault(false) + + val FINAL_WRITE_STAGE_PARTITION_FACTOR = + buildConf("spark.sql.finalWriteStageRetainExecutorsFactor") + .doc("If the target executors * factor < active executors, and " + + "target executors * factor > min executors, then kill redundant executors.") + .version("1.8.0") + .doubleConf + .checkValue(_ >= 1, "must be bigger than or equal to 1") + .createWithDefault(1.2) } diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala index fd81948c61a..e58ac726c13 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala @@ -29,6 +29,8 @@ import org.apache.kyuubi.sql.KyuubiSQLConf trait KyuubiSparkSQLExtensionTest extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper { + sys.props.put("spark.testing", "1") + private var _spark: Option[SparkSession] = None protected def spark: SparkSession = _spark.getOrElse { throw new RuntimeException("test spark session don't initial before using it.") From f2492cec6c34e03459e32f2b3470aace49cd4755 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 24 Mar 2023 11:08:11 +0800 Subject: [PATCH 2/5] nit --- .../org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala index e58ac726c13..fd81948c61a 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala @@ -29,8 +29,6 @@ import org.apache.kyuubi.sql.KyuubiSQLConf trait KyuubiSparkSQLExtensionTest extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper { - sys.props.put("spark.testing", "1") - private var _spark: Option[SparkSession] = None protected def spark: SparkSession = _spark.getOrElse { throw new RuntimeException("test spark session don't initial before using it.") From 28d4230f83d013bd243be9efea926dadcf5e227b Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 24 Mar 2023 15:25:35 +0800 Subject: [PATCH 3/5] address comments --- docs/extensions/engines/spark/rules.md | 4 ++-- .../apache/spark/FinalStageResourceManager.scala | 15 ++++++++++----- .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md index 5ba154ce262..f06ecf256eb 100644 --- a/docs/extensions/engines/spark/rules.md +++ b/docs/extensions/engines/spark/rules.md @@ -83,6 +83,6 @@ Kyuubi provides some configs to make these feature easy to use. | spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | | spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | | spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 | -| spark.sql.finalWriteStageEagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 | -| spark.sql.finalWriteStageNumPartitionFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 | +| spark.sql.finalWriteStage.eagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 | +| spark.sql.finalWriteStage.retainExecutorsFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 | diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala index b0a438c04b7..9d3c5fc70d7 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala @@ -48,15 +48,18 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl } val sc = session.sparkContext + val dra = sc.getConf.getBoolean("spark.dynamicAllocation.enabled", false) val executorCores = sc.getConf.getInt("spark.executor.cores", 1) val minExecutors = sc.getConf.getInt("spark.dynamicAllocation.minExecutors", 0) val maxExecutors = sc.getConf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) val factor = conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_PARTITION_FACTOR) val hasImprovementRoom = maxExecutors - 1 > minExecutors * factor // Fast fail if: - // 1. only work with yarn and k8s - // 2. maxExecutors is bigger than minExecutors * factor - if (!sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || hasImprovementRoom) { + // 1. DRA off + // 2. only work with yarn and k8s + // 3. maxExecutors is bigger than minExecutors * factor + if (!dra || !sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || + hasImprovementRoom) { return plan } @@ -77,7 +80,8 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl // - target executors > min executors val numActiveExecutors = sc.getExecutorIds().length val expectedCores = partitionSpecs.length - val targetExecutors = (((expectedCores / executorCores) + 1) * factor).toInt + val targetExecutors = (math.ceil(expectedCores.toFloat / executorCores) * factor).toInt + .max(1) val hasBenefits = targetExecutors < numActiveExecutors && targetExecutors > minExecutors if (hasBenefits) { val shuffleId = stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleDependency.shuffleId @@ -167,7 +171,8 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " + s"[${executorsToKill.mkString(", ")}].") - // It is a little hack to kill executors with DRA enabled. + // Note, `SparkContext#killExecutors` does not allow with ARD enabled, + // see `https://github.com/apache/spark/pull/20604`. // It may cause the status in `ExecutorAllocationManager` inconsistent with // `CoarseGrainedSchedulerBackend` for a while. But it should be synchronous finally. executorAllocationClient.killExecutors( diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index a4541faff15..15634ee355f 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -192,14 +192,14 @@ object KyuubiSQLConf { .createWithDefault(true) val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED = - buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled") + buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.enabled") .doc("When true, eagerly kill redundant executors before running final write stage.") .version("1.8.0") .booleanConf .createWithDefault(false) val FINAL_WRITE_STAGE_PARTITION_FACTOR = - buildConf("spark.sql.finalWriteStageRetainExecutorsFactor") + buildConf("spark.sql.finalWriteStage.retainExecutorsFactor") .doc("If the target executors * factor < active executors, and " + "target executors * factor > min executors, then kill redundant executors.") .version("1.8.0") From ec627ee4f8246b69b442b37ec01794d2fa8728cd Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 24 Mar 2023 17:32:37 +0800 Subject: [PATCH 4/5] nit --- .../org/apache/spark/FinalStageResourceManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala index 9d3c5fc70d7..5348c6192cc 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala @@ -49,7 +49,7 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl val sc = session.sparkContext val dra = sc.getConf.getBoolean("spark.dynamicAllocation.enabled", false) - val executorCores = sc.getConf.getInt("spark.executor.cores", 1) + val coresPerExecutor = sc.getConf.getInt("spark.executor.cores", 1) val minExecutors = sc.getConf.getInt("spark.dynamicAllocation.minExecutors", 0) val maxExecutors = sc.getConf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) val factor = conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_PARTITION_FACTOR) @@ -59,7 +59,7 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl // 2. only work with yarn and k8s // 3. maxExecutors is bigger than minExecutors * factor if (!dra || !sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || - hasImprovementRoom) { + !hasImprovementRoom) { return plan } @@ -79,8 +79,8 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl // - target executors < active executors // - target executors > min executors val numActiveExecutors = sc.getExecutorIds().length - val expectedCores = partitionSpecs.length - val targetExecutors = (math.ceil(expectedCores.toFloat / executorCores) * factor).toInt + val targetCores = partitionSpecs.length + val targetExecutors = (math.ceil(targetCores.toFloat / coresPerExecutor) * factor).toInt .max(1) val hasBenefits = targetExecutors < numActiveExecutors && targetExecutors > minExecutors if (hasBenefits) { From f35208bfd0bf0f44f8bd056aec4ffc82414f7117 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 24 Mar 2023 17:40:33 +0800 Subject: [PATCH 5/5] nit --- .../main/scala/org/apache/spark/FinalStageResourceManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala index 5348c6192cc..8eae5d0e2ab 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala @@ -57,7 +57,7 @@ case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPl // Fast fail if: // 1. DRA off // 2. only work with yarn and k8s - // 3. maxExecutors is bigger than minExecutors * factor + // 3. maxExecutors is not bigger than minExecutors * factor if (!dra || !sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || !hasImprovementRoom) { return plan