diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index f4b0ab10155a2..bf9ee161886d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -61,8 +61,12 @@ private[spark] class Pool( override def addSchedulable(schedulable: Schedulable) { require(schedulable != null) - schedulableQueue.add(schedulable) - schedulableNameToSchedulable.put(schedulable.name, schedulable) + if (schedulableNameToSchedulable.containsKey(schedulable.name)) { + logWarning(s"Schedulable: ${schedulable.name} already exists so duplicate is not created.") + } else { + schedulableQueue.add(schedulable) + schedulableNameToSchedulable.put(schedulable.name, schedulable) + } schedulable.parent = this } diff --git a/core/src/test/resources/fairscheduler-duplicate-pools.xml b/core/src/test/resources/fairscheduler-duplicate-pools.xml new file mode 100644 index 0000000000000..b4ffb930758a1 --- /dev/null +++ b/core/src/test/resources/fairscheduler-duplicate-pools.xml @@ -0,0 +1,42 @@ + + + + + + + 0 + 1 + FIFO + + + 0 + 1 + FIFO + + + 1 + 1 + FAIR + + + 2 + 2 + FAIR + + + diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index b953add9d58cb..9cb8acfbd2725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -79,9 +79,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { * algorithm properly orders the two scheduling pools. */ test("Fair Scheduler Test") { - val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) - sc = new SparkContext(LOCAL, APP_NAME, conf) + sc = createSparkContext("fairscheduler.xml") val taskScheduler = new TaskSchedulerImpl(sc) val rootPool = new Pool("", FAIR, 0, 0) @@ -295,9 +293,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { test("Fair Scheduler should build fair scheduler when " + "valid spark.scheduler.allocation.file property is set") { - val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) - sc = new SparkContext(LOCAL, APP_NAME, conf) + sc = createSparkContext("fairscheduler-with-valid-data.xml") val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) @@ -336,6 +332,29 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } + test("Fair Scheduler should not create duplicate pool") { + // Load the scheduler pools from fairscheduler-duplicate-pools, which has 4 entries, + // but two are duplicates, and make sure that the duplicates are ignored. + sc = createSparkContext("fairscheduler-duplicate-pools.xml") + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + assert(rootPool.schedulableQueue.size === 2) + assert(rootPool.schedulableNameToSchedulable.size === 2) + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, SchedulingMode.FIFO) + // The first pool specified is used if duplicate pools exist + verifyPool(rootPool, "duplicate_pool1", 1, 1, SchedulingMode.FAIR) + } + + private def createSparkContext(fileName: String): SparkContext = { + val xmlPath = getClass.getClassLoader.getResource(fileName).getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) + new SparkContext(LOCAL, APP_NAME, conf) + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName)