Skip to content

Commit 9ba8360

Browse files
larvaboyLiuyang Li
authored andcommitted
Fix alignment and null handling issues.
1 parent 95b4067 commit 9ba8360

File tree

1 file changed

+11
-10
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions

1 file changed

+11
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi
172172
}
173173

174174
case class ApproxCountDistinctPartition(child: Expression)
175-
extends AggregateExpression with trees.UnaryNode[Expression] {
175+
extends AggregateExpression with trees.UnaryNode[Expression] {
176176
override def references = child.references
177177
override def nullable = false
178178
override def dataType = child.dataType
@@ -181,7 +181,7 @@ case class ApproxCountDistinctPartition(child: Expression)
181181
}
182182

183183
case class ApproxCountDistinctMerge(child: Expression)
184-
extends AggregateExpression with trees.UnaryNode[Expression] {
184+
extends AggregateExpression with trees.UnaryNode[Expression] {
185185
override def references = child.references
186186
override def nullable = false
187187
override def dataType = IntegerType
@@ -194,19 +194,18 @@ object ApproxCountDistinct {
194194
}
195195

196196
case class ApproxCountDistinct(child: Expression)
197-
extends PartialAggregate with trees.UnaryNode[Expression] {
197+
extends PartialAggregate with trees.UnaryNode[Expression] {
198198
override def references = child.references
199199
override def nullable = false
200200
override def dataType = IntegerType
201201
override def toString = s"APPROXIMATE COUNT(DISTINCT $child)"
202202

203203
override def asPartial: SplitEvaluation = {
204-
val partialCount = Alias(ApproxCountDistinctPartition(child),
205-
"PartialApproxCountDistinct")()
204+
val partialCount = Alias(ApproxCountDistinctPartition(child), "PartialApproxCountDistinct")()
206205
SplitEvaluation(ApproxCountDistinctMerge(partialCount.toAttribute), partialCount :: Nil)
207206
}
208207

209-
override def newInstance() = new CountDistinctFunction((child :: Nil), this)
208+
override def newInstance() = new CountDistinctFunction(child :: Nil, this)
210209
}
211210

212211
case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -309,28 +308,30 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag
309308
}
310309

311310
case class ApproxCountDistinctPartitionFunction(expr: Expression, base: AggregateExpression)
312-
extends AggregateFunction {
311+
extends AggregateFunction {
313312
def this() = this(null, null) // Required for serialization.
314313

315314
private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD)
316315

317316
override def update(input: Row): Unit = {
318317
val evaluatedExpr = expr.eval(input)
319-
Option(evaluatedExpr).foreach(hyperLogLog.offer(_))
318+
if (evaluatedExpr != null) {
319+
hyperLogLog.offer(evaluatedExpr)
320+
}
320321
}
321322

322323
override def eval(input: Row): Any = hyperLogLog
323324
}
324325

325326
case class ApproxCountDistinctMergeFunction(expr: Expression, base: AggregateExpression)
326-
extends AggregateFunction {
327+
extends AggregateFunction {
327328
def this() = this(null, null) // Required for serialization.
328329

329330
private val hyperLogLog = new HyperLogLog(ApproxCountDistinct.RelativeSD)
330331

331332
override def update(input: Row): Unit = {
332333
val evaluatedExpr = expr.eval(input)
333-
Option(evaluatedExpr.asInstanceOf[HyperLogLog]).foreach(hyperLogLog.addAll(_))
334+
hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog])
334335
}
335336

336337
override def eval(input: Row): Any = hyperLogLog.cardinality()

0 commit comments

Comments
 (0)