From 2ed8cada04201192d7e1f10c10fded7b20913327 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 5 Feb 2017 20:59:40 +0000 Subject: [PATCH 1/5] Improve Fair Scheduler Logging --- .../spark/scheduler/SchedulableBuilder.scala | 30 ++++++++--- .../fairscheduler-with-valid-data.xml | 35 ++++++++++++ .../apache/spark/scheduler/PoolSuite.scala | 54 +++++++++++++++++-- 3 files changed, 106 insertions(+), 13 deletions(-) create mode 100644 core/src/test/resources/fairscheduler-with-valid-data.xml diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index f8bee3eea5ce2..14e05f98ea2cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} +import scala.util.control.NonFatal import scala.xml.{Node, XML} import org.apache.spark.SparkConf @@ -55,6 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder with Logging { + private case class FileData(inputStream: InputStream, fileName: String) + val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file") val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" @@ -69,19 +72,29 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val DEFAULT_WEIGHT = 1 override def buildPools() { - var is: Option[InputStream] = None + var fileData: Option[FileData] = None try { - is = Option { - schedulerAllocFile.map { f => - new FileInputStream(f) - }.getOrElse { - Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + fileData = schedulerAllocFile.map { f => + Some(FileData(new FileInputStream(f), f)) + }.getOrElse { + val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + if(is != null) Some(FileData(is, DEFAULT_SCHEDULER_FILE)) + else { + logWarning(s"No Fair Scheduler file found.") + None } } - is.foreach { i => buildFairSchedulerPool(i) } + fileData.foreach { data => + logInfo(s"Fair Scheduler file: ${data.fileName} is found successfully and will be parsed.") + buildFairSchedulerPool(data.inputStream) + } + } catch { + case NonFatal(t) => + logError("Fair Scheduler can not be built.", t) + throw t } finally { - is.foreach(_.close()) + fileData.foreach(_.inputStream.close()) } // finally create "default" pool @@ -173,4 +186,5 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) } + } diff --git a/core/src/test/resources/fairscheduler-with-valid-data.xml b/core/src/test/resources/fairscheduler-with-valid-data.xml new file mode 100644 index 0000000000000..700cf1c5117a8 --- /dev/null +++ b/core/src/test/resources/fairscheduler-with-valid-data.xml @@ -0,0 +1,35 @@ + + + + + + 3 + 1 + FIFO + + + 4 + 2 + FAIR + + + 2 + 3 + FAIR + + diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 520736ab64270..b857a15cb2108 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import java.io.FileNotFoundException import java.util.Properties import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} @@ -28,11 +29,11 @@ import org.apache.spark.scheduler.SchedulingMode._ */ class PoolSuite extends SparkFunSuite with LocalSparkContext { - val LOCAL = "local" - val APP_NAME = "PoolSuite" - val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" + private val LOCAL = "local" + private val APP_NAME = "PoolSuite" + private val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" - def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) + private def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) : TaskSetManager = { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) @@ -40,7 +41,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) } - def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { + private def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { val taskSetQueue = rootPool.getSortedTaskSetQueue val nextTaskSetToSchedule = taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) @@ -201,6 +202,49 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR) } + test("SPARK-19466: Fair Scheduler should build fair scheduler when " + + "valid spark.scheduler.allocation.file property is set") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + sc = new SparkContext(LOCAL, APP_NAME, conf) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) + verifyPool(rootPool, "pool1", 3, 1, FIFO) + verifyPool(rootPool, "pool2", 4, 2, FAIR) + verifyPool(rootPool, "pool3", 2, 3, FAIR) + } + + test("SPARK-19466: Fair Scheduler should use default file(fairscheduler.xml) if it exists " + + "in classpath and spark.scheduler.allocation.file property is not set") { + val conf = new SparkConf() + sc = new SparkContext(LOCAL, APP_NAME, conf) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) + verifyPool(rootPool, "1", 2, 1, FIFO) + verifyPool(rootPool, "2", 3, 1, FIFO) + verifyPool(rootPool, "3", 0, 1, FIFO) + } + + test("SPARK-19466: Fair Scheduler should throw FileNotFoundException " + + "when invalid spark.scheduler.allocation.file property is set") { + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, "INVALID_FILE_PATH") + sc = new SparkContext(LOCAL, APP_NAME, conf) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + intercept[FileNotFoundException] { + schedulableBuilder.buildPools() + } + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { assert(rootPool.getSchedulableByName(poolName) != null) From 64a88af322215b92d71615bf6f9da63a4e344f67 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Tue, 7 Feb 2017 20:00:37 +0000 Subject: [PATCH 2/5] Review comments are addressed. --- .../spark/scheduler/SchedulableBuilder.scala | 72 +++++++++++-------- .../fairscheduler-with-valid-data.xml | 35 --------- .../apache/spark/scheduler/PoolSuite.scala | 54 ++------------ 3 files changed, 47 insertions(+), 114 deletions(-) delete mode 100644 core/src/test/resources/fairscheduler-with-valid-data.xml diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 14e05f98ea2cd..7282279362c73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{FileInputStream, InputStream} +import java.io.{File, FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} import scala.util.control.NonFatal @@ -74,24 +74,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) override def buildPools() { var fileData: Option[FileData] = None try { - fileData = schedulerAllocFile.map { f => - Some(FileData(new FileInputStream(f), f)) - }.getOrElse { - val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) - if(is != null) Some(FileData(is, DEFAULT_SCHEDULER_FILE)) - else { - logWarning(s"No Fair Scheduler file found.") - None - } - } - - fileData.foreach { data => - logInfo(s"Fair Scheduler file: ${data.fileName} is found successfully and will be parsed.") - buildFairSchedulerPool(data.inputStream) - } + fileData = getFileData() + fileData.foreach { data => buildFairSchedulerPool(data) } } catch { case NonFatal(t) => - logError("Fair Scheduler can not be built.", t) + logError("Error while building the fair scheduler pools: ", t) throw t } finally { fileData.foreach(_.inputStream.close()) @@ -101,29 +88,52 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) buildDefaultPool() } + private def getFileData(): Option[FileData] = { + schedulerAllocFile.map { f => + val file = new File(f) + val fis = new FileInputStream(file) + logInfo(s"Creating Fair Scheduler pools from ${file.getName}") + Some(FileData(fis, file.getName)) + }.getOrElse { + val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + if(is != null) { + logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") + Some(FileData(is, DEFAULT_SCHEDULER_FILE)) + } + else { + logWarning("Fair Scheduler configuration file not found so jobs will be scheduled " + + "in FIFO order") + None + } + } + } + private def buildDefaultPool() { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool) - logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } - private def buildFairSchedulerPool(is: InputStream) { - val xml = XML.load(is) + private def buildFairSchedulerPool(fileData: FileData) { + val xml = XML.load(fileData.inputStream) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE) - val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE) - val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT) + val schedulingMode = getSchedulingModeValue(poolNode, poolName, + DEFAULT_SCHEDULING_MODE, fileData.fileName) + val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, + DEFAULT_MINIMUM_SHARE, fileData.fileName) + val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, + DEFAULT_WEIGHT, fileData.fileName) rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } } @@ -131,11 +141,12 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) private def getSchedulingModeValue( poolNode: Node, poolName: String, - defaultValue: SchedulingMode): SchedulingMode = { + defaultValue: SchedulingMode, fileName: String): SchedulingMode = { val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase - val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " + - s"schedulingMode: $defaultValue for pool: $poolName" + val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " + + s"Fair Scheduler configuration file: $fileName, using " + + s"the default schedulingMode: $defaultValue for pool: $poolName" try { if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) { SchedulingMode.withName(xmlSchedulingMode) @@ -153,14 +164,15 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) private def getIntValue( poolNode: Node, poolName: String, - propertyName: String, defaultValue: Int): Int = { + propertyName: String, + defaultValue: Int, fileName: String): Int = { val data = (poolNode \ propertyName).text.trim try { data.toInt } catch { case e: NumberFormatException => - logWarning(s"Error while loading scheduler allocation file. " + + logWarning(s"Error while loading Fair Scheduler configuration file: $fileName, " + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + s"$defaultValue for pool: $poolName") defaultValue @@ -179,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } diff --git a/core/src/test/resources/fairscheduler-with-valid-data.xml b/core/src/test/resources/fairscheduler-with-valid-data.xml deleted file mode 100644 index 700cf1c5117a8..0000000000000 --- a/core/src/test/resources/fairscheduler-with-valid-data.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - - - - 3 - 1 - FIFO - - - 4 - 2 - FAIR - - - 2 - 3 - FAIR - - diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index b857a15cb2108..520736ab64270 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import java.io.FileNotFoundException import java.util.Properties import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} @@ -29,11 +28,11 @@ import org.apache.spark.scheduler.SchedulingMode._ */ class PoolSuite extends SparkFunSuite with LocalSparkContext { - private val LOCAL = "local" - private val APP_NAME = "PoolSuite" - private val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" + val LOCAL = "local" + val APP_NAME = "PoolSuite" + val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" - private def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) + def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) : TaskSetManager = { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) @@ -41,7 +40,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) } - private def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { + def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { val taskSetQueue = rootPool.getSortedTaskSetQueue val nextTaskSetToSchedule = taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) @@ -202,49 +201,6 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR) } - test("SPARK-19466: Fair Scheduler should build fair scheduler when " + - "valid spark.scheduler.allocation.file property is set") { - val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) - sc = new SparkContext(LOCAL, APP_NAME, conf) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) - verifyPool(rootPool, "pool1", 3, 1, FIFO) - verifyPool(rootPool, "pool2", 4, 2, FAIR) - verifyPool(rootPool, "pool3", 2, 3, FAIR) - } - - test("SPARK-19466: Fair Scheduler should use default file(fairscheduler.xml) if it exists " + - "in classpath and spark.scheduler.allocation.file property is not set") { - val conf = new SparkConf() - sc = new SparkContext(LOCAL, APP_NAME, conf) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) - verifyPool(rootPool, "1", 2, 1, FIFO) - verifyPool(rootPool, "2", 3, 1, FIFO) - verifyPool(rootPool, "3", 0, 1, FIFO) - } - - test("SPARK-19466: Fair Scheduler should throw FileNotFoundException " + - "when invalid spark.scheduler.allocation.file property is set") { - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, "INVALID_FILE_PATH") - sc = new SparkContext(LOCAL, APP_NAME, conf) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - intercept[FileNotFoundException] { - schedulableBuilder.buildPools() - } - } - private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { assert(rootPool.getSchedulableByName(poolName) != null) From 790097e67f6f14659fcd627d8589c6e0808c907d Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Tue, 7 Feb 2017 23:41:38 +0000 Subject: [PATCH 3/5] Latest review comments are addressed. --- .../spark/scheduler/SchedulableBuilder.scala | 60 ++++++++----------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 7282279362c73..b895f4bdbacdd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{File, FileInputStream, InputStream} +import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} import scala.util.control.NonFatal @@ -56,8 +56,6 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder with Logging { - private case class FileData(inputStream: InputStream, fileName: String) - val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file") val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" @@ -72,42 +70,37 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val DEFAULT_WEIGHT = 1 override def buildPools() { - var fileData: Option[FileData] = None + var fileData: Option[(InputStream, String)] = None try { - fileData = getFileData() - fileData.foreach { data => buildFairSchedulerPool(data) } + fileData = schedulerAllocFile.map { f => + val fis = new FileInputStream(f) + logInfo(s"Creating Fair Scheduler pools from $f") + Some((fis, f)) + }.getOrElse { + val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + if (is != null) { + logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") + Some((is, DEFAULT_SCHEDULER_FILE)) + } else { + logWarning("Fair Scheduler configuration file not found so jobs will be scheduled " + + "in FIFO order") + None + } + } + + fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) } } catch { case NonFatal(t) => logError("Error while building the fair scheduler pools: ", t) throw t } finally { - fileData.foreach(_.inputStream.close()) + fileData.foreach { case (is, fileName) => is.close() } } // finally create "default" pool buildDefaultPool() } - private def getFileData(): Option[FileData] = { - schedulerAllocFile.map { f => - val file = new File(f) - val fis = new FileInputStream(file) - logInfo(s"Creating Fair Scheduler pools from ${file.getName}") - Some(FileData(fis, file.getName)) - }.getOrElse { - val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) - if(is != null) { - logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") - Some(FileData(is, DEFAULT_SCHEDULER_FILE)) - } - else { - logWarning("Fair Scheduler configuration file not found so jobs will be scheduled " + - "in FIFO order") - None - } - } - } - private def buildDefaultPool() { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, @@ -118,18 +111,18 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } } - private def buildFairSchedulerPool(fileData: FileData) { - val xml = XML.load(fileData.inputStream) + private def buildFairSchedulerPool(is: InputStream, fileName: String) { + val xml = XML.load(is) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text val schedulingMode = getSchedulingModeValue(poolNode, poolName, - DEFAULT_SCHEDULING_MODE, fileData.fileName) + DEFAULT_SCHEDULING_MODE, fileName) val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, - DEFAULT_MINIMUM_SHARE, fileData.fileName) + DEFAULT_MINIMUM_SHARE, fileName) val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, - DEFAULT_WEIGHT, fileData.fileName) + DEFAULT_WEIGHT, fileName) rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) @@ -172,7 +165,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) data.toInt } catch { case e: NumberFormatException => - logWarning(s"Error while loading Fair Scheduler configuration file: $fileName, " + + logWarning(s"Error while loading fair scheduler configuration from $fileName: " + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + s"$defaultValue for pool: $poolName") defaultValue @@ -198,5 +191,4 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) } - } From 3da98c7782a450ccf26b26c1c8b83478b63e1f83 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Thu, 9 Feb 2017 21:44:55 +0000 Subject: [PATCH 4/5] Review comments are addressed. --- .../spark/scheduler/SchedulableBuilder.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index b895f4bdbacdd..582d367c5f858 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -83,7 +83,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) Some((is, DEFAULT_SCHEDULER_FILE)) } else { logWarning("Fair Scheduler configuration file not found so jobs will be scheduled " + - "in FIFO order") + s"in FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE " + + "or set spark.scheduler.allocation.file to a file that contains the configuration.") None } } @@ -91,7 +92,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) } } catch { case NonFatal(t) => - logError("Error while building the fair scheduler pools: ", t) + val defaultMessage = "Error while building the fair scheduler pools" + val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" } + .getOrElse(defaultMessage) + logError(message, t) throw t } finally { fileData.foreach { case (is, fileName) => is.close() } @@ -134,7 +138,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) private def getSchedulingModeValue( poolNode: Node, poolName: String, - defaultValue: SchedulingMode, fileName: String): SchedulingMode = { + defaultValue: SchedulingMode, + fileName: String): SchedulingMode = { val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " + @@ -158,7 +163,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) poolNode: Node, poolName: String, propertyName: String, - defaultValue: Int, fileName: String): Int = { + defaultValue: Int, + fileName: String): Int = { val data = (poolNode \ propertyName).text.trim try { From d508d92238f82cdffd6947659f7f7596c2b685f9 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Thu, 9 Feb 2017 21:56:12 +0000 Subject: [PATCH 5/5] Review comments are addressed. --- .../org/apache/spark/scheduler/SchedulableBuilder.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 582d367c5f858..e53c4fb5b4778 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -56,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder with Logging { - val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file") + val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" + val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" @@ -82,9 +83,9 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") Some((is, DEFAULT_SCHEDULER_FILE)) } else { - logWarning("Fair Scheduler configuration file not found so jobs will be scheduled " + - s"in FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE " + - "or set spark.scheduler.allocation.file to a file that contains the configuration.") + logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " + + s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " + + s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.") None } }