Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
val initMaxRunningTasks: Int,
initWeight: Int)
extends Schedulable with Logging {

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,24 @@ private[spark] trait Schedulable {
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
val initMaxRunningTasks: Int
def runningTasks: Int
def priority: Int
def stageId: Int
def name: String

/**
* How much space for new tasks is there in this Schedulable?
*/
def maxRunningTasks: Int = {
val myMaxRunningTasks = math.max(0, initMaxRunningTasks - runningTasks)
if (parent == null) {
myMaxRunningTasks
} else {
math.min(myMaxRunningTasks, parent.maxRunningTasks)
}
}

def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
val MAXIMUM_RUNNING_TASKS_PROPERTY = "maxRunningTasks"
val SCHEDULING_MODE_PROPERTY = "schedulingMode"
val PARENT_PROPERTY = "parent"
val WEIGHT_PROPERTY = "weight"
val POOL_NAME_PROPERTY = "@name"
val POOLS_PROPERTY = "pool"
val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
val DEFAULT_MINIMUM_SHARE = 0
val DEFAULT_MAXIMUM_RUNNING_TASKS = Int.MaxValue
val DEFAULT_WEIGHT = 1

override def buildPools() {
Expand All @@ -87,13 +90,20 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
buildDefaultPool()
}

private def logPoolCreated(pool: Pool) =
logInfo(s"""Created pool ${pool.name},
| parent: ${Option(pool.parent).map(_.name).getOrElse("(none)")},
| schedulingMode: ${pool.schedulingMode},
| minShare: ${pool.minShare},
| maxRunningTasks: ${pool.maxRunningTasks},
| weight: ${pool.weight}""".stripMargin)

private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
DEFAULT_MINIMUM_SHARE, DEFAULT_MAXIMUM_RUNNING_TASKS, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logPoolCreated(pool)
}
}

Expand All @@ -104,7 +114,9 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var maxRunningTasks = DEFAULT_MAXIMUM_RUNNING_TASKS
var weight = DEFAULT_WEIGHT
var parent = DEFAULT_POOL_NAME

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
Expand All @@ -122,15 +134,30 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
minShare = xmlMinShare.toInt
}

val xmlMaxShare = (poolNode \ MAXIMUM_RUNNING_TASKS_PROPERTY).text
if (xmlMaxShare != "") {
maxRunningTasks = xmlMaxShare.toInt
}

val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}

val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
val xmlParent = (poolNode \ PARENT_PROPERTY).text
if (xmlParent != "") {
parent = xmlParent
}

val pool = new Pool(poolName, schedulingMode, minShare, maxRunningTasks, weight)
val parentPool = rootPool.getSchedulableByName(parent)
if (parentPool == null) {
logWarning(s"couldn't find parent pool $parent, adding pool to the root")
rootPool.addSchedulable(pool)
} else {
parentPool.addSchedulable(pool)
}
logPoolCreated(pool)
}
}

