Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)

// If defined, initial executors must be between min and max
if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}

numExecutors = initialNumExecutors
} else {
val numExecutorsConf = "spark.executor.instances"
numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
}
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
principal = Option(principal)
.orElse(sparkConf.getOption("spark.yarn.principal"))
.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
@volatile private var numExecutorsFailed = 0

@volatile private var targetNumExecutors =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
} else {
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)

// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,28 @@ object YarnSparkHadoopUtil {
def getClassPathSeparator(): String = {
classPathSeparatorField.get(null).asInstanceOf[String]
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring is nice. This is a reasonable place for this code now, though eventually it may not be YARN-specific?

/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
*/
def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
val initialNumExecutors =
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number" +
s"$minNumExecutors and max executor number $maxNumExecutors")

initialNumExecutors
} else {
val targetNumExecutors =
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
// System property can override environment variable.
conf.getInt("spark.executor.instances", targetNumExecutors)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@

package org.apache.spark.scheduler.cluster

import java.net.NetworkInterface

import org.apache.hadoop.yarn.api.ApplicationConstants.Environment

import scala.collection.JavaConverters._

import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.{IntParam, Utils}
import org.apache.spark.util.Utils

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
Expand All @@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(

override def start() {
super.start()
totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
.getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}

override def applicationId(): String =
Expand Down