Skip to content

Commit 414f99f

Browse files
author
José Hiram Soltren
committed
BlacklistTracker takes a SchedulerBackend as input
1 parent 6032b9a commit 414f99f

File tree

5 files changed

+25
-16
lines changed

5 files changed

+25
-16
lines changed

core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import java.util.concurrent.atomic.AtomicReference
2121

2222
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2323

24-
import org.apache.spark.{SparkConf, SparkContext}
24+
import org.apache.spark.SparkConf
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.internal.config
27+
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
2728
import org.apache.spark.util.{Clock, SystemClock, Utils}
2829

2930
/**
@@ -50,10 +51,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
5051
private[scheduler] class BlacklistTracker (
5152
private val listenerBus: LiveListenerBus,
5253
conf: SparkConf,
54+
scheduler: Option[CoarseGrainedSchedulerBackend],
5355
clock: Clock = new SystemClock()) extends Logging {
5456

55-
def this(sc: SparkContext) = {
56-
this(sc.listenerBus, sc.getConf)
57+
def this(sched: CoarseGrainedSchedulerBackend) = {
58+
this(sched.listenerBus, sched.conf, Some(sched))
5759
}
5860

5961
BlacklistTracker.validateBlacklistConfs(conf)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging
3232
import org.apache.spark.internal.config
3333
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3434
import org.apache.spark.scheduler.TaskLocality.TaskLocality
35+
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3536
import org.apache.spark.scheduler.local.LocalSchedulerBackend
3637
import org.apache.spark.storage.BlockManagerId
3738
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
@@ -54,7 +55,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
5455
private[spark] class TaskSchedulerImpl private[scheduler](
5556
val sc: SparkContext,
5657
val maxTaskFailures: Int,
57-
blacklistTrackerOpt: Option[BlacklistTracker],
58+
var blacklistTrackerOpt: Option[BlacklistTracker],
5859
isLocal: Boolean = false)
5960
extends TaskScheduler with Logging
6061
{
@@ -63,14 +64,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
6364
this(
6465
sc,
6566
sc.conf.get(config.MAX_TASK_FAILURES),
66-
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
67+
null)
6768
}
6869

6970
def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
7071
this(
7172
sc,
7273
maxTaskFailures,
73-
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
74+
null,
7475
isLocal = isLocal)
7576
}
7677

@@ -148,6 +149,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](
148149

149150
def initialize(backend: SchedulerBackend) {
150151
this.backend = backend
152+
blacklistTrackerOpt = TaskSchedulerImpl.maybeCreateBlacklistTracker(backend)
151153
// temporarily set rootPool name to empty
152154
rootPool = new Pool("", schedulingMode, 0, 0)
153155
schedulableBuilder = {
@@ -701,11 +703,16 @@ private[spark] object TaskSchedulerImpl {
701703
retval.toList
702704
}
703705

704-
private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
705-
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
706-
Some(new BlacklistTracker(sc))
707-
} else {
708-
None
706+
private def maybeCreateBlacklistTracker(sched: SchedulerBackend): Option[BlacklistTracker] = {
707+
sched match {
708+
case backend: CoarseGrainedSchedulerBackend =>
709+
if (BlacklistTracker.isBlacklistEnabled(backend.conf)) {
710+
Some(new BlacklistTracker(backend))
711+
} else {
712+
None
713+
}
714+
case _ =>
715+
None
709716
}
710717
}
711718

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
4949
protected val totalCoreCount = new AtomicInteger(0)
5050
// Total number of executors that are currently registered
5151
protected val totalRegisteredExecutors = new AtomicInteger(0)
52-
protected val conf = scheduler.sc.conf
52+
val conf = scheduler.sc.conf
5353
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
5454
private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
5555
// Submit tasks only after (registered resources / total expected resources)
@@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
7373
@GuardedBy("CoarseGrainedSchedulerBackend.this")
7474
private var numPendingExecutors = 0
7575

76-
private val listenerBus = scheduler.sc.listenerBus
76+
val listenerBus = scheduler.sc.listenerBus
7777

7878
// Executors we have requested the cluster manager to kill that have not died yet; maps
7979
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't

core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
4141
scheduler = mockTaskSchedWithConf(conf)
4242

4343
clock.setTime(0)
44-
blacklist = new BlacklistTracker(null, conf, clock)
44+
blacklist = new BlacklistTracker(null, conf, null, clock)
4545
}
4646

4747
override def afterEach(): Unit = {
@@ -90,7 +90,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
9090

9191
clock.setTime(0)
9292
listenerBusMock = mock[LiveListenerBus]
93-
blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
93+
blacklist = new BlacklistTracker(listenerBusMock, conf, null, clock)
9494
}
9595

9696
test("Blacklisting individual tasks and checking for SparkListenerEvents") {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
426426
val clock = new ManualClock
427427
// We don't directly use the application blacklist, but its presence triggers blacklisting
428428
// within the taskset.
429-
val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock))
429+
val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, null, clock))
430430
val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
431431

432432
{

0 commit comments

Comments
 (0)