Skip to content

Commit 07d720c

Browse files
committed
Synchronize Schedulable data structures
1 parent 52d9052 commit 07d720c

File tree

6 files changed

+36
-28
lines changed

6 files changed

+36
-28
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
package org.apache.spark
1919

20+
import scala.language.implicitConversions
21+
2022
import java.io._
2123
import java.net.URI
2224
import java.util.concurrent.atomic.AtomicInteger
2325
import java.util.{Properties, UUID}
2426
import java.util.UUID.randomUUID
2527
import scala.collection.{Map, Set}
28+
import scala.collection.JavaConverters._
2629
import scala.collection.generic.Growable
2730
import scala.collection.mutable.{ArrayBuffer, HashMap}
28-
import scala.language.implicitConversions
2931
import scala.reflect.{ClassTag, classTag}
3032
import org.apache.hadoop.conf.Configuration
3133
import org.apache.hadoop.fs.Path
@@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging {
836838
}
837839

838840
/**
839-
* Return pools for fair scheduler
840-
* TODO(xiajunluan): We should take nested pools into account
841+
* :: DeveloperApi ::
842+
* Return pools for fair scheduler
843+
* TODO(xiajunluan): We should take nested pools into account
841844
*/
842-
def getAllPools: ArrayBuffer[Schedulable] = {
843-
taskScheduler.rootPool.schedulableQueue
845+
@DeveloperApi
846+
def getAllPools: Array[Schedulable] = {
847+
taskScheduler.rootPool.schedulableQueue.asScala.toArray
844848
}
845849

846850
/**
851+
* :: DeveloperApi ::
847852
* Return the pool associated with the given name, if one exists
848853
*/
854+
@DeveloperApi
849855
def getPoolForName(pool: String): Option[Schedulable] = {
850-
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
856+
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
851857
}
852858

853859
/**

core/src/main/scala/org/apache/spark/scheduler/Pool.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
21+
22+
import scala.collection.JavaConverters._
2023
import scala.collection.mutable.ArrayBuffer
21-
import scala.collection.mutable.HashMap
2224

2325
import org.apache.spark.Logging
2426
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -35,18 +37,15 @@ private[spark] class Pool(
3537
extends Schedulable
3638
with Logging {
3739

38-
var schedulableQueue = new ArrayBuffer[Schedulable]
39-
var schedulableNameToSchedulable = new HashMap[String, Schedulable]
40-
40+
var schedulableQueue = new LinkedBlockingQueue[Schedulable]
41+
var schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
4142
var weight = initWeight
4243
var minShare = initMinShare
4344
var runningTasks = 0
44-
4545
var priority = 0
4646

4747
// A pool's stage id is used to break the tie in scheduling.
4848
var stageId = -1
49-
5049
var name = poolName
5150
var parent: Pool = null
5251

@@ -60,21 +59,21 @@ private[spark] class Pool(
6059
}
6160

6261
override def addSchedulable(schedulable: Schedulable) {
63-
schedulableQueue += schedulable
64-
schedulableNameToSchedulable(schedulable.name) = schedulable
62+
schedulableQueue.offer(schedulable)
63+
schedulableNameToSchedulable.put(schedulable.name, schedulable)
6564
schedulable.parent = this
6665
}
6766

6867
override def removeSchedulable(schedulable: Schedulable) {
69-
schedulableQueue -= schedulable
70-
schedulableNameToSchedulable -= schedulable.name
68+
schedulableQueue.remove(schedulable)
69+
schedulableNameToSchedulable.remove(schedulable.name)
7170
}
7271

7372
override def getSchedulableByName(schedulableName: String): Schedulable = {
7473
if (schedulableNameToSchedulable.contains(schedulableName)) {
75-
return schedulableNameToSchedulable(schedulableName)
74+
return schedulableNameToSchedulable.get(schedulableName)
7675
}
77-
for (schedulable <- schedulableQueue) {
76+
for (schedulable <- schedulableQueue.asScala) {
7877
val sched = schedulable.getSchedulableByName(schedulableName)
7978
if (sched != null) {
8079
return sched
@@ -84,22 +83,23 @@ private[spark] class Pool(
8483
}
8584

8685
override def executorLost(executorId: String, host: String) {
87-
schedulableQueue.foreach(_.executorLost(executorId, host))
86+
schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
8887
}
8988

9089
override def checkSpeculatableTasks(): Boolean = {
9190
var shouldRevive = false
92-
for (schedulable <- schedulableQueue) {
91+
for (schedulable <- schedulableQueue.asScala) {
9392
shouldRevive |= schedulable.checkSpeculatableTasks()
9493
}
9594
shouldRevive
9695
}
9796

98-
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
97+
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
9998
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
100-
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
99+
val sortedSchedulableQueue =
100+
schedulableQueue.asScala.toArray.sortWith(taskSetSchedulingAlgorithm.comparator)
101101
for (schedulable <- sortedSchedulableQueue) {
102-
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
102+
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
103103
}
104104
sortedTaskSetQueue
105105
}

core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.LinkedBlockingQueue
21+
2022
import scala.collection.mutable.ArrayBuffer
2123

2224
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -28,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2830
private[spark] trait Schedulable {
2931
var parent: Pool
3032
// child queues
31-
def schedulableQueue: ArrayBuffer[Schedulable]
33+
def schedulableQueue: LinkedBlockingQueue[Schedulable]
3234
def schedulingMode: SchedulingMode
3335
def weight: Int
3436
def minShare: Int
@@ -42,5 +44,5 @@ private[spark] trait Schedulable {
4244
def getSchedulableByName(name: String): Schedulable
4345
def executorLost(executorId: String, host: String): Unit
4446
def checkSpeculatableTasks(): Boolean
45-
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
47+
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
4648
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private[spark] class TaskSchedulerImpl(
222222
// Build a list of tasks to assign to each worker.
223223
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
224224
val availableCpus = shuffledOffers.map(o => o.cores).toArray
225-
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
225+
val sortedTaskSets = rootPool.getSortedTaskSetQueue
226226
for (taskSet <- sortedTaskSets) {
227227
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
228228
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
4949
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
5050

5151
// For now, pool information is only accessible in live UIs
52-
val pools = if (live) sc.getAllPools else Seq[Schedulable]()
52+
val pools = if (live) sc.getAllPools else Array[Schedulable]()
5353
val poolTable = new PoolTable(pools, parent)
5454

5555
val summary: NodeSeq =

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
117117
}
118118

119119
def resourceOffer(rootPool: Pool): Int = {
120-
val taskSetQueue = rootPool.getSortedTaskSetQueue()
120+
val taskSetQueue = rootPool.getSortedTaskSetQueue
121121
/* Just for Test*/
122122
for (manager <- taskSetQueue) {
123123
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(

0 commit comments

Comments
 (0)