diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index ec97fe603c44..a8c243874535 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -339,6 +339,16 @@ public Row copy() { } @Override + public MutableRow makeMutable() { + GenericMutableRow mr = new GenericMutableRow(this.size()); + for (int i = 0; i < mr.size(); ++i) { + mr.update(i, get(i)); + } + + return mr; + } + + @Override public boolean anyNull() { return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 0d460b634d9b..960fcf091a1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import scala.util.hashing.MurmurHash3 -import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, GenericRow} import org.apache.spark.sql.types.StructType object Row { @@ -348,6 +348,17 @@ trait Row extends Serializable { */ def copy(): Row + def makeMutable(): MutableRow = { + val totalSize = length + val copiedValues = new Array[Any](totalSize) + var i = 0 + while(i < totalSize) { + copiedValues(i) = apply(i) + i += 1 + } + new GenericMutableRow(copiedValues) + } + /** Returns true if there are any NULL values in this row. */ def anyNull: Boolean = { val len = length diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate2/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate2/aggregates.scala new file mode 100644 index 000000000000..a996c65dfb91 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate2/aggregates.scala @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate2 + +import java.util.{Set => JSet} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + + +/** + * This is from org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode + * Just a hint for the UDAF developers which stage we are about to process, + * However, we probably don't want the developers knows so many details, here + * is just for keep consistent with Hive (when integrated with Hive), need to + * figure out if we have work around for that soon. + */ +@deprecated +trait Mode + +/** + * PARTIAL1: from original data to partial aggregation data: iterate() and + * terminatePartial() will be called. + */ +@deprecated +case object PARTIAL1 extends Mode + +/** + * PARTIAL2: from partial aggregation data to partial aggregation data: + * merge() and terminatePartial() will be called. + */ +@deprecated +case object PARTIAL2 extends Mode +/** + * FINAL: from partial aggregation to full aggregation: merge() and + * terminate() will be called. + */ +@deprecated +case object FINAL extends Mode +/** + * COMPLETE: from original data directly to full aggregation: iterate() and + * terminate() will be called. + */ +@deprecated +case object COMPLETE extends Mode + + +/** + * Aggregation Function Interface + * All of the function will be called within Spark executors. + */ +trait AggregateFunction2 { + self: Product => + + // Specify the BoundReference for Aggregate Buffer + def initialize(buffers: Seq[BoundReference]): Unit + + // Initialize (reinitialize) the aggregation buffer + def reset(buf: MutableRow): Unit + + // Get the children value from the input row, and then + // merge it with the given aggregate buffer, + // `seen` is the set that the value showed up, that's will + // be useful for distinct aggregate. And it probably be + // null for non-distinct aggregate + def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit + + // Merge 2 aggregation buffers, and write back to the later one + def merge(value: Row, buf: MutableRow): Unit + + // Semantically we probably don't need this, however, we need it when + // integrating with Hive UDAF(GenericUDAF) + @deprecated + def terminatePartial(buf: MutableRow): Unit = {} + + // Output the final result by feeding the aggregation buffer + def terminate(buffer: Row): Any +} + +trait AggregateExpression2 extends Expression with AggregateFunction2 { + self: Product => + implicit def boundReferenceToIndex(br: BoundReference): Int = br.ordinal + + type EvaluatedType = Any + + var mode: Mode = COMPLETE // will only be used by Hive UDAF + + def initial(m: Mode): Unit = { + this.mode = m + } + + // Aggregation Buffer data types + def bufferDataType: Seq[DataType] = Nil + // Is it a distinct aggregate expression? + def distinct: Boolean + + def nullable: Boolean = true + + final override def eval(aggrBuffer: Row): EvaluatedType = terminate(aggrBuffer) +} + +abstract class UnaryAggregateExpression extends UnaryExpression with AggregateExpression2 { + self: Product => +} + +case class Min(child: Expression) extends UnaryAggregateExpression { + + override def distinct: Boolean = false + override def dataType: DataType = child.dataType + override def bufferDataType: Seq[DataType] = dataType :: Nil + override def toString: String = s"MIN($child)" + + /* The below code will be called in executors, be sure to make the instance transientable */ + @transient var arg: MutableLiteral = _ + @transient var buffer: MutableLiteral = _ + @transient var cmp: LessThan = _ + @transient var aggr: BoundReference = _ + + /* Initialization on executors */ + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + arg = MutableLiteral(null, dataType) + buffer = MutableLiteral(null, dataType) + cmp = LessThan(arg, buffer) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = null + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + // we don't care about if the argument has existed or not in the seen + val argument = child.eval(input) + if (argument != null) { + arg.value = argument + buffer.value = buf(aggr) + if (buf.isNullAt(aggr) || cmp.eval(null) == true) { + buf(aggr) = argument + } + } + } + + override def merge(value: Row, rowBuf: MutableRow): Unit = { + if (!value.isNullAt(aggr)) { + arg.value = value(aggr) + buffer.value = rowBuf(aggr) + if (rowBuf.isNullAt(aggr) || cmp.eval(null) == true) { + rowBuf(aggr) = arg.value + } + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} + +case class Average(child: Expression, distinct: Boolean = false) + extends UnaryAggregateExpression { + override def nullable: Boolean = false + + override def dataType: DataType = child.dataType match { + case DecimalType.Fixed(precision, scale) => + DecimalType(precision + 4, scale + 4) // Add 4 digits after decimal point, like Hive + case DecimalType.Unlimited => + DecimalType.Unlimited + case _ => + DoubleType + } + + override def bufferDataType: Seq[DataType] = LongType :: dataType :: Nil + override def toString: String = s"AVG($child)" + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var count: BoundReference = _ + @transient var sum: BoundReference = _ + + // for iterate + @transient var arg: MutableLiteral = _ + @transient var cast: Expression = _ + @transient var add: Add = _ + + // for merge + @transient var argInMerge: MutableLiteral = _ + @transient var addInMerge: Add = _ + + // for terminate + @transient var divide: Divide = _ + + /* Initialization on executors */ + override def initialize(buffers: Seq[BoundReference]): Unit = { + count = buffers(0) + sum = buffers(1) + + arg = MutableLiteral(null, child.dataType) + cast = if (arg.dataType != dataType) Cast(arg, dataType) else arg + add = Add(cast, sum) + + argInMerge = MutableLiteral(null, dataType) + addInMerge = Add(argInMerge, sum) + + divide = Divide(sum, Cast(count, dataType)) + } + + override def reset(buf: MutableRow): Unit = { + buf(count) = 0L + buf(sum) = null + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + val argument = child.eval(input) + if (argument != null) { + if (!distinct || !seen.contains(argument)) { + arg.value = argument + buf(count) = buf.getLong(count) + 1 + if (buf.isNullAt(sum)) { + buf(sum) = cast.eval() + } else { + buf(sum) = add.eval(buf) + } + if (distinct) seen.add(argument) + } + } + } + + override def merge(value: Row, buf: MutableRow): Unit = { + if (!value.isNullAt(sum)) { + buf(count) = value.getLong(count) + buf.getLong(count) + if (buf.isNullAt(sum)) { + buf(sum) = value(sum) + } else { + argInMerge.value = value(sum) + buf(sum) = addInMerge.eval(buf) + } + } + } + + override def terminate(row: Row): Any = if (count.eval(row) == 0) null else divide.eval(row) +} + +case class Max(child: Expression) extends UnaryAggregateExpression { + override def distinct: Boolean = false + + override def nullable: Boolean = true + override def dataType: DataType = child.dataType + override def bufferDataType: Seq[DataType] = dataType :: Nil + override def toString: String = s"MAX($child)" + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var aggr: BoundReference = _ + @transient var arg: MutableLiteral = _ + @transient var buffer: MutableLiteral = _ + @transient var cmp: GreaterThan = _ + + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + arg = MutableLiteral(null, dataType) + buffer = MutableLiteral(null, dataType) + cmp = GreaterThan(arg, buffer) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = null + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + // we don't care about if the argument has existed or not in the seen + val argument = child.eval(input) + if (argument != null) { + arg.value = argument + buffer.value = buf(aggr) + if (buf.isNullAt(aggr) || cmp.eval(null) == true) { + buf(aggr) = argument + } + } + } + + override def merge(value: Row, rowBuf: MutableRow): Unit = { + if (!value.isNullAt(aggr)) { + arg.value = value(aggr) + buffer.value = rowBuf(aggr) + if (rowBuf.isNullAt(aggr) || cmp.eval(null) == true) { + rowBuf(aggr) = arg.value + } + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} + +case class Count(child: Expression) + extends UnaryAggregateExpression { + def distinct: Boolean = false + override def nullable: Boolean = false + override def dataType: DataType = LongType + override def bufferDataType: Seq[DataType] = LongType :: Nil + override def toString: String = s"COUNT($child)" + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var aggr: BoundReference = _ + + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = 0L + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + // we don't care about if the argument has existed or not in the seen + // we here only handle the non distinct case + val argument = child.eval(input) + if (argument != null) { + if (buf.isNullAt(aggr)) { + buf(aggr) = 1L + } else { + buf(aggr) = buf.getLong(aggr) + 1L + } + } + } + + override def merge(value: Row, rowBuf: MutableRow): Unit = { + if (value.isNullAt(aggr)) { + // do nothing + } else if (rowBuf.isNullAt(aggr)) { + rowBuf(aggr) = value(aggr) + } else { + rowBuf(aggr) = value.getLong(aggr) + rowBuf.getLong(aggr) + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} + +case class CountDistinct(children: Seq[Expression]) + extends AggregateExpression2 { + def distinct: Boolean = true + override def nullable: Boolean = false + override def dataType: DataType = LongType + override def toString: String = s"COUNT($children)" + override def bufferDataType: Seq[DataType] = LongType :: Nil + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var aggr: BoundReference = _ + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = 0L + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + val arguments = children.map(_.eval(input)) + if (!arguments.exists(_ == null)) { + // CountDistinct supports multiple expression, and ONLY IF + // none of its expressions value equals null + if (!seen.contains(arguments)) { + if (buf.isNullAt(aggr)) { + buf(aggr) = 1L + } else { + buf(aggr) = buf.getLong(aggr) + 1L + } + seen.add(arguments) + } + } + } + + override def merge(value: Row, rowBuf: MutableRow): Unit = { + if (value.isNullAt(aggr)) { + // do nothing + } else if (rowBuf.isNullAt(aggr)) { + rowBuf(aggr) = value(aggr) + } else { + rowBuf(aggr) = value.getLong(aggr) + rowBuf.getLong(aggr) + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} + + /** + * Sum should satisfy 3 cases: + * 1) sum of all null values = zero + * 2) sum for table column with no data = null + * 3) sum of column with null and not null values = sum of not null values + * Require separate CombineSum Expression and function as it has to distinguish "No data" case + * versus "data equals null" case, while aggregating results and at each partial expression.i.e., + * Combining PartitionLevel InputData + * <-- null + * Zero <-- Zero <-- null + * + * <-- null <-- no data + * null <-- null <-- no data + */ +case class Sum(child: Expression, distinct: Boolean = false) + extends UnaryAggregateExpression { + override def nullable: Boolean = true + override def dataType: DataType = child.dataType match { + case DecimalType.Fixed(precision, scale) => + DecimalType(precision + 10, scale) // Add 10 digits left of decimal point, like Hive + case DecimalType.Unlimited => + DecimalType.Unlimited + case _ => + child.dataType + } + + override def bufferDataType: Seq[DataType] = dataType :: Nil + override def toString: String = s"SUM($child)" + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var aggr: BoundReference = _ + @transient var arg: MutableLiteral = _ + @transient var sum: Add = _ + + lazy val DEFAULT_VALUE = Cast(Literal.create(0, IntegerType), dataType).eval() + + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + arg = MutableLiteral(null, dataType) + sum = Add(arg, aggr) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = null + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + val argument = child.eval(input) + if (!distinct || !seen.contains(argument)) { + if (argument != null) { + if (buf.isNullAt(aggr)) { + buf(aggr) = argument + } else { + arg.value = argument + buf(aggr) = sum.eval(buf) + } + } else { + if (buf.isNullAt(aggr)) { + buf(aggr) = DEFAULT_VALUE + } + } + if (distinct) seen.add(argument) + } + } + + override def merge(value: Row, buf: MutableRow): Unit = { + if (!value.isNullAt(aggr)) { + arg.value = value(aggr) + if (buf.isNullAt(aggr)) { + buf(aggr) = arg.value + } else { + buf(aggr) = sum.eval(buf) + } + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} + +case class First(child: Expression, distinct: Boolean = false) + extends UnaryAggregateExpression { + override def nullable: Boolean = true + override def dataType: DataType = child.dataType + override def bufferDataType: Seq[DataType] = dataType :: Nil + override def toString: String = s"FIRST($child)" + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var aggr: BoundReference = _ + + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = null + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + // we don't care about if the argument has existed or not in the seen + val argument = child.eval(input) + if (buf.isNullAt(aggr)) { + if (argument != null) { + buf(aggr) = argument + } + } + } + + override def merge(value: Row, buf: MutableRow): Unit = { + if (buf.isNullAt(aggr)) { + if (!value.isNullAt(aggr)) { + buf(aggr) = value(aggr) + } + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} + +case class Last(child: Expression, distinct: Boolean = false) + extends UnaryAggregateExpression { + override def nullable: Boolean = true + override def dataType: DataType = child.dataType + override def bufferDataType: Seq[DataType] = dataType :: Nil + override def toString: String = s"LAST($child)" + + /* The below code will be called in executors, be sure to mark the instance as transient */ + @transient var aggr: BoundReference = _ + + override def initialize(buffers: Seq[BoundReference]): Unit = { + aggr = buffers(0) + } + + override def reset(buf: MutableRow): Unit = { + buf(aggr) = null + } + + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + // we don't care about if the argument has existed or not in the seen + val argument = child.eval(input) + if (argument != null) { + buf(aggr) = argument + } + } + + override def merge(value: Row, buf: MutableRow): Unit = { + if (!value.isNullAt(aggr)) { + buf(aggr) = value(aggr) + } + } + + override def terminate(row: Row): Any = aggr.eval(row) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 5fd892c42e69..a29ed83c9cd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -57,6 +57,7 @@ object EmptyRow extends Row { override def getString(i: Int): String = throw new UnsupportedOperationException override def getAs[T](i: Int): T = throw new UnsupportedOperationException override def copy(): Row = this + override def makeMutable(): MutableRow = throw new UnsupportedOperationException } /** @@ -174,6 +175,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { } override def copy(): Row = this + override def makeMutable(): MutableRow = new GenericMutableRow(values.clone()) } class GenericRowWithSchema(values: Array[Any], override val schema: StructType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 1dd75a884630..405c7f3f2d36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.planning +import org.apache.spark.sql.catalyst.expressions.aggregate2.AggregateExpression2 + import scala.annotation.tailrec import org.apache.spark.Logging @@ -106,6 +108,103 @@ object PhysicalOperation extends PredicateHelper { } } +/** + * + * TODO: This is a temporal solution to substitute the expression tree from + * AggregateExpression with aggregate2.AggregateExpression2, and will be + * removed once the aggregate2.AggregateExpression2 is stable enough. + */ +class AggregateExpressionSubsitution { + def subsitute(aggr: AggregateExpression): AggregateExpression2 = aggr match { + case Min(child) => aggregate2.Min(child) + case Max(child) => aggregate2.Max(child) + case Count(child) => aggregate2.Count(child) + case CountDistinct(children) => aggregate2.CountDistinct(children) + // TODO: we don't support approximate in aggregate2 yet. + case ApproxCountDistinct(child, sd) => aggregate2.CountDistinct(child :: Nil) + case Average(child) => aggregate2.Average(child) + case Sum(child) => aggregate2.Sum(child) + case SumDistinct(child) => aggregate2.Sum(child, true) + case First(child) => aggregate2.First(child) + case Last(child) => aggregate2.Last(child) + } +} + +// TODO: Will be removed once aggregate2.AggregateExpression2 is stable enough +object AggregateExpressionSubsitution extends AggregateExpressionSubsitution + +/** + * Matches a logical aggregation that can be performed on distributed data in two steps. The first + * operates on the data in each partition performing partial aggregation for each group. The second + * occurs after the shuffle and completes the aggregation. + * + * This pattern will only match if all aggregate expressions can be computed partially and will + * return the rewritten aggregation expressions for both phases. + * + * The returned values for this match are as follows: + * - Grouping expression (transformed to NamedExpression) list . + * - Aggregate Expressions extract from the projection. + * - Rewritten Projection for post shuffled. + * - Original Projection. + * - Child logical plan. + */ +object PartialAggregation2 { + type ReturnType = + (Seq[NamedExpression], + Seq[aggregate2.AggregateExpression2], + Seq[NamedExpression], + Seq[NamedExpression], + LogicalPlan) + + def unapply(plan: LogicalPlan) + : Option[ReturnType] = plan match { + case logical.Aggregate(groupingExpressions, projection, child) => + // Collect all aggregate expressions that can be computed partially. + val allAggregates = projection.flatMap(_ collect { + case a: aggregate2.AggregateExpression2 => a + }) + + // We need to pass all grouping expressions though so the grouping can happen a second + // time. However some of them might be unnamed so we alias them allowing them to be + // referenced in the second aggregation. + val namedGroupingExpressions = groupingExpressions.map { + case n: NamedExpression => (n, n) + case other => (other, Alias(other, "PartialGroup")()) + } + val substitutions = namedGroupingExpressions.toMap + + // Replace aggregations with a new expression that computes the result from the already + // computed partial evaluations and grouping values. + val rewrittenProjection = projection.map(_.transformUp { + case e: Expression if substitutions.contains(e) => + substitutions(e).toAttribute + case e: AggregateExpression2 => e.transformChildrenDown { + // replace the child expression of the aggregate expression, with + // Literal, as in PostShuffle, we don't need children expression any + // more(Only aggregate buffer required), otherwise, it will + // cause the attribute not binding exceptions. + case expr => MutableLiteral(null, expr.dataType, expr.nullable) + } + case e: Expression => + // Should trim aliases around `GetField`s. These aliases are introduced while + // resolving struct field accesses, because `GetField` is not a `NamedExpression`. + // (Should we just turn `GetField` into a `NamedExpression`?) + substitutions + .get(e.transform { case Alias(g: ExtractValue, _) => g }) + .map(_.toAttribute) + .getOrElse(e) + }).asInstanceOf[Seq[NamedExpression]] + + Some( + (namedGroupingExpressions.map(_._2), + allAggregates, + rewrittenProjection, + projection, + child)) + case _ => None + } +} + /** * Matches a logical aggregation that can be performed on distributed data in two steps. The first * operates on the data in each partition performing partial aggregation for each group. The second @@ -126,7 +225,10 @@ object PartialAggregation { (Seq[Attribute], Seq[NamedExpression], Seq[Expression], Seq[NamedExpression], LogicalPlan) def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { - case logical.Aggregate(groupingExpressions, aggregateExpressions, child) => + case logical.Aggregate(groupingExpressions, aggregateExpressions, child) + if (aggregateExpressions.flatMap(_.collect { + case a: aggregate2.AggregateExpression2 => a + })).length == 0 => // Collect all aggregate expressions. val allAggregates = aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a}) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 77c6af27d100..fc7515a421c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -90,6 +90,9 @@ private[spark] object SQLConf { val USE_JACKSON_STREAMING_API = "spark.sql.json.useJacksonStreamingAPI" + // Whether enable the aggregate2, which is the refactor one + val AGGREGATE_2 = "spark.sql.aggregate2" + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -264,6 +267,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean + private[spark] def aggregate2: Boolean = + getConf(AGGREGATE_2, "false").toBoolean + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ddb54025baa2..b2d3a866e98d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -21,6 +21,8 @@ import java.beans.Introspector import java.util.Properties import java.util.concurrent.atomic.AtomicReference +import org.apache.spark.sql.catalyst.planning.AggregateExpressionSubsitution + import scala.collection.JavaConversions._ import scala.collection.immutable import scala.language.implicitConversions @@ -825,6 +827,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def strategies: Seq[Strategy] = experimental.extraStrategies ++ ( + new HashAggregation2(AggregateExpressionSubsitution) :: DataSourceStrategy :: DDLStrategy :: TakeOrdered :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7a1331a39151..6de9c3f93543 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate2.AggregateExpression2 import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -117,6 +118,44 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + class HashAggregation2(aggrSubsitution: AggregateExpressionSubsitution) extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case aggr @ logical.Aggregate(groupingExpressions, aggregateExpressions, child) + if sqlContext.conf.aggregate2 => + + val subusitutedAggrExpression = aggregateExpressions.map(_.transformUp { + case a: AggregateExpression => aggrSubsitution.subsitute(a) + }.asInstanceOf[NamedExpression]) + + aggr.copy(aggregateExpressions = subusitutedAggrExpression) match { + // Aggregations that can be performed in two phases, before and after the shuffle. + case PartialAggregation2( + namedGroupingExpressions, + aggregates, + rewrittenProjection, + originalProjection, + child) => + if (aggregates.exists(_.distinct)) { + aggregate2.DistinctAggregate( + namedGroupingExpressions, + originalProjection, + rewrittenProjection, + planLater(child)) :: Nil + } else { + aggregate2.AggregatePostShuffle( + namedGroupingExpressions.map(_.toAttribute), + rewrittenProjection, + aggregate2.AggregatePreShuffle( + namedGroupingExpressions, + aggregates, + planLater(child))) :: Nil + } + case _ => Nil + } + case _ => Nil + } + } + object HashAggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // Aggregations that can be performed in two phases, before and after the shuffle. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate2/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate2/Aggregate.scala new file mode 100644 index 000000000000..b714435ee8c6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate2/Aggregate.scala @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate2 + +import java.util.{HashSet => JHashSet, Set => JSet} + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate2._ +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} + +import org.apache.spark.util.collection.OpenHashMap + +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.catalyst.plans.physical._ + +// A class of the Aggregate buffer & Seen Set pair +sealed class BufferSeens(var buffer: MutableRow, var seens: Array[JSet[Any]] = null) { + def this() { + this(new GenericMutableRow(0), null) + } + + def withBuffer(row: MutableRow): BufferSeens = { + this.buffer = row + this + } + + def withSeens(seens: Array[JSet[Any]]): BufferSeens = { + this.seens = seens + this + } +} + +// A MutableRow for AggregateBuffers and GroupingExpression Values +sealed class BufferAndKey(leftLen: Int, rightLen: Int) + extends GenericMutableRow(leftLen + rightLen) { + + def this(leftLen: Int, keys: Row) = { + this(leftLen, keys.length) + // copy the keys to the last + var idx = leftLen + var idx2 = 0 + while (idx2 < keys.length) { + this.values(idx) = keys(idx2) + idx2 += 1 + idx += 1 + } + } +} + +sealed trait Aggregate { + self: Product => + // HACK: Generators don't correctly preserve their output through serializations so we grab + // out child's output attributes statically here. + val childOutput = child.output + + // initialize the aggregate functions, this will be called in the beginning of every partition + // data processing + def initializedAndGetAggregates( + mode: Mode, + aggregates: Seq[AggregateExpression2]) + : Array[AggregateExpression2] = { + var pos = 0 + + aggregates.map { ae => + ae.initial(mode) + + // We connect all of the aggregation buffers in a single Row, + // and "BIND" the attribute references in a Hack way, as we believe + // the Pre/Post Shuffle Aggregate are actually tightly coupled + val bufferDataTypes = ae.bufferDataType + ae.initialize(for (i <- 0 until bufferDataTypes.length) yield { + BoundReference(pos + i, bufferDataTypes(i), true) + }) + pos += bufferDataTypes.length + + ae + }.toArray + } + + // This is provided by SparkPlan + def child: SparkPlan + + // The schema of the aggregate buffers, as we lines those buffers + // in a single row. + def bufferSchemaFromAggregate(aggregates: Seq[AggregateExpression2]): Seq[Attribute] = + aggregates.zipWithIndex.flatMap { case (ca, idx) => + ca.bufferDataType.zipWithIndex.map { case (dt, i) => + // the attribute names is useless here, as we bind the attribute + // in a hack way, see [[initializedAndGetAggregates]] + AttributeReference(s"aggr.${idx}_$i", dt)().toAttribute } + } +} + +sealed trait PostShuffle extends Aggregate { + self: Product => + + // extract the aggregate function from the projection + def computedAggregates(projectionList: Seq[Expression]): Seq[AggregateExpression2] = { + projectionList.flatMap { expr => + expr.collect { + case ae: AggregateExpression2 => ae + } + } + } +} + +/** + * :: DeveloperApi :: + * Groups input data by `groupingExpressions` and computes the `projection` for each + * group. + * + * @param groupingExpressions the attributes represent the output of the grouping expressions + * @param originalProjection Unbound Aggregate Function List. + * @param child the input data source. + */ +@DeveloperApi +case class AggregatePreShuffle( + groupingExpressions: Seq[NamedExpression], + originalProjection: Seq[AggregateExpression2], + child: SparkPlan) + extends UnaryNode with Aggregate { + + private val aggregateExpressions: Seq[AggregateExpression2] = originalProjection.map { + BindReferences.bindReference(_, childOutput) + } + + private val buffersSchema = bufferSchemaFromAggregate(aggregateExpressions) + override def requiredChildDistribution: Seq[Distribution] = UnspecifiedDistribution :: Nil + override val output: Seq[Attribute] = buffersSchema ++ groupingExpressions.map(_.toAttribute) + + /** + * Create Iterator for the in-memory hash map. + */ + private[this] def createIterator( + functions: Array[AggregateExpression2], + iterator: Iterator[BufferSeens]) = { + new Iterator[Row] { + override final def hasNext: Boolean = iterator.hasNext + + override final def next(): Row = { + val keyBuffer = iterator.next() + var idx = 0 + while (idx < functions.length) { + // terminatedPartial is for Hive UDAF, we + // provide an opportunity to transform its internal aggregate buffer into + // the catalyst data. + functions(idx).terminatePartial(keyBuffer.buffer) + idx += 1 + } + + keyBuffer.buffer + } + } + } + + override def doExecute(): RDD[Row] = attachTree(this, "execute") { + if (groupingExpressions.length == 0) { + child.execute().mapPartitions { iter => + // the input is every single row + val aggregates = initializedAndGetAggregates(PARTIAL1, aggregateExpressions) + // without group by keys + val buffer = new GenericMutableRow(buffersSchema.length) + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.reset(buffer) + idx += 1 + } + + while (iter.hasNext) { + val currentRow = iter.next() + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.update(currentRow, buffer, null) + idx += 1 + } + } + + createIterator(aggregates, Iterator(new BufferSeens().withBuffer(buffer))) + } + } else { + child.execute().mapPartitions { iter => + // the input is every single row + val aggregates = initializedAndGetAggregates(PARTIAL1, aggregateExpressions) + + val groupByProjection = new InterpretedMutableProjection(groupingExpressions, childOutput) + + val results = new OpenHashMap[Row, BufferSeens]() + while (iter.hasNext) { + val currentRow = iter.next() + + val keys = groupByProjection(currentRow) + results(keys) match { + case null => + val buffer = new BufferAndKey(buffersSchema.length, keys) + // update the aggregate buffers + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.reset(buffer) + ae.update(currentRow, buffer, null) + idx += 1 + } + + results(keys.copy()) = new BufferSeens(buffer, null) + case inputbuffer => + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.update(currentRow, inputbuffer.buffer, null) + idx += 1 + } + } + } + + // The output is the (Aggregate Buffers + Grouping Expression Values) + createIterator(aggregates, results.iterator.map(_._2)) + } + } + } +} + +case class AggregatePostShuffle( + groupingExpressions: Seq[Attribute], + rewrittenProjection: Seq[NamedExpression], + child: SparkPlan) extends UnaryNode with PostShuffle { + + override def output: Seq[Attribute] = rewrittenProjection.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = if (groupingExpressions == Nil) { + AllTuples :: Nil + } else { + ClusteredDistribution(groupingExpressions) :: Nil + } + + override def doExecute(): RDD[Row] = attachTree(this, "execute") { + if (groupingExpressions.length == 0) { + child.execute().mapPartitions { iter => + // The input Row in the format of (AggregateBuffers) + val aggregates = + initializedAndGetAggregates(FINAL, computedAggregates(rewrittenProjection)) + val finalProjection = new InterpretedMutableProjection(rewrittenProjection, childOutput) + + val buffer = new GenericMutableRow(childOutput.length) + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.reset(buffer) + idx += 1 + } + + while (iter.hasNext) { + val currentRow = iter.next() + + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.merge(currentRow, buffer) + idx += 1 + } + } + + Iterator(finalProjection(buffer)) + } + } else { + child.execute().mapPartitions { iter => + // The input Row in the format of (AggregateBuffers + GroupingExpression Values) + val aggregates = + initializedAndGetAggregates(FINAL, computedAggregates(rewrittenProjection)) + val finalProjection = new InterpretedMutableProjection(rewrittenProjection, childOutput) + + val results = new OpenHashMap[Row, BufferSeens]() + val groupByProjection = new InterpretedMutableProjection(groupingExpressions, childOutput) + + while (iter.hasNext) { + val currentRow = iter.next() + val keys = groupByProjection(currentRow) + results(keys) match { + case null => + // TODO actually what we need to copy is the grouping expression values + // as the aggregate buffer will be reset. + val buffer = currentRow.makeMutable() + // The reason why need to reset it first and merge with the input row is, + // in Hive UDAF, we need to provide an opportunity that the buffer can be the + // custom type, Otherwise, HIVE UDAF will wrap/unwrap in every merge() method + // calls, which is every expensive + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.reset(buffer) + ae.merge(currentRow, buffer) + idx += 1 + } + results(keys.copy()) = new BufferSeens(buffer, null) + case pair => + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.merge(currentRow, pair.buffer) + idx += 1 + } + } + } + + // final Project is simple a rewrite version of output expression list + // which will project as the final output + results.iterator.map { it => finalProjection(it._2.buffer)} + } + } + } +} + +// TODO Currently even if only a single DISTINCT exists in the aggregate expressions, we will +// not do partial aggregation (aggregating before shuffling), all of the data have to be shuffled +// to the reduce side and do aggregation directly, this probably causes the performance regression +// for Aggregation Function like CountDistinct etc. +case class DistinctAggregate( + groupingExpressions: Seq[NamedExpression], + originalProjection: Seq[NamedExpression], + rewrittenProjection: Seq[NamedExpression], + child: SparkPlan) extends UnaryNode with PostShuffle { + + override def output: Seq[Attribute] = rewrittenProjection.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = if (groupingExpressions == Nil) { + AllTuples :: Nil + } else { + ClusteredDistribution(groupingExpressions) :: Nil + } + + // binding the expression, which takes the child's output as input + private val aggregateExpressions: Seq[Expression] = originalProjection.map { + BindReferences.bindReference(_: Expression, childOutput) + } + + override def doExecute(): RDD[Row] = attachTree(this, "execute") { + if (groupingExpressions.length == 0) { + child.execute().mapPartitions { iter => + // initialize the aggregate functions for input rows + // (update/terminatePartial will be called) + val aggregates = + initializedAndGetAggregates(COMPLETE, computedAggregates(aggregateExpressions)) + + val buffersSchema = bufferSchemaFromAggregate(aggregates) + + // initialize the aggregate functions for the final output (merge/terminate will be called) + initializedAndGetAggregates(COMPLETE, computedAggregates(rewrittenProjection)) + // binding the output projection, which takes the aggregate buffer and grouping keys + // as the input row. + val finalProjection = new InterpretedMutableProjection(rewrittenProjection, buffersSchema) + + val buffer = new GenericMutableRow(buffersSchema.length) + val seens = new Array[JSet[Any]](aggregates.length) + + // reset the aggregate buffer + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.reset(buffer) + + if (ae.distinct) { + seens(idx) = new JHashSet[Any]() + } + + idx += 1 + } + val ibs = new BufferSeens().withBuffer(buffer).withSeens(seens) + + // update the aggregate buffer + while (iter.hasNext) { + val currentRow = iter.next() + + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.update(currentRow, buffer, seens(idx)) + + idx += 1 + } + } + + // only single output for non grouping keys case + Iterator(finalProjection(ibs.buffer)) + } + } else { + child.execute().mapPartitions { iter => + // initialize the aggregate functions for input rows + // (update will be called) + val aggregates = + initializedAndGetAggregates(COMPLETE, computedAggregates(aggregateExpressions)) + + val buffersSchema = bufferSchemaFromAggregate(aggregates) + val outputSchema = buffersSchema ++ groupingExpressions.map(_.toAttribute) + + // initialize the aggregate functions for the final output + // (merge/terminate will be called) + initializedAndGetAggregates(COMPLETE, computedAggregates(rewrittenProjection)) + // binding the output projection, which takes the aggregate buffer and grouping keys + // as the input row. + val finalProjection = new InterpretedMutableProjection(rewrittenProjection, outputSchema) + + val results = new OpenHashMap[Row, BufferSeens]() + val groupByProjection = new InterpretedMutableProjection(groupingExpressions, childOutput) + + while (iter.hasNext) { + val currentRow = iter.next() + + val keys = groupByProjection(currentRow) + results(keys) match { + case null => + val buffer = new BufferAndKey(buffersSchema.length, keys) + val seens = new Array[JSet[Any]](aggregates.length) + + // update the aggregate buffers + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.reset(buffer) + + if (ae.distinct) { + val seen = new JHashSet[Any]() + seens(idx) = seen + } + + ae.update(currentRow, buffer, seens(idx)) + idx += 1 + } + + results(keys.copy()) = new BufferSeens(buffer, seens) + + case bufferSeens => + var idx = 0 + while (idx < aggregates.length) { + val ae = aggregates(idx) + ae.update(currentRow, bufferSeens.buffer, bufferSeens.seens(idx)) + + idx += 1 + } + } + } + + results.iterator.map(it => finalProjection(it._2.buffer)) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5babc4332cc7..39fef43d49f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -174,7 +174,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { case generatedAgg: GeneratedAggregate => hasGeneratedAgg = true case _ => } - if (!hasGeneratedAgg) { + if (!hasGeneratedAgg && sqlContext.getConf(SQLConf.AGGREGATE_2) == false) { fail( s""" |Codegen is enabled, but query $sqlText does not have GeneratedAggregate in the plan. @@ -1365,3 +1365,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { } } } + +class SQLNewUDAFQuerySuite extends SQLQuerySuite { + override def beforeAll() { + super.beforeAll() + sqlContext.setConf(SQLConf.AGGREGATE_2, "true") + } + + override def afterAll() { + sqlContext.setConf(SQLConf.AGGREGATE_2, "false") + super.afterAll() + } +} diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 048f78b4daa8..7ec0be473a5d 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -1007,3 +1007,15 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "view_inputs" ) } + +class HiveNewUDAFCompatibilitySuite extends HiveCompatibilitySuite { + override def beforeAll() { + super.beforeAll() + TestHive.setConf(SQLConf.AGGREGATE_2, "true") + } + + override def afterAll() { + TestHive.setConf(SQLConf.AGGREGATE_2, "false") + super.afterAll() + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b8f294c262af..e06f89c655a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.spark.sql.catalyst.ParserDialect +import org.apache.spark.sql.catalyst.planning.AggregateExpressionSubsitution + import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.language.implicitConversions @@ -445,6 +447,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val hiveContext = self override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( + new HashAggregation2(HiveAggregateExpressionSubsitution), DataSourceStrategy, HiveCommandStrategy(self), HiveDDLStrategy, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 01f47352b231..da3d7b2b6953 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.hive +import java.util.{Set => JSet} + import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.aggregate2.{AggregateExpression2, COMPLETE, FINAL, PARTIAL1} +import org.apache.spark.sql.catalyst.planning.AggregateExpressionSubsitution import scala.collection.mutable.ArrayBuffer @@ -468,6 +472,118 @@ private[hive] case class HiveUdaf( def newInstance(): HiveUdafFunction = new HiveUdafFunction(funcWrapper, children, this, true) } +private[hive] case class HiveGenericUdaf2( + funcWrapper: HiveFunctionWrapper, + children: Seq[Expression], + distinct: Boolean, + isSimpleUDAF: Boolean) extends AggregateExpression2 with HiveInspectors { + type UDFType = AbstractGenericUDAFResolver + + protected def createEvaluator = resolver.getEvaluator( + new SimpleGenericUDAFParameterInfo(inspectors, false, false)) + + // Hive UDAF evaluator + @transient + lazy val evaluator = createEvaluator + + @transient + protected lazy val resolver: AbstractGenericUDAFResolver = if (isSimpleUDAF) { + // if it's the Simple UDAF, we need the UDAF bridge + new GenericUDAFBridge(funcWrapper.createFunction()) + } else { + funcWrapper.createFunction() + } + + // Output data object inspector + @transient + lazy val objectInspector = createEvaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) + + // Aggregation Buffer Inspector + @transient + lazy val bufferObjectInspector = { + createEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors) + } + + // Input arguments object inspectors + @transient + lazy val inspectors = children.map(toInspector).toArray + + private val distinctLike: Boolean = { + val annotation = evaluator.getClass().getAnnotation(classOf[HiveUDFType]) + if (annotation == null || !annotation.distinctLike()) false else true + } + override def toString: String = + s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" + + // Aggregation Buffer Data Type, We assume only 1 element for the Hive Aggregation Buffer + // It will be StructType if more than 1 element (Actually will be StructSettableObjectInspector) + override def bufferDataType: Seq[DataType] = inspectorToDataType(bufferObjectInspector) :: Nil + + // Output data type + override def dataType: DataType = inspectorToDataType(objectInspector) + + /////////////////////////////////////////////////////////////////////////////////////////////// + // The following code will be called within the executors // + /////////////////////////////////////////////////////////////////////////////////////////////// + @transient var bound: BoundReference = _ + + override def initialize(buffers: Seq[BoundReference]): Unit = { + bound = buffers(0) + mode match { + case FINAL => evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(bufferObjectInspector)) + case COMPLETE => evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) + case PARTIAL1 => evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors) + } + } + + // Initialize (reinitialize) the aggregation buffer + override def reset(buf: MutableRow): Unit = { + val buffer = evaluator.getNewAggregationBuffer + evaluator.reset(buffer) + // This is a hack, we never use the mutable row as buffer, but define our own buffer, + // which is set as the first element of the buffer + buf(bound.ordinal) = buffer + } + + // Expect the aggregate function fills the aggregation buffer when fed with each value + // in the group + override def update(input: Row, buf: MutableRow, seen: JSet[Any]): Unit = { + val arguments = children.map(_.eval(input)) + // We assume the memory is much more critical than computation, + // so we prefer computation other than put the into a in-memory Set + // when the UDAF is distinct-Like + if (distinctLike || !distinct || !seen.contains(arguments)) { + val args = arguments.zip(inspectors).map { + case (value, oi) => wrap(value, oi) + }.toArray + + evaluator.iterate( + buf.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal), args) + if (distinct && !distinctLike) seen.add(arguments) + } + } + + // Merge 2 aggregation buffer, and write back to the later one + override def merge(value: Row, buf: MutableRow): Unit = { + val buffer = buf.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal) + evaluator.merge(buffer, wrap(value.get(bound.ordinal), bufferObjectInspector)) + } + + @deprecated + override def terminatePartial(buf: MutableRow): Unit = { + val buffer = buf.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal) + // this is for serialization + buf(bound.ordinal) = unwrap(evaluator.terminatePartial(buffer), bufferObjectInspector) + } + + // Output the final result by feeding the aggregation buffer + override def terminate(input: Row): Any = { + unwrap(evaluator.terminate( + input.getAs[GenericUDAFEvaluator.AggregationBuffer](bound.ordinal)), + objectInspector) + } +} + /** * Converts a Hive Generic User Defined Table Generating Function (UDTF) to a * [[catalyst.expressions.Generator Generator]]. Note that the semantics of Generators do not allow @@ -587,3 +703,13 @@ private[hive] case class HiveUdafFunction( } } +private[hive] object HiveAggregateExpressionSubsitution extends AggregateExpressionSubsitution { + override def subsitute(aggr: AggregateExpression): AggregateExpression2 = aggr match { + // TODO: we don't support distinct for Hive UDAF(Generic) yet from the HiveQL Parser yet + case HiveGenericUdaf(funcWrapper, children) => + HiveGenericUdaf2(funcWrapper, children, distinct = false, isSimpleUDAF = false) + case HiveUdaf(funcWrapper, children) => + HiveGenericUdaf2(funcWrapper, children, distinct = false, isSimpleUDAF = true) + case _ => super.subsitute(aggr) + } +} diff --git a/sql/hive/src/test/resources/golden/aggregation with group by expressions #1-0-f94fcc218d98298e058589b40b66e54a b/sql/hive/src/test/resources/golden/aggregation with group by expressions #1-0-f94fcc218d98298e058589b40b66e54a new file mode 100644 index 000000000000..aa9769a5e9ab --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation with group by expressions #1-0-f94fcc218d98298e058589b40b66e54a @@ -0,0 +1,5 @@ +3 3 0 0 +5 1 2 2 +7 1 4 4 +8 3 5 5 +11 1 8 8 diff --git a/sql/hive/src/test/resources/golden/aggregation with group by expressions #2-0-9e3a5b01c29dc63023bde64d85c2b7e7 b/sql/hive/src/test/resources/golden/aggregation with group by expressions #2-0-9e3a5b01c29dc63023bde64d85c2b7e7 new file mode 100644 index 000000000000..26ae72edc75d --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation with group by expressions #2-0-9e3a5b01c29dc63023bde64d85c2b7e7 @@ -0,0 +1,5 @@ +3 3 0 0 0 +5 1 2 2 2 +7 1 4 4 4 +8 3 5 5 15 +11 1 8 8 8 diff --git a/sql/hive/src/test/resources/golden/aggregation with group by expressions #3-0-bb30af32082fac87c7e2720e40978c87 b/sql/hive/src/test/resources/golden/aggregation with group by expressions #3-0-bb30af32082fac87c7e2720e40978c87 new file mode 100644 index 000000000000..59abfa88c9da --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation with group by expressions #3-0-bb30af32082fac87c7e2720e40978c87 @@ -0,0 +1,5 @@ +3 1 0 0 0 +5 1 2 2 2 +7 1 4 4 4 +8 1 5 5 5 +11 1 8 8 8 diff --git a/sql/hive/src/test/resources/golden/aggregation with group by expressions #5-0-68ffb9106a13d35ba8c36741e894a93f b/sql/hive/src/test/resources/golden/aggregation with group by expressions #5-0-68ffb9106a13d35ba8c36741e894a93f new file mode 100644 index 000000000000..29bba1d1d6ea --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation with group by expressions #5-0-68ffb9106a13d35ba8c36741e894a93f @@ -0,0 +1,5 @@ +6 4 6 6 6 +10 6 10 10 10 +14 8 14 14 14 +16 9 16 16 16 +22 12 22 22 22 diff --git a/sql/hive/src/test/resources/golden/aggregation with group by expressions #6-0-5f3e67d7a3abd388c85220eb3af07976 b/sql/hive/src/test/resources/golden/aggregation with group by expressions #6-0-5f3e67d7a3abd388c85220eb3af07976 new file mode 100644 index 000000000000..1a4fc65a86c4 --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation with group by expressions #6-0-5f3e67d7a3abd388c85220eb3af07976 @@ -0,0 +1,5 @@ +0.0 0.0 +0.0 0.0 +0.0 0.0 +0.0 0.0 +0.0 0.0 diff --git a/sql/hive/src/test/resources/golden/aggregation with group by expressions #8-0-d9161a0e40862ba94a35e5b65daea51a b/sql/hive/src/test/resources/golden/aggregation with group by expressions #8-0-d9161a0e40862ba94a35e5b65daea51a new file mode 100644 index 000000000000..0daf0e02b136 --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation with group by expressions #8-0-d9161a0e40862ba94a35e5b65daea51a @@ -0,0 +1,5 @@ +4 +6 +8 +9 +12 diff --git a/sql/hive/src/test/resources/golden/aggregation without group by expressions #1-0-30038eb221d9d91ff4a098a57c1a5f9 b/sql/hive/src/test/resources/golden/aggregation without group by expressions #1-0-30038eb221d9d91ff4a098a57c1a5f9 new file mode 100644 index 000000000000..5111dd17161f --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation without group by expressions #1-0-30038eb221d9d91ff4a098a57c1a5f9 @@ -0,0 +1 @@ +500 498 0 diff --git a/sql/hive/src/test/resources/golden/aggregation without group by expressions #2-0-75a3974aac80b9c47f23519da6a68876 b/sql/hive/src/test/resources/golden/aggregation without group by expressions #2-0-75a3974aac80b9c47f23519da6a68876 new file mode 100644 index 000000000000..b4d2e5cc256d --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation without group by expressions #2-0-75a3974aac80b9c47f23519da6a68876 @@ -0,0 +1 @@ +500 498 0 130091 diff --git a/sql/hive/src/test/resources/golden/aggregation without group by expressions #3-0-8341e7bf739124bef28729aabb9fe542 b/sql/hive/src/test/resources/golden/aggregation without group by expressions #3-0-8341e7bf739124bef28729aabb9fe542 new file mode 100644 index 000000000000..276664a61678 --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation without group by expressions #3-0-8341e7bf739124bef28729aabb9fe542 @@ -0,0 +1 @@ +309 498 0 79136 diff --git a/sql/hive/src/test/resources/golden/aggregation without group by expressions #4-0-8341e7bf739124bef28729aabb9fe542 b/sql/hive/src/test/resources/golden/aggregation without group by expressions #4-0-8341e7bf739124bef28729aabb9fe542 new file mode 100644 index 000000000000..276664a61678 --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation without group by expressions #4-0-8341e7bf739124bef28729aabb9fe542 @@ -0,0 +1 @@ +309 498 0 79136 diff --git a/sql/hive/src/test/resources/golden/aggregation without group by expressions #5-0-1e35f970b831ecfffdaff828428aea51 b/sql/hive/src/test/resources/golden/aggregation without group by expressions #5-0-1e35f970b831ecfffdaff828428aea51 new file mode 100644 index 000000000000..ce71b00ee105 --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation without group by expressions #5-0-1e35f970b831ecfffdaff828428aea51 @@ -0,0 +1 @@ +503 499 2 130096 diff --git a/sql/hive/src/test/resources/golden/aggregation without group by expressions #6-0-9f51fa0a008712e35c70d7187a55ee35 b/sql/hive/src/test/resources/golden/aggregation without group by expressions #6-0-9f51fa0a008712e35c70d7187a55ee35 new file mode 100644 index 000000000000..418739f242dd --- /dev/null +++ b/sql/hive/src/test/resources/golden/aggregation without group by expressions #6-0-9f51fa0a008712e35c70d7187a55ee35 @@ -0,0 +1 @@ +313 500 3 79140 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregateSuite.scala new file mode 100644 index 000000000000..d1c0502bfa3e --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregateSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.hive.test.TestHive +import org.scalatest.BeforeAndAfter + +class AggregateSuite extends HiveComparisonTest with BeforeAndAfter { + override def beforeAll() { + TestHive.cacheTables = true + TestHive.setConf(SQLConf.AGGREGATE_2, "true") + } + + override def afterAll() { + TestHive.cacheTables = false + TestHive.setConf(SQLConf.AGGREGATE_2, "false") + } + + createQueryTest("aggregation without group by expressions #1", + """ + |SELECT + | count(value), + | max(key), + | min(key) + |FROM src + """.stripMargin, false) + + createQueryTest("aggregation without group by expressions #2", + """ + |SELECT + | count(value), + | max(key), + | min(key), + | sum(key) + |FROM src + """.stripMargin, false) + + createQueryTest("aggregation without group by expressions #3", + """ + |SELECT + | count(distinct value), + | max(key), + | min(key), + | sum(distinct key) + |FROM src + """.stripMargin, false) + + createQueryTest("aggregation without group by expressions #4", + """ + |SELECT + | count(distinct value), + | max(key), + | min(key), + | sum(distinct key) + |FROM src + """.stripMargin, false) + + createQueryTest("aggregation without group by expressions #5", + """ + |SELECT + | count(value) + 3, + | max(key) + 1, + | min(key) + 2, + | sum(key) + 5 + |FROM src + """.stripMargin, false) + + createQueryTest("aggregation without group by expressions #6", + """ + |SELECT + | count(distinct value) + 4, + | max(key) + 2, + | min(key) + 3, + | sum(distinct key) + 4 + |FROM src + """.stripMargin, false) + + createQueryTest("aggregation with group by expressions #1", + """ + |SELECT key + 3 as a, count(value), max(key), min(key) + |FROM src group by key, value + |ORDER BY a LIMIT 5 + """.stripMargin, false) + + createQueryTest("aggregation with group by expressions #2", + """ + |SELECT + | key + 3 as a, + | count(value), + | max(key), + | min(key), + | sum(key) + |FROM src + |GROUP BY key, value + |ORDER BY a LIMIT 5 + """.stripMargin, false) + + createQueryTest("aggregation with group by expressions #3", + """ + |SELECT + | key + 3 as a, + | count(distinct value), + | max(key), min(key), + | sum(distinct key) + |FROM src + |GROUP BY key, value + |ORDER BY a LIMIT 5 + """.stripMargin, false) + + createQueryTest("aggregation with group by expressions #5", + """ + |SELECT + | (key + 3) * 2 as a, + | (key + 3) + count(distinct value), + | (key + 3) + max(key + 3), + | (key + 3) + min(key + 3), + | (key + 3) + sum(distinct (key + 3)) + |FROM src + |GROUP BY key + 3, value + |ORDER BY a LIMIT 5 + """.stripMargin, false) + + createQueryTest("aggregation with group by expressions #6", + """ + |SELECT + | stddev_pop(key) as a, + | stddev_samp(key) as b + |FROM src + |GROUP BY key + 3, value + |ORDER BY a, b LIMIT 5 + """.stripMargin, false) + +// TODO currently the parser doesn't support the distinct +// in Hive UDAF +// createQueryTest("aggregation with group by expressions #7", +// """ +// |SELECT +// | stddev_pop(distinct key) as a, +// | stddev_samp(distinct key) as b +// |FROM src +// |GROUP BY key + 3, value +// |ORDER BY a, b LIMIT 5 +// """.stripMargin, false) + + createQueryTest("aggregation with group by expressions #8", + """ + |SELECT + | (key + 3) + count(distinct value, key) as a + |FROM src + |GROUP BY key + 3, value + |ORDER BY a LIMIT 5 + """.stripMargin, false) +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 6d8d99ebc816..9b295deeb1b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.{SparkFiles, SparkException} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.{SQLConf, AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.hive._ @@ -47,6 +47,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { import org.apache.spark.sql.hive.test.TestHive.implicits._ override def beforeAll() { + TestHive.reset() TestHive.cacheTables = true // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) @@ -1134,3 +1135,94 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // for SPARK-2180 test case class HavingRow(key: Int, value: String, attr: Int) + +// TODO ideally we should make this class inherit from HiveQuerySuite. +// However the tables/configuration cannot be reset, which causes +// exceptions like the table already existed etc. +class HiveNewUDAFQuerySuite extends HiveComparisonTest with BeforeAndAfter { + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + + import org.apache.spark.sql.hive.test.TestHive.implicits._ + + override def beforeAll() { + TestHive.reset() + TestHive.cacheTables = true + // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + // Add Locale setting + Locale.setDefault(Locale.US) + TestHive.setConf(SQLConf.AGGREGATE_2, "true") + } + + override def afterAll() { + TestHive.cacheTables = false + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + TestHive.setConf(SQLConf.AGGREGATE_2, "false") + } + + def isExplanation(result: DataFrame): Boolean = { + val explanation = result.select('plan).collect().map { case Row(plan: String) => plan } + explanation.contains("== Physical Plan ==") + } + + createQueryTest("having no references", + "SELECT key FROM src GROUP BY key HAVING COUNT(*) > 1") + + createQueryTest("Constant Folding Optimization for AVG_SUM_COUNT", + "SELECT AVG(0), SUM(0), COUNT(null), COUNT(value) FROM src GROUP BY key") + + test("SPARK-1704: Explain commands as a DataFrame") { + sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + + val df = sql("explain select key, count(value) from src group by key") + assert(isExplanation(df)) + + TestHive.reset() + } + + test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") { + val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3)) + .zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)} + TestHive.sparkContext.parallelize(fixture).toDF().registerTempTable("having_test") + val results = + sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3") + .collect() + .map(x => Pair(x.getString(0), x.getInt(1))) + + assert(results === Array(Pair("foo", 4))) + TestHive.reset() + } + + test("SPARK-2180: HAVING with non-boolean clause raises no exceptions") { + sql("select key, count(*) c from src group by key having c").collect() + } + + test("SPARK-2225: turn HAVING without GROUP BY into a simple filter") { + assert(sql("select key from src having key > 490").collect().size < 100) + } + + test("Query Hive native command execution result") { + val databaseName = "test_native_commands" + + assertResult(0) { + sql(s"DROP DATABASE IF EXISTS $databaseName").count() + } + + assertResult(0) { + sql(s"CREATE DATABASE $databaseName").count() + } + + assert( + sql("SHOW DATABASES") + .select('result) + .collect() + .map(_.getString(0)) + .contains(databaseName)) + + assert(isExplanation(sql(s"EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key"))) + + TestHive.reset() + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 40a35674e4cb..bd80f146ea9f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import org.scalatest.BeforeAndAfter + import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException @@ -28,6 +30,7 @@ import org.apache.spark.sql.hive.{HiveQLDialect, MetastoreRelation} import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -905,3 +908,48 @@ class SQLQuerySuite extends QueryTest { } } } + +// TODO ideally we should make this class inherit from SQLQuerySuite. +// However the tables/configuration cannot be reset, which causes +// exceptions like the table already existed etc. +class SQLNewUDAFQuerySuite extends QueryTest with BeforeAndAfter { + def beforeAll() { + TestHive.setConf(SQLConf.AGGREGATE_2, "true") + } + + def afterAll() { + TestHive.setConf(SQLConf.AGGREGATE_2, "false") + } + + test("ordering not in agg") { + checkAnswer( + sql("SELECT key FROM src GROUP BY key, value ORDER BY value"), + sql( """ + SELECT key + FROM ( + SELECT key, value + FROM src + GROUP BY key, value + ORDER BY value) a""").collect().toSeq) + } + + test("SPARK-2554 SumDistinct partial aggregation") { + checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"), + sql("SELECT distinct key FROM src order by key").collect().toSeq) + } + + test("SPARK-4296 Grouping field with Hive UDF as sub expression") { + val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil) + jsonRDD(rdd).registerTempTable("data") + checkAnswer( + sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"), + Row("str-1", 1970)) + + dropTempTable("data") + + jsonRDD(rdd).registerTempTable("data") + checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971)) + + dropTempTable("data") + } +}