Skip to content

Commit 69f7502

Browse files
andrewor14aarondav
authored andcommitted
[SPARK-1769] Executor loss causes NPE race condition
This PR replaces the Schedulable data structures in Pool.scala with thread-safe ones from java. Note that Scala's `with SynchronizedBuffer` trait is soon to be deprecated in 2.11 because it is ["inherently unreliable"](http://www.scala-lang.org/api/2.11.0/index.html#scala.collection.mutable.SynchronizedBuffer). We should slowly drift away from `SynchronizedBuffer` in other places too. Note that this PR introduces an API-breaking change; `sc.getAllPools` now returns an Array rather than an ArrayBuffer. This is because we want this method to return an immutable copy rather than one may potentially confuse the user if they try to modify the copy, which takes no effect on the original data structure. Author: Andrew Or <[email protected]> Closes #762 from andrewor14/pool-npe and squashes the following commits: 383e739 [Andrew Or] JavaConverters -> JavaConversions 3f32981 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pool-npe 769be19 [Andrew Or] Assorted minor changes 2189247 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pool-npe 05ad9e9 [Andrew Or] Fix test - contains is not the same as containsKey 0921ea0 [Andrew Or] var -> val 07d720c [Andrew Or] Synchronize Schedulable data structures
1 parent 54ae832 commit 69f7502

File tree

5 files changed

+35
-26
lines changed

5 files changed

+35
-26
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
package org.apache.spark
1919

20+
import scala.language.implicitConversions
21+
2022
import java.io._
2123
import java.net.URI
2224
import java.util.concurrent.atomic.AtomicInteger
2325
import java.util.{Properties, UUID}
2426
import java.util.UUID.randomUUID
2527
import scala.collection.{Map, Set}
28+
import scala.collection.JavaConversions._
2629
import scala.collection.generic.Growable
27-
import scala.collection.mutable.{ArrayBuffer, HashMap}
28-
import scala.language.implicitConversions
30+
import scala.collection.mutable.HashMap
2931
import scala.reflect.{ClassTag, classTag}
3032
import org.apache.hadoop.conf.Configuration
3133
import org.apache.hadoop.fs.Path
@@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging {
836838
}
837839

838840
/**
839-
* Return pools for fair scheduler
840-
* TODO(xiajunluan): We should take nested pools into account
841+
* :: DeveloperApi ::
842+
* Return pools for fair scheduler
841843
*/
842-
def getAllPools: ArrayBuffer[Schedulable] = {
843-
taskScheduler.rootPool.schedulableQueue
844+
@DeveloperApi
845+
def getAllPools: Seq[Schedulable] = {
846+
// TODO(xiajunluan): We should take nested pools into account
847+
taskScheduler.rootPool.schedulableQueue.toSeq
844848
}
845849

846850
/**
851+
* :: DeveloperApi ::
847852
* Return the pool associated with the given name, if one exists
848853
*/
854+
@DeveloperApi
849855
def getPoolForName(pool: String): Option[Schedulable] = {
850-
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
856+
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
851857
}
852858

853859
/**

core/src/main/scala/org/apache/spark/scheduler/Pool.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
21+
22+
import scala.collection.JavaConversions._
2023
import scala.collection.mutable.ArrayBuffer
21-
import scala.collection.mutable.HashMap
2224

2325
import org.apache.spark.Logging
2426
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -35,18 +37,15 @@ private[spark] class Pool(
3537
extends Schedulable
3638
with Logging {
3739

38-
var schedulableQueue = new ArrayBuffer[Schedulable]
39-
var schedulableNameToSchedulable = new HashMap[String, Schedulable]
40-
40+
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
41+
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
4142
var weight = initWeight
4243
var minShare = initMinShare
4344
var runningTasks = 0
44-
4545
var priority = 0
4646

4747
// A pool's stage id is used to break the tie in scheduling.
4848
var stageId = -1
49-
5049
var name = poolName
5150
var parent: Pool = null
5251

@@ -60,19 +59,20 @@ private[spark] class Pool(
6059
}
6160

6261
override def addSchedulable(schedulable: Schedulable) {
63-
schedulableQueue += schedulable
64-
schedulableNameToSchedulable(schedulable.name) = schedulable
62+
require(schedulable != null)
63+
schedulableQueue.add(schedulable)
64+
schedulableNameToSchedulable.put(schedulable.name, schedulable)
6565
schedulable.parent = this
6666
}
6767

6868
override def removeSchedulable(schedulable: Schedulable) {
69-
schedulableQueue -= schedulable
70-
schedulableNameToSchedulable -= schedulable.name
69+
schedulableQueue.remove(schedulable)
70+
schedulableNameToSchedulable.remove(schedulable.name)
7171
}
7272

7373
override def getSchedulableByName(schedulableName: String): Schedulable = {
74-
if (schedulableNameToSchedulable.contains(schedulableName)) {
75-
return schedulableNameToSchedulable(schedulableName)
74+
if (schedulableNameToSchedulable.containsKey(schedulableName)) {
75+
return schedulableNameToSchedulable.get(schedulableName)
7676
}
7777
for (schedulable <- schedulableQueue) {
7878
val sched = schedulable.getSchedulableByName(schedulableName)
@@ -95,11 +95,12 @@ private[spark] class Pool(
9595
shouldRevive
9696
}
9797

98-
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
98+
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
9999
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
100-
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
100+
val sortedSchedulableQueue =
101+
schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
101102
for (schedulable <- sortedSchedulableQueue) {
102-
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
103+
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
103104
}
104105
sortedTaskSetQueue
105106
}

core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.ConcurrentLinkedQueue
21+
2022
import scala.collection.mutable.ArrayBuffer
2123

2224
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -28,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2830
private[spark] trait Schedulable {
2931
var parent: Pool
3032
// child queues
31-
def schedulableQueue: ArrayBuffer[Schedulable]
33+
def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
3234
def schedulingMode: SchedulingMode
3335
def weight: Int
3436
def minShare: Int
@@ -42,5 +44,5 @@ private[spark] trait Schedulable {
4244
def getSchedulableByName(name: String): Schedulable
4345
def executorLost(executorId: String, host: String): Unit
4446
def checkSpeculatableTasks(): Boolean
45-
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
47+
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
4648
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private[spark] class TaskSchedulerImpl(
222222
// Build a list of tasks to assign to each worker.
223223
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
224224
val availableCpus = shuffledOffers.map(o => o.cores).toArray
225-
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
225+
val sortedTaskSets = rootPool.getSortedTaskSetQueue
226226
for (taskSet <- sortedTaskSets) {
227227
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
228228
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
117117
}
118118

119119
def resourceOffer(rootPool: Pool): Int = {
120-
val taskSetQueue = rootPool.getSortedTaskSetQueue()
120+
val taskSetQueue = rootPool.getSortedTaskSetQueue
121121
/* Just for Test*/
122122
for (manager <- taskSetQueue) {
123123
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(

0 commit comments

Comments
 (0)