From 8ace686dd0dd4638d71a3525aeea43b4d1f68b23 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 25 Sep 2016 22:42:20 +0100 Subject: [PATCH 1/4] SchedulableBuilder should handle invalid data access via scheduler.allocation.file --- .../spark/scheduler/SchedulableBuilder.scala | 60 ++++++++----- .../fairscheduler-with-invalid-data.xml | 75 ++++++++++++++++ .../apache/spark/scheduler/PoolSuite.scala | 85 +++++++++++++------ 3 files changed, 173 insertions(+), 47 deletions(-) create mode 100644 core/src/test/resources/fairscheduler-with-invalid-data.xml diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 96325a0329f89..641b7bd903afd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -22,8 +22,11 @@ import java.util.{NoSuchElementException, Properties} import scala.xml.XML +import org.apache.commons.lang3.StringUtils + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils /** @@ -102,38 +105,55 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: NoSuchElementException => - logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " + - s"using the default schedulingMode: $schedulingMode") - } - } + val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, DEFAULT_SCHEDULING_MODE) val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } + val minShare = getIntValue(MINIMUM_SHARES_PROPERTY, xmlMinShare, DEFAULT_MINIMUM_SHARE) val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } + val weight = getIntValue(WEIGHT_PROPERTY, xmlWeight, DEFAULT_WEIGHT) + + rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } } + private def getSchedulingModeValue(data: String, defaultValue: SchedulingMode): SchedulingMode = { + if (StringUtils.isNotBlank(data) + && checkType(SchedulingMode.withName(data.toUpperCase)) + && SchedulingMode.withName(data.toUpperCase) != SchedulingMode.NONE) { + SchedulingMode.withName(data.toUpperCase) + } else { + logWarning(s"Unsupported schedulingMode: $data, using the default schedulingMode: " + + s"$defaultValue") + defaultValue + } + } + + private def getIntValue(propertyName: String, data: String, defaultValue: Int): Int = { + if (StringUtils.isNotBlank(data) && checkType(data.toInt)) { + data.toInt + } else { + logWarning(s"$propertyName is blank or invalid: $data, using the default $propertyName: " + + s"$defaultValue") + defaultValue + } + } + + private def checkType(fun: => Any): Boolean = { + try { + fun + true + } catch { + case e: NumberFormatException => false + case e: NoSuchElementException => false + } + } + override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) diff --git a/core/src/test/resources/fairscheduler-with-invalid-data.xml b/core/src/test/resources/fairscheduler-with-invalid-data.xml new file mode 100644 index 0000000000000..3084025f2c0b2 --- /dev/null +++ b/core/src/test/resources/fairscheduler-with-invalid-data.xml @@ -0,0 +1,75 @@ + + + + + + INVALID_MIN_SHARE + 2 + FAIR + + + 1 + INVALID_WEIGHT + FAIR + + + 3 + 2 + INVALID_SCHEDULING_MODE + + + 2 + 1 + fair + + + 1 + 2 + NONE + + + + 2 + FAIR + + + 1 + + FAIR + + + 3 + 2 + + + + + 3 + FAIR + + + 2 + + FAIR + + + 2 + 2 + + + diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 00e1c447ccbef..fa3029e07ad6e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.util.Properties import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.SchedulingMode._ /** * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work @@ -27,8 +28,13 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui */ class PoolSuite extends SparkFunSuite with LocalSparkContext { + val LOCAL = "local" + val APP_NAME = "PoolSuite" + val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" + val SCHEDULER_POOL_PROPERTY = "spark.scheduler.pool" + def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) - : TaskSetManager = { + : TaskSetManager = { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) } @@ -45,12 +51,11 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } test("FIFO Scheduler Test") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") + sc = new SparkContext(LOCAL, APP_NAME) val taskScheduler = new TaskSchedulerImpl(sc) - val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) + val rootPool = new Pool("", FIFO, 0, 0) val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) - schedulableBuilder.buildPools() val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) @@ -74,30 +79,24 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { */ test("Fair Scheduler Test") { val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath) - sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + sc = new SparkContext(LOCAL, APP_NAME, conf) val taskScheduler = new TaskSchedulerImpl(sc) - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val rootPool = new Pool("", FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) schedulableBuilder.buildPools() // Ensure that the XML file was read in correctly. - assert(rootPool.getSchedulableByName("default") != null) - assert(rootPool.getSchedulableByName("1") != null) - assert(rootPool.getSchedulableByName("2") != null) - assert(rootPool.getSchedulableByName("3") != null) - assert(rootPool.getSchedulableByName("1").minShare === 2) - assert(rootPool.getSchedulableByName("1").weight === 1) - assert(rootPool.getSchedulableByName("2").minShare === 3) - assert(rootPool.getSchedulableByName("2").weight === 1) - assert(rootPool.getSchedulableByName("3").minShare === 0) - assert(rootPool.getSchedulableByName("3").weight === 1) + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) + verifyPool(rootPool, "1", 2, 1, FIFO) + verifyPool(rootPool, "2", 3, 1, FIFO) + verifyPool(rootPool, "3", 0, 1, FIFO) val properties1 = new Properties() - properties1.setProperty("spark.scheduler.pool", "1") + properties1.setProperty(SCHEDULER_POOL_PROPERTY, "1") val properties2 = new Properties() - properties2.setProperty("spark.scheduler.pool", "2") + properties2.setProperty(SCHEDULER_POOL_PROPERTY, "2") val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler) val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler) @@ -134,22 +133,22 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } test("Nested Pool Test") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") + sc = new SparkContext(LOCAL, APP_NAME) val taskScheduler = new TaskSchedulerImpl(sc) - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) - val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) + val rootPool = new Pool("", FAIR, 0, 0) + val pool0 = new Pool("0", FAIR, 3, 1) + val pool1 = new Pool("1", FAIR, 4, 1) rootPool.addSchedulable(pool0) rootPool.addSchedulable(pool1) - val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2) - val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1) + val pool00 = new Pool("00", FAIR, 2, 2) + val pool01 = new Pool("01", FAIR, 1, 1) pool0.addSchedulable(pool00) pool0.addSchedulable(pool01) - val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2) - val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) + val pool10 = new Pool("10", FAIR, 2, 2) + val pool11 = new Pool("11", FAIR, 2, 1) pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) @@ -178,4 +177,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("FairSchedulableBuilder sets default values for blank or invalid datas") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml") + .getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + + val rootPool = new Pool("", FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf) + schedulableBuilder.buildPools() + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) + verifyPool(rootPool, "pool_with_invalid_min_share", 0, 2, FAIR) + verifyPool(rootPool, "pool_with_invalid_weight", 1, 1, FAIR) + verifyPool(rootPool, "pool_with_invalid_scheduling_mode", 3, 2, FIFO) + verifyPool(rootPool, "pool_with_non_uppercase_scheduling_mode", 2, 1, FAIR) + verifyPool(rootPool, "pool_with_NONE_scheduling_mode", 1, 2, FIFO) + verifyPool(rootPool, "pool_with_whitespace_min_share", 0, 2, FAIR) + verifyPool(rootPool, "pool_with_whitespace_weight", 1, 1, FAIR) + verifyPool(rootPool, "pool_with_whitespace_scheduling_mode", 3, 2, FIFO) + verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR) + verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR) + verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO) + } + + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { + assert(rootPool.getSchedulableByName(poolName) != null) + assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare) + assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight) + assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode) + } + } From 399bf9ab2acc1a143d538bf6e11f8020e8f55230 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sat, 24 Dec 2016 16:00:17 +0000 Subject: [PATCH 2/4] Review comments are addressed. --- .../spark/scheduler/SchedulableBuilder.scala | 49 +++++++++---------- .../fairscheduler-with-invalid-data.xml | 15 ++++++ .../apache/spark/scheduler/PoolSuite.scala | 12 +++-- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 641b7bd903afd..ef23f60504354 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -22,8 +22,6 @@ import java.util.{NoSuchElementException, Properties} import scala.xml.XML -import org.apache.commons.lang3.StringUtils - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -106,13 +104,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val poolName = (poolNode \ POOL_NAME_PROPERTY).text - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, DEFAULT_SCHEDULING_MODE) - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text.trim val minShare = getIntValue(MINIMUM_SHARES_PROPERTY, xmlMinShare, DEFAULT_MINIMUM_SHARE) - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text.trim val weight = getIntValue(WEIGHT_PROPERTY, xmlWeight, DEFAULT_WEIGHT) rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) @@ -123,34 +121,31 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } private def getSchedulingModeValue(data: String, defaultValue: SchedulingMode): SchedulingMode = { - if (StringUtils.isNotBlank(data) - && checkType(SchedulingMode.withName(data.toUpperCase)) - && SchedulingMode.withName(data.toUpperCase) != SchedulingMode.NONE) { - SchedulingMode.withName(data.toUpperCase) - } else { - logWarning(s"Unsupported schedulingMode: $data, using the default schedulingMode: " + - s"$defaultValue") - defaultValue + val warningMessage = s"Unsupported schedulingMode: $data, using the default schedulingMode: " + + s"$defaultValue" + try { + if (SchedulingMode.withName(data) != SchedulingMode.NONE) { + SchedulingMode.withName(data) + } else { + logWarning(warningMessage) + defaultValue + } + } catch { + case e: NoSuchElementException => + logWarning(warningMessage) + defaultValue } } private def getIntValue(propertyName: String, data: String, defaultValue: Int): Int = { - if (StringUtils.isNotBlank(data) && checkType(data.toInt)) { - data.toInt - } else { - logWarning(s"$propertyName is blank or invalid: $data, using the default $propertyName: " + - s"$defaultValue") - defaultValue - } - } - - private def checkType(fun: => Any): Boolean = { try { - fun - true + data.toInt } catch { - case e: NumberFormatException => false - case e: NoSuchElementException => false + case e: NumberFormatException => + logWarning(s"Error while loading scheduler allocation file at $schedulerAllocFile. " + + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + + s"$defaultValue") + defaultValue } } diff --git a/core/src/test/resources/fairscheduler-with-invalid-data.xml b/core/src/test/resources/fairscheduler-with-invalid-data.xml index 3084025f2c0b2..9d05fbc76283d 100644 --- a/core/src/test/resources/fairscheduler-with-invalid-data.xml +++ b/core/src/test/resources/fairscheduler-with-invalid-data.xml @@ -72,4 +72,19 @@ 2 + + 3 + 2 + FAIR + + + 1 + 2 + FAIR + + + 3 + 2 + FAIR + diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index fa3029e07ad6e..8fc0a3adb10b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -31,10 +31,9 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val LOCAL = "local" val APP_NAME = "PoolSuite" val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" - val SCHEDULER_POOL_PROPERTY = "spark.scheduler.pool" def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) - : TaskSetManager = { + : TaskSetManager = { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) } @@ -94,9 +93,9 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "3", 0, 1, FIFO) val properties1 = new Properties() - properties1.setProperty(SCHEDULER_POOL_PROPERTY, "1") + properties1.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1") val properties2 = new Properties() - properties2.setProperty(SCHEDULER_POOL_PROPERTY, "2") + properties2.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "2") val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler) val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler) @@ -178,7 +177,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(3, rootPool, 2) } - test("FairSchedulableBuilder sets default values for blank or invalid datas") { + test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") { val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml") .getFile() val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) @@ -199,6 +198,9 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR) verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR) verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO) + verifyPool(rootPool, "pool_with_min_share_surrounded_whitespace", 3, 2, FAIR) + verifyPool(rootPool, "pool_with_weight_surrounded_whitespace", 1, 2, FAIR) + verifyPool(rootPool, "pool_with_scheduling_mode_surrounded_whitespace", 3, 2, FAIR) } private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, From 558f8b1ceed207f20ac92a833a8bdadfff5de046 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Wed, 25 Jan 2017 23:41:58 +0000 Subject: [PATCH 3/4] Review comments are addressed. --- .../spark/scheduler/SchedulableBuilder.scala | 33 +++++++++---------- .../fairscheduler-with-invalid-data.xml | 12 +------ .../apache/spark/scheduler/PoolSuite.scala | 4 +-- 3 files changed, 18 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index ef23f60504354..98efb5c84d320 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} -import scala.xml.XML +import scala.xml.{Node, XML} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging @@ -104,14 +104,9 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val poolName = (poolNode \ POOL_NAME_PROPERTY).text - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase - val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, DEFAULT_SCHEDULING_MODE) - - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text.trim - val minShare = getIntValue(MINIMUM_SHARES_PROPERTY, xmlMinShare, DEFAULT_MINIMUM_SHARE) - - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text.trim - val weight = getIntValue(WEIGHT_PROPERTY, xmlWeight, DEFAULT_WEIGHT) + val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE) + val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE) + val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT) rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) @@ -120,12 +115,14 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } } - private def getSchedulingModeValue(data: String, defaultValue: SchedulingMode): SchedulingMode = { - val warningMessage = s"Unsupported schedulingMode: $data, using the default schedulingMode: " + - s"$defaultValue" + private def getSchedulingModeValue(poolNode: Node, poolName: String, defaultValue: SchedulingMode) + : SchedulingMode = { + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase + val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " + + s"schedulingMode: $defaultValue for pool: $poolName" try { - if (SchedulingMode.withName(data) != SchedulingMode.NONE) { - SchedulingMode.withName(data) + if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) { + SchedulingMode.withName(xmlSchedulingMode) } else { logWarning(warningMessage) defaultValue @@ -137,14 +134,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } } - private def getIntValue(propertyName: String, data: String, defaultValue: Int): Int = { + private def getIntValue(poolNode: Node, poolName: String, propertyName: String, defaultValue: Int) + : Int = { + val data = (poolNode \ propertyName).text.trim try { data.toInt } catch { case e: NumberFormatException => - logWarning(s"Error while loading scheduler allocation file at $schedulerAllocFile. " + + logWarning(s"Error while loading scheduler allocation file. " + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + - s"$defaultValue") + s"$defaultValue for pool: $poolName") defaultValue } } diff --git a/core/src/test/resources/fairscheduler-with-invalid-data.xml b/core/src/test/resources/fairscheduler-with-invalid-data.xml index 9d05fbc76283d..a4d8d07b67ce4 100644 --- a/core/src/test/resources/fairscheduler-with-invalid-data.xml +++ b/core/src/test/resources/fairscheduler-with-invalid-data.xml @@ -72,19 +72,9 @@ 2 - + 3 - 2 - FAIR - - - 1 2 - FAIR - - - 3 - 2 FAIR diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 8fc0a3adb10b4..520736ab64270 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -198,9 +198,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR) verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR) verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO) - verifyPool(rootPool, "pool_with_min_share_surrounded_whitespace", 3, 2, FAIR) - verifyPool(rootPool, "pool_with_weight_surrounded_whitespace", 1, 2, FAIR) - verifyPool(rootPool, "pool_with_scheduling_mode_surrounded_whitespace", 3, 2, FAIR) + verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR) } private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, From 7551cbd4b0d75cbd78ac655813fc956052c41d5c Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Mon, 30 Jan 2017 20:21:17 +0000 Subject: [PATCH 4/4] Review comments are addressed. --- .../spark/scheduler/SchedulableBuilder.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 98efb5c84d320..f8bee3eea5ce2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -115,8 +115,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } } - private def getSchedulingModeValue(poolNode: Node, poolName: String, defaultValue: SchedulingMode) - : SchedulingMode = { + private def getSchedulingModeValue( + poolNode: Node, + poolName: String, + defaultValue: SchedulingMode): SchedulingMode = { + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " + s"schedulingMode: $defaultValue for pool: $poolName" @@ -134,8 +137,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } } - private def getIntValue(poolNode: Node, poolName: String, propertyName: String, defaultValue: Int) - : Int = { + private def getIntValue( + poolNode: Node, + poolName: String, + propertyName: String, defaultValue: Int): Int = { + val data = (poolNode \ propertyName).text.trim try { data.toInt