From 23b2ecd274b27ef4a61f74d8ae0fccebd5d158b4 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 28 Jul 2014 15:32:17 -0700 Subject: [PATCH 01/11] WIP --- .../mllib/api/python/PythonMLLibAPI.scala | 19 ++++++ .../mllib/random/RandomRDDGenerators.scala | 9 +++ python/pyspark/mllib/randomRDD.py | 64 +++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 python/pyspark/mllib/randomRDD.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 954621ee8b933..6895c4b8c652f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -24,6 +24,7 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.random.RandomRDDGenerators import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.MLUtils @@ -453,4 +454,22 @@ class PythonMLLibAPI extends Serializable { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + + def uniformRDD(jsc: JavaSparkContext, + size: Long, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { + RandomRDDGenerators.uniformRDD(jsc.sc, size, numPartitions, seed) + .map(serializeDouble).toJavaRDD() + } + + def normalRDD(jsc: JavaSparkContext, + size: Long, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { + RandomRDDGenerators.normalRDD(jsc.sc, size, numPartitions, seed) + .map(serializeDouble).toJavaRDD() + } + + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index d7ee2d3f46846..1d413537e76c9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -79,6 +79,9 @@ object RandomRDDGenerators { * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * + * To transform the distribution in the generated RDD from standard normal to some other normal + * N(mean, sigma), use `RandomRDDs.normal(sc, n, p, seed).map(v => mean + sigma * v)` + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @param numPartitions Number of partitions in the RDD. @@ -95,6 +98,9 @@ object RandomRDDGenerators { * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * + * To transform the distribution in the generated RDD from standard normal to some other normal + * N(mean, sigma), use `RandomRDDs.normal(sc, n, p).map(v => mean + sigma * v)` + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @param numPartitions Number of partitions in the RDD. @@ -110,6 +116,9 @@ object RandomRDDGenerators { * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * + * To transform the distribution in the generated RDD from standard normal to some other normal + * N(mean, sigma), use `RandomRDDs.normal(sc, n).map(v => mean + sigma * v)` + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). diff --git a/python/pyspark/mllib/randomRDD.py b/python/pyspark/mllib/randomRDD.py new file mode 100644 index 0000000000000..28e83e4e97ce1 --- /dev/null +++ b/python/pyspark/mllib/randomRDD.py @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.rdd import RDD +from pyspark.mllib._common import _deserialize_double +from pyspark.serializers import NoOpSerializer + + +class RandomRDDGenerators: + """ + Generator methods for creating RDDs comprised of i.i.d samples from + some distribution. + """ + + @staticmethod + def uniformRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the + uniform distribution on [0.0, 1.0]. + + >>> RandomRDDGenerators.uniformRDD(sc, 10, 1, 0).collect() + + """ + if not numPartitions: + if not seed: + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) + else: + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions) + else: + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size) + + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + +def _test(): + import doctest + from pyspark.context import SparkContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() From 07ddff20fc2a77c9a9d2a16d6094f8ccd36886b9 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 28 Jul 2014 18:59:56 -0700 Subject: [PATCH 02/11] units passed. --- .../mllib/api/python/PythonMLLibAPI.scala | 58 +++++- .../mllib/random/RandomRDDGenerators.scala | 15 +- python/pyspark/mllib/randomRDD.py | 171 ++++++++++++++++-- 3 files changed, 227 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 6895c4b8c652f..0fe0a009bb530 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -455,6 +455,9 @@ class PythonMLLibAPI extends Serializable { ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + /** + * Java stub for Python mllib RandomRDDGenerators.uniformRDD() + */ def uniformRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int, @@ -463,13 +466,62 @@ class PythonMLLibAPI extends Serializable { .map(serializeDouble).toJavaRDD() } + /** + * Java stub for Python mllib RandomRDDGenerators.normalRDD() + */ def normalRDD(jsc: JavaSparkContext, - size: Long, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { + size: Long, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.normalRDD(jsc.sc, size, numPartitions, seed) .map(serializeDouble).toJavaRDD() } + /** + * Java stub for Python mllib RandomRDDGenerators.poissonRDD() + */ + def poissonRDD(jsc: JavaSparkContext, + mean: Double, + size: Long, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { + RandomRDDGenerators.poissonRDD(jsc.sc, mean, size, numPartitions, seed) + .map(serializeDouble).toJavaRDD() + } + /** + * Java stub for Python mllib RandomRDDGenerators.uniformVectorRDD() + */ + def uniformVectorRDD(jsc: JavaSparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { + RandomRDDGenerators.uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed) + .map(serializeDoubleVector).toJavaRDD() + } + + /** + * Java stub for Python mllib RandomRDDGenerators.normalVectorRDD() + */ + def normalVectorRDD(jsc: JavaSparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { + RandomRDDGenerators.normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed) + .map(serializeDoubleVector).toJavaRDD() + } + + /** + * Java stub for Python mllib RandomRDDGenerators.poissonVectorRDD() + */ + def poissonVectorRDD(jsc: JavaSparkContext,mean: Double, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): JavaRDD[Array[Byte]] = { + RandomRDDGenerators.poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed) + .map(serializeDoubleVector).toJavaRDD() + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index 1d413537e76c9..5f89762dc8453 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -35,6 +35,9 @@ object RandomRDDGenerators { * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. * + * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use + * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => (b - a) * v)` + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @param numPartitions Number of partitions in the RDD. @@ -51,6 +54,9 @@ object RandomRDDGenerators { * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. * + * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use + * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => (b - a) * v)` + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @param numPartitions Number of partitions in the RDD. @@ -66,6 +72,9 @@ object RandomRDDGenerators { * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * + * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use + * `RandomRDDGenerators.uniformRDD(sc, n).map(v => (b - a) * v)` + * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. @@ -80,7 +89,7 @@ object RandomRDDGenerators { * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDs.normal(sc, n, p, seed).map(v => mean + sigma * v)` + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)` * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -99,7 +108,7 @@ object RandomRDDGenerators { * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDs.normal(sc, n, p).map(v => mean + sigma * v)` + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)` * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -117,7 +126,7 @@ object RandomRDDGenerators { * sc.defaultParallelism used for the number of partitions in the RDD. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDs.normal(sc, n).map(v => mean + sigma * v)` + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)` * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. diff --git a/python/pyspark/mllib/randomRDD.py b/python/pyspark/mllib/randomRDD.py index 28e83e4e97ce1..b2a2bc5aeb9d2 100644 --- a/python/pyspark/mllib/randomRDD.py +++ b/python/pyspark/mllib/randomRDD.py @@ -15,10 +15,12 @@ # limitations under the License. # +from random import getrandbits + from pyspark.rdd import RDD -from pyspark.mllib._common import _deserialize_double +from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector from pyspark.serializers import NoOpSerializer - +from pyspark.statcounter import StatCounter class RandomRDDGenerators: """ @@ -32,20 +34,167 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. - >>> RandomRDDGenerators.uniformRDD(sc, 10, 1, 0).collect() + To transform the distribution in the generated RDD from U[0.0, 1.0] + to U[a, b], use + C{RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(lambda v: (b - a) * v)} + >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() + >>> len(x) + 100 + >>> max(x) <= 1.0 and min(x) >= 0.0 + True + >>> RandomRDDGenerators.uniformRDD(sc, 100, 4).getNumPartitions() + 4 + >>> parts = RandomRDDGenerators.uniformRDD(sc, 100, seed=4).getNumPartitions() + >>> parts == sc.defaultParallelism + True """ - if not numPartitions: - if not seed: - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) - else: - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions) - else: - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size) - + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + @staticmethod + def normalRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the standard normal + distribution. + + To transform the distribution in the generated RDD from standard normal + to some other normal N(mean, sigma), use + C{RandomRDDGenerators.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} + + >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L).collect() + >>> stats = StatCounter(x) + >>> stats.count() + 1000L + >>> abs(stats.mean() - 0.0) < 0.1 + True + >>> abs(stats.stdev() - 1.0) < 0.1 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def poissonRDD(sc, mean, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the Poisson + distribution with the input mean. + + >>> mean = 100.0 + >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L).collect() + >>> stats = StatCounter(x) + >>> stats.count() + 1000L + >>> abs(stats.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(stats.stdev() - sqrt(mean)) < 0.5 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the uniform distribution on [0.0 1.0]. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.uniformVectorRDD(sc, 10, 10).collect()) + >>> mat.shape + (10, 10) + >>> mat.max() <= 1.0 and mat.min() >= 0.0 + True + >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() + 4 + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI()\ + .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the standard normal distribution. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.normalVectorRDD(sc, 100, 100, seed=1L).collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - 0.0) < 0.1 + True + >>> abs(mat.std() - 1.0) < 0.1 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the Poisson distribution with the input mean. + + >>> import numpy as np + >>> mean = 100.0 + >>> rdd = RandomRDDGenerators.poissonVectorRDD(sc, mean, 100, 100, seed=1L) + >>> mat = np.mat(rdd.collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(mat.std() - sqrt(mean)) < 0.5 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def _getDefaultArgs(sc, numPartitions, seed): + """ + Use sc.defaultParallelism for numPartitions and + a randomly generated long for seed if either has a value of C{None} + + >>> RandomRDDGenerators._getDefaultArgs(sc, 3, 2) + (3, 2) + >>> RandomRDDGenerators._getDefaultArgs(sc, None, 2) == (sc.defaultParallelism, 2) + True + >>> from math import pow + >>> RandomRDDGenerators._getDefaultArgs(sc, None, None)[1] < pow(2, 63) + True + """ + if not numPartitions: + numPartitions = sc.defaultParallelism + if not seed: + seed = RandomRDDGenerators._nextLong() + return numPartitions, seed + + @staticmethod + def _nextLong(): + """ + Returns a random long to be used as RNG seed in the Java APIs. + + Note: only 63 random bits are used here since Long.MAX_VALUE = 2 ^ 63 - 1 + """ + return long(getrandbits(63)) + def _test(): import doctest From bd2df136bfbfc13d5c6df365ba17b97e11c3c767 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Tue, 29 Jul 2014 12:26:38 -0700 Subject: [PATCH 03/11] typos --- .../mllib/api/python/PythonMLLibAPI.scala | 3 +- .../mllib/random/RandomRDDGenerators.scala | 84 +++++++++---------- python/pyspark/mllib/randomRDD.py | 7 +- 3 files changed, 48 insertions(+), 46 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 0fe0a009bb530..2839be3ddd35f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -516,7 +516,8 @@ class PythonMLLibAPI extends Serializable { /** * Java stub for Python mllib RandomRDDGenerators.poissonVectorRDD() */ - def poissonVectorRDD(jsc: JavaSparkContext,mean: Double, + def poissonVectorRDD(jsc: JavaSparkContext, + mean: Double, numRows: Long, numCols: Int, numPartitions: Int, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index 5f89762dc8453..021d651d4dbaa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -26,17 +26,17 @@ import org.apache.spark.util.Utils /** * :: Experimental :: - * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. + * Generator methods for creating RDDs comprised of i.i.d. samples from some distribution. */ @Experimental object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. * * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => (b - a) * v)` + * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -52,10 +52,10 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. * * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => (b - a) * v)` + * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -69,11 +69,11 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n).map(v => (b - a) * v)` + * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -86,10 +86,10 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)` + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -105,10 +105,10 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)` + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -122,11 +122,11 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)` + * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. @@ -139,7 +139,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. @@ -160,7 +160,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. @@ -175,7 +175,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. @@ -190,7 +190,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. * * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. @@ -210,7 +210,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. * * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. @@ -228,7 +228,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. @@ -247,7 +247,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * uniform distribution on [0.0 1.0]. * * @param sc SparkContext used to create the RDD. @@ -269,14 +269,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * uniform distribution on [0.0 1.0]. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. */ @Experimental def uniformVectorRDD(sc: SparkContext, @@ -288,14 +288,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * uniform distribution on [0.0 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. */ @Experimental def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { @@ -304,7 +304,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. @@ -312,7 +312,7 @@ object RandomRDDGenerators { * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). */ @Experimental def normalVectorRDD(sc: SparkContext, @@ -326,14 +326,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). */ @Experimental def normalVectorRDD(sc: SparkContext, @@ -345,14 +345,14 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). */ @Experimental def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { @@ -361,7 +361,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. @@ -370,7 +370,7 @@ object RandomRDDGenerators { * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ @Experimental def poissonVectorRDD(sc: SparkContext, @@ -385,7 +385,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. @@ -393,7 +393,7 @@ object RandomRDDGenerators { * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ @Experimental def poissonVectorRDD(sc: SparkContext, @@ -406,7 +406,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -414,7 +414,7 @@ object RandomRDDGenerators { * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ @Experimental def poissonVectorRDD(sc: SparkContext, @@ -426,7 +426,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the * input DistributionGenerator. * * @param sc SparkContext used to create the RDD. @@ -435,7 +435,7 @@ object RandomRDDGenerators { * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ @Experimental def randomVectorRDD(sc: SparkContext, @@ -449,7 +449,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the * input DistributionGenerator. * * @param sc SparkContext used to create the RDD. @@ -457,7 +457,7 @@ object RandomRDDGenerators { * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ @Experimental def randomVectorRDD(sc: SparkContext, @@ -470,7 +470,7 @@ object RandomRDDGenerators { /** * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the * input DistributionGenerator. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -478,7 +478,7 @@ object RandomRDDGenerators { * @param generator DistributionGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ @Experimental def randomVectorRDD(sc: SparkContext, diff --git a/python/pyspark/mllib/randomRDD.py b/python/pyspark/mllib/randomRDD.py index b2a2bc5aeb9d2..ce66f01f0ea5f 100644 --- a/python/pyspark/mllib/randomRDD.py +++ b/python/pyspark/mllib/randomRDD.py @@ -20,7 +20,6 @@ from pyspark.rdd import RDD from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector from pyspark.serializers import NoOpSerializer -from pyspark.statcounter import StatCounter class RandomRDDGenerators: """ @@ -31,12 +30,12 @@ class RandomRDDGenerators: @staticmethod def uniformRDD(sc, size, numPartitions=None, seed=None): """ - Generates an RDD comprised of i.i.d samples from the + Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - C{RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(lambda v: (b - a) * v)} + C{RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)} >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() >>> len(x) @@ -65,6 +64,7 @@ def normalRDD(sc, size, numPartitions=None, seed=None): C{RandomRDDGenerators.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L).collect() + >>> from pyspark.statcounter import StatCounter >>> stats = StatCounter(x) >>> stats.count() 1000L @@ -86,6 +86,7 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): >>> mean = 100.0 >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L).collect() + >>> from pyspark.statcounter import StatCounter >>> stats = StatCounter(x) >>> stats.count() 1000L From 29d205e1792f21b34501f4133e80d71acc5dac14 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Tue, 29 Jul 2014 13:42:11 -0700 Subject: [PATCH 04/11] created mllib.random package --- .../mllib/random/RandomRDDGenerators.py | 201 ++++++++++++++++ python/pyspark/mllib/random/__init__.py | 25 ++ python/pyspark/mllib/randomRDD.py | 214 ------------------ 3 files changed, 226 insertions(+), 214 deletions(-) create mode 100644 python/pyspark/mllib/random/RandomRDDGenerators.py create mode 100644 python/pyspark/mllib/random/__init__.py delete mode 100644 python/pyspark/mllib/randomRDD.py diff --git a/python/pyspark/mllib/random/RandomRDDGenerators.py b/python/pyspark/mllib/random/RandomRDDGenerators.py new file mode 100644 index 0000000000000..eeebef219a7bf --- /dev/null +++ b/python/pyspark/mllib/random/RandomRDDGenerators.py @@ -0,0 +1,201 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from random import getrandbits + +from pyspark.rdd import RDD +from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector +from pyspark.serializers import NoOpSerializer + + +def uniformRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d. samples from the + uniform distribution on [0.0, 1.0]. + + To transform the distribution in the generated RDD from U[0.0, 1.0] + to U[a, b], use + C{uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)} + + >>> x = uniformRDD(sc, 100).collect() + >>> len(x) + 100 + >>> max(x) <= 1.0 and min(x) >= 0.0 + True + >>> uniformRDD(sc, 100, 4).getNumPartitions() + 4 + >>> parts = uniformRDD(sc, 100, seed=4).getNumPartitions() + >>> parts == sc.defaultParallelism + True + """ + numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + +def normalRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the standard normal + distribution. + + To transform the distribution in the generated RDD from standard normal + to some other normal N(mean, sigma), use + C{normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} + + >>> x = normalRDD(sc, 1000, seed=1L).collect() + >>> from pyspark.statcounter import StatCounter + >>> stats = StatCounter(x) + >>> stats.count() + 1000L + >>> abs(stats.mean() - 0.0) < 0.1 + True + >>> abs(stats.stdev() - 1.0) < 0.1 + True + """ + numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) + +def poissonRDD(sc, mean, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the Poisson + distribution with the input mean. + + >>> mean = 100.0 + >>> x = poissonRDD(sc, mean, 1000, seed=1L).collect() + >>> from pyspark.statcounter import StatCounter + >>> stats = StatCounter(x) + >>> stats.count() + 1000L + >>> abs(stats.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(stats.stdev() - sqrt(mean)) < 0.5 + True + """ + numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) + +def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the uniform distribution on [0.0 1.0]. + + >>> import numpy as np + >>> mat = np.matrix(uniformVectorRDD(sc, 10, 10).collect()) + >>> mat.shape + (10, 10) + >>> mat.max() <= 1.0 and mat.min() >= 0.0 + True + >>> uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() + 4 + """ + numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI()\ + .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + +def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the standard normal distribution. + + >>> import numpy as np + >>> mat = np.matrix(normalVectorRDD(sc, 100, 100, seed=1L).collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - 0.0) < 0.1 + True + >>> abs(mat.std() - 1.0) < 0.1 + True + """ + numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + +def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the Poisson distribution with the input mean. + + >>> import numpy as np + >>> mean = 100.0 + >>> rdd = poissonVectorRDD(sc, mean, 100, 100, seed=1L) + >>> mat = np.mat(rdd.collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(mat.std() - sqrt(mean)) < 0.5 + True + """ + numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + +def _getDefaultArgs(sc, numPartitions, seed): + """ + Use sc.defaultParallelism for numPartitions and + a randomly generated long for seed if either has a value of C{None} + + >>> _getDefaultArgs(sc, 3, 2) + (3, 2) + >>> _getDefaultArgs(sc, None, 2) == (sc.defaultParallelism, 2) + True + >>> from math import pow + >>> _getDefaultArgs(sc, None, None)[1] < pow(2, 63) + True + """ + if not numPartitions: + numPartitions = sc.defaultParallelism + if not seed: + seed = _nextLong() + return numPartitions, seed + +def _nextLong(): + """ + Returns a random long to be used as RNG seed in the Java APIs. + + Note: only 63 random bits are used here since Long.MAX_VALUE = 2 ^ 63 - 1 + """ + return long(getrandbits(63)) + + +def _test(): + import doctest + from pyspark.context import SparkContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/mllib/random/__init__.py b/python/pyspark/mllib/random/__init__.py new file mode 100644 index 0000000000000..669f4eb2904e2 --- /dev/null +++ b/python/pyspark/mllib/random/__init__.py @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python package for mllib.random +""" + +from pyspark.mllib.random import RandomRDDGenerators + + +__all__ = ["RandomRDDGenerators"] diff --git a/python/pyspark/mllib/randomRDD.py b/python/pyspark/mllib/randomRDD.py deleted file mode 100644 index ce66f01f0ea5f..0000000000000 --- a/python/pyspark/mllib/randomRDD.py +++ /dev/null @@ -1,214 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from random import getrandbits - -from pyspark.rdd import RDD -from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector -from pyspark.serializers import NoOpSerializer - -class RandomRDDGenerators: - """ - Generator methods for creating RDDs comprised of i.i.d samples from - some distribution. - """ - - @staticmethod - def uniformRDD(sc, size, numPartitions=None, seed=None): - """ - Generates an RDD comprised of i.i.d. samples from the - uniform distribution on [0.0, 1.0]. - - To transform the distribution in the generated RDD from U[0.0, 1.0] - to U[a, b], use - C{RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)} - - >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() - >>> len(x) - 100 - >>> max(x) <= 1.0 and min(x) >= 0.0 - True - >>> RandomRDDGenerators.uniformRDD(sc, 100, 4).getNumPartitions() - 4 - >>> parts = RandomRDDGenerators.uniformRDD(sc, 100, seed=4).getNumPartitions() - >>> parts == sc.defaultParallelism - True - """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) - return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) - - @staticmethod - def normalRDD(sc, size, numPartitions=None, seed=None): - """ - Generates an RDD comprised of i.i.d samples from the standard normal - distribution. - - To transform the distribution in the generated RDD from standard normal - to some other normal N(mean, sigma), use - C{RandomRDDGenerators.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} - - >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L).collect() - >>> from pyspark.statcounter import StatCounter - >>> stats = StatCounter(x) - >>> stats.count() - 1000L - >>> abs(stats.mean() - 0.0) < 0.1 - True - >>> abs(stats.stdev() - 1.0) < 0.1 - True - """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) - return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) - - @staticmethod - def poissonRDD(sc, mean, size, numPartitions=None, seed=None): - """ - Generates an RDD comprised of i.i.d samples from the Poisson - distribution with the input mean. - - >>> mean = 100.0 - >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L).collect() - >>> from pyspark.statcounter import StatCounter - >>> stats = StatCounter(x) - >>> stats.count() - 1000L - >>> abs(stats.mean() - mean) < 0.5 - True - >>> from math import sqrt - >>> abs(stats.stdev() - sqrt(mean)) < 0.5 - True - """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) - return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) - - @staticmethod - def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): - """ - Generates an RDD comprised of vectors containing i.i.d samples drawn - from the uniform distribution on [0.0 1.0]. - - >>> import numpy as np - >>> mat = np.matrix(RandomRDDGenerators.uniformVectorRDD(sc, 10, 10).collect()) - >>> mat.shape - (10, 10) - >>> mat.max() <= 1.0 and mat.min() >= 0.0 - True - >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() - 4 - """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI()\ - .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) - return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - - @staticmethod - def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): - """ - Generates an RDD comprised of vectors containing i.i.d samples drawn - from the standard normal distribution. - - >>> import numpy as np - >>> mat = np.matrix(RandomRDDGenerators.normalVectorRDD(sc, 100, 100, seed=1L).collect()) - >>> mat.shape - (100, 100) - >>> abs(mat.mean() - 0.0) < 0.1 - True - >>> abs(mat.std() - 1.0) < 0.1 - True - """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI() \ - .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) - return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - - @staticmethod - def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): - """ - Generates an RDD comprised of vectors containing i.i.d samples drawn - from the Poisson distribution with the input mean. - - >>> import numpy as np - >>> mean = 100.0 - >>> rdd = RandomRDDGenerators.poissonVectorRDD(sc, mean, 100, 100, seed=1L) - >>> mat = np.mat(rdd.collect()) - >>> mat.shape - (100, 100) - >>> abs(mat.mean() - mean) < 0.5 - True - >>> from math import sqrt - >>> abs(mat.std() - sqrt(mean)) < 0.5 - True - """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI() \ - .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) - return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - - @staticmethod - def _getDefaultArgs(sc, numPartitions, seed): - """ - Use sc.defaultParallelism for numPartitions and - a randomly generated long for seed if either has a value of C{None} - - >>> RandomRDDGenerators._getDefaultArgs(sc, 3, 2) - (3, 2) - >>> RandomRDDGenerators._getDefaultArgs(sc, None, 2) == (sc.defaultParallelism, 2) - True - >>> from math import pow - >>> RandomRDDGenerators._getDefaultArgs(sc, None, None)[1] < pow(2, 63) - True - """ - if not numPartitions: - numPartitions = sc.defaultParallelism - if not seed: - seed = RandomRDDGenerators._nextLong() - return numPartitions, seed - - @staticmethod - def _nextLong(): - """ - Returns a random long to be used as RNG seed in the Java APIs. - - Note: only 63 random bits are used here since Long.MAX_VALUE = 2 ^ 63 - 1 - """ - return long(getrandbits(63)) - - -def _test(): - import doctest - from pyspark.context import SparkContext - globs = globals().copy() - # The small batch size here ensures that we see multiple batches, - # even in these small test examples: - globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() - if failure_count: - exit(-1) - - -if __name__ == "__main__": - _test() From 4338f40465d611a20171aa40761f64d9b5c60ec4 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Tue, 29 Jul 2014 18:58:28 -0700 Subject: [PATCH 05/11] renamed randomRDD to rand and import as random --- python/pyspark/mllib/__init__.py | 2 ++ python/pyspark/mllib/{random => rand}/RandomRDDGenerators.py | 0 python/pyspark/mllib/{random => rand}/__init__.py | 3 +-- 3 files changed, 3 insertions(+), 2 deletions(-) rename python/pyspark/mllib/{random => rand}/RandomRDDGenerators.py (100%) rename python/pyspark/mllib/{random => rand}/__init__.py (94%) diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py index 4149f54931d1f..6eba9c21a87c7 100644 --- a/python/pyspark/mllib/__init__.py +++ b/python/pyspark/mllib/__init__.py @@ -24,3 +24,5 @@ import numpy if numpy.version.version < '1.4': raise Exception("MLlib requires NumPy 1.4+") + +import rand as random diff --git a/python/pyspark/mllib/random/RandomRDDGenerators.py b/python/pyspark/mllib/rand/RandomRDDGenerators.py similarity index 100% rename from python/pyspark/mllib/random/RandomRDDGenerators.py rename to python/pyspark/mllib/rand/RandomRDDGenerators.py diff --git a/python/pyspark/mllib/random/__init__.py b/python/pyspark/mllib/rand/__init__.py similarity index 94% rename from python/pyspark/mllib/random/__init__.py rename to python/pyspark/mllib/rand/__init__.py index 669f4eb2904e2..ff917719262ab 100644 --- a/python/pyspark/mllib/random/__init__.py +++ b/python/pyspark/mllib/rand/__init__.py @@ -19,7 +19,6 @@ Python package for mllib.random """ -from pyspark.mllib.random import RandomRDDGenerators - +import RandomRDDGenerators __all__ = ["RandomRDDGenerators"] From 687aac081b6af06a2977e2676963c9e2661c57e3 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Tue, 29 Jul 2014 19:06:26 -0700 Subject: [PATCH 06/11] add RandomRDDGenerators.py to run-tests --- python/run-tests | 1 + 1 file changed, 1 insertion(+) diff --git a/python/run-tests b/python/run-tests index 29f755fc0dcd3..a938a6516cfce 100755 --- a/python/run-tests +++ b/python/run-tests @@ -70,6 +70,7 @@ run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" run_test "pyspark/mllib/tests.py" +run_test "pyspark/mllib/rand/RandomRDDGenerators.py" if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green From f47c4810fe72807d4c801332304c6e4788a64f88 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Tue, 29 Jul 2014 19:20:49 -0700 Subject: [PATCH 07/11] docs update --- python/pyspark/mllib/rand/RandomRDDGenerators.py | 5 +++++ python/pyspark/mllib/rand/__init__.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/rand/RandomRDDGenerators.py b/python/pyspark/mllib/rand/RandomRDDGenerators.py index eeebef219a7bf..31f9992466d26 100644 --- a/python/pyspark/mllib/rand/RandomRDDGenerators.py +++ b/python/pyspark/mllib/rand/RandomRDDGenerators.py @@ -15,6 +15,11 @@ # limitations under the License. # +""" +Generator methods for creating RDDs comprised of i.i.d. samples from +some distribution. +""" + from random import getrandbits from pyspark.rdd import RDD diff --git a/python/pyspark/mllib/rand/__init__.py b/python/pyspark/mllib/rand/__init__.py index ff917719262ab..c5a9658a8a0ba 100644 --- a/python/pyspark/mllib/rand/__init__.py +++ b/python/pyspark/mllib/rand/__init__.py @@ -16,7 +16,7 @@ # """ -Python package for mllib.random +Python package for random data generation. """ import RandomRDDGenerators From 8663e6a18b70ef928e9475a0ae05b275b727f256 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Wed, 30 Jul 2014 13:47:00 -0700 Subject: [PATCH 08/11] reverting back to a single python file for random MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All units except mllib/linalg.py fails because of the `random` collision issue. I tried adding `globs = globals().copy()` to `_test()` inside `testmod` but it didn’t help. --- python/pyspark/__init__.py | 6 + python/pyspark/mllib/__init__.py | 2 - .../pyspark/mllib/rand/RandomRDDGenerators.py | 206 ---------------- python/pyspark/mllib/rand/__init__.py | 24 -- python/pyspark/mllib/random.py | 222 ++++++++++++++++++ python/run-tests | 2 +- 6 files changed, 229 insertions(+), 233 deletions(-) delete mode 100644 python/pyspark/mllib/rand/RandomRDDGenerators.py delete mode 100644 python/pyspark/mllib/rand/__init__.py create mode 100644 python/pyspark/mllib/random.py diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 312c75d112cbf..8abe67be034a1 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -49,6 +49,12 @@ Main entry point for accessing data stored in Apache Hive.. """ + +import sys +s = sys.path.pop(0) +import random +sys.path.insert(0, s) + from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SQLContext diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py index 6eba9c21a87c7..4149f54931d1f 100644 --- a/python/pyspark/mllib/__init__.py +++ b/python/pyspark/mllib/__init__.py @@ -24,5 +24,3 @@ import numpy if numpy.version.version < '1.4': raise Exception("MLlib requires NumPy 1.4+") - -import rand as random diff --git a/python/pyspark/mllib/rand/RandomRDDGenerators.py b/python/pyspark/mllib/rand/RandomRDDGenerators.py deleted file mode 100644 index 31f9992466d26..0000000000000 --- a/python/pyspark/mllib/rand/RandomRDDGenerators.py +++ /dev/null @@ -1,206 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Generator methods for creating RDDs comprised of i.i.d. samples from -some distribution. -""" - -from random import getrandbits - -from pyspark.rdd import RDD -from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector -from pyspark.serializers import NoOpSerializer - - -def uniformRDD(sc, size, numPartitions=None, seed=None): - """ - Generates an RDD comprised of i.i.d. samples from the - uniform distribution on [0.0, 1.0]. - - To transform the distribution in the generated RDD from U[0.0, 1.0] - to U[a, b], use - C{uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)} - - >>> x = uniformRDD(sc, 100).collect() - >>> len(x) - 100 - >>> max(x) <= 1.0 and min(x) >= 0.0 - True - >>> uniformRDD(sc, 100, 4).getNumPartitions() - 4 - >>> parts = uniformRDD(sc, 100, seed=4).getNumPartitions() - >>> parts == sc.defaultParallelism - True - """ - numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) - return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) - -def normalRDD(sc, size, numPartitions=None, seed=None): - """ - Generates an RDD comprised of i.i.d samples from the standard normal - distribution. - - To transform the distribution in the generated RDD from standard normal - to some other normal N(mean, sigma), use - C{normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} - - >>> x = normalRDD(sc, 1000, seed=1L).collect() - >>> from pyspark.statcounter import StatCounter - >>> stats = StatCounter(x) - >>> stats.count() - 1000L - >>> abs(stats.mean() - 0.0) < 0.1 - True - >>> abs(stats.stdev() - 1.0) < 0.1 - True - """ - numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) - return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) - -def poissonRDD(sc, mean, size, numPartitions=None, seed=None): - """ - Generates an RDD comprised of i.i.d samples from the Poisson - distribution with the input mean. - - >>> mean = 100.0 - >>> x = poissonRDD(sc, mean, 1000, seed=1L).collect() - >>> from pyspark.statcounter import StatCounter - >>> stats = StatCounter(x) - >>> stats.count() - 1000L - >>> abs(stats.mean() - mean) < 0.5 - True - >>> from math import sqrt - >>> abs(stats.stdev() - sqrt(mean)) < 0.5 - True - """ - numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) - return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) - -def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): - """ - Generates an RDD comprised of vectors containing i.i.d samples drawn - from the uniform distribution on [0.0 1.0]. - - >>> import numpy as np - >>> mat = np.matrix(uniformVectorRDD(sc, 10, 10).collect()) - >>> mat.shape - (10, 10) - >>> mat.max() <= 1.0 and mat.min() >= 0.0 - True - >>> uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() - 4 - """ - numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI()\ - .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) - return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - -def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): - """ - Generates an RDD comprised of vectors containing i.i.d samples drawn - from the standard normal distribution. - - >>> import numpy as np - >>> mat = np.matrix(normalVectorRDD(sc, 100, 100, seed=1L).collect()) - >>> mat.shape - (100, 100) - >>> abs(mat.mean() - 0.0) < 0.1 - True - >>> abs(mat.std() - 1.0) < 0.1 - True - """ - numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI() \ - .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) - return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - -def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): - """ - Generates an RDD comprised of vectors containing i.i.d samples drawn - from the Poisson distribution with the input mean. - - >>> import numpy as np - >>> mean = 100.0 - >>> rdd = poissonVectorRDD(sc, mean, 100, 100, seed=1L) - >>> mat = np.mat(rdd.collect()) - >>> mat.shape - (100, 100) - >>> abs(mat.mean() - mean) < 0.5 - True - >>> from math import sqrt - >>> abs(mat.std() - sqrt(mean)) < 0.5 - True - """ - numPartitions, seed = _getDefaultArgs(sc, numPartitions, seed) - jrdd = sc._jvm.PythonMLLibAPI() \ - .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) - return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - -def _getDefaultArgs(sc, numPartitions, seed): - """ - Use sc.defaultParallelism for numPartitions and - a randomly generated long for seed if either has a value of C{None} - - >>> _getDefaultArgs(sc, 3, 2) - (3, 2) - >>> _getDefaultArgs(sc, None, 2) == (sc.defaultParallelism, 2) - True - >>> from math import pow - >>> _getDefaultArgs(sc, None, None)[1] < pow(2, 63) - True - """ - if not numPartitions: - numPartitions = sc.defaultParallelism - if not seed: - seed = _nextLong() - return numPartitions, seed - -def _nextLong(): - """ - Returns a random long to be used as RNG seed in the Java APIs. - - Note: only 63 random bits are used here since Long.MAX_VALUE = 2 ^ 63 - 1 - """ - return long(getrandbits(63)) - - -def _test(): - import doctest - from pyspark.context import SparkContext - globs = globals().copy() - # The small batch size here ensures that we see multiple batches, - # even in these small test examples: - globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() - if failure_count: - exit(-1) - - -if __name__ == "__main__": - _test() diff --git a/python/pyspark/mllib/rand/__init__.py b/python/pyspark/mllib/rand/__init__.py deleted file mode 100644 index c5a9658a8a0ba..0000000000000 --- a/python/pyspark/mllib/rand/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Python package for random data generation. -""" - -import RandomRDDGenerators - -__all__ = ["RandomRDDGenerators"] diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py new file mode 100644 index 0000000000000..6276209912bfd --- /dev/null +++ b/python/pyspark/mllib/random.py @@ -0,0 +1,222 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python package for random data generation. +""" + +from os import urandom +from binascii import hexlify + +from pyspark.rdd import RDD +from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector +from pyspark.serializers import NoOpSerializer + +class RandomRDDGenerators: + """ + Generator methods for creating RDDs comprised of i.i.d samples from + some distribution. + """ + + @staticmethod + def uniformRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d. samples from the + uniform distribution on [0.0, 1.0]. + + To transform the distribution in the generated RDD from U[0.0, 1.0] + to U[a, b], use + C{RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)} + + >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() + >>> len(x) + 100 + >>> max(x) <= 1.0 and min(x) >= 0.0 + True + >>> RandomRDDGenerators.uniformRDD(sc, 100, 4).getNumPartitions() + 4 + >>> parts = RandomRDDGenerators.uniformRDD(sc, 100, seed=4).getNumPartitions() + >>> parts == sc.defaultParallelism + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def normalRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the standard normal + distribution. + + To transform the distribution in the generated RDD from standard normal + to some other normal N(mean, sigma), use + C{RandomRDDGenerators.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} + + >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L).collect() + >>> from pyspark.statcounter import StatCounter + >>> stats = StatCounter(x) + >>> stats.count() + 1000L + >>> abs(stats.mean() - 0.0) < 0.1 + True + >>> abs(stats.stdev() - 1.0) < 0.1 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def poissonRDD(sc, mean, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the Poisson + distribution with the input mean. + + >>> mean = 100.0 + >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L).collect() + >>> from pyspark.statcounter import StatCounter + >>> stats = StatCounter(x) + >>> stats.count() + 1000L + >>> abs(stats.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(stats.stdev() - sqrt(mean)) < 0.5 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the uniform distribution on [0.0 1.0]. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.uniformVectorRDD(sc, 10, 10).collect()) + >>> mat.shape + (10, 10) + >>> mat.max() <= 1.0 and mat.min() >= 0.0 + True + >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() + 4 + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the standard normal distribution. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.normalVectorRDD(sc, 100, 100, seed=1L).collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - 0.0) < 0.1 + True + >>> abs(mat.std() - 1.0) < 0.1 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the Poisson distribution with the input mean. + + >>> import numpy as np + >>> mean = 100.0 + >>> rdd = RandomRDDGenerators.poissonVectorRDD(sc, mean, 100, 100, seed=1L) + >>> mat = np.mat(rdd.collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(mat.std() - sqrt(mean)) < 0.5 + True + """ + numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI() \ + .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def _getDefaultArgs(sc, numPartitions, seed): + """ + Use sc.defaultParallelism for numPartitions and + a randomly generated long for seed if either has a value of C{None} + + >>> RandomRDDGenerators._getDefaultArgs(sc, 3, 2) + (3, 2) + >>> RandomRDDGenerators._getDefaultArgs(sc, None, 2) == (sc.defaultParallelism, 2) + True + >>> from math import pow + >>> RandomRDDGenerators._getDefaultArgs(sc, None, None)[1] < pow(2, 63) + True + """ + if not numPartitions: + numPartitions = sc.defaultParallelism + if not seed: + seed = RandomRDDGenerators._nextLong() + return numPartitions, seed + + @staticmethod + def _nextLong(): + """ + Returns a random long to be used as RNG seed in the Java APIs. + + Note: only 63 random bits are used here since Long.MAX_VALUE = 2 ^ 63 - 1 + """ + k = 63 + numbytes = (k + 7) // 8 + x = long(hexlify(urandom(numbytes)), 16) + return long(x >> (numbytes * 8 - k)) + + +def _test(): + import doctest + from pyspark.context import SparkContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index a938a6516cfce..5049e15ce5f8a 100755 --- a/python/run-tests +++ b/python/run-tests @@ -67,10 +67,10 @@ run_test "pyspark/mllib/_common.py" run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" run_test "pyspark/mllib/linalg.py" +run_test "pyspark/mllib/random.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" run_test "pyspark/mllib/tests.py" -run_test "pyspark/mllib/rand/RandomRDDGenerators.py" if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green From 2d7391778e3fc72e58189e36df361bf818cbd067 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Wed, 30 Jul 2014 15:37:51 -0700 Subject: [PATCH 09/11] fix for linalg.py --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 12 ++++++------ python/pyspark/mllib/linalg.py | 2 ++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2839be3ddd35f..a86646bccb0cd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -463,7 +463,7 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.uniformRDD(jsc.sc, size, numPartitions, seed) - .map(serializeDouble).toJavaRDD() + .map(serializeDouble) } /** @@ -474,7 +474,7 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.normalRDD(jsc.sc, size, numPartitions, seed) - .map(serializeDouble).toJavaRDD() + .map(serializeDouble) } /** @@ -486,7 +486,7 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.poissonRDD(jsc.sc, mean, size, numPartitions, seed) - .map(serializeDouble).toJavaRDD() + .map(serializeDouble) } /** @@ -498,7 +498,7 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed) - .map(serializeDoubleVector).toJavaRDD() + .map(serializeDoubleVector) } /** @@ -510,7 +510,7 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed) - .map(serializeDoubleVector).toJavaRDD() + .map(serializeDoubleVector) } /** @@ -523,6 +523,6 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, seed: Long): JavaRDD[Array[Byte]] = { RandomRDDGenerators.poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed) - .map(serializeDoubleVector).toJavaRDD() + .map(serializeDoubleVector) } } diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 71f4ad1a8d44e..fcfdd6096a1f6 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -255,4 +255,6 @@ def _test(): exit(-1) if __name__ == "__main__": + import sys + sys.path.pop(0) _test() From f831d9b1eef68c5121124ae5fb2822244b7f58f8 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Thu, 31 Jul 2014 13:54:09 -0700 Subject: [PATCH 10/11] moved default args logic into PythonMLLibAPI and added docs for hacks that allow us to keep the module name mllib.random. --- .../mllib/api/python/PythonMLLibAPI.scala | 74 ++++++++++++------- python/pyspark/__init__.py | 6 +- python/pyspark/mllib/linalg.py | 2 + python/pyspark/mllib/random.py | 40 ---------- 4 files changed, 56 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index a86646bccb0cd..32546d20f1092 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -24,11 +24,12 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} -import org.apache.spark.mllib.random.RandomRDDGenerators +import org.apache.spark.mllib.random.{RandomRDDGenerators => RG} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -455,15 +456,33 @@ class PythonMLLibAPI extends Serializable { ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + // Used by the *RDD methods to get default seed if not passed in from pyspark + private def getSeed(seed: java.lang.Long): Long = { + if (seed == null) Utils.random.nextLong else seed + } + + // Used by *RDD methods to get default numPartitions if not passed in from pyspark + private def getNumParts(numPartitions: java.lang.Integer, jsc: JavaSparkContext): Int = { + if (numPartitions == null) { + jsc.sc.defaultParallelism + } else { + numPartitions + } + } + + // Note: for the following methods, numPartitions and seed are boxed to allow nulls to be passed + // in for either argument from pyspark + /** * Java stub for Python mllib RandomRDDGenerators.uniformRDD() */ def uniformRDD(jsc: JavaSparkContext, size: Long, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { - RandomRDDGenerators.uniformRDD(jsc.sc, size, numPartitions, seed) - .map(serializeDouble) + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumParts(numPartitions, jsc) + val s = getSeed(seed) + RG.uniformRDD(jsc.sc, size, parts, s).map(serializeDouble) } /** @@ -471,10 +490,11 @@ class PythonMLLibAPI extends Serializable { */ def normalRDD(jsc: JavaSparkContext, size: Long, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { - RandomRDDGenerators.normalRDD(jsc.sc, size, numPartitions, seed) - .map(serializeDouble) + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumParts(numPartitions, jsc) + val s = getSeed(seed) + RG.normalRDD(jsc.sc, size, parts, s).map(serializeDouble) } /** @@ -483,10 +503,11 @@ class PythonMLLibAPI extends Serializable { def poissonRDD(jsc: JavaSparkContext, mean: Double, size: Long, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { - RandomRDDGenerators.poissonRDD(jsc.sc, mean, size, numPartitions, seed) - .map(serializeDouble) + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumParts(numPartitions, jsc) + val s = getSeed(seed) + RG.poissonRDD(jsc.sc, mean, size, parts, s).map(serializeDouble) } /** @@ -495,10 +516,11 @@ class PythonMLLibAPI extends Serializable { def uniformVectorRDD(jsc: JavaSparkContext, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { - RandomRDDGenerators.uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed) - .map(serializeDoubleVector) + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumParts(numPartitions, jsc) + val s = getSeed(seed) + RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector) } /** @@ -507,10 +529,11 @@ class PythonMLLibAPI extends Serializable { def normalVectorRDD(jsc: JavaSparkContext, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { - RandomRDDGenerators.normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed) - .map(serializeDoubleVector) + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumParts(numPartitions, jsc) + val s = getSeed(seed) + RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector) } /** @@ -520,9 +543,10 @@ class PythonMLLibAPI extends Serializable { mean: Double, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): JavaRDD[Array[Byte]] = { - RandomRDDGenerators.poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed) - .map(serializeDoubleVector) + numPartitions: java.lang.Integer, + seed: java.lang.Long): JavaRDD[Array[Byte]] = { + val parts = getNumParts(numPartitions, jsc) + val s = getSeed(seed) + RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(serializeDoubleVector) } } diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 8abe67be034a1..c58555fc9d2c5 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -49,7 +49,11 @@ Main entry point for accessing data stored in Apache Hive.. """ - +# The following block allows us to import python's random instead of mllib.random for scripts in +# mllib that depend on top level pyspark packages, which transitively depend on python's random. +# Since Python's import logic looks for modules in the current package first, we eliminate +# mllib.random as a candidate for C{import random} by removing the first search path, the script's +# location, in order to force the loader to look in Python's top-level modules for C{random}. import sys s = sys.path.pop(0) import random diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index fcfdd6096a1f6..54720c2324ca6 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -255,6 +255,8 @@ def _test(): exit(-1) if __name__ == "__main__": + # remove current path from list of search paths to avoid importing mllib.random + # for C{import random}, which is done in an external dependency of pyspark during doctests. import sys sys.path.pop(0) _test() diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 6276209912bfd..699b9c26e4101 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -19,8 +19,6 @@ Python package for random data generation. """ -from os import urandom -from binascii import hexlify from pyspark.rdd import RDD from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector @@ -53,7 +51,6 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): >>> parts == sc.defaultParallelism True """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) @@ -78,7 +75,6 @@ def normalRDD(sc, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - 1.0) < 0.1 True """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) normal = RDD(jrdd, sc, NoOpSerializer()) return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) @@ -101,7 +97,6 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - sqrt(mean)) < 0.5 True """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) @@ -121,7 +116,6 @@ def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() 4 """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) jrdd = sc._jvm.PythonMLLibAPI() \ .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) uniform = RDD(jrdd, sc, NoOpSerializer()) @@ -142,7 +136,6 @@ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): >>> abs(mat.std() - 1.0) < 0.1 True """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) jrdd = sc._jvm.PythonMLLibAPI() \ .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) normal = RDD(jrdd, sc, NoOpSerializer()) @@ -166,44 +159,11 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): >>> abs(mat.std() - sqrt(mean)) < 0.5 True """ - numPartitions, seed = RandomRDDGenerators._getDefaultArgs(sc, numPartitions, seed) jrdd = sc._jvm.PythonMLLibAPI() \ .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) - @staticmethod - def _getDefaultArgs(sc, numPartitions, seed): - """ - Use sc.defaultParallelism for numPartitions and - a randomly generated long for seed if either has a value of C{None} - - >>> RandomRDDGenerators._getDefaultArgs(sc, 3, 2) - (3, 2) - >>> RandomRDDGenerators._getDefaultArgs(sc, None, 2) == (sc.defaultParallelism, 2) - True - >>> from math import pow - >>> RandomRDDGenerators._getDefaultArgs(sc, None, None)[1] < pow(2, 63) - True - """ - if not numPartitions: - numPartitions = sc.defaultParallelism - if not seed: - seed = RandomRDDGenerators._nextLong() - return numPartitions, seed - - @staticmethod - def _nextLong(): - """ - Returns a random long to be used as RNG seed in the Java APIs. - - Note: only 63 random bits are used here since Long.MAX_VALUE = 2 ^ 63 - 1 - """ - k = 63 - numbytes = (k + 7) // 8 - x = long(hexlify(urandom(numbytes)), 16) - return long(x >> (numbytes * 8 - k)) - def _test(): import doctest From 55c6de86369ea675db737bde480af2453c7da408 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Thu, 31 Jul 2014 14:35:59 -0700 Subject: [PATCH 11/11] review comments. all python units passed. --- .../mllib/api/python/PythonMLLibAPI.scala | 29 ++++++++++--------- python/pyspark/mllib/random.py | 16 +++++----- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 32546d20f1092..d2e8ccf208970 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -457,12 +457,13 @@ class PythonMLLibAPI extends Serializable { } // Used by the *RDD methods to get default seed if not passed in from pyspark - private def getSeed(seed: java.lang.Long): Long = { + private def getSeedOrDefault(seed: java.lang.Long): Long = { if (seed == null) Utils.random.nextLong else seed } // Used by *RDD methods to get default numPartitions if not passed in from pyspark - private def getNumParts(numPartitions: java.lang.Integer, jsc: JavaSparkContext): Int = { + private def getNumPartitionsOrDefault(numPartitions: java.lang.Integer, + jsc: JavaSparkContext): Int = { if (numPartitions == null) { jsc.sc.defaultParallelism } else { @@ -480,8 +481,8 @@ class PythonMLLibAPI extends Serializable { size: Long, numPartitions: java.lang.Integer, seed: java.lang.Long): JavaRDD[Array[Byte]] = { - val parts = getNumParts(numPartitions, jsc) - val s = getSeed(seed) + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) RG.uniformRDD(jsc.sc, size, parts, s).map(serializeDouble) } @@ -492,8 +493,8 @@ class PythonMLLibAPI extends Serializable { size: Long, numPartitions: java.lang.Integer, seed: java.lang.Long): JavaRDD[Array[Byte]] = { - val parts = getNumParts(numPartitions, jsc) - val s = getSeed(seed) + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) RG.normalRDD(jsc.sc, size, parts, s).map(serializeDouble) } @@ -505,8 +506,8 @@ class PythonMLLibAPI extends Serializable { size: Long, numPartitions: java.lang.Integer, seed: java.lang.Long): JavaRDD[Array[Byte]] = { - val parts = getNumParts(numPartitions, jsc) - val s = getSeed(seed) + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) RG.poissonRDD(jsc.sc, mean, size, parts, s).map(serializeDouble) } @@ -518,8 +519,8 @@ class PythonMLLibAPI extends Serializable { numCols: Int, numPartitions: java.lang.Integer, seed: java.lang.Long): JavaRDD[Array[Byte]] = { - val parts = getNumParts(numPartitions, jsc) - val s = getSeed(seed) + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector) } @@ -531,8 +532,8 @@ class PythonMLLibAPI extends Serializable { numCols: Int, numPartitions: java.lang.Integer, seed: java.lang.Long): JavaRDD[Array[Byte]] = { - val parts = getNumParts(numPartitions, jsc) - val s = getSeed(seed) + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector) } @@ -545,8 +546,8 @@ class PythonMLLibAPI extends Serializable { numCols: Int, numPartitions: java.lang.Integer, seed: java.lang.Long): JavaRDD[Array[Byte]] = { - val parts = getNumParts(numPartitions, jsc) - val s = getSeed(seed) + val parts = getNumPartitionsOrDefault(numPartitions, jsc) + val s = getSeedOrDefault(seed) RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(serializeDoubleVector) } } diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 699b9c26e4101..36e710dbae7a8 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -38,7 +38,8 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - C{RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)} + C{RandomRDDGenerators.uniformRDD(sc, n, p, seed)\ + .map(lambda v: a + (b - a) * v)} >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() >>> len(x) @@ -63,11 +64,11 @@ def normalRDD(sc, size, numPartitions=None, seed=None): To transform the distribution in the generated RDD from standard normal to some other normal N(mean, sigma), use - C{RandomRDDGenerators.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)} + C{RandomRDDGenerators.normal(sc, n, p, seed)\ + .map(lambda v: mean + sigma * v)} - >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L).collect() - >>> from pyspark.statcounter import StatCounter - >>> stats = StatCounter(x) + >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L) + >>> stats = x.stats() >>> stats.count() 1000L >>> abs(stats.mean() - 0.0) < 0.1 @@ -86,9 +87,8 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): distribution with the input mean. >>> mean = 100.0 - >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L).collect() - >>> from pyspark.statcounter import StatCounter - >>> stats = StatCounter(x) + >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L) + >>> stats = x.stats() >>> stats.count() 1000L >>> abs(stats.mean() - mean) < 0.5