Skip to content

Commit add8416

Browse files
author
Marcelo Vanzin
committed
[SPARK-2778] [yarn] Add yarn integration tests.
This patch adds a couple of, currently, very simple integration tests to make sure both client and cluster modes are working. The tests don't do much yet other than run a simple job, but the plan is to enhance them after we get the framework in. The cluster tests are noisy, so redirect all log output to a file like other tests do. Copying the conf around sucks but it's less work than messing with maven/sbt and having to clean up other projects. Note the test is only added for yarn-stable. The code compiles against yarn-alpha but there are two issues I ran into that I could not overcome: - and old netty dependency kept creeping into the classpath and causing akka to not work, when using sbt; the old netty was correctly suppressed under maven. - MiniYARNCluster kept failing to execute containers because it did not create the NM's local dir itself; this is apparently a known behavior, but I'm not sure how to work around it. None of those issues are present with the stable Yarn. Also, these tests are a little slow to run. Apparently Spark doesn't yet tag tests (so that these could be isolated in a "slow" batch), so this is something to keep in mind.
1 parent f2b5b61 commit add8416

File tree

5 files changed

+209
-3
lines changed

5 files changed

+209
-3
lines changed

pom.xml

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,35 @@
698698
</exclusion>
699699
</exclusions>
700700
</dependency>
701+
<dependency>
702+
<groupId>org.apache.hadoop</groupId>
703+
<artifactId>hadoop-yarn-server-tests</artifactId>
704+
<version>${yarn.version}</version>
705+
<classifier>tests</classifier>
706+
<scope>test</scope>
707+
<exclusions>
708+
<exclusion>
709+
<groupId>asm</groupId>
710+
<artifactId>asm</artifactId>
711+
</exclusion>
712+
<exclusion>
713+
<groupId>org.ow2.asm</groupId>
714+
<artifactId>asm</artifactId>
715+
</exclusion>
716+
<exclusion>
717+
<groupId>org.jboss.netty</groupId>
718+
<artifactId>netty</artifactId>
719+
</exclusion>
720+
<exclusion>
721+
<groupId>javax.servlet</groupId>
722+
<artifactId>servlet-api</artifactId>
723+
</exclusion>
724+
<exclusion>
725+
<groupId>commons-logging</groupId>
726+
<artifactId>commons-logging</artifactId>
727+
</exclusion>
728+
</exclusions>
729+
</dependency>
701730
<dependency>
702731
<groupId>org.apache.hadoop</groupId>
703732
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
@@ -1170,7 +1199,7 @@
11701199
<dependency>
11711200
<groupId>org.apache.zookeeper</groupId>
11721201
<artifactId>zookeeper</artifactId>
1173-
<version>3.4.5-mapr-1406</version>
1202+
<version>3.4.5-mapr-1406</version>
11741203
</dependency>
11751204
</dependencies>
11761205
</profile>

yarn/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,14 @@
126126
<configuration>
127127
<environmentVariables>
128128
<SPARK_HOME>${basedir}/../..</SPARK_HOME>
129-
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
130129
</environmentVariables>
131130
</configuration>
132131
</plugin>
133132
</plugins>
134133

135134
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
136135
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
137-
136+
138137
<resources>
139138
<resource>
140139
<directory>../common/src/main/resources</directory>

yarn/stable/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,13 @@
3232
<packaging>jar</packaging>
3333
<name>Spark Project YARN Stable API</name>
3434

