Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4dcea38
Move withSystemProperty to TestUtils class.
JoshRosen Dec 19, 2014
9e3e0dd
Add ResetSystemProperties test fixture mixin; use it in SparkSubmitSu…
JoshRosen Dec 19, 2014
628f46c
Use ResetSystemProperties in DistributedSuite
JoshRosen Dec 19, 2014
14a92e4
Use withSystemProperty in FileServerSuite
JoshRosen Dec 19, 2014
60a63a1
Use ResetSystemProperties in JobCancellationSuite
JoshRosen Dec 19, 2014
51aa870
Use withSystemProperty in ShuffleSuite
JoshRosen Dec 19, 2014
c83ded8
Use ResetSystemProperties in SparkConfSuite
JoshRosen Dec 19, 2014
0995c4b
Use ResetSystemProperties in SparkContextSchedulerCreationSuite
JoshRosen Dec 19, 2014
5b3cb54
Use ResetSystemProperties in SparkListenerSuite
JoshRosen Dec 19, 2014
e9ded62
Use ResetSystemProperties in TaskSchedulerImplSuite
JoshRosen Dec 19, 2014
b0daff2
Use ResetSystemProperties in BlockManagerSuite
JoshRosen Dec 19, 2014
dd9492b
Use ResetSystemProperties in AkkaUtilsSuite
JoshRosen Dec 19, 2014
1d1aa5a
Use ResetSystemProperties in SizeEstimatorSuite
JoshRosen Dec 19, 2014
25bfce2
Use ResetSystemProperties in UtilsSuite
JoshRosen Dec 19, 2014
633a84a
Remove use of system properties in FileServerSuite
JoshRosen Dec 24, 2014
8783ab0
Remove TestUtils.setSystemProperty, since it is subsumed by the Reset…
JoshRosen Dec 24, 2014
cfe9cce
Remove use of system properties in SparkContextSuite
JoshRosen Dec 24, 2014
3f2f955
Remove System.setProperty calls in DistributedSuite
JoshRosen Dec 24, 2014
655587c
Remove setProperty calls in JobCancellationSuite
JoshRosen Dec 24, 2014
bee20df
Remove setProperty calls in SparkContextSchedulerCreationSuite
JoshRosen Dec 24, 2014
3fdb554
Remove setProperty call in TaskSchedulerImplSuite
JoshRosen Dec 24, 2014
7a3d224
Fix trait ordering
JoshRosen Dec 24, 2014
0eaf0b6
Remove setProperty call in TaskResultGetterSuite.
JoshRosen Dec 24, 2014
4742a5b
Clarify ResetSystemProperties trait inheritance ordering.
JoshRosen Dec 25, 2014
4f4031d
Add note on why SparkSubmitSuite needs ResetSystemProperties
JoshRosen Dec 25, 2014
3888fe3
Remove setProperty use in LocalJavaStreamingContext
JoshRosen Dec 25, 2014
0236d66
Replace setProperty uses in two example programs / tools
JoshRosen Dec 25, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark

import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
Expand All @@ -29,16 +28,10 @@ class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}


class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
with LocalSparkContext {
class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {

val clusterUrl = "local-cluster[2,1,512]"

after {
System.clearProperty("spark.reducer.maxMbInFlight")
System.clearProperty("spark.storage.memoryFraction")
}

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
Expand Down Expand Up @@ -84,15 +77,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("groupByKey where map output sizes exceed maxMbInFlight") {
System.setProperty("spark.reducer.maxMbInFlight", "1")
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
sc = new SparkContext(clusterUrl, "test", conf)
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
// file should be about 2.5 MB
val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
assert(groups.length === 16)
assert(groups.map(_._2).sum === 2000)
// Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
}

test("accumulators") {
Expand Down Expand Up @@ -210,28 +202,25 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("compute without caching when no partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.0001")
sc = new SparkContext(clusterUrl, "test")
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("compute when only some partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.01")
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
sc = new SparkContext(clusterUrl, "test", conf)
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
// to make sure that *some* of them do fit though
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("passing environment variables to cluster") {
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var tmpJarUrl: String = _

def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")

override def beforeEach() {
super.beforeEach()
resetSparkContext()
System.setProperty("spark.authenticate", "false")
}

override def beforeAll() {
Expand All @@ -52,7 +53,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarFile = new File(testTempDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")

val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
Expand All @@ -74,7 +74,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test("Distributing files locally") {
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand Down Expand Up @@ -108,7 +108,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {

test("Distributing files locally using URL as input") {
// addFile("file:///....")
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(new File(tmpFile.toString).toURI.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -122,7 +122,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -133,7 +133,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test("Distributing files on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -147,7 +147,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -158,7 +158,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
Expand Down
21 changes: 10 additions & 11 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,42 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
override def afterEach() {
super.afterEach()
resetSparkContext()
System.clearProperty("spark.scheduler.mode")
}

test("local mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test")
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("local mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test")
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("cluster mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test")
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("cluster mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test")
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand Down
22 changes: 9 additions & 13 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
conf.set("spark.test.noStageRetry", "true")

test("groupByKey without compression") {
try {
System.setProperty("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test", conf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
} finally {
System.setProperty("spark.shuffle.compress", "true")
}
val myConf = conf.clone().set("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test", myConf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}

test("shuffle non-zero block size") {
Expand Down
51 changes: 19 additions & 32 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,20 @@ package org.apache.spark

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext {
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("loading from system properties") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
}

test("initializing without loading defaults") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
}

test("named set methods") {
Expand Down Expand Up @@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {

test("nested property names") {
// This wasn't supported by some external conf parsing libraries
try {
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
} finally {
System.clearProperty("spark.test.a")
System.clearProperty("spark.test.a.b")
System.clearProperty("spark.test.a.b.c")
}
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}

test("register kryo classes through registerKryoClasses") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {

def createTaskScheduler(master: String): TaskSchedulerImpl = {
def createTaskScheduler(master: String): TaskSchedulerImpl =
createTaskScheduler(master, new SparkConf())

def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test", conf)
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
Expand Down Expand Up @@ -102,19 +105,13 @@ class SparkContextSchedulerCreationSuite
}

test("local-default-parallelism") {
val defaultParallelism = System.getProperty("spark.default.parallelism")
System.setProperty("spark.default.parallelism", "16")
val sched = createTaskScheduler("local")
val conf = new SparkConf().set("spark.default.parallelism", "16")
val sched = createTaskScheduler("local", conf)

sched.backend match {
case s: LocalBackend => assert(s.defaultParallelism() === 16)
case _ => fail()
}

Option(defaultParallelism) match {
case Some(v) => System.setProperty("spark.default.parallelism", v)
case _ => System.clearProperty("spark.default.parallelism")
}
}

test("simr") {
Expand Down Expand Up @@ -155,9 +152,10 @@ class SparkContextSchedulerCreationSuite
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
}

def testMesos(master: String, expectedClass: Class[_]) {
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
try {
val sched = createTaskScheduler(master)
val sched = createTaskScheduler(master, conf)
assert(sched.backend.getClass === expectedClass)
} catch {
case e: UnsatisfiedLinkError =>
Expand All @@ -168,17 +166,14 @@ class SparkContextSchedulerCreationSuite
}

test("mesos fine-grained") {
System.setProperty("spark.mesos.coarse", "false")
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
}

test("mesos coarse-grained") {
System.setProperty("spark.mesos.coarse", "true")
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
}

test("mesos with zookeeper") {
System.setProperty("spark.mesos.coarse", "false")
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
}
}
Loading