Skip to content

Commit 347a348

Browse files
author
Andrew Or
committed
Refactor SparkContext into ExecutorAllocationClient
This is such that the ExecutorAllocationManager does not take in the SparkContext with all of its dependencies as an argument. This prevents future developers of this class to tie down this class further with the SparkContext, which has really become quite a monstrous object.
1 parent 15cf3b0 commit 347a348

File tree

4 files changed

+56
-15
lines changed

4 files changed

+56
-15
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
/**
21+
* A client that communicates with the cluster manager to request or kill executors.
22+
*/
23+
private[spark] trait ExecutorAllocationClient {
24+
25+
/**
26+
* Request an additional number of executors from the cluster manager.
27+
*/
28+
def requestExecutors(numAdditionalExecutors: Int): Boolean
29+
30+
/**
31+
* Request that the cluster manager kill the specified executors.
32+
*/
33+
def killExecutors(executorIds: Seq[String]): Boolean
34+
35+
/**
36+
* Request that the cluster manager kill the specified executor.
37+
*/
38+
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
39+
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@ import org.apache.spark.scheduler._
6060
* spark.dynamicAllocation.executorIdleTimeout (K) -
6161
* If an executor has been idle for this duration, remove it
6262
*/
63-
private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging {
63+
private[spark] class ExecutorAllocationManager(
64+
client: ExecutorAllocationClient,
65+
listenerBus: LiveListenerBus,
66+
conf: SparkConf)
67+
extends Logging {
6468
import ExecutorAllocationManager._
6569

66-
private val conf = sc.conf
67-
6870
// Lower and upper bounds on the number of executors. These are required.
6971
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
7072
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
@@ -168,7 +170,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
168170
* Register for scheduler callbacks to decide when to add and remove executors.
169171
*/
170172
def start(): Unit = {
171-
sc.addSparkListener(listener)
173+
listenerBus.addListener(listener)
172174
startPolling()
173175
}
174176

@@ -253,7 +255,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
253255
val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
254256

255257
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
256-
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
258+
val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
257259
if (addRequestAcknowledged) {
258260
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
259261
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
@@ -295,7 +297,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
295297
}
296298

297299
// Send a request to the backend to kill this executor
298-
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
300+
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
299301
if (removeRequestAcknowledged) {
300302
logInfo(s"Removing executor $executorId because it has been idle for " +
301303
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ import org.apache.spark.util._
6464
* @param config a Spark Config object describing the application configuration. Any settings in
6565
* this config overrides the default configs as well as system properties.
6666
*/
67-
class SparkContext(config: SparkConf) extends Logging {
67+
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
6868

6969
// The call site where this SparkContext was constructed.
7070
private val creationSite: CallSite = Utils.getCallSite()
@@ -359,7 +359,7 @@ class SparkContext(config: SparkConf) extends Logging {
359359
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
360360
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
361361
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
362-
Some(new ExecutorAllocationManager(this))
362+
Some(new ExecutorAllocationManager(this, listenerBus, conf))
363363
} else {
364364
None
365365
}
@@ -988,7 +988,7 @@ class SparkContext(config: SparkConf) extends Logging {
988988
* This is currently only supported in Yarn mode. Return whether the request is received.
989989
*/
990990
@DeveloperApi
991-
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
991+
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
992992
schedulerBackend match {
993993
case b: CoarseGrainedSchedulerBackend =>
994994
b.requestExecutors(numAdditionalExecutors)
@@ -1004,7 +1004,7 @@ class SparkContext(config: SparkConf) extends Logging {
10041004
* This is currently only supported in Yarn mode. Return whether the request is received.
10051005
*/
10061006
@DeveloperApi
1007-
def killExecutors(executorIds: Seq[String]): Boolean = {
1007+
override def killExecutors(executorIds: Seq[String]): Boolean = {
10081008
schedulerBackend match {
10091009
case b: CoarseGrainedSchedulerBackend =>
10101010
b.killExecutors(executorIds)
@@ -1020,7 +1020,7 @@ class SparkContext(config: SparkConf) extends Logging {
10201020
* This is currently only supported in Yarn mode. Return whether the request is received.
10211021
*/
10221022
@DeveloperApi
1023-
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
1023+
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
10241024

10251025
/** The version of Spark on which this application is running. */
10261026
def version = SPARK_VERSION

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import akka.actor._
2727
import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

30-
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
30+
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
3131
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3333
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
@@ -42,7 +42,7 @@ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Ut
4242
*/
4343
private[spark]
4444
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
45-
extends SchedulerBackend with Logging
45+
extends ExecutorAllocationClient with SchedulerBackend with Logging
4646
{
4747
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4848
var totalCoreCount = new AtomicInteger(0)
@@ -307,7 +307,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
307307
* Request an additional number of executors from the cluster manager.
308308
* Return whether the request is acknowledged.
309309
*/
310-
final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
310+
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
311311
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
312312
logDebug(s"Number of pending executors is now $numPendingExecutors")
313313
numPendingExecutors += numAdditionalExecutors
@@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
334334
* Request that the cluster manager kill the specified executors.
335335
* Return whether the kill request is acknowledged.
336336
*/
337-
final def killExecutors(executorIds: Seq[String]): Boolean = {
337+
final override def killExecutors(executorIds: Seq[String]): Boolean = {
338338
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
339339
val filteredExecutorIds = new ArrayBuffer[String]
340340
executorIds.foreach { id =>

0 commit comments

Comments
 (0)