Skip to content

Commit da84f81

Browse files
MaxGekkcloud-fan
andcommitted
[SPARK-44355][SQL] Move WithCTE into command queries
This PR is based on #42022 to fix tests, as the PR author is on vacation. ### What changes were proposed in this pull request? In the PR, I propose to add new trait `CTEInChildren` and mix it to some commands that should have `WithCTE` on top of their children (queries) instead of main query. Also I modified the `CTESubstitution` rule and removed special handling of `Command`s and similar nodes. After the changes, `Command`, `ParsedStatement` and `InsertIntoDir` are handled in the same way as other queries by referring to CTE Defs. Only the difference is in `WithCTE` is not not placed on the top of main query but on top of command queries. Closes #41922 ### Why are the changes needed? To improve code maintenance. Right now the CTE resolution code path is diverged: query with commands go into CTE inline code path where non-command queries go into CTEDef code path. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running new test: ``` $ build/sbt "test:testOnly *InsertSuite" ``` Closes #42036 from cloud-fan/help. Lead-authored-by: Max Gekk <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 304d6ac commit da84f81

File tree

18 files changed

+473
-54
lines changed

18 files changed

+473
-54
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@ package org.apache.spark.sql.catalyst.analysis
2020
import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
23-
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
23+
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.catalyst.trees.TreePattern._
2626
import org.apache.spark.sql.catalyst.util.TypeUtils._
2727
import org.apache.spark.sql.errors.QueryCompilationErrors
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy}
2930

