From 41fce544cadce5ed314b75f368abf79ee7fcd2da Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 10 Nov 2014 15:54:20 -0800 Subject: [PATCH 1/8] randomSplit() --- .../apache/spark/api/python/PythonRDD.scala | 13 +++++++++ python/pyspark/rdd.py | 28 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 45beb8fc8c925..78a5794bd557b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -757,6 +757,19 @@ private[spark] object PythonRDD extends Logging { converted.saveAsHadoopDataset(new JobConf(conf)) } } + + /** + * A helper to convert java.util.List[Double] into Array[Double] + * @param list + * @return + */ + def listToArrayDouble(list: JList[Double]): Array[Double] = { + val r = new Array[Double](list.size) + list.zipWithIndex.foreach { + case (v, i) => r(i) = v + } + r + } } private diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 08d047402625f..f29af793737f8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -316,6 +316,34 @@ def sample(self, withReplacement, fraction, seed=None): assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) + def randomSplit(self, weights, seed=None): + """ + Randomly splits this RDD with the provided weights. + + :param weights: weights for splits, will be normalized if they don't sum to 1 + :param seed: random seed + :return: split RDDs in an list + + >>> rdd = sc.parallelize(range(10), 1) + >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11) + >>> rdd1.collect() + [3, 6] + >>> rdd2.collect() + [0, 5, 7] + >>> rdd3.collect() + [1, 2, 4, 8, 9] + """ + ser = BatchedSerializer(PickleSerializer(), 1) + rdd = self._reserialize(ser) + jweights = ListConverter().convert([float(w) for w in weights], + self.ctx._gateway._gateway_client) + jweights = self.ctx._jvm.PythonRDD.listToArrayDouble(jweights) + if seed is None: + jrdds = rdd._jrdd.randomSplit(jweights) + else: + jrdds = rdd._jrdd.randomSplit(jweights, seed) + return [RDD(jrdd, self.ctx, ser) for jrdd in jrdds] + # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ From 1715ee38b619868c284454d5058376e9e0ca09a7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 12 Nov 2014 14:15:47 -0800 Subject: [PATCH 2/8] address comments --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 -- python/pyspark/rdd.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 78a5794bd557b..a36f4a1a221c8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -760,8 +760,6 @@ private[spark] object PythonRDD extends Logging { /** * A helper to convert java.util.List[Double] into Array[Double] - * @param list - * @return */ def listToArrayDouble(list: JList[Double]): Array[Double] = { val r = new Array[Double](list.size) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f29af793737f8..a4cf1ead5bfa9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -322,7 +322,7 @@ def randomSplit(self, weights, seed=None): :param weights: weights for splits, will be normalized if they don't sum to 1 :param seed: random seed - :return: split RDDs in an list + :return: split RDDs in a list >>> rdd = sc.parallelize(range(10), 1) >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11) From 0d9b256a9b7aa52c37c6a952ffd68bf4441d46e5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 12 Nov 2014 15:00:17 -0800 Subject: [PATCH 3/8] refactor --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a36f4a1a221c8..6702baa9e12fb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import org.apache.spark.input.PortableDataStream import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials @@ -762,11 +763,7 @@ private[spark] object PythonRDD extends Logging { * A helper to convert java.util.List[Double] into Array[Double] */ def listToArrayDouble(list: JList[Double]): Array[Double] = { - val r = new Array[Double](list.size) - list.zipWithIndex.foreach { - case (v, i) => r(i) = v - } - r + list.asScala.toArray } } From c7a2007245752480dfe96316d22dc8f19ea1b1a2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 11:29:43 -0800 Subject: [PATCH 4/8] switch to python implementation --- .../apache/spark/api/python/PythonRDD.scala | 7 ----- python/pyspark/rdd.py | 30 ++++++++----------- python/pyspark/rddsampler.py | 5 ++-- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6702baa9e12fb..ffac314d50219 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -758,13 +758,6 @@ private[spark] object PythonRDD extends Logging { converted.saveAsHadoopDataset(new JobConf(conf)) } } - - /** - * A helper to convert java.util.List[Double] into Array[Double] - */ - def listToArrayDouble(list: JList[Double]): Array[Double] = { - list.asScala.toArray - } } private diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a4cf1ead5bfa9..bb03adce2463e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -28,7 +28,7 @@ import warnings import heapq import bisect -from random import Random +import random from math import sqrt, log, isinf, isnan from pyspark.accumulators import PStatsParam @@ -324,25 +324,21 @@ def randomSplit(self, weights, seed=None): :param seed: random seed :return: split RDDs in a list - >>> rdd = sc.parallelize(range(10), 1) - >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11) + >>> rdd = sc.parallelize(range(5), 1) + >>> rdd1, rdd2 = rdd.randomSplit([2.0, 3.0], 101) >>> rdd1.collect() - [3, 6] + [2, 3] >>> rdd2.collect() - [0, 5, 7] - >>> rdd3.collect() - [1, 2, 4, 8, 9] + [0, 1, 4] """ - ser = BatchedSerializer(PickleSerializer(), 1) - rdd = self._reserialize(ser) - jweights = ListConverter().convert([float(w) for w in weights], - self.ctx._gateway._gateway_client) - jweights = self.ctx._jvm.PythonRDD.listToArrayDouble(jweights) + s = sum(weights) + cweights = [0.0] + for w in weights: + cweights.append(cweights[-1] + w / s) if seed is None: - jrdds = rdd._jrdd.randomSplit(jweights) - else: - jrdds = rdd._jrdd.randomSplit(jweights, seed) - return [RDD(jrdd, self.ctx, ser) for jrdd in jrdds] + seed = random.randint(0, 2 ** 32 - 1) + return [self.mapPartitionsWithIndex(RDDSampler(False, ub, seed, lb).func, True) + for lb, ub in zip(cweights, cweights[1:])] # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): @@ -369,7 +365,7 @@ def takeSample(self, withReplacement, num, seed=None): if initialCount == 0: return [] - rand = Random(seed) + rand = random.Random(seed) if (not withReplacement) and num >= initialCount: # shuffle current RDD and return diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index f5c3cfd259a5b..b35fb64570762 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -96,9 +96,10 @@ def shuffle(self, vals): class RDDSampler(RDDSamplerBase): - def __init__(self, withReplacement, fraction, seed=None): + def __init__(self, withReplacement, fraction, seed=None, lowbound=0.0): RDDSamplerBase.__init__(self, withReplacement, seed) self._fraction = fraction + self._lowbound = lowbound def func(self, split, iterator): if self._withReplacement: @@ -111,7 +112,7 @@ def func(self, split, iterator): yield obj else: for obj in iterator: - if self.getUniformSample(split) <= self._fraction: + if self._lowbound <= self.getUniformSample(split) < self._fraction: yield obj From f866bcf7e23a6d888d2113df4da3031bfe91400e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 11:31:09 -0800 Subject: [PATCH 5/8] remove unneeded change --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index ffac314d50219..45beb8fc8c925 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -24,7 +24,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import org.apache.spark.input.PortableDataStream import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials From 4dfa2cdce6a2eaae5f5e24321e324bd0498ea49f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 12:20:56 -0800 Subject: [PATCH 6/8] refactor --- python/pyspark/rdd.py | 4 ++-- python/pyspark/rddsampler.py | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bb03adce2463e..4b2043ff9b465 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -38,7 +38,7 @@ from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_full_outer_join, python_cogroup from pyspark.statcounter import StatCounter -from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler +from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ @@ -337,7 +337,7 @@ def randomSplit(self, weights, seed=None): cweights.append(cweights[-1] + w / s) if seed is None: seed = random.randint(0, 2 ** 32 - 1) - return [self.mapPartitionsWithIndex(RDDSampler(False, ub, seed, lb).func, True) + return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True) for lb, ub in zip(cweights, cweights[1:])] # this is ported from scala/spark/RDD.scala diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index b35fb64570762..4365640040116 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -96,10 +96,9 @@ def shuffle(self, vals): class RDDSampler(RDDSamplerBase): - def __init__(self, withReplacement, fraction, seed=None, lowbound=0.0): + def __init__(self, withReplacement, fraction, seed=None): RDDSamplerBase.__init__(self, withReplacement, seed) self._fraction = fraction - self._lowbound = lowbound def func(self, split, iterator): if self._withReplacement: @@ -112,10 +111,23 @@ def func(self, split, iterator): yield obj else: for obj in iterator: - if self._lowbound <= self.getUniformSample(split) < self._fraction: + if self.getUniformSample(split) <= self._fraction: yield obj +class RDDRangeSampler(RDDSamplerBase): + + def __init__(self, lowerBound, upperBound, seed=None): + RDDSamplerBase.__init__(self, False, seed) + self._lowerBound = lowerBound + self._upperBound = upperBound + + def func(self, split, iterator): + for obj in iterator: + if self._lowerBound <= self.getUniformSample(split) < self._upperBound: + yield obj + + class RDDStratifiedSampler(RDDSamplerBase): def __init__(self, withReplacement, fractions, seed=None): From f5fdf63fe0d0ea091c01a4144585276a7db63625 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 12:54:14 -0800 Subject: [PATCH 7/8] fix bug with int in weights --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4b2043ff9b465..0e8920281e842 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -325,13 +325,13 @@ def randomSplit(self, weights, seed=None): :return: split RDDs in a list >>> rdd = sc.parallelize(range(5), 1) - >>> rdd1, rdd2 = rdd.randomSplit([2.0, 3.0], 101) + >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 101) >>> rdd1.collect() [2, 3] >>> rdd2.collect() [0, 1, 4] """ - s = sum(weights) + s = float(sum(weights)) cweights = [0.0] for w in weights: cweights.append(cweights[-1] + w / s) From 78bf997f13c6f08129671a9d6a3484620d5b37a2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 13:08:10 -0800 Subject: [PATCH 8/8] fix tests, do not use numpy in randomSplit, no performance gain --- python/pyspark/rdd.py | 6 +++--- python/pyspark/rddsampler.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0e8920281e842..50535d2711708 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -325,11 +325,11 @@ def randomSplit(self, weights, seed=None): :return: split RDDs in a list >>> rdd = sc.parallelize(range(5), 1) - >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 101) + >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17) >>> rdd1.collect() - [2, 3] + [1, 3] >>> rdd2.collect() - [0, 1, 4] + [0, 2, 4] """ s = float(sum(weights)) cweights = [0.0] diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 4365640040116..558dcfd12d46f 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -119,6 +119,7 @@ class RDDRangeSampler(RDDSamplerBase): def __init__(self, lowerBound, upperBound, seed=None): RDDSamplerBase.__init__(self, False, seed) + self._use_numpy = False # no performance gain from numpy self._lowerBound = lowerBound self._upperBound = upperBound