Skip to content

Commit 799fc20

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
Conflicts: bin/compute-classpath.cmd bin/compute-classpath.sh make-distribution.sh
2 parents bb5d324 + b6aa557 commit 799fc20

File tree

30 files changed

+1027
-652
lines changed

30 files changed

+1027
-652
lines changed

assembly/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,5 +354,25 @@
354354
</dependency>
355355
</dependencies>
356356
</profile>
357+
358+
<!-- Profiles that disable inclusion of certain dependencies. -->
359+
<profile>
360+
<id>hadoop-provided</id>
361+
<properties>
362+
<hadoop.deps.scope>provided</hadoop.deps.scope>
363+
</properties>
364+
</profile>
365+
<profile>
366+
<id>hive-provided</id>
367+
<properties>
368+
<hive.deps.scope>provided</hive.deps.scope>
369+
</properties>
370+
</profile>
371+
<profile>
372+
<id>parquet-provided</id>
373+
<properties>
374+
<parquet.deps.scope>provided</parquet.deps.scope>
375+
</properties>
376+
</profile>
357377
</profiles>
358378
</project>

bagel/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,6 @@
4040
<artifactId>spark-core_${scala.binary.version}</artifactId>
4141
<version>${project.version}</version>
4242
</dependency>
43-
<dependency>
44-
<groupId>org.eclipse.jetty</groupId>
45-
<artifactId>jetty-server</artifactId>
46-
</dependency>
4743
<dependency>
4844
<groupId>org.scalacheck</groupId>
4945
<artifactId>scalacheck_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,26 @@ private[spark] class SparkDeploySchedulerBackend(
5555
"{{WORKER_URL}}")
5656
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
5757
.map(Utils.splitCommandString).getOrElse(Seq.empty)
58-
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
59-
cp.split(java.io.File.pathSeparator)
60-
}
61-
val libraryPathEntries =
62-
sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
63-
cp.split(java.io.File.pathSeparator)
58+
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
59+
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
60+
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
61+
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
62+
63+
// When testing, expose the parent class path to the child. This is processed by
64+
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
65+
// when the assembly is built with the "*-provided" profiles enabled.
66+
val testingClassPath =
67+
if (sys.props.contains("spark.testing")) {
68+
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
69+
} else {
70+
Nil
6471
}
6572

