1717
1818package org .apache .spark .scheduler
1919
20- import java .util .concurrent .{ConcurrentHashMap , LinkedBlockingQueue }
20+ import java .util .concurrent .{ConcurrentHashMap , ConcurrentLinkedQueue }
2121
22- import scala .collection .JavaConverters ._
22+ import scala .collection .JavaConversions ._
2323import scala .collection .mutable .ArrayBuffer
2424
2525import org .apache .spark .Logging
@@ -37,7 +37,7 @@ private[spark] class Pool(
3737 extends Schedulable
3838 with Logging {
3939
40- val schedulableQueue = new LinkedBlockingQueue [Schedulable ]
40+ val schedulableQueue = new ConcurrentLinkedQueue [Schedulable ]
4141 val schedulableNameToSchedulable = new ConcurrentHashMap [String , Schedulable ]
4242 var weight = initWeight
4343 var minShare = initMinShare
@@ -59,7 +59,8 @@ private[spark] class Pool(
5959 }
6060
6161 override def addSchedulable (schedulable : Schedulable ) {
62- schedulableQueue.offer(schedulable)
62+ require(schedulable != null )
63+ schedulableQueue.add(schedulable)
6364 schedulableNameToSchedulable.put(schedulable.name, schedulable)
6465 schedulable.parent = this
6566 }
@@ -73,7 +74,7 @@ private[spark] class Pool(
7374 if (schedulableNameToSchedulable.containsKey(schedulableName)) {
7475 return schedulableNameToSchedulable.get(schedulableName)
7576 }
76- for (schedulable <- schedulableQueue.asScala ) {
77+ for (schedulable <- schedulableQueue) {
7778 val sched = schedulable.getSchedulableByName(schedulableName)
7879 if (sched != null ) {
7980 return sched
@@ -83,12 +84,12 @@ private[spark] class Pool(
8384 }
8485
8586 override def executorLost (executorId : String , host : String ) {
86- schedulableQueue.asScala. foreach(_.executorLost(executorId, host))
87+ schedulableQueue.foreach(_.executorLost(executorId, host))
8788 }
8889
8990 override def checkSpeculatableTasks (): Boolean = {
9091 var shouldRevive = false
91- for (schedulable <- schedulableQueue.asScala ) {
92+ for (schedulable <- schedulableQueue) {
9293 shouldRevive |= schedulable.checkSpeculatableTasks()
9394 }
9495 shouldRevive
@@ -97,7 +98,7 @@ private[spark] class Pool(
9798 override def getSortedTaskSetQueue : ArrayBuffer [TaskSetManager ] = {
9899 var sortedTaskSetQueue = new ArrayBuffer [TaskSetManager ]
99100 val sortedSchedulableQueue =
100- schedulableQueue.asScala.toArray .sortWith(taskSetSchedulingAlgorithm.comparator)
101+ schedulableQueue.toSeq .sortWith(taskSetSchedulingAlgorithm.comparator)
101102 for (schedulable <- sortedSchedulableQueue) {
102103 sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
103104 }
0 commit comments