35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.hadoop</groupId>
38+
<artifactId>hadoop-yarn-server-tests</artifactId>
39+
<classifier>tests</classifier>
40+
<scope>test</scope>
41+
</dependency>
42+
</dependencies>
43+
3544
</project>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Set everything to be logged to the file core/target/unit-tests.log
19+
log4j.rootCategory=INFO, file
20+
log4j.appender.file=org.apache.log4j.FileAppender
21+
log4j.appender.file.append=false
22+
log4j.appender.file.file=target/unit-tests.log
23+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
24+
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
25+
26+
# Ignore messages below warning level from Jetty, because it's a bit verbose
27+
log4j.logger.org.eclipse.jetty=WARN
28+
org.eclipse.jetty.LEVEL=WARN
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.yarn
19+
20+
import java.io.File
21+
22+
import scala.collection.JavaConversions._
23+
import scala.collection.mutable.HashMap
24+
25+
import com.google.common.base.Charsets
26+
import com.google.common.io.Files
27+
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
28+
29+
import org.apache.hadoop.yarn.conf.YarnConfiguration
30+
import org.apache.hadoop.yarn.server.MiniYARNCluster
31+
32+
import org.apache.spark.{Logging, SparkConf, SparkContext}
33+
import org.apache.spark.deploy.SparkHadoopUtil
34+
import org.apache.spark.util.Utils
35+
36+
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
37+
38+
private val oldConf = new HashMap[String, String]()
39+
private var yarnCluster: MiniYARNCluster = _
40+
private var tempDir: File = _
41+
private var fakeSparkJar: File = _
42+
43+
override def beforeAll() {
44+
tempDir = Utils.createTempDir()
45+
46+
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1, 1, false)
47+
yarnCluster.init(new YarnConfiguration())
48+
yarnCluster.start()
49+
50+
val sysProps = sys.props.map { case (k, v) => (k, v) }
51+
sysProps.foreach { case (k, v) =>
52+
if (k.startsWith("spark.")) {
53+
oldConf += (k -> v)
54+
sys.props -= k
55+
}
56+
}
57+
58+
yarnCluster.getConfig().foreach { e =>
59+
sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
60+
}
61+
62+
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
63+
sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
64+
sys.props += ("spark.executor.instances" -> "1")
65+
sys.props += ("spark.driver.extraClassPath" -> sys.props("java.class.path"))
66+
sys.props += ("spark.executor.extraClassPath" -> sys.props("java.class.path"))
67+
68+
super.beforeAll()
69+
}
70+
71+
override def afterAll() {
72+
yarnCluster.stop()
73+
74+
val sysProps = sys.props.map { case (k, v) => (k, v) }
75+
sysProps.foreach { case (k, v) =>
76+
if (k.startsWith("spark.")) {
77+
sys.props -= k
78+
}
79+
}
80+
81+
oldConf.foreach { case (k, v) => sys.props += (k -> v) }
82+
83+
super.afterAll()
84+
}
85+
86+
test("run Spark in yarn-client mode") {
87+
var result = File.createTempFile("result", null, tempDir)
88+
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
89+
checkResult(result)
90+
}
91+
92+
test("run Spark in yarn-cluster mode") {
93+
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
94+
var result = File.createTempFile("result", null, tempDir)
95+
96+
// The Client object will call System.exit() after the job is done, and we don't want
97+
// that because it messes up the scalatest monitoring. So replicate some of what main()
98+
// does here.
99+
val args = Array("--class", main,
100+
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
101+
"--arg", "yarn-cluster",
102+
"--arg", result.getAbsolutePath(),
103+
"--num-executors", "4")
104+
val sparkConf = new SparkConf()
105+
val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
106+
val clientArgs = new ClientArguments(args, sparkConf)
107+
new Client(clientArgs, yarnConf, sparkConf).run()
108+
checkResult(result)
109+
}
110+
111+
/**
112+
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
113+
* any sort of error when the job process finishes successfully, but the job itself fails. So
114+
* the tests enforce that something is written to a file after everything is ok to indicate
115+
* that the job succeeded.
116+
*/
117+
private def checkResult(result: File) = {
118+
var resultString = Files.toString(result, Charsets.UTF_8)
119+
resultString should be ("success")
120+
}
121+
122+
}
123+
124+
private object YarnClusterDriver extends Logging with Matchers {
125+
126+
def main(args: Array[String]) = {
127+
val sc = new SparkContext(new SparkConf().setMaster(args(0))
128+
.setAppName("yarn \"test app\" 'with quotes'"))
129+
val status = new File(args(1))
130+
var result = "failure"
131+
try {
132+
val data = sc.parallelize(1 to 4).map(i => i).collect().toSet
133+
data should be (Set(1, 2, 3, 4))
134+
result = "success"
135+
} finally {
136+
sc.stop()
137+
Files.write(result, status, Charsets.UTF_8)
138+
}
139+
}
140+
141+
}

0 commit comments

Comments
 (0)