Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ import org.scalatest.Matchers
class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {


implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
t1 ++= t2
t1
}
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
t1 += t2
t1
}
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
new mutable.HashSet[A]()
implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
new AccumulableParam[mutable.Set[A], A] {
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
t1 ++= t2
t1
}
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
t1 += t2
t1
}
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
new mutable.HashSet[A]()
}
}
}

test ("basic accumulation"){
sc = new SparkContext("local", "test")
Expand All @@ -49,11 +50,10 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
d.foreach{x => acc += x}
acc.value should be (210)


val longAcc = sc.accumulator(0l)
val longAcc = sc.accumulator(0L)
val maxInt = Integer.MAX_VALUE.toLong
d.foreach{x => longAcc += maxInt + x}
longAcc.value should be (210l + maxInt * 20)
longAcc.value should be (210L + maxInt * 20)
}

test ("value not assignable from tasks") {
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(split)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
Array(1, 2, 3, 4).iterator
}
rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
override def getPartitions: Array[Partition] = firstParent[Int].partitions
override def compute(split: Partition, context: TaskContext) =
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
firstParent[Int].iterator(split, context)
}.cache()
rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
override def getPartitions: Array[Partition] = firstParent[Int].partitions
override def compute(split: Partition, context: TaskContext) =
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
firstParent[Int].iterator(split, context)
}.cache()
}
Expand Down
15 changes: 9 additions & 6 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
assert(parCollection.partitions.length === numPartitions)
assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
assert(parCollection.partitions.toList ===
parCollection.checkpointData.get.getPartitions.toList)
assert(parCollection.collect() === result)
}

Expand All @@ -102,13 +103,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}

test("UnionRDD") {
def otherRDD = sc.makeRDD(1 to 10, 1)
def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
testRDD(_.union(otherRDD))
testRDDPartitions(_.union(otherRDD))
}

test("CartesianRDD") {
def otherRDD = sc.makeRDD(1 to 10, 1)
def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
testRDD(new CartesianRDD(sc, _, otherRDD))
testRDDPartitions(new CartesianRDD(sc, _, otherRDD))

Expand Down Expand Up @@ -223,7 +224,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val partitionAfterCheckpoint = serializeDeserialize(
unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
assert(
partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
partitionBeforeCheckpoint.parents.head.getClass !=
partitionAfterCheckpoint.parents.head.getClass,
"PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
)
}
Expand Down Expand Up @@ -358,7 +360,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* Generate an pair RDD (with partitioner) such that both the RDD and its partitions
* have large size.
*/
def generateFatPairRDD() = {
def generateFatPairRDD(): RDD[(Int, Int)] = {
new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
}

Expand Down Expand Up @@ -445,7 +447,8 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
: RDD[(K, Array[Iterable[V]])] = {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
}
}

//------ Helper functions ------
// ------ Helper functions ------

protected def newRDD() = sc.makeRDD(1 to 10)
protected def newPairRDD() = newRDD().map(_ -> 1)
Expand Down Expand Up @@ -370,7 +370,7 @@ class CleanerTester(
val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
toBeCleanedRDDIds -= rddId
logInfo("RDD "+ rddId + " cleaned")
logInfo("RDD " + rddId + " cleaned")
}

def shuffleCleaned(shuffleId: Int): Unit = {
Expand Down
26 changes: 17 additions & 9 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsSequenceFile(outputDir)
val output =
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}

Expand Down Expand Up @@ -451,16 +451,19 @@ class FileSuite extends FunSuite with LocalSparkContext {

test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
}
}

test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
Expand All @@ -471,16 +474,20 @@ class FileSuite extends FunSuite with LocalSparkContext {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
Expand All @@ -492,7 +499,8 @@ class FileSuite extends FunSuite with LocalSparkContext {

test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private object ImplicitOrderingSuite {
override def compare(o: OrderedClass): Int = ???
}

def basicMapExpectations(rdd: RDD[Int]) = {
def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
List((rdd.map(x => (x, x)).keyOrdering.isDefined,
"rdd.map(x => (x, x)).keyOrdering.isDefined"),
(rdd.map(x => (1, x)).keyOrdering.isDefined,
Expand All @@ -68,7 +68,7 @@ private object ImplicitOrderingSuite {
"rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined"))
}

def otherRDDMethodExpectations(rdd: RDD[Int]) = {
def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
List((rdd.groupBy(x => x).keyOrdering.isDefined,
"rdd.groupBy(x => x).keyOrdering.isDefined"),
(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty,
Expand All @@ -82,4 +82,4 @@ private object ImplicitOrderingSuite {
(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined,
"rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
val rdd = sc.parallelize(1 to 10, 2).map { i =>
JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
}.reduceByKey(_+_)
}.reduceByKey(_ + _)
val f1 = rdd.collectAsync()
val f2 = rdd.countAsync()

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/LocalSparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
super.afterEach()
}

def resetSparkContext() = {
def resetSparkContext(): Unit = {
LocalSparkContext.stop(sc)
sc = null
}
Expand All @@ -54,7 +54,7 @@ object LocalSparkContext {
}

/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = {
try {
f(sc)
} finally {
Expand Down
30 changes: 17 additions & 13 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
// Row does not extend Comparable, but has an implicit Ordering defined.
implicit object RowOrdering extends Ordering[Row] {
override def compare(x: Row, y: Row) = x.value - y.value
override def compare(x: Row, y: Row): Int = x.value - y.value
}

val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
Expand Down Expand Up @@ -212,20 +212,24 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
val arrPairs: RDD[(Array[Int], Int)] =
sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))

assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
def verify(testFun: => Unit): Unit = {
intercept[SparkException](testFun).getMessage.contains("array")
}

verify(arrs.distinct())
// We can't catch all usages of arrays, since they might occur inside other collections:
// assert(fails { arrPairs.distinct() })
assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
verify(arrPairs.partitionBy(new HashPartitioner(2)))
verify(arrPairs.join(arrPairs))
verify(arrPairs.leftOuterJoin(arrPairs))
verify(arrPairs.rightOuterJoin(arrPairs))
verify(arrPairs.fullOuterJoin(arrPairs))
verify(arrPairs.groupByKey())
verify(arrPairs.countByKey())
verify(arrPairs.countByKeyApprox(1))
verify(arrPairs.cogroup(arrPairs))
verify(arrPairs.reduceByKeyLocally(_ + _))
verify(arrPairs.reduceByKey(_ + _))
}

test("zero-length partitions should be correctly handled") {
Expand Down
15 changes: 10 additions & 5 deletions core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")

val opts = SSLOptions.parse(conf, "spark.ssl")
Expand All @@ -52,7 +53,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
assert(opts.enabledAlgorithms ===
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
}

test("test resolving property with defaults specified ") {
Expand All @@ -66,7 +68,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")

val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
Expand All @@ -83,7 +86,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
assert(opts.enabledAlgorithms ===
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
}

test("test whether defaults can be overridden ") {
Expand All @@ -99,7 +103,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
conf.set("spark.ssl.protocol", "SSLv3")

Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import java.io.File

object SSLSampleConfigs {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
val untrustedKeyStorePath = new File(this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
val untrustedKeyStorePath = new File(
this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath

def sparkSSLConfig() = {
def sparkSSLConfig(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
Expand All @@ -38,7 +39,7 @@ object SSLSampleConfigs {
conf
}

def sparkSSLConfigUntrusted() = {
def sparkSSLConfigUntrusted(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", untrustedKeyStorePath)
Expand Down
Loading