Skip to content

Commit b6aa557

Browse files
kayousterhoutpwendell
authored andcommitted
[SPARK-1143] Separate pool tests into their own suite.
The current TaskSchedulerImplSuite includes some tests that are actually for the TaskSchedulerImpl, but the remainder of the tests avoid using the TaskSchedulerImpl entirely, and actually test the pool and scheduling algorithm mechanisms. This commit separates the pool/scheduling algorithm tests into their own suite, and also simplifies those tests. The pull request replaces #339. Author: Kay Ousterhout <[email protected]> Closes #3967 from kayousterhout/SPARK-1143 and squashes the following commits: 8a898c4 [Kay Ousterhout] [SPARK-1143] Separate pool tests into their own suite.
1 parent 1790b38 commit b6aa557

File tree

2 files changed

+183
-230
lines changed

2 files changed

+183
-230
lines changed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.util.Properties
21+
22+
import org.scalatest.FunSuite
23+
24+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
25+
26+
/**
27+
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
28+
* correctly.
29+
*/
30+
class PoolSuite extends FunSuite with LocalSparkContext {
31+
32+
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
33+
: TaskSetManager = {
34+
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
35+
new FakeTask(i, Nil)
36+
}
37+
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
38+
}
39+
40+
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
41+
val taskSetQueue = rootPool.getSortedTaskSetQueue
42+
val nextTaskSetToSchedule =
43+
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
44+
assert(nextTaskSetToSchedule.isDefined)
45+
nextTaskSetToSchedule.get.addRunningTask(taskId)
46+
assert(nextTaskSetToSchedule.get.stageId === expectedStageId)
47+
}
48+
49+
test("FIFO Scheduler Test") {
50+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
51+
val taskScheduler = new TaskSchedulerImpl(sc)
52+
53+
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
54+
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
55+
schedulableBuilder.buildPools()
56+
57+
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
58+
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
59+
val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler)
60+
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
61+
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
62+
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
63+
64+
scheduleTaskAndVerifyId(0, rootPool, 0)
65+
scheduleTaskAndVerifyId(1, rootPool, 0)
66+
scheduleTaskAndVerifyId(2, rootPool, 1)
67+
scheduleTaskAndVerifyId(3, rootPool, 1)
68+
scheduleTaskAndVerifyId(4, rootPool, 2)
69+
scheduleTaskAndVerifyId(5, rootPool, 2)
70+
}
71+
72+
/**
73+
* This test creates three scheduling pools, and creates task set managers in the first
74+
* two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling
75+
* algorithm properly orders the two scheduling pools.
76+
*/
77+
test("Fair Scheduler Test") {
78+
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
79+
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
80+
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
81+
val taskScheduler = new TaskSchedulerImpl(sc)
82+
83+
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
84+
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
85+
schedulableBuilder.buildPools()
86+
87+
// Ensure that the XML file was read in correctly.
88+
assert(rootPool.getSchedulableByName("default") != null)
89+
assert(rootPool.getSchedulableByName("1") != null)
90+
assert(rootPool.getSchedulableByName("2") != null)
91+
assert(rootPool.getSchedulableByName("3") != null)
92+
assert(rootPool.getSchedulableByName("1").minShare === 2)
93+
assert(rootPool.getSchedulableByName("1").weight === 1)
94+
assert(rootPool.getSchedulableByName("2").minShare === 3)
95+
assert(rootPool.getSchedulableByName("2").weight === 1)
96+
assert(rootPool.getSchedulableByName("3").minShare === 0)
97+
assert(rootPool.getSchedulableByName("3").weight === 1)
98+
99+
val properties1 = new Properties()
100+
properties1.setProperty("spark.scheduler.pool","1")
101+
val properties2 = new Properties()
102+
properties2.setProperty("spark.scheduler.pool","2")
103+
104+
val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
105+
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
106+
val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler)
107+
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
108+
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
109+
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
110+
111+
val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler)
112+
val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler)
113+
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
114+
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
115+
116+
// Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names.
117+
scheduleTaskAndVerifyId(0, rootPool, 0)
118+
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower.
119+
scheduleTaskAndVerifyId(1, rootPool, 3)
120+
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower.
121+
scheduleTaskAndVerifyId(2, rootPool, 3)
122+
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower.
123+
scheduleTaskAndVerifyId(3, rootPool, 1)
124+
// Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower.
125+
scheduleTaskAndVerifyId(4, rootPool, 4)
126+
// Neither pool is needy so ordering is based on number of running tasks.
127+
// Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running
128+
// tasks.
129+
scheduleTaskAndVerifyId(5, rootPool, 2)
130+
// Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming
131+
// ordering.
132+
scheduleTaskAndVerifyId(6, rootPool, 2)
133+
// Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running
134+
// tasks.
135+
scheduleTaskAndVerifyId(7, rootPool, 4)
136+
}
137+
138+
test("Nested Pool Test") {
139+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
140+
val taskScheduler = new TaskSchedulerImpl(sc)
141+
142+
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
143+
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
144+
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
145+
rootPool.addSchedulable(pool0)
146+
rootPool.addSchedulable(pool1)
147+
148+
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
149+
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
150+
pool0.addSchedulable(pool00)
151+
pool0.addSchedulable(pool01)
152+
153+
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
154+
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
155+
pool1.addSchedulable(pool10)
156+
pool1.addSchedulable(pool11)
157+
158+
val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler)
159+
val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler)
160+
pool00.addSchedulable(taskSetManager000)
161+
pool00.addSchedulable(taskSetManager001)
162+
163+
val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler)
164+
val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler)
165+
pool01.addSchedulable(taskSetManager010)
166+
pool01.addSchedulable(taskSetManager011)
167+
168+
val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler)
169+
val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler)
170+
pool10.addSchedulable(taskSetManager100)
171+
pool10.addSchedulable(taskSetManager101)
172+
173+
val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler)
174+
val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler)
175+
pool11.addSchedulable(taskSetManager110)
176+
pool11.addSchedulable(taskSetManager111)
177+
178+
scheduleTaskAndVerifyId(0, rootPool, 0)
179+
scheduleTaskAndVerifyId(1, rootPool, 4)
180+
scheduleTaskAndVerifyId(2, rootPool, 6)
181+
scheduleTaskAndVerifyId(3, rootPool, 2)
182+
}
183+
}

0 commit comments

Comments
 (0)