Skip to content

Commit 3f90c3e

Browse files
committed
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
2 parents 2c18513 + b8d2580 commit 3f90c3e

File tree

57 files changed

+737
-533
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+737
-533
lines changed

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
3838
sc.stop()
3939
sc = null
4040
}
41-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
42-
System.clearProperty("spark.driver.port")
4341
}
4442

4543
test("halting by voting") {

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
672672

673673
/**
674674
* Return approximate number of distinct values for each key in this RDD.
675-
* The accuracy of approximation can be controlled through the relative standard deviation
676-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
677-
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
678-
* Partitioner to partition the output RDD.
675+
*
676+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
677+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
678+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
679+
*
680+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
681+
* It must be greater than 0.000017.
682+
* @param partitioner partitioner of the resulting RDD.
679683
*/
680-
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
681-
rdd.countApproxDistinctByKey(relativeSD, partitioner)
684+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
685+
{
686+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
682687
}
683688

684689
/**
685-
* Return approximate number of distinct values for each key this RDD.
686-
* The accuracy of approximation can be controlled through the relative standard deviation
687-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
688-
* more accurate counts but increase the memory footprint and vise versa. The default value of
689-
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
690-
* level.
690+
* Return approximate number of distinct values for each key in this RDD.
691+
*
692+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
693+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
694+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
695+
*
696+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
697+
* It must be greater than 0.000017.
698+
* @param numPartitions number of partitions of the resulting RDD.
691699
*/
692-
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
693-
rdd.countApproxDistinctByKey(relativeSD)
700+
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
701+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
694702
}
695703

696-
697704
/**
698705
* Return approximate number of distinct values for each key in this RDD.
699-
* The accuracy of approximation can be controlled through the relative standard deviation
700-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
701-
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
702-
* output RDD into numPartitions.
703706
*
707+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
708+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
709+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
710+
*
711+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
712+
* It must be greater than 0.000017.
704713
*/
705-
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
706-
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
714+
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
715+
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
707716
}
708717

709718
/** Assign a name to this RDD */

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
560560
/**
561561
* Return approximate number of distinct elements in the RDD.
562562
*
563-
* The accuracy of approximation can be controlled through the relative standard deviation
564-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
565-
* more accurate counts but increase the memory footprint and vise versa. The default value of
566-
* relativeSD is 0.05.
563+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
564+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
565+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
566+
*
567+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
568+
* It must be greater than 0.000017.
567569
*/
568-
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
570+
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
569571

570572
def name(): String = rdd.name
571573

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.collection.mutable
2828
import scala.collection.mutable.ArrayBuffer
2929
import scala.reflect.ClassTag
3030

31-
import com.clearspring.analytics.stream.cardinality.HyperLogLog
31+
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
3232
import org.apache.hadoop.conf.{Configurable, Configuration}
3333
import org.apache.hadoop.fs.FileSystem
3434
import org.apache.hadoop.io.SequenceFile.CompressionType
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
4646
import org.apache.spark.SparkContext._
4747
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4848
import org.apache.spark.serializer.Serializer
49-
import org.apache.spark.util.SerializableHyperLogLog
5049

5150
/**
5251
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
214213
}
215214

216215
/**
216+
* :: Experimental ::
217+
*
217218
* Return approximate number of distinct values for each key in this RDD.
218-
* The accuracy of approximation can be controlled through the relative standard deviation
219-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
220-
* more accurate counts but increase the memory footprint and vice versa. Uses the provided
221-
* Partitioner to partition the output RDD.
219+
*
220+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
221+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
222+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
223+
*
224+
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
225+
* would trigger sparse representation of registers, which may reduce the memory consumption
226+
* and increase accuracy when the cardinality is small.
227+
*
228+
* @param p The precision value for the normal set.
229+
* `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
230+
* @param sp The precision value for the sparse set, between 0 and 32.
231+
* If `sp` equals 0, the sparse representation is skipped.
232+
* @param partitioner Partitioner to use for the resulting RDD.
222233
*/
223-
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
224-
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
225-
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
226-
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
234+
@Experimental
235+
def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
236+
require(p >= 4, s"p ($p) must be >= 4")
237+
require(sp <= 32, s"sp ($sp) must be <= 32")
238+
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
239+
val createHLL = (v: V) => {
240+
val hll = new HyperLogLogPlus(p, sp)
241+
hll.offer(v)
242+
hll
243+
}
244+
val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
245+
hll.offer(v)
246+
hll
247+
}
248+
val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
249+
h1.addAll(h2)
250+
h1
251+
}
252+
253+
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
254+
}
227255

