From 07d720c4b3883c397cc903ea61d3fc871e883429 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 13 May 2014 16:16:21 -0700 Subject: [PATCH 1/5] Synchronize Schedulable data structures --- .../scala/org/apache/spark/SparkContext.scala | 18 ++++++---- .../org/apache/spark/scheduler/Pool.scala | 34 +++++++++---------- .../apache/spark/scheduler/Schedulable.scala | 6 ++-- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/ui/jobs/JobProgressPage.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 2 +- 6 files changed, 36 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c43b4fd6d926..0f43eb5dcbe7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -17,15 +17,17 @@ package org.apache.spark +import scala.language.implicitConversions + import java.io._ import java.net.URI import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} +import scala.collection.JavaConverters._ import scala.collection.generic.Growable import scala.collection.mutable.{ArrayBuffer, HashMap} -import scala.language.implicitConversions import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Return pools for fair scheduler - * TODO(xiajunluan): We should take nested pools into account + * :: DeveloperApi :: + * Return pools for fair scheduler + * TODO(xiajunluan): We should take nested pools into account */ - def getAllPools: ArrayBuffer[Schedulable] = { - taskScheduler.rootPool.schedulableQueue + @DeveloperApi + def getAllPools: Array[Schedulable] = { + taskScheduler.rootPool.schedulableQueue.asScala.toArray } /** + * :: DeveloperApi :: * Return the pool associated with the given name, if one exists */ + @DeveloperApi def getPoolForName(pool: String): Option[Schedulable] = { - taskScheduler.rootPool.schedulableNameToSchedulable.get(pool) + Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 187672c4e19e..ed3fc7655664 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -17,8 +17,10 @@ package org.apache.spark.scheduler +import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} + +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import org.apache.spark.Logging import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -35,18 +37,15 @@ private[spark] class Pool( extends Schedulable with Logging { - var schedulableQueue = new ArrayBuffer[Schedulable] - var schedulableNameToSchedulable = new HashMap[String, Schedulable] - + var schedulableQueue = new LinkedBlockingQueue[Schedulable] + var schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] var weight = initWeight var minShare = initMinShare var runningTasks = 0 - var priority = 0 // A pool's stage id is used to break the tie in scheduling. var stageId = -1 - var name = poolName var parent: Pool = null @@ -60,21 +59,21 @@ private[spark] class Pool( } override def addSchedulable(schedulable: Schedulable) { - schedulableQueue += schedulable - schedulableNameToSchedulable(schedulable.name) = schedulable + schedulableQueue.offer(schedulable) + schedulableNameToSchedulable.put(schedulable.name, schedulable) schedulable.parent = this } override def removeSchedulable(schedulable: Schedulable) { - schedulableQueue -= schedulable - schedulableNameToSchedulable -= schedulable.name + schedulableQueue.remove(schedulable) + schedulableNameToSchedulable.remove(schedulable.name) } override def getSchedulableByName(schedulableName: String): Schedulable = { if (schedulableNameToSchedulable.contains(schedulableName)) { - return schedulableNameToSchedulable(schedulableName) + return schedulableNameToSchedulable.get(schedulableName) } - for (schedulable <- schedulableQueue) { + for (schedulable <- schedulableQueue.asScala) { val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched @@ -84,22 +83,23 @@ private[spark] class Pool( } override def executorLost(executorId: String, host: String) { - schedulableQueue.foreach(_.executorLost(executorId, host)) + schedulableQueue.asScala.foreach(_.executorLost(executorId, host)) } override def checkSpeculatableTasks(): Boolean = { var shouldRevive = false - for (schedulable <- schedulableQueue) { + for (schedulable <- schedulableQueue.asScala) { shouldRevive |= schedulable.checkSpeculatableTasks() } shouldRevive } - override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { + override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] - val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) + val sortedSchedulableQueue = + schedulableQueue.asScala.toArray.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { - sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() + sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } sortedTaskSetQueue } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index ed24eb6a549d..1b4198dba659 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.LinkedBlockingQueue + import scala.collection.mutable.ArrayBuffer import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -28,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode private[spark] trait Schedulable { var parent: Pool // child queues - def schedulableQueue: ArrayBuffer[Schedulable] + def schedulableQueue: LinkedBlockingQueue[Schedulable] def schedulingMode: SchedulingMode def weight: Int def minShare: Int @@ -42,5 +44,5 @@ private[spark] trait Schedulable { def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String): Unit def checkSpeculatableTasks(): Boolean - def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] + def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5a68f38bc584..ffd1d9432682 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -222,7 +222,7 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray - val sortedTaskSets = rootPool.getSortedTaskSetQueue() + val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 0da62892118d..588d25fe8bff 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -49,7 +49,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent) // For now, pool information is only accessible in live UIs - val pools = if (live) sc.getAllPools else Seq[Schedulable]() + val pools = if (live) sc.getAllPools else Array[Schedulable]() val poolTable = new PoolTable(pools, parent) val summary: NodeSeq = diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a8b605c5b212..7532da88c606 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -117,7 +117,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin } def resourceOffer(rootPool: Pool): Int = { - val taskSetQueue = rootPool.getSortedTaskSetQueue() + val taskSetQueue = rootPool.getSortedTaskSetQueue /* Just for Test*/ for (manager <- taskSetQueue) { logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format( From 0921ea02327f53b53d63a057f86f7bf46d80f74d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 13 May 2014 16:38:35 -0700 Subject: [PATCH 2/5] var -> val --- core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index ed3fc7655664..643039985fa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -37,8 +37,8 @@ private[spark] class Pool( extends Schedulable with Logging { - var schedulableQueue = new LinkedBlockingQueue[Schedulable] - var schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] + val schedulableQueue = new LinkedBlockingQueue[Schedulable] + val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] var weight = initWeight var minShare = initMinShare var runningTasks = 0 From 05ad9e9cd1028d2d074c7220aa891817776c609f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 13 May 2014 18:13:28 -0700 Subject: [PATCH 3/5] Fix test - contains is not the same as containsKey --- core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 643039985fa4..3583f6a16a63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -70,7 +70,7 @@ private[spark] class Pool( } override def getSchedulableByName(schedulableName: String): Schedulable = { - if (schedulableNameToSchedulable.contains(schedulableName)) { + if (schedulableNameToSchedulable.containsKey(schedulableName)) { return schedulableNameToSchedulable.get(schedulableName) } for (schedulable <- schedulableQueue.asScala) { From 769be19195e92f21403a85763e9621ddeb6e5eb3 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 13 May 2014 23:05:44 -0700 Subject: [PATCH 4/5] Assorted minor changes --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/scheduler/Pool.scala | 17 +++++++++-------- .../apache/spark/scheduler/Schedulable.scala | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0f43eb5dcbe7..16d83c38b342 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -840,10 +840,10 @@ class SparkContext(config: SparkConf) extends Logging { /** * :: DeveloperApi :: * Return pools for fair scheduler - * TODO(xiajunluan): We should take nested pools into account */ @DeveloperApi def getAllPools: Array[Schedulable] = { + // TODO(xiajunluan): We should take nested pools into account taskScheduler.rootPool.schedulableQueue.asScala.toArray } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 3583f6a16a63..174b73221afc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -17,9 +17,9 @@ package org.apache.spark.scheduler -import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} -import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging @@ -37,7 +37,7 @@ private[spark] class Pool( extends Schedulable with Logging { - val schedulableQueue = new LinkedBlockingQueue[Schedulable] + val schedulableQueue = new ConcurrentLinkedQueue[Schedulable] val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] var weight = initWeight var minShare = initMinShare @@ -59,7 +59,8 @@ private[spark] class Pool( } override def addSchedulable(schedulable: Schedulable) { - schedulableQueue.offer(schedulable) + require(schedulable != null) + schedulableQueue.add(schedulable) schedulableNameToSchedulable.put(schedulable.name, schedulable) schedulable.parent = this } @@ -73,7 +74,7 @@ private[spark] class Pool( if (schedulableNameToSchedulable.containsKey(schedulableName)) { return schedulableNameToSchedulable.get(schedulableName) } - for (schedulable <- schedulableQueue.asScala) { + for (schedulable <- schedulableQueue) { val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched @@ -83,12 +84,12 @@ private[spark] class Pool( } override def executorLost(executorId: String, host: String) { - schedulableQueue.asScala.foreach(_.executorLost(executorId, host)) + schedulableQueue.foreach(_.executorLost(executorId, host)) } override def checkSpeculatableTasks(): Boolean = { var shouldRevive = false - for (schedulable <- schedulableQueue.asScala) { + for (schedulable <- schedulableQueue) { shouldRevive |= schedulable.checkSpeculatableTasks() } shouldRevive @@ -97,7 +98,7 @@ private[spark] class Pool( override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = - schedulableQueue.asScala.toArray.sortWith(taskSetSchedulingAlgorithm.comparator) + schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index 1b4198dba659..a87ef030e69c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable.ArrayBuffer @@ -30,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode private[spark] trait Schedulable { var parent: Pool // child queues - def schedulableQueue: LinkedBlockingQueue[Schedulable] + def schedulableQueue: ConcurrentLinkedQueue[Schedulable] def schedulingMode: SchedulingMode def weight: Int def minShare: Int From 383e73987205f193a2a11a610f462c7cfeacb960 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 14 May 2014 00:10:43 -0700 Subject: [PATCH 5/5] JavaConverters -> JavaConversions --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++---- .../scala/org/apache/spark/ui/jobs/JobProgressPage.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 16d83c38b342..032b3d744c61 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -25,9 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} -import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.collection.generic.Growable -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -842,9 +842,9 @@ class SparkContext(config: SparkConf) extends Logging { * Return pools for fair scheduler */ @DeveloperApi - def getAllPools: Array[Schedulable] = { + def getAllPools: Seq[Schedulable] = { // TODO(xiajunluan): We should take nested pools into account - taskScheduler.rootPool.schedulableQueue.asScala.toArray + taskScheduler.rootPool.schedulableQueue.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 588d25fe8bff..0da62892118d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -49,7 +49,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent) // For now, pool information is only accessible in live UIs - val pools = if (live) sc.getAllPools else Array[Schedulable]() + val pools = if (live) sc.getAllPools else Seq[Schedulable]() val poolTable = new PoolTable(pools, parent) val summary: NodeSeq =