From fc3ac0f91e26c2069b7544715d21eb8c98a06465 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 27 May 2014 19:22:33 +0900 Subject: [PATCH 1/4] Fix evaluated value type of ApproxCountDistinctMergeFunction to Int. --- .../org/apache/spark/sql/catalyst/expressions/aggregates.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 79937b129aeae..75e54944dd118 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -340,7 +340,7 @@ case class ApproxCountDistinctMergeFunction( hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog]) } - override def eval(input: Row): Any = hyperLogLog.cardinality() + override def eval(input: Row): Any = hyperLogLog.cardinality().toInt } case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { From cbe7c71bbc90a17252796fadda6cebef95e8acee Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 28 May 2014 12:29:29 +0900 Subject: [PATCH 2/4] Revert a change. --- .../org/apache/spark/sql/catalyst/expressions/aggregates.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 75e54944dd118..79937b129aeae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -340,7 +340,7 @@ case class ApproxCountDistinctMergeFunction( hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog]) } - override def eval(input: Row): Any = hyperLogLog.cardinality().toInt + override def eval(input: Row): Any = hyperLogLog.cardinality() } case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { From 5ad7ec1c4d2a915a903a8390bbc1096f962030a9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 28 May 2014 12:41:05 +0900 Subject: [PATCH 3/4] Make dataType for each of CountDistinct, ApproxCountDistinctMerge and ApproxCountDistinct LongType. --- .../spark/sql/catalyst/expressions/aggregates.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 79937b129aeae..7e5beb66da0ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -166,7 +166,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi override def children = expressions override def references = expressions.flatMap(_.references).toSet override def nullable = false - override def dataType = IntegerType + override def dataType = LongType override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")})" override def newInstance() = new CountDistinctFunction(expressions, this) } @@ -184,7 +184,7 @@ case class ApproxCountDistinctMerge(child: Expression, relativeSD: Double) extends AggregateExpression with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false - override def dataType = IntegerType + override def dataType = LongType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" override def newInstance() = new ApproxCountDistinctMergeFunction(child, this, relativeSD) } @@ -193,7 +193,7 @@ case class ApproxCountDistinct(child: Expression, relativeSD: Double = 0.05) extends PartialAggregate with trees.UnaryNode[Expression] { override def references = child.references override def nullable = false - override def dataType = IntegerType + override def dataType = LongType override def toString = s"APPROXIMATE COUNT(DISTINCT $child)" override def asPartial: SplitEvaluation = { @@ -391,7 +391,7 @@ case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpressio } } - override def eval(input: Row): Any = seen.size + override def eval(input: Row): Any = seen.size.toLong } case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { From 3970e8871ca9810bfc68a1dcc8870d6e1c79b286 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 28 May 2014 12:44:25 +0900 Subject: [PATCH 4/4] Remove a superfluous line. --- .../org/apache/spark/sql/catalyst/expressions/aggregates.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 7e5beb66da0ba..10d6ceaf9c4ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -82,7 +82,6 @@ abstract class AggregateFunction override def dataType = base.dataType def update(input: Row): Unit - override def eval(input: Row): Any // Do we really need this? override def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)