228-
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
256+
/**
257+
* Return approximate number of distinct values for each key in this RDD.
258+
*
259+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
260+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
261+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
262+
*
263+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
264+
* It must be greater than 0.000017.
265+
* @param partitioner partitioner of the resulting RDD
266+
*/
267+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
268+
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
269+
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
270+
assert(p <= 32)
271+
countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
229272
}
230273

231274
/**
232275
* Return approximate number of distinct values for each key in this RDD.
233-
* The accuracy of approximation can be controlled through the relative standard deviation
234-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
235-
* more accurate counts but increase the memory footprint and vice versa. HashPartitions the
236-
* output RDD into numPartitions.
237276
*
277+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
278+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
279+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
280+
*
281+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
282+
* It must be greater than 0.000017.
283+
* @param numPartitions number of partitions of the resulting RDD
238284
*/
239285
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
240286
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
241287
}
242288

243289
/**
244-
* Return approximate number of distinct values for each key this RDD.
245-
* The accuracy of approximation can be controlled through the relative standard deviation
246-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
247-
* more accurate counts but increase the memory footprint and vice versa. The default value of
248-
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
249-
* level.
290+
* Return approximate number of distinct values for each key in this RDD.
291+
*
292+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
293+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
294+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
295+
*
296+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
297+
* It must be greater than 0.000017.
250298
*/
251299
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
252300
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ package org.apache.spark.rdd
1919

2020
import java.util.Random
2121

22-
import scala.collection.Map
23-
import scala.collection.mutable
22+
import scala.collection.{mutable, Map}
2423
import scala.collection.mutable.ArrayBuffer
2524
import scala.reflect.{classTag, ClassTag}
2625

27-
import com.clearspring.analytics.stream.cardinality.HyperLogLog
26+
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
2827
import org.apache.hadoop.io.BytesWritable
2928
import org.apache.hadoop.io.compress.CompressionCodec
3029
import org.apache.hadoop.io.NullWritable
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
4140
import org.apache.spark.partial.GroupedCountEvaluator
4241
import org.apache.spark.partial.PartialResult
4342
import org.apache.spark.storage.StorageLevel
44-
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
43+
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
4544
import org.apache.spark.util.collection.OpenHashMap
4645
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
4746

@@ -655,7 +654,19 @@ abstract class RDD[T: ClassTag](
655654
* partitions* and the *same number of elements in each partition* (e.g. one was made through
656655
* a map on the other).
657656
*/
658-
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
657+
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
658+
zipPartitions(other, true) { (thisIter, otherIter) =>
659+
new Iterator[(T, U)] {
660+
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
661+
case (true, true) => true
662+
case (false, false) => false
663+
case _ => throw new SparkException("Can only zip RDDs with " +
664+
"same number of elements in each partition")
665+
}
666+
def next = (thisIter.next, otherIter.next)
667+
}
668+
}
669+
}
659670

660671
/**
661672
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
@@ -921,15 +932,49 @@ abstract class RDD[T: ClassTag](
921932
* :: Experimental ::
922933
* Return approximate number of distinct elements in the RDD.
923934
*
924-
* The accuracy of approximation can be controlled through the relative standard deviation
925-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
926-
* more accurate counts but increase the memory footprint and vise versa. The default value of
927-
* relativeSD is 0.05.
935+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
936+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
937+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
938+
*
939+
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
940+
* would trigger sparse representation of registers, which may reduce the memory consumption
941+
* and increase accuracy when the cardinality is small.
942+
*
943+
* @param p The precision value for the normal set.
944+
* `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
945+
* @param sp The precision value for the sparse set, between 0 and 32.
946+
* If `sp` equals 0, the sparse representation is skipped.
928947
*/
929948
@Experimental
949+
def countApproxDistinct(p: Int, sp: Int): Long = {
950+
require(p >= 4, s"p ($p) must be greater than 0")
951+
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
952+
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
953+
val zeroCounter = new HyperLogLogPlus(p, sp)
954+
aggregate(zeroCounter)(
955+
(hll: HyperLogLogPlus, v: T) => {
956+
hll.offer(v)
957+
hll
958+
},
959+
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
960+
h1.addAll(h2)
961+
h2
962+
}).cardinality()
963+
}
964+
965+
/**
966+
* Return approximate number of distinct elements in the RDD.
967+
*
968+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
969+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
970+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
971+
*
972+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
973+
* It must be greater than 0.000017.
974+
*/
930975
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
931-
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
932-
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
976+
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
977+
countApproxDistinct(p, 0)
933978
}
934979

935980
/**

0 commit comments

Comments
 (0)