Skip to content

Commit b2f3aca

Browse files
JoshRosenyhuai
authored andcommitted
[SPARK-9286] [SQL] Methods in Unevaluable should be final and AlgebraicAggregate should extend Unevaluable.
This patch marks the Unevaluable.eval() and UnevaluablegenCode() methods as final and fixes two cases where they were overridden. It also updates AggregateFunction2 to extend Unevaluable. Author: Josh Rosen <[email protected]> Closes #7627 from JoshRosen/unevaluable-fix and squashes the following commits: 8d9ed22 [Josh Rosen] AlgebraicAggregate should extend Unevaluable 65329c2 [Josh Rosen] Do not have AggregateFunction1 inherit from AggregateExpression1 fa68a22 [Josh Rosen] Make eval() and genCode() final
1 parent 662d60d commit b2f3aca

File tree

3 files changed

+10
-20
lines changed

3 files changed

+10
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,10 @@ abstract class Expression extends TreeNode[Expression] {
184184
*/
185185
trait Unevaluable extends Expression {
186186

187-
override def eval(input: InternalRow = null): Any =
187+
final override def eval(input: InternalRow = null): Any =
188188
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
189189

190-
override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
190+
final override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
191191
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
192192
}
193193

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ private[sql] case object Complete extends AggregateMode
6363
*/
6464
private[sql] case object NoOp extends Expression with Unevaluable {
6565
override def nullable: Boolean = true
66-
override def eval(input: InternalRow): Any = {
67-
throw new TreeNodeException(
68-
this, s"No function to evaluate expression. type: ${this.nodeName}")
69-
}
7066
override def dataType: DataType = NullType
7167
override def children: Seq[Expression] = Nil
7268
}
@@ -151,8 +147,7 @@ abstract class AggregateFunction2
151147
/**
152148
* A helper class for aggregate functions that can be implemented in terms of catalyst expressions.
153149
*/
154-
abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable {
155-
self: Product =>
150+
abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable {
156151

157152
val initialValues: Seq[Expression]
158153
val updateExpressions: Seq[Expression]
@@ -188,19 +183,15 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable {
188183
}
189184
}
190185

191-
override def update(buffer: MutableRow, input: InternalRow): Unit = {
186+
override final def update(buffer: MutableRow, input: InternalRow): Unit = {
192187
throw new UnsupportedOperationException(
193188
"AlgebraicAggregate's update should not be called directly")
194189
}
195190

196-
override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
191+
override final def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
197192
throw new UnsupportedOperationException(
198193
"AlgebraicAggregate's merge should not be called directly")
199194
}
200195

201-
override def eval(buffer: InternalRow): Any = {
202-
throw new UnsupportedOperationException(
203-
"AlgebraicAggregate's eval should not be called directly")
204-
}
205196
}
206197

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
2020
import com.clearspring.analytics.stream.cardinality.HyperLogLog
2121

2222
import org.apache.spark.sql.catalyst.InternalRow
23-
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2423
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
24+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
2525
import org.apache.spark.sql.catalyst.util.TypeUtils
2626
import org.apache.spark.sql.types._
2727
import org.apache.spark.util.collection.OpenHashSet
@@ -71,8 +71,7 @@ trait PartialAggregate1 extends AggregateExpression1 {
7171
* A specific implementation of an aggregate function. Used to wrap a generic
7272
* [[AggregateExpression1]] with an algorithm that will be used to compute one specific result.
7373
*/
74-
abstract class AggregateFunction1
75-
extends LeafExpression with AggregateExpression1 with Serializable {
74+
abstract class AggregateFunction1 extends LeafExpression with Serializable {
7675

7776
/** Base should return the generic aggregate expression that this function is computing */
7877
val base: AggregateExpression1
@@ -82,9 +81,9 @@ abstract class AggregateFunction1
8281

8382
def update(input: InternalRow): Unit
8483

85-
// Do we really need this?
86-
override def newInstance(): AggregateFunction1 = {
87-
makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
84+
override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
85+
throw new UnsupportedOperationException(
86+
"AggregateFunction1 should not be used for generated aggregates")
8887
}
8988
}
9089

0 commit comments

Comments
 (0)