From da2c82ebef5fb2a7e9279d8ce3d9364d2da61ae4 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 11 Dec 2018 12:24:28 +0100 Subject: [PATCH 1/7] [SPARK-26224][SQL] Avoid creating many project on subsequent calls to withColumn --- .../main/scala/org/apache/spark/sql/Dataset.scala | 14 +++++++------- .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++++++++++++ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a664c7338bad..3e4da9aca4b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} -import org.apache.spark.sql.catalyst.optimizer.CombineUnions +import org.apache.spark.sql.catalyst.optimizer.{CollapseProject, CombineUnions} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -2146,7 +2146,7 @@ class Dataset[T] private[sql]( * Returns a new Dataset by adding columns or replacing the existing columns that has * the same names. */ - private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { + private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = withPlan { require(colNames.size == cols.size, s"The size of column names: ${colNames.size} isn't equal to " + s"the size of columns: ${cols.size}") @@ -2164,16 +2164,16 @@ class Dataset[T] private[sql]( columnMap.find { case (colName, _) => resolver(field.name, colName) } match { - case Some((colName: String, col: Column)) => col.as(colName) - case _ => Column(field) + case Some((colName: String, col: Column)) => col.as(colName).named + case _ => field } } - val newColumns = columnMap.filter { case (colName, col) => + val newColumns = columnMap.filter { case (colName, _) => !output.exists(f => resolver(f.name, colName)) - }.map { case (colName, col) => col.as(colName) } + }.map { case (colName, col) => col.as(colName).named } - select(replacedAndExistingColumns ++ newColumns : _*) + CollapseProject(Project(replacedAndExistingColumns ++ newColumns, logicalPlan)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 525c7cef3956..c7f35df4ef6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScroogeLikeExample import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -1656,6 +1657,18 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkAnswer(df.groupBy(col("a")).agg(first(col("b"))), Seq(Row("0", BigDecimal.valueOf(0.1111)), Row("1", BigDecimal.valueOf(1.1111)))) } + + test("SPARK-26224: withColumn produces too many Projects") { + val N = 10 + val resDF = (1 to N).foldLeft(Seq(1).toDF("a")) { case (df, i) => + df.withColumn(s"col$i", lit(0)) + } + assert(resDF.queryExecution.logical.collect { + case _: Project => true + }.size == 1) + val result = Row(1 :: List.fill(N)(0): _*) + checkAnswer(resDF, result) + } } case class TestDataUnion(x: Int, y: Int, z: Int) From a40db106d4d26966af04afb79edb35a59efd1703 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 11 Dec 2018 14:19:27 +0100 Subject: [PATCH 2/7] fix UT failures --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3e4da9aca4b6..88d76d24aad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2173,7 +2173,8 @@ class Dataset[T] private[sql]( !output.exists(f => resolver(f.name, colName)) }.map { case (colName, col) => col.as(colName).named } - CollapseProject(Project(replacedAndExistingColumns ++ newColumns, logicalPlan)) + val newPlan = Project(replacedAndExistingColumns ++ newColumns, logicalPlan) + CollapseProject(sparkSession.sessionState.analyzer.execute(newPlan)) } /** From fa25e2e0cf1aea9c94907338dddd86b76f494319 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 17 Dec 2018 17:03:00 +0100 Subject: [PATCH 3/7] Revert "fix UT failures" This reverts commit a40db106d4d26966af04afb79edb35a59efd1703. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 88d76d24aad3..3e4da9aca4b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2173,8 +2173,7 @@ class Dataset[T] private[sql]( !output.exists(f => resolver(f.name, colName)) }.map { case (colName, col) => col.as(colName).named } - val newPlan = Project(replacedAndExistingColumns ++ newColumns, logicalPlan) - CollapseProject(sparkSession.sessionState.analyzer.execute(newPlan)) + CollapseProject(Project(replacedAndExistingColumns ++ newColumns, logicalPlan)) } /** From 5162b13e6b6179a6f0c295746d728d3b4974a2ae Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 17 Dec 2018 17:03:07 +0100 Subject: [PATCH 4/7] Revert "[SPARK-26224][SQL] Avoid creating many project on subsequent calls to withColumn" This reverts commit da2c82ebef5fb2a7e9279d8ce3d9364d2da61ae4. --- .../main/scala/org/apache/spark/sql/Dataset.scala | 14 +++++++------- .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 ------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3e4da9aca4b6..a664c7338bad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} -import org.apache.spark.sql.catalyst.optimizer.{CollapseProject, CombineUnions} +import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -2146,7 +2146,7 @@ class Dataset[T] private[sql]( * Returns a new Dataset by adding columns or replacing the existing columns that has * the same names. */ - private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = withPlan { + private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { require(colNames.size == cols.size, s"The size of column names: ${colNames.size} isn't equal to " + s"the size of columns: ${cols.size}") @@ -2164,16 +2164,16 @@ class Dataset[T] private[sql]( columnMap.find { case (colName, _) => resolver(field.name, colName) } match { - case Some((colName: String, col: Column)) => col.as(colName).named - case _ => field + case Some((colName: String, col: Column)) => col.as(colName) + case _ => Column(field) } } - val newColumns = columnMap.filter { case (colName, _) => + val newColumns = columnMap.filter { case (colName, col) => !output.exists(f => resolver(f.name, colName)) - }.map { case (colName, col) => col.as(colName).named } + }.map { case (colName, col) => col.as(colName) } - CollapseProject(Project(replacedAndExistingColumns ++ newColumns, logicalPlan)) + select(replacedAndExistingColumns ++ newColumns : _*) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c7f35df4ef6c..525c7cef3956 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScroogeLikeExample import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} -import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -1657,18 +1656,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkAnswer(df.groupBy(col("a")).agg(first(col("b"))), Seq(Row("0", BigDecimal.valueOf(0.1111)), Row("1", BigDecimal.valueOf(1.1111)))) } - - test("SPARK-26224: withColumn produces too many Projects") { - val N = 10 - val resDF = (1 to N).foldLeft(Seq(1).toDF("a")) { case (df, i) => - df.withColumn(s"col$i", lit(0)) - } - assert(resDF.queryExecution.logical.collect { - case _: Project => true - }.size == 1) - val result = Row(1 :: List.fill(N)(0): _*) - checkAnswer(resDF, result) - } } case class TestDataUnion(x: Int, y: Int, z: Int) From e37c2d6553b9469af0e25589e987851be42a0472 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 17 Dec 2018 17:44:50 +0100 Subject: [PATCH 5/7] warning for too many withColumn --- .../main/scala/org/apache/spark/sql/Dataset.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a664c7338bad..d274ddebec91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -32,6 +32,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.analysis._ @@ -179,7 +180,7 @@ class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @Unstable @transient val queryExecution: QueryExecution, encoder: Encoder[T]) - extends Serializable { + extends Serializable with Logging { queryExecution.assertAnalyzed() @@ -2154,6 +2155,18 @@ class Dataset[T] private[sql]( colNames, "in given column names", sparkSession.sessionState.conf.caseSensitiveAnalysis) + var numProjects = 0 + var currPlan = logicalPlan + while (currPlan.isInstanceOf[Project] && numProjects < 50) { + numProjects += 1 + currPlan = currPlan.children.head // Since it is a Project, it has 1 and only 1 child + } + if (numProjects == 50) { + logWarning("The current plan contains many projects on the top. This happens usually when " + + "using `withColumn` in a loop. Please, avoid this pattern as it can seriously affect " + + "performance and even cause OOM due to the huge size of the generated plan. Please use " + + "a single select providing all the needed rows to it instead.") + } val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output From 802cb9e1c55109d17325b21f27934692546dd62d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 14 Jan 2019 10:44:50 +0100 Subject: [PATCH 6/7] add conf --- .../apache/spark/sql/internal/SQLConf.scala | 11 ++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 26 +++++++++++-------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 86e068bf632b..345d68b18009 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1621,6 +1621,17 @@ object SQLConf { .intConf .createWithDefault(25) + val MAX_WITHCOLUMN_PROJECTS = buildConf("spark.sql.withColumn.maxProjects") + .internal() + .doc("Maximum number of projects on top of a plan when using `Dataset.withColumn`. When the " + + "threshold is exceeded, warnings are emitted in order to advice a different approach. " + + "This usually happens when adding too many columns in loops using withColumns. Indeed, " + + "this pattern can lead to serious performance issues and even OOM. Set to 0 in order to " + + "disable completely the check.") + .intConf + .checkValue(_ >= 0, "The max number of projects cannot be negative.") + .createWithDefault(50) + val SET_COMMAND_REJECTS_SPARK_CORE_CONFS = buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d274ddebec91..82657f7fc77f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -2155,17 +2156,20 @@ class Dataset[T] private[sql]( colNames, "in given column names", sparkSession.sessionState.conf.caseSensitiveAnalysis) - var numProjects = 0 - var currPlan = logicalPlan - while (currPlan.isInstanceOf[Project] && numProjects < 50) { - numProjects += 1 - currPlan = currPlan.children.head // Since it is a Project, it has 1 and only 1 child - } - if (numProjects == 50) { - logWarning("The current plan contains many projects on the top. This happens usually when " + - "using `withColumn` in a loop. Please, avoid this pattern as it can seriously affect " + - "performance and even cause OOM due to the huge size of the generated plan. Please use " + - "a single select providing all the needed rows to it instead.") + val maxProjects = sparkSession.sessionState.conf.getConf(SQLConf.MAX_WITHCOLUMN_PROJECTS) + if (maxProjects > 0) { + var numProjects = 0 + var currPlan = logicalPlan + while (currPlan.isInstanceOf[Project] && numProjects < maxProjects) { + numProjects += 1 + currPlan = currPlan.children.head // Since it is a Project, it has 1 and only 1 child + } + if (numProjects == maxProjects) { + logWarning("The current plan contains many projects on the top. This happens usually " + + "when using `withColumn` in a loop. Please, avoid this pattern as it can seriously " + + "affect performance and even cause OOM due to the huge size of the generated plan. " + + "Please use a single select providing all the needed rows to it instead.") + } } val resolver = sparkSession.sessionState.analyzer.resolver From 5d1fc00ad40dec3b811af85c13bd3c01621ba0ed Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 28 Mar 2019 14:08:55 +0100 Subject: [PATCH 7/7] address comment --- .../apache/spark/sql/internal/SQLConf.scala | 11 --------- .../scala/org/apache/spark/sql/Dataset.scala | 24 +++++-------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 345d68b18009..86e068bf632b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1621,17 +1621,6 @@ object SQLConf { .intConf .createWithDefault(25) - val MAX_WITHCOLUMN_PROJECTS = buildConf("spark.sql.withColumn.maxProjects") - .internal() - .doc("Maximum number of projects on top of a plan when using `Dataset.withColumn`. When the " + - "threshold is exceeded, warnings are emitted in order to advice a different approach. " + - "This usually happens when adding too many columns in loops using withColumns. Indeed, " + - "this pattern can lead to serious performance issues and even OOM. Set to 0 in order to " + - "disable completely the check.") - .intConf - .checkValue(_ >= 0, "The max number of projects cannot be negative.") - .createWithDefault(50) - val SET_COMMAND_REJECTS_SPARK_CORE_CONFS = buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 82657f7fc77f..674d3abca05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -32,7 +32,6 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.analysis._ @@ -52,7 +51,6 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -181,7 +179,7 @@ class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @Unstable @transient val queryExecution: QueryExecution, encoder: Encoder[T]) - extends Serializable with Logging { + extends Serializable { queryExecution.assertAnalyzed() @@ -2139,6 +2137,11 @@ class Dataset[T] private[sql]( * `column`'s expression must only refer to attributes supplied by this Dataset. It is an * error to add a column that refers to some other Dataset. * + * Please notice that this method introduces a `Project`. This means that using it in loops in + * order to add several columns can generate very big plans which can cause huge performance + * issues and even `StackOverflowException`s. A much better alternative use `select` with the + * list of columns to add. + * * @group untypedrel * @since 2.0.0 */ @@ -2156,21 +2159,6 @@ class Dataset[T] private[sql]( colNames, "in given column names", sparkSession.sessionState.conf.caseSensitiveAnalysis) - val maxProjects = sparkSession.sessionState.conf.getConf(SQLConf.MAX_WITHCOLUMN_PROJECTS) - if (maxProjects > 0) { - var numProjects = 0 - var currPlan = logicalPlan - while (currPlan.isInstanceOf[Project] && numProjects < maxProjects) { - numProjects += 1 - currPlan = currPlan.children.head // Since it is a Project, it has 1 and only 1 child - } - if (numProjects == maxProjects) { - logWarning("The current plan contains many projects on the top. This happens usually " + - "when using `withColumn` in a loop. Please, avoid this pattern as it can seriously " + - "affect performance and even cause OOM due to the huge size of the generated plan. " + - "Please use a single select providing all the needed rows to it instead.") - } - } val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output