6673
// Start executors with a few necessary configs for registering with the scheduler
6774
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
6875
val javaOpts = sparkJavaOpts ++ extraJavaOpts
6976
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
70-
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
77+
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
7178
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
7279
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
7380
appUIAddress, sc.eventLogDir)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -990,11 +990,12 @@ private[spark] object Utils extends Logging {
990990
for ((key, value) <- extraEnvironment) {
991991
environment.put(key, value)
992992
}
993+
993994
val process = builder.start()
994995
new Thread("read stderr for " + command(0)) {
995996
override def run() {
996997
for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
997-
System.err.println(line)
998+
logInfo(line)
998999
}
9991000
}
10001001
}.start()
@@ -1089,7 +1090,7 @@ private[spark] object Utils extends Logging {
10891090
var firstUserLine = 0
10901091
var insideSpark = true
10911092
var callStack = new ArrayBuffer[String]() :+ "<unknown>"
1092-
1093+
10931094
Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
10941095
// When running under some profilers, the current stack trace might contain some bogus
10951096
// frames. This is intended to ensure that we don't crash in these situations by

core/src/test/scala/org/apache/spark/DriverSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class DriverSuite extends FunSuite with Timeouts {
3535
forAll(masters) { (master: String) =>
3636
failAfter(60 seconds) {
3737
Utils.executeAndGetOutput(
38-
Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
38+
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
3939
new File(sparkHome),
4040
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
4141
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.util.Properties
21+
22+
import org.scalatest.FunSuite
23+
24+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
25+
26+
/**
27+
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
28+
* correctly.
29+
*/
30+
class PoolSuite extends FunSuite with LocalSparkContext {
31+
32+
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
33+
: TaskSetManager = {
34+
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
35+
new FakeTask(i, Nil)
36+
}
37+
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
38+
}
39+
40+
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
41+
val taskSetQueue = rootPool.getSortedTaskSetQueue
42+
val nextTaskSetToSchedule =
43+
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
44+
assert(nextTaskSetToSchedule.isDefined)
45+
nextTaskSetToSchedule.get.addRunningTask(taskId)
46+
assert(nextTaskSetToSchedule.get.stageId === expectedStageId)
47+
}
48+
49+
test("FIFO Scheduler Test") {
50+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
51+
val taskScheduler = new TaskSchedulerImpl(sc)
52+
53+
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
54+
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
55+
schedulableBuilder.buildPools()
56+
57+
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
58+
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
59+
val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler)
60+
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
61+
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
62+
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
63+
64+
scheduleTaskAndVerifyId(0, rootPool, 0)
65+
scheduleTaskAndVerifyId(1, rootPool, 0)
66+
scheduleTaskAndVerifyId(2, rootPool, 1)
67+
scheduleTaskAndVerifyId(3, rootPool, 1)
68+
scheduleTaskAndVerifyId(4, rootPool, 2)
69+
scheduleTaskAndVerifyId(5, rootPool, 2)
70+
}
71+
72+
/**
73+
* This test creates three scheduling pools, and creates task set managers in the first
74+
* two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling
75+
* algorithm properly orders the two scheduling pools.
76+
*/
77+
test("Fair Scheduler Test") {
78+
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
79+
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
80+
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
81+
val taskScheduler = new TaskSchedulerImpl(sc)
82+
83+
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
84+
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
85+
schedulableBuilder.buildPools()
86+
87+
// Ensure that the XML file was read in correctly.
88+
assert(rootPool.getSchedulableByName("default") != null)
89+
assert(rootPool.getSchedulableByName("1") != null)
90+
assert(rootPool.getSchedulableByName("2") != null)
91+
assert(rootPool.getSchedulableByName("3") != null)
92+
assert(rootPool.getSchedulableByName("1").minShare === 2)
93+
assert(rootPool.getSchedulableByName("1").weight === 1)
94+
assert(rootPool.getSchedulableByName("2").minShare === 3)
95+
assert(rootPool.getSchedulableByName("2").weight === 1)
96+
assert(rootPool.getSchedulableByName("3").minShare === 0)
97+
assert(rootPool.getSchedulableByName("3").weight === 1)
98+
99+
val properties1 = new Properties()
100+
properties1.setProperty("spark.scheduler.pool","1")
101+
val properties2 = new Properties()
102+
properties2.setProperty("spark.scheduler.pool","2")
103+
104+
val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
105+
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
106+
val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler)
107+
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
108+
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
109+
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
110+
111+
val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler)
112+
val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler)
113+
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
114+
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
115+
116+
// Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names.
117+
scheduleTaskAndVerifyId(0, rootPool, 0)
118+
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower.
119+
scheduleTaskAndVerifyId(1, rootPool, 3)
120+
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower.
121+
scheduleTaskAndVerifyId(2, rootPool, 3)
122+
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower.
123+
scheduleTaskAndVerifyId(3, rootPool, 1)
124+
// Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower.
125+
scheduleTaskAndVerifyId(4, rootPool, 4)
126+
// Neither pool is needy so ordering is based on number of running tasks.
127+
// Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running
128+
// tasks.
129+
scheduleTaskAndVerifyId(5, rootPool, 2)
130+
// Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming
131+
// ordering.
132+
scheduleTaskAndVerifyId(6, rootPool, 2)
133+
// Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running
134+
// tasks.
135+
scheduleTaskAndVerifyId(7, rootPool, 4)
136+
}
137+
138+
test("Nested Pool Test") {
139+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
140+
val taskScheduler = new TaskSchedulerImpl(sc)
141+
142+
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
143+
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
144+
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
145+
rootPool.addSchedulable(pool0)
146+
rootPool.addSchedulable(pool1)
147+
148+
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
149+
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
150+
pool0.addSchedulable(pool00)
151+
pool0.addSchedulable(pool01)
152+
153+
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
154+
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
155+
pool1.addSchedulable(pool10)
156+
pool1.addSchedulable(pool11)
157+
158+
val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler)
159+
val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler)
160+
pool00.addSchedulable(taskSetManager000)
161+
pool00.addSchedulable(taskSetManager001)
162+
163+
val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler)
164+
val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler)
165+
pool01.addSchedulable(taskSetManager010)
166+
pool01.addSchedulable(taskSetManager011)
167+
168+
val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler)
169+
val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler)
170+
pool10.addSchedulable(taskSetManager100)
171+
pool10.addSchedulable(taskSetManager101)
172+
173+
val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler)
174+
val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler)
175+
pool11.addSchedulable(taskSetManager110)
176+
pool11.addSchedulable(taskSetManager111)
177+
178+
scheduleTaskAndVerifyId(0, rootPool, 0)
179+
scheduleTaskAndVerifyId(1, rootPool, 4)
180+
scheduleTaskAndVerifyId(2, rootPool, 6)
181+
scheduleTaskAndVerifyId(3, rootPool, 2)
182+
}
183+
}

0 commit comments

Comments
 (0)