From 653542bee0d264634030a95fdefc077566d0809e Mon Sep 17 00:00:00 2001 From: larvaboy Date: Sat, 10 May 2014 16:20:10 -0700 Subject: [PATCH 1/8] Fix a couple of minor typos. --- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 5efb4388f6c7..bc6d204434ad 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -217,7 +217,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * more accurate counts but increase the memory footprint and vice versa. Uses the provided * Partitioner to partition the output RDD. */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { @@ -232,7 +232,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * more accurate counts but increase the memory footprint and vice versa. HashPartitions the * output RDD into numPartitions. * */ @@ -244,7 +244,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of + * more accurate counts but increase the memory footprint and vice versa. The default value of * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism * level. */ From 1d9aacff2d32872e190cadeb7c7ea81c30ecc4a6 Mon Sep 17 00:00:00 2001 From: larvaboy Date: Sun, 11 May 2014 16:15:35 -0700 Subject: [PATCH 2/8] Fix a minor typo in the toString method of the Count case class. --- .../org/apache/spark/sql/catalyst/expressions/aggregates.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 7777d372903e..0722b1330245 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -146,7 +146,6 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr override def eval(input: Row): Any = currentMax } - case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false @@ -166,7 +165,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi override def references = expressions.flatMap(_.references).toSet override def nullable = false override def dataType = IntegerType - override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})" + override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")})" override def newInstance() = new CountDistinctFunction(expressions, this) } From 7ad273a46153bdd142ae55fd57c8c863d7d78de6 Mon Sep 17 00:00:00 2001 From: larvaboy Date: Mon, 12 May 2014 02:25:59 -0700 Subject: [PATCH 3/8] Add SparkSql serializer for HyperLogLog. --- .../sql/execution/SparkSqlSerializer.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 1c6e29b3cdee..94c2a249ef8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag +import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Serializer, Kryo} @@ -44,6 +45,8 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co kryo.register(classOf[scala.collection.Map[_,_]], new MapSerializer) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) + kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog], + new HyperLogLogSerializer) kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer) kryo.setReferences(false) @@ -81,6 +84,20 @@ private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] { } } +private[sql] class HyperLogLogSerializer extends Serializer[HyperLogLog] { + def write(kryo: Kryo, output: Output, hyperLogLog: HyperLogLog) { + val bytes = hyperLogLog.getBytes() + output.writeInt(bytes.length) + output.writeBytes(bytes) + } + + def read(kryo: Kryo, input: Input, tpe: Class[HyperLogLog]): HyperLogLog = { + val length = input.readInt() + val bytes = input.readBytes(length) + HyperLogLog.Builder.build(bytes) + } +} + /** * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize * them as `Array[(k,v)]`. From a2d5d108162d0d154fede22f9cb9e2f085759d17 Mon Sep 17 00:00:00 2001 From: larvaboy Date: Mon, 12 May 2014 02:38:16 -0700 Subject: [PATCH 4/8] Add ApproximateCountDistinct aggregates and functions. We use stream-lib's HyperLogLog to approximately count the number of distinct elements in each partition, and merge the HyperLogLogs to compute the final result. If the expressions can not be successfully broken apart, we fall back to the exact CountDistinct. --- .../sql/catalyst/expressions/aggregates.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 0722b1330245..70135c401bec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import com.clearspring.analytics.stream.cardinality.HyperLogLog + import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.errors.TreeNodeException @@ -169,6 +171,44 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi override def newInstance() = new CountDistinctFunction(expressions, this) } +case class ApproxCountDistinctPartition(child: Expression) + extends AggregateExpression with trees.UnaryNode[Expression] { + override def references = child.references + override def nullable = false + override def dataType = child.dataType + override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" + override def newInstance() = new ApproxCountDistinctPartitionFunction(child, this) +} + +case class ApproxCountDistinctMerge(child: Expression) + extends AggregateExpression with trees.UnaryNode[Expression] { + override def references = child.references + override def nullable = false + override def dataType = IntegerType + override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" + override def newInstance() = new ApproxCountDistinctMergeFunction(child, this) +} + +object ApproxCountDistinct { + val RelativeSD = 0.05 +} + +case class ApproxCountDistinct(child: Expression) + extends PartialAggregate with trees.UnaryNode[Expression] { + override def references = child.references + override def nullable = false + override def dataType = IntegerType + override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" + + override def asPartial: SplitEvaluation = { + val partialCount = Alias(ApproxCountDistinctPartition(child), + "PartialApproxCountDistinct")() + SplitEvaluation(ApproxCountDistinctMerge(partialCount.toAttribute), partialCount :: Nil) + } + + override def newInstance() = new CountDistinctFunction((child :: Nil), this) +} + case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false @@ -268,6 +308,34 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag override def eval(input: Row): Any = count } +case class ApproxCountDistinctPartitionFunction(expr: Expression, base: AggregateExpression) + extends AggregateFunction { + def this() = this(null, null) // Required for serialization. + + private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD) + + override def update(input: Row): Unit = { + val evaluatedExpr = expr.eval(input) + Option(evaluatedExpr).foreach(hyperLogLog.offer(_)) + } + + override def eval(input: Row): Any = hyperLogLog +} + +case class ApproxCountDistinctMergeFunction(expr: Expression, base: AggregateExpression) + extends AggregateFunction { + def this() = this(null, null) // Required for serialization. + + private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD) + + override def update(input: Row): Unit = { + val evaluatedExpr = expr.eval(input) + Option(evaluatedExpr.asInstanceOf[HyperLogLog]).foreach(hyperLogLog.addAll(_)) + } + + override def eval(input: Row): Any = hyperLogLog.cardinality() +} + case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) // Required for serialization. From f57917dee2a40d7171515932df7970dfb0fb7009 Mon Sep 17 00:00:00 2001 From: larvaboy Date: Sun, 11 May 2014 21:58:54 -0700 Subject: [PATCH 5/8] Add the parser for the approximate count. --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index b3a3a1ef1b5e..bb1b0802e326 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -93,6 +93,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val AND = Keyword("AND") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") + protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AVG = Keyword("AVG") protected val BY = Keyword("BY") protected val CAST = Keyword("CAST") @@ -318,6 +319,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { COUNT ~> "(" ~ "*" <~ ")" ^^ { case _ => Count(Literal(1)) } | COUNT ~> "(" ~ expression <~ ")" ^^ { case dist ~ exp => Count(exp) } | COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) } | + APPROXIMATE ~> COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => ApproxCountDistinct(exp) } | FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } | AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } | MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } | From 95b40678db548ae19767d30a2b5d7081a25a2660 Mon Sep 17 00:00:00 2001 From: larvaboy Date: Sun, 11 May 2014 22:05:15 -0700 Subject: [PATCH 6/8] Add a test case for count distinct and approximate count distinct. --- .../org/apache/spark/sql/SQLQuerySuite.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e966d89c30cf..c2d4e6bb277b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -96,8 +96,19 @@ class SQLQuerySuite extends QueryTest { test("count") { checkAnswer( sql("SELECT COUNT(*) FROM testData2"), - testData2.count() - ) + testData2.count()) + } + + test("count distinct") { + checkAnswer( + sql("SELECT COUNT(DISTINCT b) FROM testData2"), + 2) + } + + test("approximate count distinct") { + checkAnswer( + sql("SELECT APPROXIMATE COUNT(DISTINCT a) FROM testData2"), + 3) } // No support for primitive nulls yet. From 9ba8360d74aa009da34767c72ef013011da28d78 Mon Sep 17 00:00:00 2001 From: larvaboy Date: Tue, 13 May 2014 00:20:25 -0700 Subject: [PATCH 7/8] Fix alignment and null handling issues. --- .../sql/catalyst/expressions/aggregates.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 70135c401bec..ffb92d702a11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -172,7 +172,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi } case class ApproxCountDistinctPartition(child: Expression) - extends AggregateExpression with trees.UnaryNode[Expression] { + extends AggregateExpression with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false override def dataType = child.dataType @@ -181,7 +181,7 @@ case class ApproxCountDistinctPartition(child: Expression) } case class ApproxCountDistinctMerge(child: Expression) - extends AggregateExpression with trees.UnaryNode[Expression] { + extends AggregateExpression with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false override def dataType = IntegerType @@ -194,19 +194,18 @@ object ApproxCountDistinct { } case class ApproxCountDistinct(child: Expression) - extends PartialAggregate with trees.UnaryNode[Expression] { + extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false override def dataType = IntegerType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" override def asPartial: SplitEvaluation = { - val partialCount = Alias(ApproxCountDistinctPartition(child), - "PartialApproxCountDistinct")() + val partialCount = Alias(ApproxCountDistinctPartition(child), "PartialApproxCountDistinct")() SplitEvaluation(ApproxCountDistinctMerge(partialCount.toAttribute), partialCount :: Nil) } - override def newInstance() = new CountDistinctFunction((child :: Nil), this) + override def newInstance() = new CountDistinctFunction(child :: Nil, this) } case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -309,28 +308,30 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag } case class ApproxCountDistinctPartitionFunction(expr: Expression, base: AggregateExpression) - extends AggregateFunction { + extends AggregateFunction { def this() = this(null, null) // Required for serialization. private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD) override def update(input: Row): Unit = { val evaluatedExpr = expr.eval(input) - Option(evaluatedExpr).foreach(hyperLogLog.offer(_)) + if (evaluatedExpr != null) { + hyperLogLog.offer(evaluatedExpr) + } } override def eval(input: Row): Any = hyperLogLog } case class ApproxCountDistinctMergeFunction(expr: Expression, base: AggregateExpression) - extends AggregateFunction { + extends AggregateFunction { def this() = this(null, null) // Required for serialization. private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD) override def update(input: Row): Unit = { val evaluatedExpr = expr.eval(input) - Option(evaluatedExpr.asInstanceOf[HyperLogLog]).foreach(hyperLogLog.addAll(_)) + hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog]) } override def eval(input: Row): Any = hyperLogLog.cardinality() From bd8ef3f201627c72c07fc707129bcadfd1d5b7cb Mon Sep 17 00:00:00 2001 From: larvaboy Date: Tue, 13 May 2014 02:16:00 -0700 Subject: [PATCH 8/8] Add support of user-provided standard deviation to ApproxCountDistinct. --- .../apache/spark/sql/catalyst/SqlParser.scala | 7 +++- .../sql/catalyst/expressions/aggregates.scala | 40 +++++++++++-------- .../org/apache/spark/sql/SQLQuerySuite.scala | 6 +++ 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index bb1b0802e326..f2b9b2c1a3ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -319,7 +319,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { COUNT ~> "(" ~ "*" <~ ")" ^^ { case _ => Count(Literal(1)) } | COUNT ~> "(" ~ expression <~ ")" ^^ { case dist ~ exp => Count(exp) } | COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) } | - APPROXIMATE ~> COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => ApproxCountDistinct(exp) } | + APPROXIMATE ~> COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { + case exp => ApproxCountDistinct(exp) + } | + APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ COUNT ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ { + case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble) + } | FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } | AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } | MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index ffb92d702a11..5dbaaa3b0ce3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -171,29 +171,25 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi override def newInstance() = new CountDistinctFunction(expressions, this) } -case class ApproxCountDistinctPartition(child: Expression) +case class ApproxCountDistinctPartition(child: Expression, relativeSD: Double) extends AggregateExpression with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false override def dataType = child.dataType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" - override def newInstance() = new ApproxCountDistinctPartitionFunction(child, this) + override def newInstance() = new ApproxCountDistinctPartitionFunction(child, this, relativeSD) } -case class ApproxCountDistinctMerge(child: Expression) +case class ApproxCountDistinctMerge(child: Expression, relativeSD: Double) extends AggregateExpression with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false override def dataType = IntegerType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" - override def newInstance() = new ApproxCountDistinctMergeFunction(child, this) + override def newInstance() = new ApproxCountDistinctMergeFunction(child, this, relativeSD) } -object ApproxCountDistinct { - val RelativeSD = 0.05 -} - -case class ApproxCountDistinct(child: Expression) +case class ApproxCountDistinct(child: Expression, relativeSD: Double = 0.05) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false @@ -201,8 +197,12 @@ case class ApproxCountDistinct(child: Expression) override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" override def asPartial: SplitEvaluation = { - val partialCount = Alias(ApproxCountDistinctPartition(child), "PartialApproxCountDistinct")() - SplitEvaluation(ApproxCountDistinctMerge(partialCount.toAttribute), partialCount :: Nil) + val partialCount = + Alias(ApproxCountDistinctPartition(child, relativeSD), "PartialApproxCountDistinct")() + + SplitEvaluation( + ApproxCountDistinctMerge(partialCount.toAttribute, relativeSD), + partialCount :: Nil) } override def newInstance() = new CountDistinctFunction(child :: Nil, this) @@ -307,11 +307,14 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag override def eval(input: Row): Any = count } -case class ApproxCountDistinctPartitionFunction(expr: Expression, base: AggregateExpression) +case class ApproxCountDistinctPartitionFunction( + expr: Expression, + base: AggregateExpression, + relativeSD: Double) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. + def this() = this(null, null, 0) // Required for serialization. - private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD) + private val hyperLogLog = new HyperLogLog(relativeSD) override def update(input: Row): Unit = { val evaluatedExpr = expr.eval(input) @@ -323,11 +326,14 @@ case class ApproxCountDistinctPartitionFunction(expr: Expression, base: Aggregat override def eval(input: Row): Any = hyperLogLog } -case class ApproxCountDistinctMergeFunction(expr: Expression, base: AggregateExpression) +case class ApproxCountDistinctMergeFunction( + expr: Expression, + base: AggregateExpression, + relativeSD: Double) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. + def this() = this(null, null, 0) // Required for serialization. - private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD) + private val hyperLogLog = new HyperLogLog(relativeSD) override def update(input: Row): Unit = { val evaluatedExpr = expr.eval(input) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c2d4e6bb277b..524549eb544f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -111,6 +111,12 @@ class SQLQuerySuite extends QueryTest { 3) } + test("approximate count distinct with user provided standard deviation") { + checkAnswer( + sql("SELECT APPROXIMATE(0.04) COUNT(DISTINCT a) FROM testData2"), + 3) + } + // No support for primitive nulls yet. ignore("null count") { checkAnswer(