Expand All @@ -143,11 +170,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
if (parentPool == null) {
// we will create a new pool that user has configured in app
// instead of being defined in xml file
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
val pool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_MAXIMUM_RUNNING_TASKS, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logPoolCreated(pool)
parentPool = pool
}
}
parentPool.addSchedulable(manager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private[spark] class TaskSchedulerImpl(
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
rootPool = new Pool("", schedulingMode, 0, Int.MaxValue, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private[spark] class TaskSetManager(
var parent: Pool = null
var totalResultSize = 0L
var calculatedTasks = 0
val initMaxRunningTasks = Int.MaxValue

val runningTasksSet = new HashSet[Long]

Expand Down Expand Up @@ -421,7 +422,7 @@ private[spark] class TaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
if (!isZombie && maxRunningTasks > 0) {
val curTime = clock.getTimeMillis()

var allowedLocality = maxLocality
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/resources/fairscheduler.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
<allocations>
<pool name="1">
<minShare>2</minShare>
<maxRunningTasks>512</maxRunningTasks>
<weight>1</weight>
<schedulingMode>FIFO</schedulingMode>
</pool>
<pool name="2">
<minShare>3</minShare>
<maxRunningTasks>256</maxRunningTasks>
<weight>1</weight>
<schedulingMode>FIFO</schedulingMode>
</pool>
Expand Down
60 changes: 60 additions & 0 deletions core/src/test/resources/nestedpool.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<allocations>
<pool name="0">
<minShare>3</minShare>
<maxRunningTasks>1024</maxRunningTasks>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="1">
<minShare>4</minShare>
<maxRunningTasks>128</maxRunningTasks>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="00">
<parent>0</parent>
<minShare>2</minShare>
<maxRunningTasks>512</maxRunningTasks>
<weight>2</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="01">
<parent>0</parent>
<minShare>1</minShare>
<maxRunningTasks>128</maxRunningTasks>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="10">
<parent>1</parent>
<minShare>2</minShare>
<maxRunningTasks>256</maxRunningTasks>
<weight>2</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="11">
<parent>1</parent>
<minShare>2</minShare>
<maxRunningTasks>64</maxRunningTasks>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
</allocations>
72 changes: 54 additions & 18 deletions core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val rootPool = new Pool("", SchedulingMode.FIFO, 0, Int.MaxValue, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()

Expand Down Expand Up @@ -78,7 +78,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, Int.MaxValue, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()

Expand All @@ -88,10 +88,13 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
assert(rootPool.getSchedulableByName("2") != null)
assert(rootPool.getSchedulableByName("3") != null)
assert(rootPool.getSchedulableByName("1").minShare === 2)
assert(rootPool.getSchedulableByName("1").maxRunningTasks === 512)
assert(rootPool.getSchedulableByName("1").weight === 1)
assert(rootPool.getSchedulableByName("2").minShare === 3)
assert(rootPool.getSchedulableByName("2").maxRunningTasks === 256)
assert(rootPool.getSchedulableByName("2").weight === 1)
assert(rootPool.getSchedulableByName("3").minShare === 0)
assert(rootPool.getSchedulableByName("3").maxRunningTasks === Int.MaxValue)
assert(rootPool.getSchedulableByName("3").weight === 1)

val properties1 = new Properties()
Expand Down Expand Up @@ -134,24 +137,57 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
}

test("Nested Pool Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val xmlPath = getClass.getClassLoader.getResource("nestedpool.xml").getFile()
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)

val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)

val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, Int.MaxValue, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()

val pool0 = rootPool.getSchedulableByName("0")
val pool1 = rootPool.getSchedulableByName("1")
val pool00 = rootPool.getSchedulableByName("00")
val pool01 = rootPool.getSchedulableByName("01")
val pool10 = rootPool.getSchedulableByName("10")
val pool11 = rootPool.getSchedulableByName("11")

// Ensure that the XML file was read in correctly.
assert(pool0 != null)
assert(pool1 != null)
assert(pool00 != null)
assert(pool01 != null)
assert(pool10 != null)
assert(pool11 != null)

assert(pool0.minShare === 3)
assert(pool0.maxRunningTasks === 1024)
assert(pool0.weight === 1)

assert(pool1.minShare === 4)
assert(pool1.maxRunningTasks === 128)
assert(pool1.weight === 1)

assert(pool00.minShare === 2)
assert(pool00.maxRunningTasks === 512)
assert(pool00.weight === 2)
assert(pool00.parent === pool0)

assert(pool01.minShare === 1)
assert(pool01.maxRunningTasks === 128)
assert(pool01.weight === 1)
assert(pool01.parent === pool0)

assert(pool10.minShare === 2)
assert(pool10.maxRunningTasks === 128) // not 256 due to parent
assert(pool10.weight === 2)
assert(pool10.parent === pool1)

assert(pool11.minShare === 2)
assert(pool11.maxRunningTasks === 64)
assert(pool11.weight === 1)
assert(pool11.parent === pool1)

val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler)
val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler)
Expand Down
Loading