@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc
2525
2626import scala .collection .JavaConverters ._
2727import scala .collection .Map
28+ import scala .collection .immutable
2829import scala .collection .mutable .HashMap
2930import scala .language .implicitConversions
3031import scala .reflect .{classTag , ClassTag }
@@ -53,7 +54,7 @@ import org.apache.spark.io.CompressionCodec
5354import org .apache .spark .metrics .source .JVMCPUSource
5455import org .apache .spark .partial .{ApproximateEvaluator , PartialResult }
5556import org .apache .spark .rdd ._
56- import org .apache .spark .resource .{ ResourceID , ResourceInformation }
57+ import org .apache .spark .resource ._
5758import org .apache .spark .resource .ResourceUtils ._
5859import org .apache .spark .rpc .RpcEndpointRef
5960import org .apache .spark .scheduler ._
@@ -219,9 +220,10 @@ class SparkContext(config: SparkConf) extends Logging {
219220 private var _shutdownHookRef : AnyRef = _
220221 private var _statusStore : AppStatusStore = _
221222 private var _heartbeater : Heartbeater = _
222- private var _resources : scala.collection. immutable.Map [String , ResourceInformation ] = _
223+ private var _resources : immutable.Map [String , ResourceInformation ] = _
223224 private var _shuffleDriverComponents : ShuffleDriverComponents = _
224225 private var _plugins : Option [PluginContainer ] = None
226+ private var _resourceProfileManager : ResourceProfileManager = _
225227
226228 /* ------------------------------------------------------------------------------------- *
227229 | Accessors and public fields. These provide access to the internal state of the |
@@ -343,6 +345,8 @@ class SparkContext(config: SparkConf) extends Logging {
343345 private [spark] def executorAllocationManager : Option [ExecutorAllocationManager ] =
344346 _executorAllocationManager
345347
348+ private [spark] def resourceProfileManager : ResourceProfileManager = _resourceProfileManager
349+
346350 private [spark] def cleaner : Option [ContextCleaner ] = _cleaner
347351
348352 private [spark] var checkpointDir : Option [String ] = None
@@ -451,6 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
451455 }
452456
453457 _listenerBus = new LiveListenerBus (_conf)
458+ _resourceProfileManager = new ResourceProfileManager (_conf)
454459
455460 // Initialize the app status store and listener before SparkEnv is created so that it gets
456461 // all events.
@@ -611,7 +616,7 @@ class SparkContext(config: SparkConf) extends Logging {
611616 case b : ExecutorAllocationClient =>
612617 Some (new ExecutorAllocationManager (
613618 schedulerBackend.asInstanceOf [ExecutorAllocationClient ], listenerBus, _conf,
614- cleaner = cleaner))
619+ cleaner = cleaner, resourceProfileManager = resourceProfileManager ))
615620 case _ =>
616621 None
617622 }
@@ -1622,7 +1627,7 @@ class SparkContext(config: SparkConf) extends Logging {
16221627
16231628 /**
16241629 * Update the cluster manager on our scheduling needs. Three bits of information are included
1625- * to help it make decisions.
1630+ * to help it make decisions. This applies to the default ResourceProfile.
16261631 * @param numExecutors The total number of executors we'd like to have. The cluster manager
16271632 * shouldn't kill any running executor to reach this number, but,
16281633 * if all existing executors were to die, this is the number of executors
@@ -1638,11 +1643,16 @@ class SparkContext(config: SparkConf) extends Logging {
16381643 def requestTotalExecutors (
16391644 numExecutors : Int ,
16401645 localityAwareTasks : Int ,
1641- hostToLocalTaskCount : scala.collection. immutable.Map [String , Int ]
1646+ hostToLocalTaskCount : immutable.Map [String , Int ]
16421647 ): Boolean = {
16431648 schedulerBackend match {
16441649 case b : ExecutorAllocationClient =>
1645- b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
1650+ // this is being applied to the default resource profile, would need to add api to support
1651+ // others
1652+ val defaultProfId = resourceProfileManager.defaultResourceProfile.id
1653+ b.requestTotalExecutors(immutable.Map (defaultProfId-> numExecutors),
1654+ immutable.Map (localityAwareTasks -> defaultProfId),
1655+ immutable.Map (defaultProfId -> hostToLocalTaskCount))
16461656 case _ =>
16471657 logWarning(" Requesting executors is not supported by current scheduler." )
16481658 false
@@ -2036,6 +2046,7 @@ class SparkContext(config: SparkConf) extends Logging {
20362046 // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
20372047 // `SparkContext` is stopped.
20382048 localProperties.remove()
2049+ ResourceProfile .clearDefaultProfile()
20392050 // Unset YARN mode system env variable, to allow switching between cluster types.
20402051 SparkContext .clearActiveContext()
20412052 logInfo(" Successfully stopped SparkContext" )
@@ -2771,109 +2782,34 @@ object SparkContext extends Logging {
27712782 // When running locally, don't try to re-execute tasks on failure.
27722783 val MAX_LOCAL_TASK_FAILURES = 1
27732784
2774- // Ensure that executor's resources satisfies one or more tasks requirement.
2775- def checkResourcesPerTask (clusterMode : Boolean , executorCores : Option [Int ]): Unit = {
2785+ // Ensure that default executor's resources satisfies one or more tasks requirement.
2786+ // This function is for cluster managers that don't set the executor cores config, for
2787+ // others its checked in ResourceProfile.
2788+ def checkResourcesPerTask (executorCores : Int ): Unit = {
27762789 val taskCores = sc.conf.get(CPUS_PER_TASK )
2777- val execCores = if (clusterMode) {
2778- executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES ))
2779- } else {
2780- executorCores.get
2781- }
2782- // some cluster managers don't set the EXECUTOR_CORES config by default (standalone
2783- // and mesos coarse grained), so we can't rely on that config for those.
2784- val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES ) ||
2785- (master.equalsIgnoreCase(" yarn" ) || master.startsWith(" k8s" ))
2786-
2787- // Number of cores per executor must meet at least one task requirement.
2788- if (shouldCheckExecCores && execCores < taskCores) {
2789- throw new SparkException (s " The number of cores per executor (= $execCores) has to be >= " +
2790- s " the task config: ${CPUS_PER_TASK .key} = $taskCores when run on $master. " )
2791- }
2792-
2793- // Calculate the max slots each executor can provide based on resources available on each
2794- // executor and resources required by each task.
2795- val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX )
2796- val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX )
2797- .map(request => (request.id.resourceName, request.amount)).toMap
2798-
2799- var (numSlots, limitingResourceName) = if (shouldCheckExecCores) {
2800- (execCores / taskCores, " CPU" )
2801- } else {
2802- (- 1 , " " )
2803- }
2804-
2805- taskResourceRequirements.foreach { taskReq =>
2806- // Make sure the executor resources were specified through config.
2807- val execAmount = executorResourcesAndAmounts.getOrElse(taskReq.resourceName,
2808- throw new SparkException (" The executor resource config: " +
2809- new ResourceID (SPARK_EXECUTOR_PREFIX , taskReq.resourceName).amountConf +
2810- " needs to be specified since a task requirement config: " +
2811- new ResourceID (SPARK_TASK_PREFIX , taskReq.resourceName).amountConf +
2812- " was specified" )
2813- )
2814- // Make sure the executor resources are large enough to launch at least one task.
2815- if (execAmount < taskReq.amount) {
2816- throw new SparkException (" The executor resource config: " +
2817- new ResourceID (SPARK_EXECUTOR_PREFIX , taskReq.resourceName).amountConf +
2818- s " = $execAmount has to be >= the requested amount in task resource config: " +
2819- new ResourceID (SPARK_TASK_PREFIX , taskReq.resourceName).amountConf +
2820- s " = ${taskReq.amount}" )
2821- }
2822- // Compare and update the max slots each executor can provide.
2823- // If the configured amount per task was < 1.0, a task is subdividing
2824- // executor resources. If the amount per task was > 1.0, the task wants
2825- // multiple executor resources.
2826- val resourceNumSlots = Math .floor(execAmount * taskReq.numParts / taskReq.amount).toInt
2827- if (resourceNumSlots < numSlots) {
2828- if (shouldCheckExecCores) {
2829- throw new IllegalArgumentException (" The number of slots on an executor has to be " +
2830- " limited by the number of cores, otherwise you waste resources and " +
2831- " dynamic allocation doesn't work properly. Your configuration has " +
2832- s " core/task cpu slots = ${numSlots} and " +
2833- s " ${taskReq.resourceName} = ${resourceNumSlots}. " +
2834- " Please adjust your configuration so that all resources require same number " +
2835- " of executor slots." )
2836- }
2837- numSlots = resourceNumSlots
2838- limitingResourceName = taskReq.resourceName
2839- }
2840- }
2841- if (! shouldCheckExecCores && Utils .isDynamicAllocationEnabled(sc.conf)) {
2842- // if we can't rely on the executor cores config throw a warning for user
2843- logWarning(" Please ensure that the number of slots available on your " +
2844- " executors is limited by the number of cores to task cpus and not another " +
2845- " custom resource. If cores is not the limiting resource then dynamic " +
2846- " allocation will not work properly!" )
2847- }
2848- // warn if we would waste any resources due to another resource limiting the number of
2849- // slots on an executor
2850- taskResourceRequirements.foreach { taskReq =>
2851- val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
2852- if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
2853- val taskReqStr = if (taskReq.numParts > 1 ) {
2854- s " ${taskReq.amount}/ ${taskReq.numParts}"
2855- } else {
2856- s " ${taskReq.amount}"
2857- }
2858- val resourceNumSlots = Math .floor(execAmount * taskReq.numParts / taskReq.amount).toInt
2859- val message = s " The configuration of resource: ${taskReq.resourceName} " +
2860- s " (exec = ${execAmount}, task = ${taskReqStr}, " +
2861- s " runnable tasks = ${resourceNumSlots}) will " +
2862- s " result in wasted resources due to resource ${limitingResourceName} limiting the " +
2863- s " number of runnable tasks per executor to: ${numSlots}. Please adjust " +
2864- s " your configuration. "
2865- if (Utils .isTesting) {
2866- throw new SparkException (message)
2867- } else {
2868- logWarning(message)
2869- }
2870- }
2790+ validateTaskCpusLargeEnough(executorCores, taskCores)
2791+ val defaultProf = sc.resourceProfileManager.defaultResourceProfile
2792+ // TODO - this is temporary until all of stage level scheduling feature is integrated,
2793+ // fail if any other resource limiting due to dynamic allocation and scheduler using
2794+ // slots based on cores
2795+ val cpuSlots = executorCores/ taskCores
2796+ val limitingResource = defaultProf.limitingResource(sc.conf)
2797+ if (limitingResource.nonEmpty && ! limitingResource.equals(ResourceProfile .CPUS ) &&
2798+ defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
2799+ throw new IllegalArgumentException (" The number of slots on an executor has to be " +
2800+ " limited by the number of cores, otherwise you waste resources and " +
2801+ " dynamic allocation doesn't work properly. Your configuration has " +
2802+ s " core/task cpu slots = ${cpuSlots} and " +
2803+ s " ${limitingResource} = " +
2804+ s " ${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " +
2805+ " so that all resources require same number of executor slots." )
28712806 }
2807+ ResourceUtils .warnOnWastedResources(defaultProf, sc.conf, Some (executorCores))
28722808 }
28732809
28742810 master match {
28752811 case " local" =>
2876- checkResourcesPerTask(clusterMode = false , Some ( 1 ) )
2812+ checkResourcesPerTask(1 )
28772813 val scheduler = new TaskSchedulerImpl (sc, MAX_LOCAL_TASK_FAILURES , isLocal = true )
28782814 val backend = new LocalSchedulerBackend (sc.getConf, scheduler, 1 )
28792815 scheduler.initialize(backend)
@@ -2886,7 +2822,7 @@ object SparkContext extends Logging {
28862822 if (threadCount <= 0 ) {
28872823 throw new SparkException (s " Asked to run locally with $threadCount threads " )
28882824 }
2889- checkResourcesPerTask(clusterMode = false , Some ( threadCount) )
2825+ checkResourcesPerTask(threadCount)
28902826 val scheduler = new TaskSchedulerImpl (sc, MAX_LOCAL_TASK_FAILURES , isLocal = true )
28912827 val backend = new LocalSchedulerBackend (sc.getConf, scheduler, threadCount)
28922828 scheduler.initialize(backend)
@@ -2897,22 +2833,21 @@ object SparkContext extends Logging {
28972833 // local[*, M] means the number of cores on the computer with M failures
28982834 // local[N, M] means exactly N threads with M failures
28992835 val threadCount = if (threads == " *" ) localCpuCount else threads.toInt
2900- checkResourcesPerTask(clusterMode = false , Some ( threadCount) )
2836+ checkResourcesPerTask(threadCount)
29012837 val scheduler = new TaskSchedulerImpl (sc, maxFailures.toInt, isLocal = true )
29022838 val backend = new LocalSchedulerBackend (sc.getConf, scheduler, threadCount)
29032839 scheduler.initialize(backend)
29042840 (backend, scheduler)
29052841
29062842 case SPARK_REGEX (sparkUrl) =>
2907- checkResourcesPerTask(clusterMode = true , None )
29082843 val scheduler = new TaskSchedulerImpl (sc)
29092844 val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
29102845 val backend = new StandaloneSchedulerBackend (scheduler, sc, masterUrls)
29112846 scheduler.initialize(backend)
29122847 (backend, scheduler)
29132848
29142849 case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
2915- checkResourcesPerTask(clusterMode = true , Some ( coresPerSlave.toInt) )
2850+ checkResourcesPerTask(coresPerSlave.toInt)
29162851 // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
29172852 val memoryPerSlaveInt = memoryPerSlave.toInt
29182853 if (sc.executorMemory > memoryPerSlaveInt) {
@@ -2941,7 +2876,6 @@ object SparkContext extends Logging {
29412876 (backend, scheduler)
29422877
29432878 case masterUrl =>
2944- checkResourcesPerTask(clusterMode = true , None )
29452879 val cm = getClusterManager(masterUrl) match {
29462880 case Some (clusterMgr) => clusterMgr
29472881 case None => throw new SparkException (" Could not parse Master URL: '" + master + " '" )
0 commit comments