From 1c32dd5f22ce734293574a4c0ec926aff8abb361 Mon Sep 17 00:00:00 2001 From: somideshmukh Date: Tue, 19 Jan 2016 18:47:47 +0530 Subject: [PATCH] [SPARK-12485][Rename "dynamic allocation" to "elastic scaling"] --- .../apache/spark/ExecutorAllocationManager.scala | 4 ++-- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/deploy/master/ApplicationInfo.scala | 2 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../mesos/CoarseMesosSchedulerBackend.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 7 ++++--- .../apache/spark/HeartbeatReceiverSuite.scala | 2 +- .../org/apache/spark/SparkContextSuite.scala | 2 +- .../StandaloneDynamicAllocationSuite.scala | 14 +++++++------- docs/configuration.md | 16 ++++++++-------- docs/job-scheduling.md | 8 ++++---- docs/running-on-mesos.md | 4 ++-- docs/running-on-yarn.md | 2 +- project/MimaExcludes.scala | 2 +- .../spark/streaming/StreamingContext.scala | 4 ++-- .../spark/deploy/yarn/ClientArguments.scala | 2 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 4 ++-- 17 files changed, 41 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6176e258989d..8dc6c45d2252 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -193,10 +193,10 @@ private[spark] class ExecutorAllocationManager( if (executorIdleTimeoutS <= 0) { throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!") } - // Require external shuffle service for dynamic allocation + // Require external shuffle service for elastic scaling // Otherwise, we may lose shuffle files when killing executors if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) { - throw new SparkException("Dynamic allocation of executors requires the external " + + throw new SparkException("Elastic scaling of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } if (tasksPerExecutor == 0) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 194ecc0a0434..bec770d7b5bf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -562,7 +562,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { - logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") + logWarning("Elastic Scaling and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 7e2cf956c725..e5be3aedab31 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -45,7 +45,7 @@ private[spark] class ApplicationInfo( // A cap on the number of executors this application can have at any given time. // By default, this is infinite. Only after the first allocation request is issued by the - // application will this be set to a finite value. This is used for dynamic allocation. + // application will this be set to a finite value. This is used for elastic scaling. @transient private[master] var executorLimit: Int = _ @transient private var nextExecutorId: Int = _ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9fe9d83a705b..60ce1c6d48c1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1513,7 +1513,7 @@ abstract class RDD[T: ClassTag]( * fault-tolerant storage. The effect is that if an executor fails during the computation, * the checkpointed data may no longer be accessible, causing an irrecoverable job failure. * - * This is NOT safe to use with dynamic allocation, which removes executors along + * This is NOT safe to use with elastic scaling, which removes executors along * with their cached blocks. If you must use both features, you are advised to set * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. * @@ -1522,7 +1522,7 @@ abstract class RDD[T: ClassTag]( def localCheckpoint(): this.type = RDDCheckpointData.synchronized { if (conf.getBoolean("spark.dynamicAllocation.enabled", false) && conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) { - logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " + + logWarning("Local checkpointing is NOT safe to use with elastic scaling, " + "which removes executors along with their cached blocks. If you must use both " + "features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " + "to a high value. E.g. If you plan to use the RDD for 1 hour, set the timeout to " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7d08eae0b487..3f7d4d94230d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -78,7 +78,7 @@ private[spark] class CoarseMesosSchedulerBackend( val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] /** - * The total number of executors we aim to have. Undefined when not using dynamic allocation + * The total number of executors we aim to have. Undefined when not using elastic scaling * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]]. */ private var executorLimitOption: Option[Int] = None diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9dbe66e7eefb..39c527a1d75f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2176,9 +2176,10 @@ private[spark] object Utils extends Logging { } /** - * Return whether dynamic allocation is enabled in the given conf - * Dynamic allocation and explicitly setting the number of executors are inherently - * incompatible. In environments where dynamic allocation is turned on by default, + * Return whether elastic scaling is enabled in the given conf + * Return whether elastic scaling is enabled in the given conf + * Elastic Scaling and explicitly setting the number of executors are inherently + * incompatible. In environments where elastic scaling is turned on by default, * the latter should override the former (SPARK-9092). */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 3cd80c0f7d17..8cde7f9c1b05 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -247,7 +247,7 @@ class HeartbeatReceiverSuite } } -// TODO: use these classes to add end-to-end tests for dynamic allocation! +// TODO: use these classes to add end-to-end tests for elastic scaling! /** * Dummy RPC endpoint to simulate executors. diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index d4f2ea87650a..9a92851ac188 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -285,7 +285,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } - test("No exception when both num-executors and dynamic allocation set.") { + test("No exception when both num-executors and elastic scaling set.") { noException should be thrownBy { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 2fa795f84666..2f06d7e800de 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.scheduler.cluster._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor /** - * End-to-end tests for dynamic allocation in standalone mode. + * End-to-end tests for elastic scaling in standalone mode. */ class StandaloneDynamicAllocationSuite extends SparkFunSuite @@ -82,7 +82,7 @@ class StandaloneDynamicAllocationSuite super.afterAll() } - test("dynamic allocation default behavior") { + test("elastic scaling default behavior") { sc = new SparkContext(appConf) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { @@ -129,7 +129,7 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1000) } - test("dynamic allocation with max cores <= cores per worker") { + test("elastic scaling with max cores <= cores per worker") { sc = new SparkContext(appConf.set("spark.cores.max", "8")) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { @@ -152,7 +152,7 @@ class StandaloneDynamicAllocationSuite assert(apps.head.executors.values.head.cores === 8) assert(apps.head.getExecutorLimit === 1) // request 1 more; this one won't go through because we're already at max cores. - // This highlights a limitation of using dynamic allocation with max cores WITHOUT + // This highlights a limitation of using elastic scaling with max cores WITHOUT // setting cores per executor: once an application scales down and then scales back // up, its executors may not be spread out anymore! assert(sc.requestExecutors(1)) @@ -184,7 +184,7 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1000) } - test("dynamic allocation with max cores > cores per worker") { + test("elastic scaling with max cores > cores per worker") { sc = new SparkContext(appConf.set("spark.cores.max", "16")) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { @@ -237,7 +237,7 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1000) } - test("dynamic allocation with cores per executor") { + test("elastic scaling with cores per executor") { sc = new SparkContext(appConf.set("spark.executor.cores", "2")) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { @@ -289,7 +289,7 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1000) } - test("dynamic allocation with cores per executor AND max cores") { + test("elastic scaling with cores per executor AND max cores") { sc = new SparkContext(appConf .set("spark.executor.cores", "2") .set("spark.cores.max", "8")) diff --git a/docs/configuration.md b/docs/configuration.md index 38d3d059f9d3..99afa5077992 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -459,7 +459,7 @@ Apart from these, the following properties are also available, and may be useful executors so the executors can be safely removed. This must be enabled if spark.dynamicAllocation.enabled is "true". The external shuffle service must be set up in order to enable it. See - dynamic allocation + elastic scaling configuration and setup documentation for more information. @@ -1242,7 +1242,7 @@ Apart from these, the following properties are also available, and may be useful -#### Dynamic Allocation +#### Elastic Scaling @@ -1265,7 +1265,7 @@ Apart from these, the following properties are also available, and may be useful @@ -1274,7 +1274,7 @@ Apart from these, the following properties are also available, and may be useful @@ -1283,28 +1283,28 @@ Apart from these, the following properties are also available, and may be useful diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 36327c6efeaf..f8eb2c52e7da 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -108,7 +108,7 @@ and request executors. #### Request Policy -A Spark application with dynamic allocation enabled requests additional executors when it has +A Spark application with elastic scaling enabled requests additional executors when it has pending tasks waiting to be scheduled. This condition necessarily implies that the existing set of executors is insufficient to simultaneously saturate all tasks that have been submitted but not yet finished. @@ -135,9 +135,9 @@ an executor should not be idle if there are still pending tasks to be scheduled. ### Graceful Decommission of Executors -Before dynamic allocation, a Spark executor exits either on failure or when the associated +Before elastic scaling, a Spark executor exits either on failure or when the associated application has also exited. In both scenarios, all state associated with the executor is no -longer needed and can be safely discarded. With dynamic allocation, however, the application +longer needed and can be safely discarded. With elastic scaling, however, the application is still running when an executor is explicitly removed. If the application attempts to access state stored in or written by the executor, it will have to perform a recompute the state. Thus, Spark needs a mechanism to decommission an executor gracefully by preserving its state before @@ -146,7 +146,7 @@ removing it. This requirement is especially important for shuffles. During a shuffle, the Spark executor first writes its own map outputs locally to disk, and then acts as the server for those files when other executors attempt to fetch them. In the event of stragglers, which are tasks that run for much -longer than their peers, dynamic allocation may remove an executor before the shuffle completes, +longer than their peers, elastic scaling may remove an executor before the shuffle completes, in which case the shuffle files written by that executor must be recomputed unnecessarily. The solution for preserving shuffle files is to use an external shuffle service, also introduced diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 3193e1785348..f9031595a772 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -243,8 +243,8 @@ In either case, HDFS runs separately from Hadoop MapReduce, without being schedu # Dynamic Resource Allocation with Mesos -Mesos supports dynamic allocation only with coarse grain mode, which can resize the number of executors based on statistics -of the application. While dynamic allocation supports both scaling up and scaling down the number of executors, the coarse grain scheduler only supports scaling down +Mesos supports elastic scaling only with coarse grain mode, which can resize the number of executors based on statistics +of the application. While elastic scaling supports both scaling up and scaling down the number of executors, the coarse grain scheduler only supports scaling down since it is already designed to run one executor per slave with the configured amount of resources. However, after scaling down the number of executors the coarse grain scheduler can scale back up to the same amount of executors when Spark signals more executors are needed. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 06413f83c3a7..e36c5f0c905f 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -206,7 +206,7 @@ If you need a reference to the proper location to put log files in the YARN so t diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index edae59d88266..8390f78baf00 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -310,7 +310,7 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation") ) ++ Seq( - // SPARK-4751 Dynamic allocation for standalone mode + // SPARK-4751 elastic scaling for standalone mode ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.supportDynamicAllocation") ) ++ Seq( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b24c0d067bb0..531a35e5412c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -563,8 +563,8 @@ class StreamingContext private[streaming] ( } if (Utils.isDynamicAllocationEnabled(sc.conf)) { - logWarning("Dynamic Allocation is enabled for this application. " + - "Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " + + logWarning("Elastic Scaling is enabled for this application. " + + "Enabling Elastic scaling for Spark Streaming applications can cause data loss if " + "Write Ahead Log is not enabled for non-replayable sources like Flume. " + "See the programming guide for details on how to enable the Write Ahead Log") } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index a9f437435735..b6243281ce85 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -79,7 +79,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) .orNull - // If dynamic allocation is enabled, start at the configured initial number of executors. + // If elastic scaling is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors) principal = Option(principal) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 36a2d6142988..f71e32330eb8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -439,9 +439,9 @@ object YarnSparkHadoopUtil { } /** - * Getting the initial target number of executors depends on whether dynamic allocation is + * Getting the initial target number of executors depends on whether elastic scaling is * enabled. - * If not using dynamic allocation it gets the number of executors reqeusted by the user. + * If not using elastic scaling it gets the number of executors reqeusted by the user. */ def getInitialTargetExecutorNumber( conf: SparkConf,
Property NameDefaultMeaning
spark.dynamicAllocation.executorIdleTimeout 60s - If dynamic allocation is enabled and an executor has been idle for more than this duration, + If elastic scaling is enabled and an executor has been idle for more than this duration, the executor will be removed. For more detail, see this description. spark.dynamicAllocation.cachedExecutorIdleTimeout infinity - If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, + If elastic scaling is enabled and an executor which has cached data blocks has been idle for more than this duration, the executor will be removed. For more details, see this description. spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors - Initial number of executors to run if dynamic allocation is enabled. + Initial number of executors to run if elastic scaling is enabled.
spark.dynamicAllocation.maxExecutors infinity - Upper bound for the number of executors if dynamic allocation is enabled. + Upper bound for the number of executors if elastic scaling is enabled.
spark.dynamicAllocation.minExecutors 0 - Lower bound for the number of executors if dynamic allocation is enabled. + Lower bound for the number of executors if elastic scaling is enabled.
spark.dynamicAllocation.schedulerBacklogTimeout 1s - If dynamic allocation is enabled and there have been pending tasks backlogged for more than + If elastic scaling is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. For more detail, see this description. spark.executor.instances 2 - The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used. + The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled. If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, elastic scaling is turned off and the specified number of spark.executor.instances is used.