From c0ef0c2f5f312e5f0ebb6d0660ecd717a73f4e34 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 27 May 2014 17:11:05 -0700 Subject: [PATCH 01/12] SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog. --- .../apache/spark/rdd/PairRDDFunctions.scala | 28 +++++++--- .../main/scala/org/apache/spark/rdd/RDD.scala | 21 ++++++-- .../spark/util/SerializableHyperLogLog.scala | 52 ------------------- .../spark/rdd/PairRDDFunctionsSuite.scala | 8 +-- pom.xml | 2 +- project/SparkBuild.scala | 2 +- 6 files changed, 42 insertions(+), 71 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 223fef79261d0..0f3eedb0a8aac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -28,7 +28,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import com.clearspring.analytics.stream.cardinality.HyperLogLog +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType @@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.SerializableHyperLogLog /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -218,14 +217,29 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vice versa. Uses the provided - * Partitioner to partition the output RDD. + * [[Partitioner]] to partition the output RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) - val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) - val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) + val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + val createHLL = (v: V) => { + val hll = new HyperLogLogPlus(precision) + hll.offer(v) + hll + } + val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { + hll.offer(v) + hll + } + val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { + h1.addAll(h2) + h1 + } - combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index aa03e9276fb34..94307aad14acd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} -import com.clearspring.analytics.stream.cardinality.HyperLogLog +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} @@ -925,11 +925,24 @@ abstract class RDD[T: ClassTag]( * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ @Experimental def countApproxDistinct(relativeSD: Double = 0.05): Long = { - val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() + val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + val zeroCounter = new HyperLogLogPlus(precision) + aggregate(zeroCounter)( + (hll: HyperLogLogPlus, v: T) => { + hll.offer(v) + hll + }, + (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { + h1.addAll(h2) + h2 + }).cardinality() } /** diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala deleted file mode 100644 index 21a88eea3bbc2..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{Externalizable, ObjectInput, ObjectOutput} - -import com.clearspring.analytics.stream.cardinality.{HyperLogLog, ICardinality} - -/** - * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is - * serializable. - */ -private[spark] -class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { - - def this() = this(null) // For deserialization - - def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) - - def add[T](elem: T) = { - this.value.offer(elem) - this - } - - def readExternal(in: ObjectInput) { - val byteLength = in.readInt() - val bytes = new Array[Byte](byteLength) - in.readFully(bytes) - value = HyperLogLog.Builder.build(bytes) - } - - def writeExternal(out: ObjectOutput) { - val bytes = value.getBytes() - out.writeInt(bytes.length) - out.write(bytes) - } -} diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 1230565ea5b7e..010ee04109225 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -126,9 +126,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() - counted1.foreach{ - case(k, count) => assert(error(count, k) < relativeSD) - } + counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) } val rnd = new Random() @@ -139,9 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { } val rdd2 = sc.parallelize(randStacked) val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() - counted2.foreach{ - case(k, count) => assert(error(count, k) < relativeSD) - } + counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) } } test("join") { diff --git a/pom.xml b/pom.xml index 86264d1132ec4..a4d83545c13af 100644 --- a/pom.xml +++ b/pom.xml @@ -284,7 +284,7 @@ com.clearspring.analytics stream - 2.5.1 + 2.7.0 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b9d92340ff75b..f1e2f9f668e27 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -358,7 +358,7 @@ object SparkBuild extends Build { "com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm), "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), - "com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil), + "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), "org.spark-project" % "pyrolite" % "2.0.1", "net.sf.py4j" % "py4j" % "0.8.1" ), From 1294be6ea6b57b5476ef174c69fd9134b2ad45fe Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 31 May 2014 00:20:32 -0700 Subject: [PATCH 02/12] Updated HLL+. --- .../apache/spark/api/java/JavaPairRDD.scala | 90 ++++++++++++++++--- .../apache/spark/api/java/JavaRDDLike.scala | 27 +++++- .../apache/spark/rdd/PairRDDFunctions.scala | 84 +++++++++++++++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 32 +++++-- .../java/org/apache/spark/JavaAPISuite.java | 10 +-- .../spark/rdd/PairRDDFunctionsSuite.scala | 8 +- .../scala/org/apache/spark/rdd/RDDSuite.scala | 6 +- 7 files changed, 214 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 4c8f9ed6fbc02..b1058c3c35f5f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -672,38 +672,102 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. + * * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. Uses the provided * Partitioner to partition the output RDD. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param partitioner Partitioner to use for the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, partitioner) + def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner)) } /** - * Return approximate number of distinct values for each key this RDD. + * Return approximate number of distinct values for each key in this RDD. + * * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param numPartitions The number of partitions in the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD) + def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions)) } - /** * Return approximate number of distinct values for each key in this RDD. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + */ + def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(p, sp)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. This is deprecated. + * Use the variant with p and sp parameters instead. + * * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. HashPartitions the - * output RDD into numPartitions. + * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * Partitioner to partition the output RDD. + */ + @Deprecated + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = + { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. This is deprecated. + * Use the variant with p and sp parameters instead. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. + */ + @Deprecated + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. This is deprecated. + * Use the variant with p and sp parameters instead. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, numPartitions) + @Deprecated + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 619bfd75be8eb..080bb6ea677a4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -564,8 +564,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp. + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + */ + def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp) + + /** + * Return approximate number of distinct elements in the RDD. This is deprecated. Use the + * variant with p and sp parameters instead. + * + * The accuracy of approximation can be controlled through the relative standard deviation + * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in + * more accurate counts but increase the memory footprint and vise versa. The default value of + * relativeSD is 0.05. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ - def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + @Deprecated + def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 0f3eedb0a8aac..1d95d5bdd45b4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -213,20 +213,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * :: Experimental :: + * * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. Uses the provided - * [[Partitioner]] to partition the output RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param partitioner Partitioner to use for the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { val createHLL = (v: V) => { - val hll = new HyperLogLogPlus(precision) + val hll = new HyperLogLogPlus(p, sp) hll.offer(v) hll } @@ -242,6 +246,63 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) } + /** + * :: Experimental :: + * + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + * @param numPartitions Number of partitions in the resulting RDD. + */ + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions)) + } + + /** + * :: Experimental :: + * + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. + */ + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(p, sp, defaultPartitioner(self)) + } + + /** + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. + * @param partitioner Partitioner to use for the resulting RDD + */ + @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. + val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + countApproxDistinctByKey(p, 0, partitioner) + } + /** * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation @@ -249,7 +310,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * more accurate counts but increase the memory footprint and vice versa. HashPartitions the * output RDD into numPartitions. * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ + @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -261,7 +326,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * more accurate counts but increase the memory footprint and vice versa. The default value of * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism * level. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. */ + @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 94307aad14acd..f418c80db08d4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -921,19 +921,21 @@ abstract class RDD[T: ClassTag]( * :: Experimental :: * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. - * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at * [[http://research.google.com/pubs/pub40671.html]]. + * + * @param p The precision value for the normal set. + * p must be a value between 4 and sp. + * @param sp The precision value for the sparse set, between 0 and 32. + * If sp equals 0, the sparse representation is skipped. */ @Experimental - def countApproxDistinct(relativeSD: Double = 0.05): Long = { - val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt - val zeroCounter = new HyperLogLogPlus(precision) + def countApproxDistinct(p: Int, sp: Int): Long = { + require(p >= 4, s"p ($p) must be greater than 0") + require(sp <= 32, s"sp ($sp) cannot be greater than 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") + val zeroCounter = new HyperLogLogPlus(p, sp) aggregate(zeroCounter)( (hll: HyperLogLogPlus, v: T) => { hll.offer(v) @@ -945,6 +947,20 @@ abstract class RDD[T: ClassTag]( }).cardinality() } + /** + * Return approximate number of distinct elements in the RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at + * [[http://research.google.com/pubs/pub40671.html]]. + */ + @deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1") + def countApproxDistinct(relativeSD: Double = 0.05): Long = { + // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. + val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + countApproxDistinct(p, 0) + } + /** * Zips this RDD with its element indices. The ordering is first based on the partition index * and then the ordering of items within each partition. So the first item in the first diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 3dd79243ab5bd..21ba8ed5eb3de 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1031,27 +1031,23 @@ public void countApproxDistinct() { arrayData.add(i % size); } JavaRDD simpleRdd = sc.parallelize(arrayData, 10); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(8, 0) - size) / (size * 1.0)) <= 0.1); } @Test public void countApproxDistinctByKey() { - double relativeSD = 0.001; - List> arrayData = new ArrayList>(); for (int i = 10; i < 100; i++) for (int j = 0; j < i; j++) arrayData.add(new Tuple2(i, j)); JavaPairRDD pairRdd = sc.parallelizePairs(arrayData); - List> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); + List> res = pairRdd.countApproxDistinctByKey(8, 0).collect(); for (Tuple2 resItem : res) { double count = (double)resItem._1(); Long resCount = (Long)resItem._2(); Double error = Math.abs((resCount - count) / count); - Assert.assertTrue(error < relativeSD); + Assert.assertTrue(error < 0.1); } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 010ee04109225..937ddfcc2b6b1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -119,13 +119,15 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { * relatively tight error bounds to check correctness of functionality rather than checking * whether the approximation conforms with the requested bound. */ - val relativeSD = 0.001 + val p = 20 + val sp = 0 + val relativeSD = 0.01 // For each value i, there are i tuples with first element equal to i. // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) - val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() + val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect() counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) } val rnd = new Random() @@ -136,7 +138,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() + val counted2 = rdd2.countApproxDistinctByKey(p, sp, 4).collect() counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e686068f7a99a..bbd0c14178368 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -73,10 +73,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) - assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) - assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) - assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4) + assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1) } test("SparkContext.union") { From 88cfe77bc7b357f1b694ba89eeec142890f60b0e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 Jun 2014 17:29:19 -0700 Subject: [PATCH 03/12] Updated documentation and restored the old incorrect API to maintain API compatibility. --- .../apache/spark/api/java/JavaPairRDD.scala | 68 +++++++++---------- .../apache/spark/api/java/JavaRDDLike.scala | 24 ++----- .../apache/spark/rdd/PairRDDFunctions.scala | 51 ++++++-------- .../main/scala/org/apache/spark/rdd/RDD.scala | 11 ++- 4 files changed, 64 insertions(+), 90 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index b1058c3c35f5f..b53602c484129 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -673,15 +673,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided - * Partitioner to partition the output RDD. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp (32 max). + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. * @param partitioner Partitioner to use for the resulting RDD. */ def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = { @@ -691,15 +690,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided - * Partitioner to partition the output RDD. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp (32 max). + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. * @param numPartitions The number of partitions in the resulting RDD. */ def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = { @@ -709,15 +707,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided - * Partitioner to partition the output RDD. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp (32 max). + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. */ def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = { fromRDD(rdd.countApproxDistinctByKey(p, sp)) @@ -725,33 +722,34 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with p and sp parameters instead. + * Use the variant with `p` and `sp` parameters instead. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided - * Partitioner to partition the output RDD. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD The relative standard deviation for the counter. + * Smaller values create counters that require more space. */ @Deprecated - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = - { - fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD, partitioner) } /** * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with p and sp parameters instead. + * Use the variant with `p` and `sp` parameters instead. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param relativeSD The relative standard deviation for the counter. * Smaller values create counters that require more space. */ @Deprecated - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD, numPartitions) } /** @@ -759,15 +757,15 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Use the variant with p and sp parameters instead. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param relativeSD The relative standard deviation for the counter. * Smaller values create counters that require more space. */ @Deprecated - def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(relativeSD)) + def countApproxDistinctByKey(relativeSD: Double): JavaRDD[(K, Long)] = { + rdd.countApproxDistinctByKey(relativeSD) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 080bb6ea677a4..ea579c4468da2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -560,34 +560,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. - * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp. + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. */ def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp) /** * Return approximate number of distinct elements in the RDD. This is deprecated. Use the - * variant with p and sp parameters instead. - * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. + * variant with `p` and `sp` parameters instead. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. */ @Deprecated def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1d95d5bdd45b4..e9c10ae44ac71 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -218,13 +218,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp (32 max). + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. * @param partitioner Partitioner to use for the resulting RDD. */ @Experimental @@ -252,13 +252,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp (32 max). + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. * @param numPartitions Number of partitions in the resulting RDD. */ @Experimental @@ -272,13 +272,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp (32 max). + * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. */ @Experimental def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = { @@ -289,12 +289,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - * @param partitioner Partitioner to use for the resulting RDD + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. */ @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { @@ -305,14 +301,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. HashPartitions the - * output RDD into numPartitions. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. */ @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { @@ -320,16 +312,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * Return approximate number of distinct values for each key this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. */ @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f418c80db08d4..ab15052f519f2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -19,8 +19,7 @@ package org.apache.spark.rdd import java.util.Random -import scala.collection.Map -import scala.collection.mutable +import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} @@ -922,8 +921,8 @@ abstract class RDD[T: ClassTag]( * Return approximate number of distinct elements in the RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. * * @param p The precision value for the normal set. * p must be a value between 4 and sp. @@ -951,8 +950,8 @@ abstract class RDD[T: ClassTag]( * Return approximate number of distinct elements in the RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at - * [[http://research.google.com/pubs/pub40671.html]]. + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. */ @deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1") def countApproxDistinct(relativeSD: Double = 0.05): Long = { From 1db1522d8062ce312f6223af43d97919163a6d3b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 Jun 2014 17:31:34 -0700 Subject: [PATCH 04/12] Excluded util.SerializableHyperLogLog from MIMA check. --- project/MimaExcludes.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ecb389de5558f..be0455039d045 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -35,7 +35,9 @@ object MimaExcludes { val excludes = SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => - Seq() + Seq( + MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") + ) case v if v.startsWith("1.0") => Seq( MimaBuild.excludeSparkPackage("api.java"), From 6555bfe87e58fc8628c889d546381ba719741087 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 Jun 2014 17:47:35 -0700 Subject: [PATCH 05/12] Added a default method and re-arranged MimaExcludes. --- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 10 ++++++++++ project/MimaExcludes.scala | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index ea579c4468da2..28c3e05f3e15c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -571,6 +571,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp) + /** + * Return approximate number of distinct elements in the RDD. This method uses 20 as p value + * and 0 as sp value (i.e. skipping sparse representation). + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + */ + def countApproxDistinct(): Long = rdd.countApproxDistinct(20, 0) + /** * Return approximate number of distinct elements in the RDD. This is deprecated. Use the * variant with `p` and `sp` parameters instead. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index be0455039d045..61ab13b4eeef1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,8 +36,8 @@ object MimaExcludes { SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => Seq( - MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") - ) + ) ++ + MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") case v if v.startsWith("1.0") => Seq( MimaBuild.excludeSparkPackage("api.java"), From acaa5247359680866fe472288be1d7263195bcba Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 Jun 2014 22:10:47 -0700 Subject: [PATCH 06/12] Added the right exclude rules in MimaExcludes. --- project/MimaExcludes.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 61ab13b4eeef1..66b17831f3a03 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -16,7 +16,6 @@ */ import com.typesafe.tools.mima.core._ -import com.typesafe.tools.mima.core.ProblemFilters._ /** * Additional excludes for checking of Spark's binary compatibility. @@ -36,6 +35,18 @@ object MimaExcludes { SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => Seq( + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct") ) ++ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") case v if v.startsWith("1.0") => From 354deb88b1ebe331c80dd49183dbfc76f32bee81 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 1 Jun 2014 22:27:41 -0700 Subject: [PATCH 07/12] Added comment on the Mima exclude rules. --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 66b17831f3a03..72a5d33ab9ad9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -35,6 +35,9 @@ object MimaExcludes { SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => Seq( + // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values + // for countApproxDistinct* functions, which does not work in Java. We later removed + // them, and use the following to tell Mima to not care about them. ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( From 9e320c84f512ef106bcd5544fab2ea782a521c66 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Jun 2014 16:35:31 -0700 Subject: [PATCH 08/12] Incorporate code review feedback. --- .../apache/spark/api/java/JavaPairRDD.scala | 89 ++++--------------- .../apache/spark/api/java/JavaRDDLike.scala | 29 +----- .../apache/spark/rdd/PairRDDFunctions.scala | 75 ++++++---------- .../main/scala/org/apache/spark/rdd/RDD.scala | 14 +-- .../java/org/apache/spark/JavaAPISuite.java | 2 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 2 +- pom.xml | 2 +- project/SparkBuild.scala | 2 +- 8 files changed, 60 insertions(+), 155 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index b53602c484129..9419c3f5989be 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -675,16 +675,15 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - * @param partitioner Partitioner to use for the resulting RDD. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD. */ - def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(p, sp, partitioner)) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = + { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) } /** @@ -692,16 +691,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - * @param numPartitions The number of partitions in the resulting RDD. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD. */ - def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(p, sp, numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) } /** @@ -709,63 +706,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. */ - def countApproxDistinctByKey(p: Int, sp: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(p, sp)) - } - - /** - * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with `p` and `sp` parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - */ - @Deprecated - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, partitioner) - } - - /** - * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with `p` and `sp` parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - */ - @Deprecated - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, numPartitions) - } - - /** - * Return approximate number of distinct values for each key in this RDD. This is deprecated. - * Use the variant with p and sp parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD The relative standard deviation for the counter. - * Smaller values create counters that require more space. - */ - @Deprecated - def countApproxDistinctByKey(relativeSD: Double): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD) + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 28c3e05f3e15c..2741532732c27 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -562,34 +562,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. */ - def countApproxDistinct(p: Int, sp: Int): Long = rdd.countApproxDistinct(p, sp) - - /** - * Return approximate number of distinct elements in the RDD. This method uses 20 as p value - * and 0 as sp value (i.e. skipping sparse representation). - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - */ - def countApproxDistinct(): Long = rdd.countApproxDistinct(20, 0) - - /** - * Return approximate number of distinct elements in the RDD. This is deprecated. Use the - * variant with `p` and `sp` parameters instead. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - */ - @Deprecated def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e9c10ae44ac71..d459815ae7cbe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -219,9 +219,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. * - * @param p The precision value for the normal set. + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. + * + *@param p The precision value for the normal set. * `p` must be a value between 4 and `sp` (32 max). * @param sp The precision value for the sparse set, between 0 and 32. * If `sp` equals 0, the sparse representation is skipped. @@ -229,6 +233,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ @Experimental def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { + require(p >= 4, s"p ($p) should be >= 4") + require(sp <= 32, s"sp ($sp) should be <= 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val createHLL = (v: V) => { val hll = new HyperLogLogPlus(p, sp) hll.offer(v) @@ -247,56 +254,21 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** - * :: Experimental :: - * - * Return approximate number of distinct values for each key in this RDD. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - * @param numPartitions Number of partitions in the resulting RDD. - */ - @Experimental - def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int): RDD[(K, Long)] = { - countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions)) - } - - /** - * :: Experimental :: - * * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). - * @param sp The precision value for the sparse set, between 0 and 32. - * If `sp` equals 0, the sparse representation is skipped. - */ - @Experimental - def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = { - countApproxDistinctByKey(p, sp, defaultPartitioner(self)) - } - - /** - * Return approximate number of distinct values for each key in this RDD. + * here. * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD */ - @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. - val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt - countApproxDistinctByKey(p, 0, partitioner) + require(relativeSD > 0.000017, s"accuracy ($relativeSD) should be greater than 0.000017") + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt + assert(p <= 32) + countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) } /** @@ -304,9 +276,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD */ - @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -316,9 +291,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It should be greater than 0.000017. */ - @deprecated("Use countApproxDistinctByKey with parameter p and sp", "1.0.1") def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ab15052f519f2..58375b9b07c32 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -922,7 +922,11 @@ abstract class RDD[T: ClassTag]( * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. * * @param p The precision value for the normal set. * p must be a value between 4 and sp. @@ -951,12 +955,12 @@ abstract class RDD[T: ClassTag]( * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. */ - @deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1") def countApproxDistinct(relativeSD: Double = 0.05): Long = { - // See stream-lib's HyperLogLog implementation on the conversion from relativeSD to p. - val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt countApproxDistinct(p, 0) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 68cf183c91c48..b78309f81cb8c 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1028,7 +1028,7 @@ public void countApproxDistinct() { arrayData.add(i % size); } JavaRDD simpleRdd = sc.parallelize(arrayData, 10); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(8, 0) - size) / (size * 1.0)) <= 0.1); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1); } @Test diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 937ddfcc2b6b1..461ffeddbff2a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -138,7 +138,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countApproxDistinctByKey(p, sp, 4).collect() + val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect() counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) } } diff --git a/pom.xml b/pom.xml index 63701a1e40f49..fcd6f66b4414a 100644 --- a/pom.xml +++ b/pom.xml @@ -302,7 +302,7 @@ stream 2.7.0 - + it.unimi.dsi fastutil diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e136461d380d5..efb0b9319be13 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -362,7 +362,7 @@ object SparkBuild extends Build { "com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm), "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), - "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), + "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil. "org.spark-project" % "pyrolite" % "2.0.1", "net.sf.py4j" % "py4j" % "0.8.1" ), From 41e649a3934e5569c08ff1ccb1825a9bba049b95 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Jun 2014 16:40:29 -0700 Subject: [PATCH 09/12] Update final mima list. --- project/MimaExcludes.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index bb6d67cc70938..fadf6a4d8bb4a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -39,10 +39,12 @@ object MimaExcludes { // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed // them, and use the following to tell Mima to not care about them. + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"), - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"), ProblemFilters.exclude[MissingMethodProblem]( @@ -50,7 +52,7 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct") + "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1") ) ++ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") case v if v.startsWith("1.0") => From e36752705a8db5ccaa5d853a61e6a97dcb45725c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Jun 2014 17:21:54 -0700 Subject: [PATCH 10/12] One more round of code review. --- .../org/apache/spark/api/java/JavaPairRDD.scala | 6 +++--- .../org/apache/spark/api/java/JavaRDDLike.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 16 ++++++++-------- .../main/scala/org/apache/spark/rdd/RDD.scala | 5 +++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9419c3f5989be..7dcfbf741c4f1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -678,7 +678,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD. */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = @@ -694,7 +694,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD. */ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { @@ -709,7 +709,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. */ def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { fromRDD(rdd.countApproxDistinctByKey(relativeSD)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 2741532732c27..330569a8d8837 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -565,7 +565,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. */ def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d459815ae7cbe..f2ce3cbd47f93 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -225,16 +225,16 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * would trigger sparse representation of registers, which may reduce the memory consumption * and increase accuracy when the cardinality is small. * - *@param p The precision value for the normal set. - * `p` must be a value between 4 and `sp` (32 max). + * @param p The precision value for the normal set. + * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). * @param sp The precision value for the sparse set, between 0 and 32. * If `sp` equals 0, the sparse representation is skipped. * @param partitioner Partitioner to use for the resulting RDD. */ @Experimental def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { - require(p >= 4, s"p ($p) should be >= 4") - require(sp <= 32, s"sp ($sp) should be <= 32") + require(p >= 4, s"p ($p) must be >= 4") + require(sp <= 32, s"sp ($sp) must be <= 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val createHLL = (v: V) => { val hll = new HyperLogLogPlus(p, sp) @@ -261,11 +261,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - require(relativeSD > 0.000017, s"accuracy ($relativeSD) should be greater than 0.000017") + require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt assert(p <= 32) countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) @@ -279,7 +279,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD */ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { @@ -294,7 +294,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It should be greater than 0.000017. + * It must be greater than 0.000017. */ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 58375b9b07c32..585b2f76afa65 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -929,9 +929,9 @@ abstract class RDD[T: ClassTag]( * and increase accuracy when the cardinality is small. * * @param p The precision value for the normal set. - * p must be a value between 4 and sp. + * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). * @param sp The precision value for the sparse set, between 0 and 32. - * If sp equals 0, the sparse representation is skipped. + * If `sp` equals 0, the sparse representation is skipped. */ @Experimental def countApproxDistinct(p: Int, sp: Int): Long = { @@ -958,6 +958,7 @@ abstract class RDD[T: ClassTag]( * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ def countApproxDistinct(relativeSD: Double = 0.05): Long = { val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt From f154ea021b1502a0045d7690ac9d9f1ee903b183 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Jun 2014 17:37:50 -0700 Subject: [PATCH 11/12] Added a comment on the value bound for testing. --- .../test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 461ffeddbff2a..b9bcc7c1321c3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -121,6 +121,8 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { */ val p = 20 val sp = 0 + // When p = 20, the relative accuracy is about 0.001. So with high probability, the + // relative error should be smaller than the threshold 0.005 we use here. val relativeSD = 0.01 // For each value i, there are i tuples with first element equal to i. From 4d83f41c1c85d32b138d5ee854a1062ae91585a6 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Jun 2014 18:00:22 -0700 Subject: [PATCH 12/12] New error bound and non-randomness. --- .../org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index b9bcc7c1321c3..9ddafc451878d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -122,7 +122,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val p = 20 val sp = 0 // When p = 20, the relative accuracy is about 0.001. So with high probability, the - // relative error should be smaller than the threshold 0.005 we use here. + // relative error should be smaller than the threshold 0.01 we use here. val relativeSD = 0.01 // For each value i, there are i tuples with first element equal to i. @@ -132,16 +132,18 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect() counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) } - val rnd = new Random() + val rnd = new Random(42) // The expected count for key num would be num val randStacked = (1 to 100).flatMap { i => - val num = rnd.nextInt % 500 + val num = rnd.nextInt() % 500 (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect() - counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) } + counted2.foreach { case (k, count) => + assert(error(count, k) < relativeSD, s"${error(count, k)} < $relativeSD") + } } test("join") {