From d9ce1688319eafbf42cfa6634637a270e8828c55 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 10 Jul 2023 15:42:59 +0300 Subject: [PATCH 1/6] Add a test --- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 48bdd799017c..9187ed048d29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -2528,6 +2528,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("SPARK-XXXXX: CTE on top of INSERT INTO") { + withTable("t") { + sql("CREATE TABLE t(i int, part1 int, part2 int) using parquet") + sql("WITH v1(c1) as (values (1)) INSERT INTO t select c1, 2, 3 from v1") + checkAnswer(spark.table("t"), Row(1, 2, 3)) + } + } + test("SELECT clause with star wildcard") { withTable("t1") { sql("CREATE TABLE t1(c1 int, c2 string) using parquet") From afa48f13bff9cede8d97e8e4d5de11d8e0a3e1d7 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 10 Jul 2023 15:43:28 +0300 Subject: [PATCH 2/6] Exchange InsertIntoStatement and WithCTE --- .../catalyst/analysis/CTESubstitution.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 4e3234f9c0dc..67231ea6dbe5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, InsertIntoStatement, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils._ @@ -52,20 +52,27 @@ object CTESubstitution extends Rule[LogicalPlan] { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } - val isCommand = plan.exists { + val isUnsupportedCommand = plan.exists { + case _: InsertIntoStatement => false case _: Command | _: ParsedStatement | _: InsertIntoDir => true case _ => false } + // New plan with CTEs moved to command's query. + val planWithCTE = plan match { + case UnresolvedWith(child: InsertIntoStatement, cteRelations) => + child.copy(query = UnresolvedWith(child.query, cteRelations)) + case _ => plan + } val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => - assertNoNameConflictsInCTE(plan) - traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) + assertNoNameConflictsInCTE(planWithCTE) + traverseAndSubstituteCTE(planWithCTE, isUnsupportedCommand, Seq.empty, cteDefs) case LegacyBehaviorPolicy.LEGACY => - (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) + (legacyTraverseAndSubstituteCTE(planWithCTE, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(planWithCTE, isUnsupportedCommand, Seq.empty, cteDefs) } if (cteDefs.isEmpty) { substituted From d487b0c0e41f74d3a2b9dec11d4b15fa38a6736f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 10 Jul 2023 19:47:00 +0300 Subject: [PATCH 3/6] Fix a test --- .../catalyst/analysis/CTESubstitution.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 67231ea6dbe5..7b9fd514e745 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -52,27 +52,26 @@ object CTESubstitution extends Rule[LogicalPlan] { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } - val isUnsupportedCommand = plan.exists { - case _: InsertIntoStatement => false - case _: Command | _: ParsedStatement | _: InsertIntoDir => true - case _ => false - } // New plan with CTEs moved to command's query. - val planWithCTE = plan match { + val (planWithCTE, isCommandWithCTE) = plan match { case UnresolvedWith(child: InsertIntoStatement, cteRelations) => - child.copy(query = UnresolvedWith(child.query, cteRelations)) - case _ => plan + (child.copy(query = UnresolvedWith(child.query, cteRelations)), true) + case _ => (plan, false) + } + val isCommand = !isCommandWithCTE && plan.exists { + case _: Command | _: ParsedStatement | _: InsertIntoDir => true + case _ => false } val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(planWithCTE) - traverseAndSubstituteCTE(planWithCTE, isUnsupportedCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(planWithCTE, isCommand, Seq.empty, cteDefs) case LegacyBehaviorPolicy.LEGACY => (legacyTraverseAndSubstituteCTE(planWithCTE, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(planWithCTE, isUnsupportedCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(planWithCTE, isCommand, Seq.empty, cteDefs) } if (cteDefs.isEmpty) { substituted From ed7b30af57600b2ca3402bd20f4d9dde0783a86a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 10 Jul 2023 20:10:10 +0300 Subject: [PATCH 4/6] Re-gen with.sql.out --- .../analyzer-results/postgreSQL/with.sql.out | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out index e53480e96bed..f58f8faa0be3 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out @@ -452,10 +452,14 @@ with test as (select 42) insert into test select * from test -- !query analysis InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/test, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/test], Append, `spark_catalog`.`default`.`test`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/test), [i] +- Project [cast(42#x as int) AS i#x] - +- Project [42#x] - +- SubqueryAlias test - +- Project [42 AS 42#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias test + : +- Project [42 AS 42#x] + : +- OneRowRelation + +- Project [42#x] + +- SubqueryAlias test + +- CTERelationRef xxxx, true, [42#x] -- !query From 86ebe377300426b3adce42225beb11276d607694 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 10 Jul 2023 20:17:22 +0300 Subject: [PATCH 5/6] SPARK-XXXXX -> SPARK-44356 --- .../test/scala/org/apache/spark/sql/sources/InsertSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 9187ed048d29..54eb4f4b2df1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -2528,7 +2528,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } - test("SPARK-XXXXX: CTE on top of INSERT INTO") { + test("SPARK-44356: CTE on top of INSERT INTO") { withTable("t") { sql("CREATE TABLE t(i int, part1 int, part2 int) using parquet") sql("WITH v1(c1) as (values (1)) INSERT INTO t select c1, 2, 3 from v1") From a999675a5d599b496a4e9b43d008e322abbc954d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 11 Jul 2023 08:35:10 +0300 Subject: [PATCH 6/6] Trigger build