diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index fcfe83ceb863a..66cdfd91cd831 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -22,9 +22,15 @@ package org.apache.spark.sql.catalyst.expressions * @param expressions a sequence of expressions that determine the value of each column of the * output row. */ -class InterpretedProjection(expressions: Seq[Expression]) extends Projection { - def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = - this(expressions.map(BindReferences.bindReference(_, inputSchema))) +class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false) + extends Projection { + + def this( + expressions: Seq[Expression], + inputSchema: Seq[Attribute], + mutableRow: Boolean = false) = { + this(expressions.map(BindReferences.bindReference(_, inputSchema)), mutableRow) + } // null check is required for when Kryo invokes the no-arg constructor. protected val exprArray = if (expressions != null) expressions.toArray else null @@ -36,7 +42,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { outputArray(i) = exprArray(i).eval(input) i += 1 } - new GenericInternalRow(outputArray) + if (mutableRow) new GenericMutableRow(outputArray) else new GenericInternalRow(outputArray) } override def toString: String = s"Row => [${exprArray.mkString(",")}]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index ba2c8f53d702d..5f69a5e19b2be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -64,6 +64,11 @@ case class GeneratedAggregate( } } + // even with empty input iterator, if this group-by operator is for + // global(groupingExpression.isEmpty) and final(partial=false), + // we still need to make a row from empty buffer. + def needEmptyBufferForwarded: Boolean = groupingExpressions.isEmpty && !partial + override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) protected override def doExecute(): RDD[InternalRow] = { @@ -246,7 +251,7 @@ case class GeneratedAggregate( child.execute().mapPartitions { iter => // Builds a new custom class for holding the results of aggregation for a group. val initialValues = computeFunctions.flatMap(_.initialValues) - val newAggregationBuffer = newProjection(initialValues, child.output) + val newAggregationBuffer = newProjection(initialValues, child.output, mutableRow = true) log.info(s"Initial values: ${initialValues.mkString(",")}") // A projection that computes the group given an input tuple. @@ -270,7 +275,9 @@ case class GeneratedAggregate( val joinedRow = new JoinedRow3 - if (groupingExpressions.isEmpty) { + if (!iter.hasNext && !needEmptyBufferForwarded) { + Iterator[InternalRow]() + } else if (groupingExpressions.isEmpty) { // TODO: Codegening anything other than the updateProjection is probably over kill. val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow] var currentRow: InternalRow = null @@ -284,6 +291,8 @@ case class GeneratedAggregate( val resultProjection = resultProjectionBuilder() Iterator(resultProjection(buffer)) } else if (unsafeEnabled && schemaSupportsUnsafe) { + // unsafe aggregation buffer is not released if input is empty (see SPARK-8357) + assert(iter.hasNext, "There should be at least one row for this path") log.info("Using Unsafe-based aggregator") val aggregationMap = new UnsafeFixedWidthAggregationMap( newAggregationBuffer(EmptyRow), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 47f56b2b7ebe6..d15ee93bd7aea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -153,13 +153,15 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } protected def newProjection( - expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { + expressions: Seq[Expression], + inputSchema: Seq[Attribute], + mutableRow: Boolean = false): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if (codegenEnabled && expressions.forall(_.isThreadSafe)) { GenerateProjection.generate(expressions, inputSchema) } else { - new InterpretedProjection(expressions, inputSchema) + new InterpretedProjection(expressions, inputSchema, mutableRow) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala new file mode 100644 index 0000000000000..7c87024ce85bb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.types.DataTypes._ + +class AggregateSuite extends SparkPlanTest { + + test("SPARK-8357 Memory leakage on unsafe aggregation path with empty input") { + + val input0 = Seq.empty[(String, Int, Double)] + // in the case of needEmptyBufferForwarded=true, task makes a row from empty buffer + // even with empty input. And current default parallelism of SparkPlanTest is two (local[2]) + val x0 = Seq(Tuple1(0L), Tuple1(0L)) + val y0 = Seq.empty[Tuple1[Long]] + + val input1 = Seq(("Hello", 4, 2.0)) + val x1 = Seq(Tuple1(0L), Tuple1(1L)) + val y1 = Seq(Tuple1(1L)) + + val codegenDefault = TestSQLContext.getConf(SQLConf.CODEGEN_ENABLED) + TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, true) + try { + for ((input, x, y) <- Seq((input0, x0, y0), (input1, x1, y1))) { + val df = input.toDF("a", "b", "c") + val colB = df.col("b").expr + val colC = df.col("c").expr + val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")() + + for (partial <- Seq(false, true); groupExpr <- Seq(Seq(colB), Seq.empty)) { + val aggregate = GeneratedAggregate(partial, groupExpr, Seq(aggrExpr), true, _: SparkPlan) + checkAnswer(df, + aggregate, + if (aggregate(null).needEmptyBufferForwarded) x else y) + } + } + } finally { + TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault) + } + } +}