Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ private[spark] class ExecutorAllocationManager(
if (executorIdleTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
// Require external shuffle service for elastic scaling
// Otherwise, we may lose shuffle files when killing executors
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
throw new SparkException("Elastic scaling of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
logWarning("Elastic Scaling and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] class ApplicationInfo(

// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
// application will this be set to a finite value. This is used for dynamic allocation.
// application will this be set to a finite value. This is used for elastic scaling.
@transient private[master] var executorLimit: Int = _

@transient private var nextExecutorId: Int = _
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,7 @@ abstract class RDD[T: ClassTag](
* fault-tolerant storage. The effect is that if an executor fails during the computation,
* the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
*
* This is NOT safe to use with dynamic allocation, which removes executors along
* This is NOT safe to use with elastic scaling, which removes executors along
* with their cached blocks. If you must use both features, you are advised to set
* `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
*
Expand All @@ -1522,7 +1522,7 @@ abstract class RDD[T: ClassTag](
def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " +
logWarning("Local checkpointing is NOT safe to use with elastic scaling, " +
"which removes executors along with their cached blocks. If you must use both " +
"features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " +
"to a high value. E.g. If you plan to use the RDD for 1 hour, set the timeout to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]

/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation
* The total number of executors we aim to have. Undefined when not using elastic scaling
* and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
*/
private var executorLimitOption: Option[Int] = None
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2176,9 +2176,10 @@ private[spark] object Utils extends Logging {
}

/**
* Return whether dynamic allocation is enabled in the given conf
* Dynamic allocation and explicitly setting the number of executors are inherently
* incompatible. In environments where dynamic allocation is turned on by default,
* Return whether elastic scaling is enabled in the given conf
* Return whether elastic scaling is enabled in the given conf
* Elastic Scaling and explicitly setting the number of executors are inherently
* incompatible. In environments where elastic scaling is turned on by default,
* the latter should override the former (SPARK-9092).
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class HeartbeatReceiverSuite
}
}

// TODO: use these classes to add end-to-end tests for dynamic allocation!
// TODO: use these classes to add end-to-end tests for elastic scaling!

/**
* Dummy RPC endpoint to simulate executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("No exception when both num-executors and dynamic allocation set.") {
test("No exception when both num-executors and elastic scaling set.") {
noException should be thrownBy {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
.set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor

/**
* End-to-end tests for dynamic allocation in standalone mode.
* End-to-end tests for elastic scaling in standalone mode.
*/
class StandaloneDynamicAllocationSuite
extends SparkFunSuite
Expand Down Expand Up @@ -82,7 +82,7 @@ class StandaloneDynamicAllocationSuite
super.afterAll()
}

test("dynamic allocation default behavior") {
test("elastic scaling default behavior") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
Expand Down Expand Up @@ -129,7 +129,7 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1000)
}

test("dynamic allocation with max cores <= cores per worker") {
test("elastic scaling with max cores <= cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "8"))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
Expand All @@ -152,7 +152,7 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.executors.values.head.cores === 8)
assert(apps.head.getExecutorLimit === 1)
// request 1 more; this one won't go through because we're already at max cores.
// This highlights a limitation of using dynamic allocation with max cores WITHOUT
// This highlights a limitation of using elastic scaling with max cores WITHOUT
// setting cores per executor: once an application scales down and then scales back
// up, its executors may not be spread out anymore!
assert(sc.requestExecutors(1))
Expand Down Expand Up @@ -184,7 +184,7 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1000)
}

test("dynamic allocation with max cores > cores per worker") {
test("elastic scaling with max cores > cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "16"))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
Expand Down Expand Up @@ -237,7 +237,7 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1000)
}

test("dynamic allocation with cores per executor") {
test("elastic scaling with cores per executor") {
sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
Expand Down Expand Up @@ -289,7 +289,7 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1000)
}

test("dynamic allocation with cores per executor AND max cores") {
test("elastic scaling with cores per executor AND max cores") {
sc = new SparkContext(appConf
.set("spark.executor.cores", "2")
.set("spark.cores.max", "8"))
Expand Down
16 changes: 8 additions & 8 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ Apart from these, the following properties are also available, and may be useful
executors so the executors can be safely removed. This must be enabled if
<code>spark.dynamicAllocation.enabled</code> is "true". The external shuffle service
must be set up in order to enable it. See
<a href="job-scheduling.html#configuration-and-setup">dynamic allocation
<a href="job-scheduling.html#configuration-and-setup">elastic scaling
configuration and setup documentation</a> for more information.
</td>
</tr>
Expand Down Expand Up @@ -1242,7 +1242,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>

#### Dynamic Allocation
#### Elastic Scaling
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
Expand All @@ -1265,7 +1265,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
<td>60s</td>
<td>
If dynamic allocation is enabled and an executor has been idle for more than this duration,
If elastic scaling is enabled and an executor has been idle for more than this duration,
the executor will be removed. For more detail, see this
<a href="job-scheduling.html#resource-allocation-policy">description</a>.
</td>
Expand All @@ -1274,7 +1274,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.dynamicAllocation.cachedExecutorIdleTimeout</code></td>
<td>infinity</td>
<td>
If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration,
If elastic scaling is enabled and an executor which has cached data blocks has been idle for more than this duration,
the executor will be removed. For more details, see this
<a href="job-scheduling.html#resource-allocation-policy">description</a>.
</td>
Expand All @@ -1283,28 +1283,28 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.dynamicAllocation.initialExecutors</code></td>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>
Initial number of executors to run if dynamic allocation is enabled.
Initial number of executors to run if elastic scaling is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
<td>infinity</td>
<td>
Upper bound for the number of executors if dynamic allocation is enabled.
Upper bound for the number of executors if elastic scaling is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>0</td>
<td>
Lower bound for the number of executors if dynamic allocation is enabled.
Lower bound for the number of executors if elastic scaling is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
<td>1s</td>
<td>
If dynamic allocation is enabled and there have been pending tasks backlogged for more than
If elastic scaling is enabled and there have been pending tasks backlogged for more than
this duration, new executors will be requested. For more detail, see this
<a href="job-scheduling.html#resource-allocation-policy">description</a>.
</td>
Expand Down
8 changes: 4 additions & 4 deletions docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ and request executors.

#### Request Policy

A Spark application with dynamic allocation enabled requests additional executors when it has
A Spark application with elastic scaling enabled requests additional executors when it has
pending tasks waiting to be scheduled. This condition necessarily implies that the existing set
of executors is insufficient to simultaneously saturate all tasks that have been submitted but
not yet finished.
Expand All @@ -135,9 +135,9 @@ an executor should not be idle if there are still pending tasks to be scheduled.

### Graceful Decommission of Executors

Before dynamic allocation, a Spark executor exits either on failure or when the associated
Before elastic scaling, a Spark executor exits either on failure or when the associated
application has also exited. In both scenarios, all state associated with the executor is no
longer needed and can be safely discarded. With dynamic allocation, however, the application
longer needed and can be safely discarded. With elastic scaling, however, the application
is still running when an executor is explicitly removed. If the application attempts to access
state stored in or written by the executor, it will have to perform a recompute the state. Thus,
Spark needs a mechanism to decommission an executor gracefully by preserving its state before
Expand All @@ -146,7 +146,7 @@ removing it.
This requirement is especially important for shuffles. During a shuffle, the Spark executor first
writes its own map outputs locally to disk, and then acts as the server for those files when other
executors attempt to fetch them. In the event of stragglers, which are tasks that run for much
longer than their peers, dynamic allocation may remove an executor before the shuffle completes,
longer than their peers, elastic scaling may remove an executor before the shuffle completes,
in which case the shuffle files written by that executor must be recomputed unnecessarily.

The solution for preserving shuffle files is to use an external shuffle service, also introduced
Expand Down
4 changes: 2 additions & 2 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ In either case, HDFS runs separately from Hadoop MapReduce, without being schedu

# Dynamic Resource Allocation with Mesos

Mesos supports dynamic allocation only with coarse grain mode, which can resize the number of executors based on statistics
of the application. While dynamic allocation supports both scaling up and scaling down the number of executors, the coarse grain scheduler only supports scaling down
Mesos supports elastic scaling only with coarse grain mode, which can resize the number of executors based on statistics
of the application. While elastic scaling supports both scaling up and scaling down the number of executors, the coarse grain scheduler only supports scaling down
since it is already designed to run one executor per slave with the configured amount of resources. However, after scaling down the number of executors the coarse grain scheduler
can scale back up to the same amount of executors when Spark signals more executors are needed.

Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ If you need a reference to the proper location to put log files in the YARN so t
<td><code>spark.executor.instances</code></td>
<td><code>2</code></td>
<td>
The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>. If both <code>spark.dynamicAllocation.enabled</code> and <code>spark.executor.instances</code> are specified, dynamic allocation is turned off and the specified number of <code>spark.executor.instances</code> is used.
The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>. If both <code>spark.dynamicAllocation.enabled</code> and <code>spark.executor.instances</code> are specified, elastic scaling is turned off and the specified number of <code>spark.executor.instances</code> is used.
</td>
</tr>
<tr>
Expand Down
2 changes: 1 addition & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
) ++ Seq(
// SPARK-4751 Dynamic allocation for standalone mode
// SPARK-4751 elastic scaling for standalone mode
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.supportDynamicAllocation")
) ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ class StreamingContext private[streaming] (
}

if (Utils.isDynamicAllocationEnabled(sc.conf)) {
logWarning("Dynamic Allocation is enabled for this application. " +
"Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " +
logWarning("Elastic Scaling is enabled for this application. " +
"Enabling Elastic scaling for Spark Streaming applications can cause data loss if " +
"Write Ahead Log is not enabled for non-replayable sources like Flume. " +
"See the programming guide for details on how to enable the Write Ahead Log")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// If elastic scaling is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
principal = Option(principal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ object YarnSparkHadoopUtil {
}

/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* Getting the initial target number of executors depends on whether elastic scaling is
* enabled.
* If not using dynamic allocation it gets the number of executors reqeusted by the user.
* If not using elastic scaling it gets the number of executors reqeusted by the user.
*/
def getInitialTargetExecutorNumber(
conf: SparkConf,
Expand Down