Skip to content

Commit 9804a75

Browse files
author
Andrew Or
committed
[SPARK-4754] Refactor SparkContext into ExecutorAllocationClient
This is such that the `ExecutorAllocationManager` does not take in the `SparkContext` with all of its dependencies as an argument. This prevents future developers of this class to tie down this class further with the `SparkContext`, which has really become quite a monstrous object. cc'ing pwendell who originally suggested this, and JoshRosen who may have thoughts about the trait mix-in style of `SparkContext`. Author: Andrew Or <[email protected]> Closes #3614 from andrewor14/dynamic-allocation-sc and squashes the following commits: 187070d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dynamic-allocation-sc 59baf6c [Andrew Or] Merge branch 'master' of github.com:apache/spark into dynamic-allocation-sc 347a348 [Andrew Or] Refactor SparkContext into ExecutorAllocationClient
1 parent 105293a commit 9804a75

File tree

4 files changed

+59
-15
lines changed

4 files changed

+59
-15
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
/**
21+
* A client that communicates with the cluster manager to request or kill executors.
22+
*/
23+
private[spark] trait ExecutorAllocationClient {
24+
25+
/**
26+
* Request an additional number of executors from the cluster manager.
27+
* Return whether the request is acknowledged by the cluster manager.
28+
*/
29+
def requestExecutors(numAdditionalExecutors: Int): Boolean
30+
31+
/**
32+
* Request that the cluster manager kill the specified executors.
33+
* Return whether the request is acknowledged by the cluster manager.
34+
*/
35+
def killExecutors(executorIds: Seq[String]): Boolean
36+
37+
/**
38+
* Request that the cluster manager kill the specified executor.
39+
* Return whether the request is acknowledged by the cluster manager.
40+
*/
41+
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
42+
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@ import org.apache.spark.scheduler._
6060
* spark.dynamicAllocation.executorIdleTimeout (K) -
6161
* If an executor has been idle for this duration, remove it
6262
*/
63-
private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging {
63+
private[spark] class ExecutorAllocationManager(
64+
client: ExecutorAllocationClient,
65+
listenerBus: LiveListenerBus,
66+
conf: SparkConf)
67+
extends Logging {
6468
import ExecutorAllocationManager._
6569

66-
private val conf = sc.conf
67-
6870
// Lower and upper bounds on the number of executors. These are required.
6971
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
7072
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
@@ -168,7 +170,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
168170
* Register for scheduler callbacks to decide when to add and remove executors.
169171
*/
170172
def start(): Unit = {
171-
sc.addSparkListener(listener)
173+
listenerBus.addListener(listener)
172174
startPolling()
173175
}
174176

@@ -253,7 +255,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
253255
val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
254256

255257
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
256-
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
258+
val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
257259
if (addRequestAcknowledged) {
258260
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
259261
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
@@ -295,7 +297,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
295297
}
296298

297299
// Send a request to the backend to kill this executor
298-
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
300+
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
299301
if (removeRequestAcknowledged) {
300302
logInfo(s"Removing executor $executorId because it has been idle for " +
301303
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ import org.apache.spark.util._
6464
* @param config a Spark Config object describing the application configuration. Any settings in
6565
* this config overrides the default configs as well as system properties.
6666
*/
67-
class SparkContext(config: SparkConf) extends Logging {
67+
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
6868

6969
// The call site where this SparkContext was constructed.
7070
private val creationSite: CallSite = Utils.getCallSite()
@@ -365,7 +365,7 @@ class SparkContext(config: SparkConf) extends Logging {
365365
if (dynamicAllocationEnabled) {
366366
assert(master.contains("yarn") || dynamicAllocationTesting,
367367
"Dynamic allocation of executors is currently only supported in YARN mode")
368-
Some(new ExecutorAllocationManager(this))
368+
Some(new ExecutorAllocationManager(this, listenerBus, conf))
369369
} else {
370370
None
371371
}
@@ -994,7 +994,7 @@ class SparkContext(config: SparkConf) extends Logging {
994994
* This is currently only supported in Yarn mode. Return whether the request is received.
995995
*/
996996
@DeveloperApi
997-
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
997+
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
998998
assert(master.contains("yarn") || dynamicAllocationTesting,
999999
"Requesting executors is currently only supported in YARN mode")
10001000
schedulerBackend match {
@@ -1012,7 +1012,7 @@ class SparkContext(config: SparkConf) extends Logging {
10121012
* This is currently only supported in Yarn mode. Return whether the request is received.
10131013
*/
10141014
@DeveloperApi
1015-
def killExecutors(executorIds: Seq[String]): Boolean = {
1015+
override def killExecutors(executorIds: Seq[String]): Boolean = {
10161016
assert(master.contains("yarn") || dynamicAllocationTesting,
10171017
"Killing executors is currently only supported in YARN mode")
10181018
schedulerBackend match {
@@ -1030,7 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging {
10301030
* This is currently only supported in Yarn mode. Return whether the request is received.
10311031
*/
10321032
@DeveloperApi
1033-
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
1033+
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
10341034

10351035
/** The version of Spark on which this application is running. */
10361036
def version = SPARK_VERSION

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import akka.actor._
2727
import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

30-
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
30+
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
3131
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3333
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
@@ -42,7 +42,7 @@ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Ut
4242
*/
4343
private[spark]
4444
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
45-
extends SchedulerBackend with Logging
45+
extends ExecutorAllocationClient with SchedulerBackend with Logging
4646
{
4747
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4848
var totalCoreCount = new AtomicInteger(0)
@@ -307,7 +307,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
307307
* Request an additional number of executors from the cluster manager.
308308
* Return whether the request is acknowledged.
309309
*/
310-
final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
310+
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
311311
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
312312
logDebug(s"Number of pending executors is now $numPendingExecutors")
313313
numPendingExecutors += numAdditionalExecutors
@@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
334334
* Request that the cluster manager kill the specified executors.
335335
* Return whether the kill request is acknowledged.
336336
*/
337-
final def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
337+
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
338338
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
339339
val filteredExecutorIds = new ArrayBuffer[String]
340340
executorIds.foreach { id =>

0 commit comments

Comments
 (0)