3031
/**
3132
* Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending
3233
* on the conditions below:
33-
* 1. If in legacy mode, or if the query is a SQL command or DML statement, replace with CTE
34-
* definitions, i.e., inline CTEs.
34+
* 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs.
3535
* 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not
3636
* inline will be made later by the rule `InlineCTE` after query analysis.
3737
*
@@ -46,42 +46,62 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega
4646
* dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A,
4747
* A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an
4848
* analysis exception will be thrown later by relation resolving rules.
49+
*
50+
* If the query is a SQL command or DML statement (extends `CTEInChildren`),
51+
* place `WithCTE` into their children.
4952
*/
5053
object CTESubstitution extends Rule[LogicalPlan] {
5154
def apply(plan: LogicalPlan): LogicalPlan = {
5255
if (!plan.containsPattern(UNRESOLVED_WITH)) {
5356
return plan
5457
}
55-
val isCommand = plan.exists {
56-
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
57-
case _ => false
58+
59+
val commands = plan.collect {
60+
case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c
5861
}
62+
val forceInline = if (commands.length == 1) {
63+
if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) {
64+
// The legacy behavior always inlines the CTE relations for queries in commands.
65+
true
66+
} else {
67+
// If there is only one command and it's `CTEInChildren`, we can resolve
68+
// CTE normally and don't need to force inline.
69+
!commands.head.isInstanceOf[CTEInChildren]
70+
}
71+
} else if (commands.length > 1) {
72+
// This can happen with the multi-insert statement. We should fall back to
73+
// the legacy behavior.
74+
true
75+
} else {
76+
false
77+
}
78+
5979
val cteDefs = ArrayBuffer.empty[CTERelationDef]
6080
val (substituted, firstSubstituted) =
6181
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
6282
case LegacyBehaviorPolicy.EXCEPTION =>
6383
assertNoNameConflictsInCTE(plan)
64-
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
84+
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
6585
case LegacyBehaviorPolicy.LEGACY =>
6686
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
6787
case LegacyBehaviorPolicy.CORRECTED =>
68-
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
88+
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
6989
}
7090
if (cteDefs.isEmpty) {
7191
substituted
7292
} else if (substituted eq firstSubstituted.get) {
73-
WithCTE(substituted, cteDefs.toSeq)
93+
withCTEDefs(substituted, cteDefs.toSeq)
7494
} else {
7595
var done = false
7696
substituted.resolveOperatorsWithPruning(_ => !done) {
7797
case p if p eq firstSubstituted.get =>
7898
// `firstSubstituted` is the parent of all other CTEs (if any).
7999
done = true
80-
WithCTE(p, cteDefs.toSeq)
100+
withCTEDefs(p, cteDefs.toSeq)
81101
case p if p.children.count(_.containsPattern(CTE)) > 1 =>
82102
// This is the first common parent of all CTEs.
83103
done = true
84-
WithCTE(p, cteDefs.toSeq)
104+
withCTEDefs(p, cteDefs.toSeq)
85105
}
86106
}
87107
}
@@ -131,7 +151,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
131151
plan.resolveOperatorsUp {
132152
case UnresolvedWith(child, relations) =>
133153
val resolvedCTERelations =
134-
resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs)
154+
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
135155
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
136156
}
137157
}
@@ -168,27 +188,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
168188
* SELECT * FROM t
169189
* )
170190
* @param plan the plan to be traversed
171-
* @param isCommand if this is a command
191+
* @param forceInline always inline the CTE relations if this is true
172192
* @param outerCTEDefs already resolved outer CTE definitions with names
173193
* @param cteDefs all accumulated CTE definitions
174194
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
175195
* where CTE definitions will be gathered to
176196
*/
177197
private def traverseAndSubstituteCTE(
178198
plan: LogicalPlan,
179-
isCommand: Boolean,
199+
forceInline: Boolean,
180200
outerCTEDefs: Seq[(String, CTERelationDef)],
181201
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
182202
var firstSubstituted: Option[LogicalPlan] = None
183203
val newPlan = plan.resolveOperatorsDownWithPruning(
184204
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
185205
case UnresolvedWith(child: LogicalPlan, relations) =>
186206
val resolvedCTERelations =
187-
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++
207+
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
188208
outerCTEDefs
189209
val substituted = substituteCTE(
190-
traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1,
191-
isCommand,
210+
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
211+
forceInline,
192212
resolvedCTERelations)
193213
if (firstSubstituted.isEmpty) {
194214
firstSubstituted = Some(substituted)
@@ -206,10 +226,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
206226
private def resolveCTERelations(
207227
relations: Seq[(String, SubqueryAlias)],
208228
isLegacy: Boolean,
209-
isCommand: Boolean,
229+
forceInline: Boolean,
210230
outerCTEDefs: Seq[(String, CTERelationDef)],
211231
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
212-
var resolvedCTERelations = if (isLegacy || isCommand) {
232+
val alwaysInline = isLegacy || forceInline
233+
var resolvedCTERelations = if (alwaysInline) {
213234
Seq.empty
214235
} else {
215236
outerCTEDefs
@@ -232,12 +253,12 @@ object CTESubstitution extends Rule[LogicalPlan] {
232253
// WITH t3 AS (SELECT * FROM t1)
233254
// )
234255
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
235-
traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1
256+
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
236257
}
237258
// CTE definition can reference a previous one
238-
val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations)
259+
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
239260
val cteRelation = CTERelationDef(substituted)
240-
if (!(isLegacy || isCommand)) {
261+
if (!alwaysInline) {
241262
cteDefs += cteRelation
242263
}
243264
// Prepending new CTEs makes sure that those have higher priority over outer ones.
@@ -249,7 +270,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
249270
private def substituteCTE(
250271
plan: LogicalPlan,
251272
alwaysInline: Boolean,
252-
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan =
273+
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
253274
plan.resolveOperatorsUpWithPruning(
254275
_.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
255276
case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
@@ -273,4 +294,22 @@ object CTESubstitution extends Rule[LogicalPlan] {
273294
e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations)))
274295
}
275296
}
297+
}
298+
299+
/**
300+
* For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its
301+
* children. There are two reasons:
302+
* 1. Some rules will pattern match the root command nodes, and we should keep command
303+
* as the root node to not break them.
304+
* 2. `Dataset` eagerly executes the commands inside a query plan. For example,
305+
* sql("WITH v ... CREATE TABLE t AS SELECT * FROM v") will create the table instead of just
306+
* analyzing the command. However, the CTE references inside commands will be invalid if we
307+
* execute the command alone, as the CTE definitions are outside of the command.
308+
*/
309+
private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = {
310+
p match {
311+
case c: CTEInChildren => c.withCTEDefs(cteDefs)
312+
case _ => WithCTE(p, cteDefs)
313+
}
314+
}
276315
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ case class InsertIntoDir(
684684
provider: Option[String],
685685
child: LogicalPlan,
686686
overwrite: Boolean = true)
687-
extends UnaryNode {
687+
extends UnaryNode with CTEInChildren {
688688

689689
override def output: Seq[Attribute] = Seq.empty
690690
override def metadataOutput: Seq[Attribute] = Nil
@@ -894,6 +894,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi
894894
}
895895
}
896896

897+
/**
898+
* The logical node which is able to place the `WithCTE` node on its children.
899+
*/
900+
trait CTEInChildren extends LogicalPlan {
901+
def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
902+
withNewChildren(children.map(WithCTE(_, cteDefs)))
903+
}
904+
}
905+
906+
897907
case class WithWindowDefinition(
898908
windowDefinitions: Map[String, WindowSpecDefinition],
899909
child: LogicalPlan) extends UnaryNode {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.DataType
4040
* Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be
4141
* kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]].
4242
*/
43-
abstract class ParsedStatement extends LogicalPlan {
43+
abstract class ParsedStatement extends LogicalPlan with CTEInChildren {
4444
// Redact properties and options when parsed nodes are used by generic methods like toString
4545
override def productIterator: Iterator[Any] = super.productIterator.map {
4646
case mapArg: Map[_, _] => conf.redactOptions(mapArg)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command {
4646
/**
4747
* Base trait for DataSourceV2 write commands
4848
*/
49-
trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery {
49+
trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren {
5050
def table: NamedRelation
5151
def query: LogicalPlan
5252
def isByName: Boolean
@@ -392,9 +392,16 @@ case class WriteDelta(
392392
}
393393
}
394394

395-
trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with AnalysisOnlyCommand {
395+
trait V2CreateTableAsSelectPlan
396+
extends V2CreateTablePlan
397+
with AnalysisOnlyCommand
398+
with CTEInChildren {
396399
def query: LogicalPlan
397400

401+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
402+
withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs))
403+
}
404+
398405
override lazy val resolved: Boolean = childrenResolved && {
399406
// the table schema is created from the query schema, so the only resolution needed is to check
400407
// that the columns referenced by the table's partitioning exist in the query schema
@@ -1234,12 +1241,16 @@ case class RepairTable(
12341241
case class AlterViewAs(
12351242
child: LogicalPlan,
12361243
originalText: String,
1237-
query: LogicalPlan) extends BinaryCommand {
1244+
query: LogicalPlan) extends BinaryCommand with CTEInChildren {
12381245
override def left: LogicalPlan = child
12391246
override def right: LogicalPlan = query
12401247
override protected def withNewChildrenInternal(
12411248
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
12421249
copy(child = newLeft, query = newRight)
1250+
1251+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
1252+
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
1253+
}
12431254
}
12441255

12451256
/**
@@ -1253,12 +1264,16 @@ case class CreateView(
12531264
originalText: Option[String],
12541265
query: LogicalPlan,
12551266
allowExisting: Boolean,
1256-
replace: Boolean) extends BinaryCommand {
1267+
replace: Boolean) extends BinaryCommand with CTEInChildren {
12571268
override def left: LogicalPlan = child
12581269
override def right: LogicalPlan = query
12591270
override protected def withNewChildrenInternal(
12601271
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
12611272
copy(child = newLeft, query = newRight)
1273+
1274+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
1275+
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
1276+
}
12621277
}
12631278

12641279
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3766,6 +3766,15 @@ object SQLConf {
37663766
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
37673767
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
37683768

3769+
val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands")
3770+
.internal()
3771+
.doc("If true, always inline the CTE relations for the queries in commands. This is the " +
3772+
"legacy behavior which may produce incorrect results because Spark may evaluate a CTE " +
3773+
"relation more than once, even if it's nondeterministic.")
3774+
.version("4.0.0")
3775+
.booleanConf
3776+
.createWithDefault(false)
3777+
37693778
val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
37703779
.internal()
37713780
.doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
2424
import org.apache.spark.SparkContext
2525
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
2626
import org.apache.spark.sql.catalyst.expressions.Attribute
27-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
27+
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, UnaryCommand}
2828
import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
3030
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
@@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration
3535
/**
3636
* A special `Command` which writes data out and updates metrics.
3737
*/
38-
trait DataWritingCommand extends UnaryCommand {
38+
trait DataWritingCommand extends UnaryCommand with CTEInChildren {
3939
/**
4040
* The input query plan that produces the data to be written.
4141
* IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns

sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.catalog._
22-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
2323
import org.apache.spark.sql.errors.QueryExecutionErrors
2424
import org.apache.spark.sql.execution.datasources._
2525

@@ -42,7 +42,7 @@ case class InsertIntoDataSourceDirCommand(
4242
storage: CatalogStorageFormat,
4343
provider: String,
4444
query: LogicalPlan,
45-
overwrite: Boolean) extends LeafRunnableCommand {
45+
overwrite: Boolean) extends LeafRunnableCommand with CTEInChildren {
4646

4747
override def innerChildren: Seq[LogicalPlan] = query :: Nil
4848

@@ -76,4 +76,8 @@ case class InsertIntoDataSourceDirCommand(
7676

7777
Seq.empty[Row]
7878
}
79+
80+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
81+
copy(query = WithCTE(query, cteDefs))
82+
}
7983
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.net.URI
2121

2222
import org.apache.spark.sql._
2323
import org.apache.spark.sql.catalyst.catalog._
24-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
2525
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils}
2626
import org.apache.spark.sql.errors.QueryCompilationErrors
2727
import org.apache.spark.sql.execution.CommandExecutionMode
@@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
141141
mode: SaveMode,
142142
query: LogicalPlan,
143143
outputColumnNames: Seq[String])
144-
extends LeafRunnableCommand {
144+
extends LeafRunnableCommand with CTEInChildren {
145145
assert(query.resolved)
146146
override def innerChildren: Seq[LogicalPlan] = query :: Nil
147147

@@ -233,4 +233,8 @@ case class CreateDataSourceTableAsSelectCommand(
233233
throw ex
234234
}
235235
}
236+
237+
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
238+
copy(query = WithCTE(query, cteDefs))
239+
}
236240
}

0 commit comments

Comments
 (0)