Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}

import scala.xml.XML
import scala.xml.{Node, XML}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -102,38 +103,57 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {

val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: NoSuchElementException =>
logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
s"using the default schedulingMode: $schedulingMode")
}
}

val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)

val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))

val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}

private def getSchedulingModeValue(
poolNode: Node,
poolName: String,
defaultValue: SchedulingMode): SchedulingMode = {

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
s"schedulingMode: $defaultValue for pool: $poolName"
try {
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
SchedulingMode.withName(xmlSchedulingMode)
} else {
logWarning(warningMessage)
defaultValue
}
} catch {
case e: NoSuchElementException =>
logWarning(warningMessage)
defaultValue
}
}

private def getIntValue(
poolNode: Node,
poolName: String,
propertyName: String, defaultValue: Int): Int = {

val data = (poolNode \ propertyName).text.trim
try {
data.toInt
} catch {
case e: NumberFormatException =>
logWarning(s"Error while loading scheduler allocation file. " +
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
s"$defaultValue for pool: $poolName")
defaultValue
}
}

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
Expand Down
80 changes: 80 additions & 0 deletions core/src/test/resources/fairscheduler-with-invalid-data.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<allocations>
<pool name="pool_with_invalid_min_share">
<minShare>INVALID_MIN_SHARE</minShare>
<weight>2</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="pool_with_invalid_weight">
<minShare>1</minShare>
<weight>INVALID_WEIGHT</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="pool_with_invalid_scheduling_mode">
<minShare>3</minShare>
<weight>2</weight>
<schedulingMode>INVALID_SCHEDULING_MODE</schedulingMode>
</pool>
<pool name="pool_with_non_uppercase_scheduling_mode">
<minShare>2</minShare>
<weight>1</weight>
<schedulingMode>fair</schedulingMode>
</pool>
<pool name="pool_with_NONE_scheduling_mode">
<minShare>1</minShare>
<weight>2</weight>
<schedulingMode>NONE</schedulingMode>
</pool>
<pool name="pool_with_whitespace_min_share">
<minShare> </minShare>
<weight>2</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="pool_with_whitespace_weight">
<minShare>1</minShare>
<weight> </weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="pool_with_whitespace_scheduling_mode">
<minShare>3</minShare>
<weight>2</weight>
<schedulingMode> </schedulingMode>
</pool>
<pool name="pool_with_empty_min_share">
<minShare></minShare>
<weight>3</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="pool_with_empty_weight">
<minShare>2</minShare>
<weight></weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="pool_with_empty_scheduling_mode">
<minShare>2</minShare>
<weight>2</weight>
<schedulingMode></schedulingMode>
</pool>
<pool name="pool_with_surrounded_whitespace">
<minShare> 3 </minShare>
<weight> 2 </weight>
<schedulingMode> FAIR </schedulingMode>
</pool>
</allocations>
83 changes: 57 additions & 26 deletions core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ package org.apache.spark.scheduler
import java.util.Properties

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode._

/**
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
* correctly.
*/
class PoolSuite extends SparkFunSuite with LocalSparkContext {

val LOCAL = "local"
val APP_NAME = "PoolSuite"
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"

def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
Expand All @@ -45,12 +50,11 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
}

test("FIFO Scheduler Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
sc = new SparkContext(LOCAL, APP_NAME)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val rootPool = new Pool("", FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()

val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
Expand All @@ -74,30 +78,24 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
*/
test("Fair Scheduler Test") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
sc = new SparkContext(LOCAL, APP_NAME, conf)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()

// Ensure that the XML file was read in correctly.
assert(rootPool.getSchedulableByName("default") != null)
assert(rootPool.getSchedulableByName("1") != null)
assert(rootPool.getSchedulableByName("2") != null)
assert(rootPool.getSchedulableByName("3") != null)
assert(rootPool.getSchedulableByName("1").minShare === 2)
assert(rootPool.getSchedulableByName("1").weight === 1)
assert(rootPool.getSchedulableByName("2").minShare === 3)
assert(rootPool.getSchedulableByName("2").weight === 1)
assert(rootPool.getSchedulableByName("3").minShare === 0)
assert(rootPool.getSchedulableByName("3").weight === 1)
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
verifyPool(rootPool, "1", 2, 1, FIFO)
verifyPool(rootPool, "2", 3, 1, FIFO)
verifyPool(rootPool, "3", 0, 1, FIFO)

val properties1 = new Properties()
properties1.setProperty("spark.scheduler.pool", "1")
properties1.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1")
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.pool", "2")
properties2.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "2")

val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
Expand Down Expand Up @@ -134,22 +132,22 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
}

test("Nested Pool Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
sc = new SparkContext(LOCAL, APP_NAME)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
val rootPool = new Pool("", FAIR, 0, 0)
val pool0 = new Pool("0", FAIR, 3, 1)
val pool1 = new Pool("1", FAIR, 4, 1)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)

val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
val pool00 = new Pool("00", FAIR, 2, 2)
val pool01 = new Pool("01", FAIR, 1, 1)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)

val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
val pool10 = new Pool("10", FAIR, 2, 2)
val pool11 = new Pool("11", FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)

Expand Down Expand Up @@ -178,4 +176,37 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
scheduleTaskAndVerifyId(2, rootPool, 6)
scheduleTaskAndVerifyId(3, rootPool, 2)
}

test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
.getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)

val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
schedulableBuilder.buildPools()

verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
verifyPool(rootPool, "pool_with_invalid_min_share", 0, 2, FAIR)
verifyPool(rootPool, "pool_with_invalid_weight", 1, 1, FAIR)
verifyPool(rootPool, "pool_with_invalid_scheduling_mode", 3, 2, FIFO)
verifyPool(rootPool, "pool_with_non_uppercase_scheduling_mode", 2, 1, FAIR)
verifyPool(rootPool, "pool_with_NONE_scheduling_mode", 1, 2, FIFO)
verifyPool(rootPool, "pool_with_whitespace_min_share", 0, 2, FAIR)
verifyPool(rootPool, "pool_with_whitespace_weight", 1, 1, FAIR)
verifyPool(rootPool, "pool_with_whitespace_scheduling_mode", 3, 2, FIFO)
verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR)
verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR)
verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with my suggestion on trim, you could also add a test case for a mode w/ surrounding whitespace.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed ;)

verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR)
}

private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
assert(rootPool.getSchedulableByName(poolName) != null)
assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare)
assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight)
assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode)
}

}