Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}

import scala.util.control.NonFatal
import scala.xml.{Node, XML}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {

val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
Expand All @@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val DEFAULT_WEIGHT = 1

override def buildPools() {
var is: Option[InputStream] = None
var fileData: Option[(InputStream, String)] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
if (is != null) {
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
None
}
}

is.foreach { i => buildFairSchedulerPool(i) }
fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
} catch {
case NonFatal(t) =>
val defaultMessage = "Error while building the fair scheduler pools"
val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
.getOrElse(defaultMessage)
logError(message, t)
throw t
} finally {
is.foreach(_.close())
fileData.foreach { case (is, fileName) => is.close() }
}

// finally create "default" pool
Expand All @@ -93,36 +111,41 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}

private def buildFairSchedulerPool(is: InputStream) {
private def buildFairSchedulerPool(is: InputStream, fileName: String) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {

val poolName = (poolNode \ POOL_NAME_PROPERTY).text

val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
val schedulingMode = getSchedulingModeValue(poolNode, poolName,
DEFAULT_SCHEDULING_MODE, fileName)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
DEFAULT_MINIMUM_SHARE, fileName)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
DEFAULT_WEIGHT, fileName)

rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))

logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}

private def getSchedulingModeValue(
poolNode: Node,
poolName: String,
defaultValue: SchedulingMode): SchedulingMode = {
defaultValue: SchedulingMode,
fileName: String): SchedulingMode = {

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
s"schedulingMode: $defaultValue for pool: $poolName"
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
s"Fair Scheduler configuration file: $fileName, using " +
s"the default schedulingMode: $defaultValue for pool: $poolName"
try {
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
SchedulingMode.withName(xmlSchedulingMode)
Expand All @@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getIntValue(
poolNode: Node,
poolName: String,
propertyName: String, defaultValue: Int): Int = {
propertyName: String,
defaultValue: Int,
fileName: String): Int = {

val data = (poolNode \ propertyName).text.trim
try {
data.toInt
} catch {
case e: NumberFormatException =>
logWarning(s"Error while loading scheduler allocation file. " +
logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
s"$defaultValue for pool: $poolName")
defaultValue
Expand All @@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
Expand Down