From 0c247dba6084313873b539bcf230371c903f04b3 Mon Sep 17 00:00:00 2001 From: Arun Ramakrishnan Date: Mon, 21 Apr 2014 00:41:09 -0700 Subject: [PATCH 1/5] SPARK-1438 RDD language apis to support optional seed in RDD methods sample/takeSample --- .../apache/spark/api/java/JavaDoubleRDD.scala | 8 +++++++- .../apache/spark/api/java/JavaPairRDD.scala | 8 +++++++- .../org/apache/spark/api/java/JavaRDD.scala | 8 +++++++- .../apache/spark/api/java/JavaRDDLike.scala | 5 ++++- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 19 +++++++++++++++++++ python/pyspark/rdd.py | 13 ++++++------- python/pyspark/rddsampler.py | 2 +- .../plans/logical/basicOperators.scala | 2 +- .../org/apache/spark/sql/SchemaRDD.scala | 5 +++-- .../spark/sql/execution/basicOperators.scala | 2 +- 11 files changed, 58 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 4330cef3965ee..868ed141effae 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -133,7 +133,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja /** * Return a sampled subset of this RDD. */ - def sample(withReplacement: Boolean, fraction: JDouble, seed: Int): JavaDoubleRDD = + def sample(withReplacement: Boolean, fraction: JDouble): JavaDoubleRDD = + sample(withReplacement, fraction, System.nanoTime) + + /** + * Return a sampled subset of this RDD. + */ + def sample(withReplacement: Boolean, fraction: JDouble, seed: Long): JavaDoubleRDD = fromRDD(srdd.sample(withReplacement, fraction, seed)) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index b3ec270281ae4..02a5e20073f85 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -119,7 +119,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return a sampled subset of this RDD. */ - def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] = + def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] = + sample(withReplacement, fraction, System.nanoTime) + + /** + * Return a sampled subset of this RDD. + */ + def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed)) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 327c1552dc941..374ff7808c215 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -98,7 +98,13 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) /** * Return a sampled subset of this RDD. */ - def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = + def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] = + sample(withReplacement, fraction, System.nanoTime) + + /** + * Return a sampled subset of this RDD. + */ + def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] = wrapRDD(rdd.sample(withReplacement, fraction, seed)) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 725c423a53e35..bbf84ee7ffb0f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -394,7 +394,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } - def takeSample(withReplacement: Boolean, num: Int, seed: Int): JList[T] = { + def takeSample(withReplacement: Boolean, num: Int): JList[T] = + takeSample(withReplacement, num, System.nanoTime) + + def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = { import scala.collection.JavaConversions._ val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, seed).toSeq new java.util.ArrayList(arr) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 891efccf23b6a..125f1da220fda 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -321,7 +321,7 @@ abstract class RDD[T: ClassTag]( /** * Return a sampled subset of this RDD. */ - def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = { + def sample(withReplacement: Boolean, fraction: Double, seed: Long = System.nanoTime): RDD[T] = { require(fraction >= 0.0, "Invalid fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed) @@ -346,7 +346,7 @@ abstract class RDD[T: ClassTag]( }.toArray } - def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { + def takeSample(withReplacement: Boolean, num: Int, seed: Long = System.nanoTime): Array[T] = { var fraction = 0.0 var total = 0 val multiplier = 3.0 diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 1901330d8b188..6bbaaf87cd4b4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -466,6 +466,12 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("takeSample") { val data = sc.parallelize(1 to 100, 2) + for (num <- List(5,20,100)) { + val sample = data.takeSample(withReplacement=false, num=num) + assert(sample.size === num) // Got exactly num elements + assert(sample.toSet.size === num) // Elements are distinct + assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]") + } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement=false, 20, seed) assert(sample.size === 20) // Got exactly 20 elements @@ -483,6 +489,19 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sample.size === 20) // Got exactly 20 elements assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]") } + { + val sample = data.takeSample(withReplacement=true, num=20) + assert(sample.size === 20) // Got exactly 100 elements + assert(sample.toSet.size <= 20, "sampling with replacement returned all distinct elements") + assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]") + } + { + val sample = data.takeSample(withReplacement=true, num=100) + assert(sample.size === 100) // Got exactly 100 elements + // Chance of getting all distinct elements is astronomically low, so test we got < 100 + assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements") + assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]") + } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement=true, 100, seed) assert(sample.size === 100) // Got exactly 100 elements diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 91fc7e637e2c6..70a78eda8c019 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -30,6 +30,7 @@ from threading import Thread import warnings import heapq +import random from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long @@ -332,7 +333,7 @@ def distinct(self): .reduceByKey(lambda x, _: x) \ .map(lambda (x, _): x) - def sample(self, withReplacement, fraction, seed): + def sample(self, withReplacement, fraction, seed=None): """ Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable). @@ -344,7 +345,7 @@ def sample(self, withReplacement, fraction, seed): return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) # this is ported from scala/spark/RDD.scala - def takeSample(self, withReplacement, num, seed): + def takeSample(self, withReplacement, num, seed=None): """ Return a fixed-size sampled subset of this RDD (currently requires numpy). @@ -381,13 +382,11 @@ def takeSample(self, withReplacement, num, seed): # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. # See: scala/spark/RDD.scala + random.seed(seed) while len(samples) < total: - if seed > sys.maxint - 2: - seed = -1 - seed += 1 - samples = self.sample(withReplacement, fraction, seed).collect() + samples = self.sample(withReplacement, fraction, random.randint(0,sys.maxint)).collect() - sampler = RDDSampler(withReplacement, fraction, seed+1) + sampler = RDDSampler(withReplacement, fraction, random.randint(0,sys.maxint)) sampler.shuffle(samples) return samples[0:total] diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index aca2ef3b51e98..5ce330ee239be 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -19,7 +19,7 @@ import random class RDDSampler(object): - def __init__(self, withReplacement, fraction, seed): + def __init__(self, withReplacement, fraction, seed=None): try: import numpy self._use_numpy = True diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 397473e178867..732708e146b04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -168,7 +168,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode { def references = Set.empty } -case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan) +case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: LogicalPlan) extends UnaryNode { def output = child.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index f2ae5b0fe612f..6658d306c987e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -256,10 +256,11 @@ class SchemaRDD( * @group Query */ @Experimental + override def sample( - fraction: Double, withReplacement: Boolean = true, - seed: Int = (math.random * 1000).toInt) = + fraction: Double, + seed: Long) = new SchemaRDD(sqlContext, Sample(fraction, withReplacement, seed, logicalPlan)) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index eedcc7dda02d7..288094f97f9ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -44,7 +44,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { } } -case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: SparkPlan) +case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan) extends UnaryNode { override def output = child.output From 69619c6686cc7ff7113f8ef031f3ed3698bafa25 Mon Sep 17 00:00:00 2001 From: Arun Ramakrishnan Date: Mon, 21 Apr 2014 21:37:22 -0700 Subject: [PATCH 2/5] SPARK-1438 fix spacing issue --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6bbaaf87cd4b4..1f97bd867b338 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -465,8 +465,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("takeSample") { val data = sc.parallelize(1 to 100, 2) - - for (num <- List(5,20,100)) { + + for (num <- List(5,20,100)) { val sample = data.takeSample(withReplacement=false, num=num) assert(sample.size === num) // Got exactly num elements assert(sample.toSet.size === num) // Elements are distinct From 8d05b1a7719d0fdf262f130ed5a71f1030b153e4 Mon Sep 17 00:00:00 2001 From: Arun Ramakrishnan Date: Tue, 22 Apr 2014 21:59:40 -0700 Subject: [PATCH 3/5] SPARK-1438 RDD . Replace System.nanoTime with a Random generated number. python: use a separate instance of Random instead of seeding language api global Random instance. --- .../apache/spark/api/java/JavaDoubleRDD.scala | 3 +- .../apache/spark/api/java/JavaPairRDD.scala | 3 +- .../org/apache/spark/api/java/JavaRDD.scala | 3 +- .../apache/spark/api/java/JavaRDDLike.scala | 3 +- .../spark/rdd/PartitionwiseSampledRDD.scala | 6 ++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 11 +++++--- .../scala/org/apache/spark/util/Utils.scala | 2 ++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 4 +-- python/pyspark/rdd.py | 8 +++--- python/pyspark/rddsampler.py | 28 +++++++++---------- 10 files changed, 40 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 868ed141effae..a6123bd108c11 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -30,6 +30,7 @@ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.StatCounter +import org.apache.spark.util.Utils class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] { @@ -134,7 +135,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: JDouble): JavaDoubleRDD = - sample(withReplacement, fraction, System.nanoTime) + sample(withReplacement, fraction, Utils.random.nextLong) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 02a5e20073f85..554c065358648 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]) @@ -120,7 +121,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] = - sample(withReplacement, fraction, System.nanoTime) + sample(withReplacement, fraction, Utils.random.nextLong) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 374ff7808c215..dc698dea75e43 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark._ import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { @@ -99,7 +100,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] = - sample(withReplacement, fraction, System.nanoTime) + sample(withReplacement, fraction, Utils.random.nextLong) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index bbf84ee7ffb0f..574a98636a619 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -34,6 +34,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def wrapRDD(rdd: RDD[T]): This @@ -395,7 +396,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } def takeSample(withReplacement: Boolean, num: Int): JList[T] = - takeSample(withReplacement, num, System.nanoTime) + takeSample(withReplacement, num, Utils.random.nextLong) def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = { import scala.collection.JavaConversions._ diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala index b4e3bb5d75e17..2fc98b4498794 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala @@ -23,6 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.util.random.RandomSampler +import org.apache.spark.util.Utils private[spark] class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) @@ -38,14 +39,15 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) * * @param prev RDD to be sampled * @param sampler a random sampler - * @param seed random seed, default to System.nanoTime + * @param seed random seed, default to a Long value generated by an instance of + * java.util.Random shared within the library code * @tparam T input RDD item type * @tparam U sampled RDD item type */ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag]( prev: RDD[T], sampler: RandomSampler[T, U], - @transient seed: Long = System.nanoTime) + @transient seed: Long = Utils.random.nextLong) extends RDD[U](prev) { override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 125f1da220fda..180d7e179730d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -321,7 +321,9 @@ abstract class RDD[T: ClassTag]( /** * Return a sampled subset of this RDD. */ - def sample(withReplacement: Boolean, fraction: Double, seed: Long = System.nanoTime): RDD[T] = { + def sample(withReplacement: Boolean, + fraction: Double, + seed: Long = Utils.random.nextLong): RDD[T] = { require(fraction >= 0.0, "Invalid fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed) @@ -334,11 +336,11 @@ abstract class RDD[T: ClassTag]( * Randomly splits this RDD with the provided weights. * * @param weights weights for splits, will be normalized if they don't sum to 1 - * @param seed random seed, default to System.nanoTime + * @param seed random seed, default to rand.nextLong * * @return split RDDs in an array */ - def randomSplit(weights: Array[Double], seed: Long = System.nanoTime): Array[RDD[T]] = { + def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => @@ -346,7 +348,8 @@ abstract class RDD[T: ClassTag]( }.toArray } - def takeSample(withReplacement: Boolean, num: Int, seed: Long = System.nanoTime): Array[T] = { + def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] = + { var fraction = 0.0 var total = 0 val multiplier = 3.0 diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3af4e7b91692..3f8396d327f3c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -46,6 +46,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, private[spark] object Utils extends Logging { val osName = System.getProperty("os.name") + + val random = new Random() /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 1f97bd867b338..abf5e8972e184 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -467,11 +467,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { val data = sc.parallelize(1 to 100, 2) for (num <- List(5,20,100)) { - val sample = data.takeSample(withReplacement=false, num=num) + val sample = data.takeSample(withReplacement=false, num=num) assert(sample.size === num) // Got exactly num elements assert(sample.toSet.size === num) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]") - } + } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement=false, 20, seed) assert(sample.size === 20) // Got exactly 20 elements diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 70a78eda8c019..a46de27021857 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -30,7 +30,7 @@ from threading import Thread import warnings import heapq -import random +from random import Random from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long @@ -382,11 +382,11 @@ def takeSample(self, withReplacement, num, seed=None): # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. # See: scala/spark/RDD.scala - random.seed(seed) + rand = Random(seed) while len(samples) < total: - samples = self.sample(withReplacement, fraction, random.randint(0,sys.maxint)).collect() + samples = self.sample(withReplacement, fraction, rand.randint(0,sys.maxint)).collect() - sampler = RDDSampler(withReplacement, fraction, random.randint(0,sys.maxint)) + sampler = RDDSampler(withReplacement, fraction, rand.randint(0,sys.maxint)) sampler.shuffle(samples) return samples[0:total] diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 5ce330ee239be..78564379b145e 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -27,7 +27,7 @@ def __init__(self, withReplacement, fraction, seed=None): print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling." self._use_numpy = False - self._seed = seed + self._seed = seed if seed is not None else random.randint(0,sys.maxint) self._withReplacement = withReplacement self._fraction = fraction self._random = None @@ -38,17 +38,15 @@ def initRandomGenerator(self, split): if self._use_numpy: import numpy self._random = numpy.random.RandomState(self._seed) - for _ in range(0, split): - # discard the next few values in the sequence to have a - # different seed for the different splits - self._random.randint(sys.maxint) else: import random - random.seed(self._seed) - for _ in range(0, split): - # discard the next few values in the sequence to have a - # different seed for the different splits - random.randint(0, sys.maxint) + self._random = random.Random(self._seed) + + for _ in range(0, split): + # discard the next few values in the sequence to have a + # different seed for the different splits + self._random.randint(0, sys.maxint) + self._split = split self._rand_initialized = True @@ -59,7 +57,7 @@ def getUniformSample(self, split): if self._use_numpy: return self._random.random_sample() else: - return random.uniform(0.0, 1.0) + return self._random.uniform(0.0, 1.0) def getPoissonSample(self, split, mean): if not self._rand_initialized or split != self._split: @@ -73,26 +71,26 @@ def getPoissonSample(self, split, mean): num_arrivals = 1 cur_time = 0.0 - cur_time += random.expovariate(mean) + cur_time += self._random.expovariate(mean) if cur_time > 1.0: return 0 while(cur_time <= 1.0): - cur_time += random.expovariate(mean) + cur_time += self._random.expovariate(mean) num_arrivals += 1 return (num_arrivals - 1) def shuffle(self, vals): - if self._random == None or split != self._split: + if self._random == None: self.initRandomGenerator(0) # this should only ever called on the master so # the split does not matter if self._use_numpy: self._random.shuffle(vals) else: - random.shuffle(vals, self._random) + self._random.shuffle(vals, self._random.random) def func(self, split, iterator): if self._withReplacement: From b9ebfe2baf5be05a31a55b9200b470f2ce0e3311 Mon Sep 17 00:00:00 2001 From: Arun Ramakrishnan Date: Wed, 23 Apr 2014 23:31:03 -0700 Subject: [PATCH 4/5] SPARK-1438 removing redundant import of random in python rddsampler --- python/pyspark/rddsampler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 78564379b145e..eb352de4d24b4 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -39,7 +39,6 @@ def initRandomGenerator(self, split): import numpy self._random = numpy.random.RandomState(self._seed) else: - import random self._random = random.Random(self._seed) for _ in range(0, split): From 07bb06e9f15406567c4907d964a959728b8bf77b Mon Sep 17 00:00:00 2001 From: Arun Ramakrishnan Date: Thu, 24 Apr 2014 03:08:19 -0700 Subject: [PATCH 5/5] SPARK-1438 fixing more space formatting issues --- .../scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala | 3 +-- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- python/pyspark/rdd.py | 4 ++-- python/pyspark/rddsampler.py | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala index 2fc98b4498794..b5b8a5706deb3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala @@ -39,8 +39,7 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) * * @param prev RDD to be sampled * @param sampler a random sampler - * @param seed random seed, default to a Long value generated by an instance of - * java.util.Random shared within the library code + * @param seed random seed * @tparam T input RDD item type * @tparam U sampled RDD item type */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 180d7e179730d..8dee014e1b49d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -336,7 +336,7 @@ abstract class RDD[T: ClassTag]( * Randomly splits this RDD with the provided weights. * * @param weights weights for splits, will be normalized if they don't sum to 1 - * @param seed random seed, default to rand.nextLong + * @param seed random seed * * @return split RDDs in an array */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index abf5e8972e184..512bb07365ad1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -466,7 +466,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("takeSample") { val data = sc.parallelize(1 to 100, 2) - for (num <- List(5,20,100)) { + for (num <- List(5, 20, 100)) { val sample = data.takeSample(withReplacement=false, num=num) assert(sample.size === num) // Got exactly num elements assert(sample.toSet.size === num) // Elements are distinct diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a46de27021857..d73ab7006e9c7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -384,9 +384,9 @@ def takeSample(self, withReplacement, num, seed=None): # See: scala/spark/RDD.scala rand = Random(seed) while len(samples) < total: - samples = self.sample(withReplacement, fraction, rand.randint(0,sys.maxint)).collect() + samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() - sampler = RDDSampler(withReplacement, fraction, rand.randint(0,sys.maxint)) + sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) sampler.shuffle(samples) return samples[0:total] diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index eb352de4d24b4..845a267e311c5 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -27,7 +27,7 @@ def __init__(self, withReplacement, fraction, seed=None): print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling." self._use_numpy = False - self._seed = seed if seed is not None else random.randint(0,sys.maxint) + self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._fraction = fraction self._random = None