From 8685ae983a76ef2946eccd538feca818f035a84c Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 2 Oct 2016 13:04:05 +0100 Subject: [PATCH 1/5] SchedulableBuilder should avoid to create duplicate fair scheduler-pools. --- .../fairscheduler-duplicate-pools.xml | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 core/src/test/resources/fairscheduler-duplicate-pools.xml diff --git a/core/src/test/resources/fairscheduler-duplicate-pools.xml b/core/src/test/resources/fairscheduler-duplicate-pools.xml new file mode 100644 index 0000000000000..b4ffb930758a1 --- /dev/null +++ b/core/src/test/resources/fairscheduler-duplicate-pools.xml @@ -0,0 +1,42 @@ + + + + + + + 0 + 1 + FIFO + + + 0 + 1 + FIFO + + + 1 + 1 + FAIR + + + 2 + 2 + FAIR + + + From 8dc728fe917fff2405c0ab5f90b7d655a23769e2 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Wed, 12 Oct 2016 13:14:16 +0300 Subject: [PATCH 2/5] FairSchedulableBuilder should avoid to create duplicate schedulable. --- .../org/apache/spark/scheduler/Pool.scala | 12 ++- .../apache/spark/scheduler/PoolSuite.scala | 97 +++++++++++++++++++ 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index f4b0ab10155a2..29a9c0459e969 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -61,8 +61,16 @@ private[spark] class Pool( override def addSchedulable(schedulable: Schedulable) { require(schedulable != null) - schedulableQueue.add(schedulable) - schedulableNameToSchedulable.put(schedulable.name, schedulable) + val previousSchedulable = schedulableNameToSchedulable.put(schedulable.name, schedulable) + if (previousSchedulable == null) { + schedulableQueue.add(schedulable) + } else { + logWarning(s"Duplicate Schedulable added: ${schedulable.name}") + schedulableQueue.remove(previousSchedulable) + if (!schedulableQueue.contains(schedulable)) { + schedulableQueue.add(schedulable) + } + } schedulable.parent = this } diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index b953add9d58cb..965cd76723693 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -336,7 +336,104 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } +<<<<<<< HEAD private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, +======= + test("FIFO Scheduler should not add duplicate TaskSetManager") { + sc = new SparkContext(LOCAL, APP_NAME) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) + val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager1, null) + + assert(rootPool.schedulableQueue.size == 2) + assert(rootPool.schedulableNameToSchedulable.size == 2) + + assert(rootPool.getSchedulableByName(taskSetManager0.name) == taskSetManager0) + assert(rootPool.getSchedulableByName(taskSetManager1.name) == taskSetManager1) + } + + test("Fair Scheduler should not create duplicate pool") { + sc = createSparkContext("fairscheduler-duplicate-pools.xml") + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + assert(rootPool.schedulableQueue.size == 2) + assert(rootPool.schedulableNameToSchedulable.size == 2) + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, SchedulingMode.FIFO) + verifyPool(rootPool, "duplicate_pool1", 2, 2, SchedulingMode.FAIR) + } + + test("Fair Scheduler should not add duplicate TaskSetManager via default pool") { + sc = new SparkContext(LOCAL, APP_NAME) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager1, null) + + rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) match { + case defaultPool: Pool => + assert(defaultPool.schedulableQueue.size == 2) + assert(defaultPool.schedulableNameToSchedulable.size == 2) + assert(defaultPool.getSchedulableByName(taskSetManager0.name) == taskSetManager0) + assert(defaultPool.getSchedulableByName(taskSetManager1.name) == taskSetManager1) + + case _ => fail("Default Pool can not be found for Fair Scheduler") + } + } + + test("Fair Scheduler should not add duplicate TaskSetManager via custom pool") { + val CUSTOM_POOL = "customPool" + sc = new SparkContext(LOCAL, APP_NAME) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, CUSTOM_POOL) + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, properties) + schedulableBuilder.addTaskSetManager(taskSetManager0, properties) + schedulableBuilder.addTaskSetManager(taskSetManager1, properties) + + rootPool.getSchedulableByName(CUSTOM_POOL) match { + case customPool: Pool => + assert(customPool.schedulableQueue.size == 2) + assert(customPool.schedulableNameToSchedulable.size == 2) + assert(customPool.getSchedulableByName(taskSetManager0.name) == taskSetManager0) + assert(customPool.getSchedulableByName(taskSetManager1.name) == taskSetManager1) + + case _ => fail(s"$CUSTOM_POOL Pool can not be found for Fair Scheduler") + } + } + + def createSparkContext(fileName: String): SparkContext = { + val xmlPath = getClass.getClassLoader.getResource(fileName).getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + new SparkContext(LOCAL, APP_NAME, conf) + } + + def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, +>>>>>>> FairSchedulableBuilder should avoid to create duplicate schedulable. expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) assert(selectedPool !== null) From 04e06fc61ff5624aacc2f6a4f191517a00adc830 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Wed, 29 Mar 2017 23:30:25 +0100 Subject: [PATCH 3/5] Review comments are addressed. --- .../org/apache/spark/scheduler/Pool.scala | 12 +-- .../apache/spark/scheduler/PoolSuite.scala | 97 ------------------- 2 files changed, 4 insertions(+), 105 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 29a9c0459e969..bf9ee161886d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -61,15 +61,11 @@ private[spark] class Pool( override def addSchedulable(schedulable: Schedulable) { require(schedulable != null) - val previousSchedulable = schedulableNameToSchedulable.put(schedulable.name, schedulable) - if (previousSchedulable == null) { - schedulableQueue.add(schedulable) + if (schedulableNameToSchedulable.containsKey(schedulable.name)) { + logWarning(s"Schedulable: ${schedulable.name} already exists so duplicate is not created.") } else { - logWarning(s"Duplicate Schedulable added: ${schedulable.name}") - schedulableQueue.remove(previousSchedulable) - if (!schedulableQueue.contains(schedulable)) { - schedulableQueue.add(schedulable) - } + schedulableQueue.add(schedulable) + schedulableNameToSchedulable.put(schedulable.name, schedulable) } schedulable.parent = this } diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 965cd76723693..b953add9d58cb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -336,104 +336,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } -<<<<<<< HEAD private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, -======= - test("FIFO Scheduler should not add duplicate TaskSetManager") { - sc = new SparkContext(LOCAL, APP_NAME) - val taskScheduler = new TaskSchedulerImpl(sc) - - val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) - val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) - - val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) - val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager1, null) - - assert(rootPool.schedulableQueue.size == 2) - assert(rootPool.schedulableNameToSchedulable.size == 2) - - assert(rootPool.getSchedulableByName(taskSetManager0.name) == taskSetManager0) - assert(rootPool.getSchedulableByName(taskSetManager1.name) == taskSetManager1) - } - - test("Fair Scheduler should not create duplicate pool") { - sc = createSparkContext("fairscheduler-duplicate-pools.xml") - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - assert(rootPool.schedulableQueue.size == 2) - assert(rootPool.schedulableNameToSchedulable.size == 2) - - verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, SchedulingMode.FIFO) - verifyPool(rootPool, "duplicate_pool1", 2, 2, SchedulingMode.FAIR) - } - - test("Fair Scheduler should not add duplicate TaskSetManager via default pool") { - sc = new SparkContext(LOCAL, APP_NAME) - val taskScheduler = new TaskSchedulerImpl(sc) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) - val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager1, null) - - rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) match { - case defaultPool: Pool => - assert(defaultPool.schedulableQueue.size == 2) - assert(defaultPool.schedulableNameToSchedulable.size == 2) - assert(defaultPool.getSchedulableByName(taskSetManager0.name) == taskSetManager0) - assert(defaultPool.getSchedulableByName(taskSetManager1.name) == taskSetManager1) - - case _ => fail("Default Pool can not be found for Fair Scheduler") - } - } - - test("Fair Scheduler should not add duplicate TaskSetManager via custom pool") { - val CUSTOM_POOL = "customPool" - sc = new SparkContext(LOCAL, APP_NAME) - val taskScheduler = new TaskSchedulerImpl(sc) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - val properties = new Properties() - properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, CUSTOM_POOL) - val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) - val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, properties) - schedulableBuilder.addTaskSetManager(taskSetManager0, properties) - schedulableBuilder.addTaskSetManager(taskSetManager1, properties) - - rootPool.getSchedulableByName(CUSTOM_POOL) match { - case customPool: Pool => - assert(customPool.schedulableQueue.size == 2) - assert(customPool.schedulableNameToSchedulable.size == 2) - assert(customPool.getSchedulableByName(taskSetManager0.name) == taskSetManager0) - assert(customPool.getSchedulableByName(taskSetManager1.name) == taskSetManager1) - - case _ => fail(s"$CUSTOM_POOL Pool can not be found for Fair Scheduler") - } - } - - def createSparkContext(fileName: String): SparkContext = { - val xmlPath = getClass.getClassLoader.getResource(fileName).getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) - new SparkContext(LOCAL, APP_NAME, conf) - } - - def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, ->>>>>>> FairSchedulableBuilder should avoid to create duplicate schedulable. expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) assert(selectedPool !== null) From 7db682544e3b0e6a11587d7fbfdfeb1d7bc5a5f4 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Wed, 7 Jun 2017 23:02:01 -0700 Subject: [PATCH 4/5] Review comments are addressed. --- .../apache/spark/scheduler/PoolSuite.scala | 98 ++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index b953add9d58cb..741882f2022e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -336,7 +336,103 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + test("FIFO Scheduler should not add duplicate TaskSetManager") { + sc = new SparkContext(LOCAL, APP_NAME) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) + val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager1, null) + + assert(rootPool.schedulableQueue.size === 2) + assert(rootPool.schedulableNameToSchedulable.size === 2) + + assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("Fair Scheduler should not create duplicate pool") { + // Load the scheduler pools from fairscheduler-duplicate-pools, which has 4 entries, + // but two are duplicates, and make sure that the duplicates are ignored. + sc = createSparkContext("fairscheduler-duplicate-pools.xml") + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + assert(rootPool.schedulableQueue.size === 2) + assert(rootPool.schedulableNameToSchedulable.size === 2) + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, SchedulingMode.FIFO) + // The first pool specified is used if duplicate pools exist + verifyPool(rootPool, "duplicate_pool1", 1, 1, SchedulingMode.FAIR) + } + + test("Fair Scheduler should not add duplicate TaskSetManager via default pool") { + sc = new SparkContext(LOCAL, APP_NAME) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager1, null) + + rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) match { + case defaultPool: Pool => + assert(defaultPool.schedulableQueue.size === 2) + assert(defaultPool.schedulableNameToSchedulable.size === 2) + assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + + case null => fail("Default Pool can not be found for Fair Scheduler") + } + } + + test("Fair Scheduler should not add duplicate TaskSetManager via custom pool") { + val CUSTOM_POOL = "customPool" + sc = new SparkContext(LOCAL, APP_NAME) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, CUSTOM_POOL) + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, properties) + schedulableBuilder.addTaskSetManager(taskSetManager0, properties) + schedulableBuilder.addTaskSetManager(taskSetManager1, properties) + + rootPool.getSchedulableByName(CUSTOM_POOL) match { + case customPool: Pool => + assert(customPool.schedulableQueue.size === 2) + assert(customPool.schedulableNameToSchedulable.size === 2) + assert(customPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + assert(customPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + + case null => fail(s"$CUSTOM_POOL Pool can not be found for Fair Scheduler") + } + } + + def createSparkContext(fileName: String): SparkContext = { + val xmlPath = getClass.getClassLoader.getResource(fileName).getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) + new SparkContext(LOCAL, APP_NAME, conf) + } + + def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) assert(selectedPool !== null) From 5d44ed81f773fa7ddb030e996010246571ba389e Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Sat, 1 Jun 2019 11:00:34 +0100 Subject: [PATCH 5/5] Review comments are addressed and merge conflict has been fixed. --- .../apache/spark/scheduler/PoolSuite.scala | 85 +------------------ 1 file changed, 4 insertions(+), 81 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 741882f2022e6..9cb8acfbd2725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -79,9 +79,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { * algorithm properly orders the two scheduling pools. */ test("Fair Scheduler Test") { - val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) - sc = new SparkContext(LOCAL, APP_NAME, conf) + sc = createSparkContext("fairscheduler.xml") val taskScheduler = new TaskSchedulerImpl(sc) val rootPool = new Pool("", FAIR, 0, 0) @@ -295,9 +293,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { test("Fair Scheduler should build fair scheduler when " + "valid spark.scheduler.allocation.file property is set") { - val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) - sc = new SparkContext(LOCAL, APP_NAME, conf) + sc = createSparkContext("fairscheduler-with-valid-data.xml") val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) @@ -336,26 +332,6 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - test("FIFO Scheduler should not add duplicate TaskSetManager") { - sc = new SparkContext(LOCAL, APP_NAME) - val taskScheduler = new TaskSchedulerImpl(sc) - - val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) - val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) - - val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) - val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager1, null) - - assert(rootPool.schedulableQueue.size === 2) - assert(rootPool.schedulableNameToSchedulable.size === 2) - - assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) - assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) - } - test("Fair Scheduler should not create duplicate pool") { // Load the scheduler pools from fairscheduler-duplicate-pools, which has 4 entries, // but two are duplicates, and make sure that the duplicates are ignored. @@ -373,66 +349,13 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "duplicate_pool1", 1, 1, SchedulingMode.FAIR) } - test("Fair Scheduler should not add duplicate TaskSetManager via default pool") { - sc = new SparkContext(LOCAL, APP_NAME) - val taskScheduler = new TaskSchedulerImpl(sc) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) - val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager1, null) - - rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) match { - case defaultPool: Pool => - assert(defaultPool.schedulableQueue.size === 2) - assert(defaultPool.schedulableNameToSchedulable.size === 2) - assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) - assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) - - case null => fail("Default Pool can not be found for Fair Scheduler") - } - } - - test("Fair Scheduler should not add duplicate TaskSetManager via custom pool") { - val CUSTOM_POOL = "customPool" - sc = new SparkContext(LOCAL, APP_NAME) - val taskScheduler = new TaskSchedulerImpl(sc) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - val properties = new Properties() - properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, CUSTOM_POOL) - val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) - val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, properties) - schedulableBuilder.addTaskSetManager(taskSetManager0, properties) - schedulableBuilder.addTaskSetManager(taskSetManager1, properties) - - rootPool.getSchedulableByName(CUSTOM_POOL) match { - case customPool: Pool => - assert(customPool.schedulableQueue.size === 2) - assert(customPool.schedulableNameToSchedulable.size === 2) - assert(customPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) - assert(customPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) - - case null => fail(s"$CUSTOM_POOL Pool can not be found for Fair Scheduler") - } - } - - def createSparkContext(fileName: String): SparkContext = { + private def createSparkContext(fileName: String): SparkContext = { val xmlPath = getClass.getClassLoader.getResource(fileName).getFile() val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) new SparkContext(LOCAL, APP_NAME, conf) } - def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) assert(selectedPool !== null)