From 1b075567ee4f5a4e3bfece94b301f4a3ceb299cb Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Sun, 14 Jun 2015 12:10:03 +0900 Subject: [PATCH 1/9] [SPARK-8357] [SQL] Memory leakage on unsafe aggregation path with empty input --- .../org/apache/spark/sql/execution/GeneratedAggregate.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index ba2c8f53d702d..383df3635f905 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -270,7 +270,9 @@ case class GeneratedAggregate( val joinedRow = new JoinedRow3 - if (groupingExpressions.isEmpty) { + if (!iter.hasNext) { + Iterator[InternalRow]() + } else if (groupingExpressions.isEmpty) { // TODO: Codegening anything other than the updateProjection is probably over kill. val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow] var currentRow: InternalRow = null From d396589c8669c2bbb38a1d025261d486e15f0c0a Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Sun, 14 Jun 2015 14:14:13 +0900 Subject: [PATCH 2/9] added comments --- .../org/apache/spark/sql/execution/GeneratedAggregate.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 383df3635f905..c1f2efa81c244 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -271,6 +271,7 @@ case class GeneratedAggregate( val joinedRow = new JoinedRow3 if (!iter.hasNext) { + // unsafe aggregation buffer is not released if input is empty (see SPARK-8357) Iterator[InternalRow]() } else if (groupingExpressions.isEmpty) { // TODO: Codegening anything other than the updateProjection is probably over kill. @@ -286,6 +287,7 @@ case class GeneratedAggregate( val resultProjection = resultProjectionBuilder() Iterator(resultProjection(buffer)) } else if (unsafeEnabled && schemaSupportsUnsafe) { + assert(iter.hasNext, "There should be at least one row for this path") log.info("Using Unsafe-based aggregator") val aggregationMap = new UnsafeFixedWidthAggregationMap( newAggregationBuffer(EmptyRow), From 15c5afc1a1dcc052e248484f937ef0566aa6e6d4 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Mon, 22 Jun 2015 17:29:18 +0900 Subject: [PATCH 3/9] added a test as suggested by JoshRosen --- .../spark/sql/execution/AggregateSuite.scala | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala new file mode 100644 index 0000000000000..88620baddb520 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.types.DataTypes._ + +class AggregateSuite extends SparkPlanTest { + + test("SPARK-8357 Memory leakage on unsafe aggregation path with empty input") { + + val df = Seq.empty[(String, Int, Double)].toDF("a", "b", "c") + + val groupExpr = df.col("b").expr + val aggrExpr = Alias(Count(Cast(groupExpr, LongType)), "Count")() + + SparkEnv.get.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true") + for (codegen <- Seq(false, true); partial <- Seq(false, true); unsafe <- Seq(false, true)) { + TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) + checkAnswer( + df, + GeneratedAggregate(partial, groupExpr :: Nil, aggrExpr :: Nil, unsafe, _: SparkPlan), + Seq.empty[(String, Int, Double)]) + } + } +} From 4d326b908952e1c67eb0abc1c8a5e87375e5f519 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Mon, 22 Jun 2015 22:38:36 +0900 Subject: [PATCH 4/9] fixed test fails --- .../apache/spark/sql/execution/GeneratedAggregate.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index c1f2efa81c244..8be63c677845a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -270,10 +270,8 @@ case class GeneratedAggregate( val joinedRow = new JoinedRow3 - if (!iter.hasNext) { - // unsafe aggregation buffer is not released if input is empty (see SPARK-8357) - Iterator[InternalRow]() - } else if (groupingExpressions.isEmpty) { + if (groupingExpressions.isEmpty) { + // even with the empty input, value of empty buffer should be forwarded // TODO: Codegening anything other than the updateProjection is probably over kill. val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow] var currentRow: InternalRow = null @@ -286,7 +284,10 @@ case class GeneratedAggregate( val resultProjection = resultProjectionBuilder() Iterator(resultProjection(buffer)) + } else if (!iter.hasNext) { + Iterator[InternalRow]() } else if (unsafeEnabled && schemaSupportsUnsafe) { + // unsafe aggregation buffer is not released if input is empty (see SPARK-8357) assert(iter.hasNext, "There should be at least one row for this path") log.info("Using Unsafe-based aggregator") val aggregationMap = new UnsafeFixedWidthAggregationMap( From 51178e829e6321c9bd06424c81655abc1e90ed19 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 23 Jun 2015 10:45:26 +0900 Subject: [PATCH 5/9] addressed comments --- .../main/scala/org/apache/spark/sql/SQLConf.scala | 4 ++++ .../spark/sql/execution/GeneratedAggregate.scala | 8 ++++---- .../apache/spark/sql/test/TestSQLContext.scala | 15 +++++++++++++++ .../spark/sql/execution/AggregateSuite.scala | 5 ++--- .../spark/sql/execution/SparkPlanTest.scala | 10 ++++++++++ 5 files changed, 35 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9a10a23937fbb..b3fb097fb02cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -550,6 +550,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf { props.foreach { case (k, v) => setConfString(k, v) } } + def setConf(props: Map[String, String]): Unit = settings.synchronized { + props.foreach { case (k, v) => setConfString(k, v) } + } + /** Set the given Spark SQL configuration property using a `string` value. */ def setConfString(key: String, value: String): Unit = { require(key != null, "key cannot be null") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 8be63c677845a..9b4ddc3ac2d94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -270,8 +270,10 @@ case class GeneratedAggregate( val joinedRow = new JoinedRow3 - if (groupingExpressions.isEmpty) { - // even with the empty input, value of empty buffer should be forwarded + if (!iter.hasNext && (partial || groupingExpressions.nonEmpty)) { + // even with empty input, final-global groupby should forward value of empty buffer + Iterator[InternalRow]() + } else if (groupingExpressions.isEmpty) { // TODO: Codegening anything other than the updateProjection is probably over kill. val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow] var currentRow: InternalRow = null @@ -284,8 +286,6 @@ case class GeneratedAggregate( val resultProjection = resultProjectionBuilder() Iterator(resultProjection(buffer)) - } else if (!iter.hasNext) { - Iterator[InternalRow]() } else if (unsafeEnabled && schemaSupportsUnsafe) { // unsafe aggregation buffer is not released if input is empty (see SPARK-8357) assert(iter.hasNext, "There should be at least one row for this path") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 9fa394525d65c..1e8ec9ab81b60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.test +import scala.collection.immutable import scala.language.implicitConversions import org.apache.spark.{SparkConf, SparkContext} @@ -36,9 +37,18 @@ class LocalSQLContext } protected[sql] class SQLSession extends super.SQLSession { + var backup: immutable.Map[String, String] = null protected[sql] override lazy val conf: SQLConf = new SQLConf { /** Fewer partitions to speed up testing. */ override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5) + backup = getAllConfs + } + + protected[sql] def reset() = { + if (backup != null) { + conf.clear() + conf.setConf(backup) + } } } @@ -50,6 +60,11 @@ class LocalSQLContext DataFrame(this, plan) } + /** + * Reset session conf to initial state + */ + protected[sql] def resetConf(): Unit = currentSession().asInstanceOf[SQLSession].reset + } object TestSQLContext extends LocalSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala index 88620baddb520..fb088028f9a3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import org.apache.spark.SparkEnv import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.types.DataTypes._ @@ -31,8 +30,8 @@ class AggregateSuite extends SparkPlanTest { val groupExpr = df.col("b").expr val aggrExpr = Alias(Count(Cast(groupExpr, LongType)), "Count")() - SparkEnv.get.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true") - for (codegen <- Seq(false, true); partial <- Seq(false, true); unsafe <- Seq(false, true)) { + for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); + partial <- Seq(false, true)) { TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) checkAnswer( df, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 13f3be8ca28d6..d454ca24dcd0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import org.scalatest.Tag + import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -43,6 +45,14 @@ class SparkPlanTest extends SparkFunSuite { TestSQLContext.implicits.localSeqToDataFrameHolder(data) } + protected override def test(testName: String, testTags: Tag*)(testFun: => Unit): Unit = { + try { + super.test(testName, testTags: _*)(testFun) + } finally { + TestSQLContext.resetConf() + } + } + /** * Runs the plan and makes sure the answer matches the expected result. * @param input the input data to be used. From 1a02a5550fd579c704eb1ea282344f96479c3613 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 23 Jun 2015 16:29:53 +0900 Subject: [PATCH 6/9] Rolled-back test-conf cleanup & fixed possible CCE & added more tests --- .../sql/catalyst/expressions/Projection.scala | 10 +++--- .../scala/org/apache/spark/sql/SQLConf.scala | 4 --- .../sql/execution/GeneratedAggregate.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 5 +-- .../sql/parquet/ParquetTableOperations.scala | 2 +- .../spark/sql/test/TestSQLContext.scala | 15 -------- .../spark/sql/execution/AggregateSuite.scala | 34 ++++++++++++------- .../spark/sql/execution/SparkPlanTest.scala | 10 ------ 8 files changed, 33 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index fcfe83ceb863a..6514694087ec1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -22,9 +22,11 @@ package org.apache.spark.sql.catalyst.expressions * @param expressions a sequence of expressions that determine the value of each column of the * output row. */ -class InterpretedProjection(expressions: Seq[Expression]) extends Projection { - def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = - this(expressions.map(BindReferences.bindReference(_, inputSchema))) +class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false) + extends Projection { + def this(expressions: Seq[Expression], + inputSchema: Seq[Attribute], mutableRow: Boolean = false) = + this(expressions.map(BindReferences.bindReference(_, inputSchema)), mutableRow) // null check is required for when Kryo invokes the no-arg constructor. protected val exprArray = if (expressions != null) expressions.toArray else null @@ -36,7 +38,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { outputArray(i) = exprArray(i).eval(input) i += 1 } - new GenericInternalRow(outputArray) + if (mutableRow) new GenericMutableRow(outputArray) else new GenericInternalRow(outputArray) } override def toString: String = s"Row => [${exprArray.mkString(",")}]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b3fb097fb02cf..9a10a23937fbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -550,10 +550,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { props.foreach { case (k, v) => setConfString(k, v) } } - def setConf(props: Map[String, String]): Unit = settings.synchronized { - props.foreach { case (k, v) => setConfString(k, v) } - } - /** Set the given Spark SQL configuration property using a `string` value. */ def setConfString(key: String, value: String): Unit = { require(key != null, "key cannot be null") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 9b4ddc3ac2d94..10cd29f6f7bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -246,7 +246,7 @@ case class GeneratedAggregate( child.execute().mapPartitions { iter => // Builds a new custom class for holding the results of aggregation for a group. val initialValues = computeFunctions.flatMap(_.initialValues) - val newAggregationBuffer = newProjection(initialValues, child.output) + val newAggregationBuffer = newProjection(initialValues, child.output, mutableRow = true) log.info(s"Initial values: ${initialValues.mkString(",")}") // A projection that computes the group given an input tuple. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 47f56b2b7ebe6..99f8e9433c919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -153,13 +153,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } protected def newProjection( - expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { + expressions: Seq[Expression], + inputSchema: Seq[Attribute], mutableRow: Boolean = false): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if (codegenEnabled && expressions.forall(_.isThreadSafe)) { GenerateProjection.generate(expressions, inputSchema) } else { - new InterpretedProjection(expressions, inputSchema) + new InterpretedProjection(expressions, inputSchema, mutableRow) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index b30fc171c0af1..3031886700225 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.sql.types.StructType -import org.apache.spark.{Logging, TaskContext} +import org.apache.spark.{SparkContext, Logging, TaskContext} import org.apache.spark.util.SerializableConfiguration /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 1e8ec9ab81b60..9fa394525d65c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.test -import scala.collection.immutable import scala.language.implicitConversions import org.apache.spark.{SparkConf, SparkContext} @@ -37,18 +36,9 @@ class LocalSQLContext } protected[sql] class SQLSession extends super.SQLSession { - var backup: immutable.Map[String, String] = null protected[sql] override lazy val conf: SQLConf = new SQLConf { /** Fewer partitions to speed up testing. */ override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5) - backup = getAllConfs - } - - protected[sql] def reset() = { - if (backup != null) { - conf.clear() - conf.setConf(backup) - } } } @@ -60,11 +50,6 @@ class LocalSQLContext DataFrame(this, plan) } - /** - * Reset session conf to initial state - */ - protected[sql] def resetConf(): Unit = currentSession().asInstanceOf[SQLSession].reset - } object TestSQLContext extends LocalSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala index fb088028f9a3b..4b74b636982e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -25,18 +25,28 @@ class AggregateSuite extends SparkPlanTest { test("SPARK-8357 Memory leakage on unsafe aggregation path with empty input") { - val df = Seq.empty[(String, Int, Double)].toDF("a", "b", "c") - - val groupExpr = df.col("b").expr - val aggrExpr = Alias(Count(Cast(groupExpr, LongType)), "Count")() - - for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); - partial <- Seq(false, true)) { - TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) - checkAnswer( - df, - GeneratedAggregate(partial, groupExpr :: Nil, aggrExpr :: Nil, unsafe, _: SparkPlan), - Seq.empty[(String, Int, Double)]) + val input = Seq.empty[(String, Int, Double)] + val df = input.toDF("a", "b", "c") + + val colB = df.col("b").expr + val colC = df.col("c").expr + val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")() + + // hack : current default parallelism of test local backend is two + val two = Seq(Tuple1(0L), Tuple1(0L)) + val empty = Seq.empty[Tuple1[Long]] + + val codegenDefault = TestSQLContext.conf.getConfString("spark.sql.codegen") + try { + for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); + partial <- Seq(false, true); groupExpr <- Seq(colB :: Nil, Seq.empty)) { + TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) + checkAnswer(df, + GeneratedAggregate(partial, groupExpr, aggrExpr :: Nil, unsafe, _: SparkPlan), + if (groupExpr.isEmpty && !partial) two else empty) + } + } finally { + TestSQLContext.conf.setConfString("spark.sql.codegen", codegenDefault) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index d454ca24dcd0a..13f3be8ca28d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import org.scalatest.Tag - import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -45,14 +43,6 @@ class SparkPlanTest extends SparkFunSuite { TestSQLContext.implicits.localSeqToDataFrameHolder(data) } - protected override def test(testName: String, testTags: Tag*)(testFun: => Unit): Unit = { - try { - super.test(testName, testTags: _*)(testFun) - } finally { - TestSQLContext.resetConf() - } - } - /** * Runs the plan and makes sure the answer matches the expected result. * @param input the input data to be used. From 735972f8d894ddc6f302b9a1aaa3cd225b53a6cd Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 23 Jun 2015 16:38:53 +0900 Subject: [PATCH 7/9] used new conf apis --- .../org/apache/spark/sql/execution/AggregateSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala index 4b74b636982e8..01f902aa655be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.types.DataTypes._ @@ -36,17 +37,17 @@ class AggregateSuite extends SparkPlanTest { val two = Seq(Tuple1(0L), Tuple1(0L)) val empty = Seq.empty[Tuple1[Long]] - val codegenDefault = TestSQLContext.conf.getConfString("spark.sql.codegen") + val codegenDefault = TestSQLContext.getConf(SQLConf.CODEGEN_ENABLED) try { for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); partial <- Seq(false, true); groupExpr <- Seq(colB :: Nil, Seq.empty)) { - TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) + TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegen) checkAnswer(df, GeneratedAggregate(partial, groupExpr, aggrExpr :: Nil, unsafe, _: SparkPlan), if (groupExpr.isEmpty && !partial) two else empty) } } finally { - TestSQLContext.conf.setConfString("spark.sql.codegen", codegenDefault) + TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault) } } } From 143e1ef8a50ca4752b2b129cbd0097ee22c3caaa Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 26 Jun 2015 17:52:39 +0900 Subject: [PATCH 8/9] fixed format & added test for CCE case --- .../sql/catalyst/expressions/Projection.scala | 8 +++-- .../sql/parquet/ParquetTableOperations.scala | 2 +- .../spark/sql/execution/AggregateSuite.scala | 34 +++++++++++-------- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 6514694087ec1..66cdfd91cd831 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -24,9 +24,13 @@ package org.apache.spark.sql.catalyst.expressions */ class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false) extends Projection { - def this(expressions: Seq[Expression], - inputSchema: Seq[Attribute], mutableRow: Boolean = false) = + + def this( + expressions: Seq[Expression], + inputSchema: Seq[Attribute], + mutableRow: Boolean = false) = { this(expressions.map(BindReferences.bindReference(_, inputSchema)), mutableRow) + } // null check is required for when Kryo invokes the no-arg constructor. protected val exprArray = if (expressions != null) expressions.toArray else null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 3031886700225..b30fc171c0af1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.sql.types.StructType -import org.apache.spark.{SparkContext, Logging, TaskContext} +import org.apache.spark.{Logging, TaskContext} import org.apache.spark.util.SerializableConfiguration /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala index 01f902aa655be..b8ee523eb9c6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -26,25 +26,31 @@ class AggregateSuite extends SparkPlanTest { test("SPARK-8357 Memory leakage on unsafe aggregation path with empty input") { - val input = Seq.empty[(String, Int, Double)] - val df = input.toDF("a", "b", "c") - - val colB = df.col("b").expr - val colC = df.col("c").expr - val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")() + val input0 = Seq.empty[(String, Int, Double)] + val input1 = Seq(("Hello", 4, 2.0)) // hack : current default parallelism of test local backend is two - val two = Seq(Tuple1(0L), Tuple1(0L)) - val empty = Seq.empty[Tuple1[Long]] + val x0 = Seq(Tuple1(0L), Tuple1(0L)) + val y0 = Seq.empty[Tuple1[Long]] + + val x1 = Seq(Tuple1(0L), Tuple1(1L)) + val y1 = Seq(Tuple1(1L)) val codegenDefault = TestSQLContext.getConf(SQLConf.CODEGEN_ENABLED) try { - for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); - partial <- Seq(false, true); groupExpr <- Seq(colB :: Nil, Seq.empty)) { - TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegen) - checkAnswer(df, - GeneratedAggregate(partial, groupExpr, aggrExpr :: Nil, unsafe, _: SparkPlan), - if (groupExpr.isEmpty && !partial) two else empty) + for ((input, x, y) <- Seq((input0, x0, y0), (input1, x1, y1))) { + val df = input.toDF("a", "b", "c") + val colB = df.col("b").expr + val colC = df.col("c").expr + val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")() + + for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); + partial <- Seq(false, true); groupExpr <- Seq(colB :: Nil, Seq.empty)) { + TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegen) + checkAnswer(df, + GeneratedAggregate(partial, groupExpr, aggrExpr :: Nil, unsafe, _: SparkPlan), + if (groupExpr.isEmpty && !partial) x else y) + } } } finally { TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault) From c5419b37f2ece4842d8c2bd7463a50b61245fbcf Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 30 Jun 2015 11:33:15 +0900 Subject: [PATCH 9/9] addressed comments --- .../spark/sql/execution/GeneratedAggregate.scala | 8 ++++++-- .../apache/spark/sql/execution/SparkPlan.scala | 3 ++- .../spark/sql/execution/AggregateSuite.scala | 16 ++++++++-------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 10cd29f6f7bc9..5f69a5e19b2be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -64,6 +64,11 @@ case class GeneratedAggregate( } } + // even with empty input iterator, if this group-by operator is for + // global(groupingExpression.isEmpty) and final(partial=false), + // we still need to make a row from empty buffer. + def needEmptyBufferForwarded: Boolean = groupingExpressions.isEmpty && !partial + override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) protected override def doExecute(): RDD[InternalRow] = { @@ -270,8 +275,7 @@ case class GeneratedAggregate( val joinedRow = new JoinedRow3 - if (!iter.hasNext && (partial || groupingExpressions.nonEmpty)) { - // even with empty input, final-global groupby should forward value of empty buffer + if (!iter.hasNext && !needEmptyBufferForwarded) { Iterator[InternalRow]() } else if (groupingExpressions.isEmpty) { // TODO: Codegening anything other than the updateProjection is probably over kill. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 99f8e9433c919..d15ee93bd7aea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -154,7 +154,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def newProjection( expressions: Seq[Expression], - inputSchema: Seq[Attribute], mutableRow: Boolean = false): Projection = { + inputSchema: Seq[Attribute], + mutableRow: Boolean = false): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if (codegenEnabled && expressions.forall(_.isThreadSafe)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala index b8ee523eb9c6d..7c87024ce85bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -27,16 +27,17 @@ class AggregateSuite extends SparkPlanTest { test("SPARK-8357 Memory leakage on unsafe aggregation path with empty input") { val input0 = Seq.empty[(String, Int, Double)] - val input1 = Seq(("Hello", 4, 2.0)) - - // hack : current default parallelism of test local backend is two + // in the case of needEmptyBufferForwarded=true, task makes a row from empty buffer + // even with empty input. And current default parallelism of SparkPlanTest is two (local[2]) val x0 = Seq(Tuple1(0L), Tuple1(0L)) val y0 = Seq.empty[Tuple1[Long]] + val input1 = Seq(("Hello", 4, 2.0)) val x1 = Seq(Tuple1(0L), Tuple1(1L)) val y1 = Seq(Tuple1(1L)) val codegenDefault = TestSQLContext.getConf(SQLConf.CODEGEN_ENABLED) + TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, true) try { for ((input, x, y) <- Seq((input0, x0, y0), (input1, x1, y1))) { val df = input.toDF("a", "b", "c") @@ -44,12 +45,11 @@ class AggregateSuite extends SparkPlanTest { val colC = df.col("c").expr val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")() - for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); - partial <- Seq(false, true); groupExpr <- Seq(colB :: Nil, Seq.empty)) { - TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegen) + for (partial <- Seq(false, true); groupExpr <- Seq(Seq(colB), Seq.empty)) { + val aggregate = GeneratedAggregate(partial, groupExpr, Seq(aggrExpr), true, _: SparkPlan) checkAnswer(df, - GeneratedAggregate(partial, groupExpr, aggrExpr :: Nil, unsafe, _: SparkPlan), - if (groupExpr.isEmpty && !partial) x else y) + aggregate, + if (aggregate(null).needEmptyBufferForwarded) x else y) } } } finally {