Skip to content

Conversation

@maryannxue
Copy link
Contributor

@maryannxue maryannxue commented Aug 6, 2021

What changes were proposed in this pull request?

This PR fixes an existing correctness issue where a non-deterministic With-CTE can be executed multiple times producing different results, by deferring the inline of With-CTE to after the analysis stage. This fix also provides the future opportunity of performance improvement by executing deterministic With-CTEs only once in some circumstances.

The major changes include:

  1. Added new With-CTE logical nodes: CTERelationDef, CTERelationRef, WithCTE. Each CTERelationDef has a unique ID and the mapping between CTE def and CTE ref is based on IDs rather than names. WithCTE is a resolved version of With, only that: 1) WithCTE is a multi-children logical node so that most logical rules can automatically apply to CTE defs; 2) In the main query and each subquery, there can only be at most one WithCTE, which means nested With-CTEs are combined.
  2. Changed CTESubstitution rule so that if NOT in legacy mode, CTE defs will not be inlined immediately, but rather transformed into a CTERelationRef per reference.
  3. Added new With-CTE rules: 1) ResolveWithCTE - to update CTERelationRefs with resolved output from corresponding CTERelationDefs; 2) InlineCTE - to inline deterministic CTEs or non-deterministic CTEs with only ONE reference; 3) UpdateCTERelationStats - to update stats for CTERelationRefs that are not inlined.
  4. Added a CTE physical planning strategy to plan CTERelationRefs as an independent shuffle with round-robin partitioning so that such CTEs will only be materialized once and different references will later be a shuffle reuse.

A current limitation is that With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces.

Why are the changes needed?

This is a correctness issue. Non-deterministic CTEs should produce the same output regardless of how many times it is referenced/used in query, while under the current implementation there is no such guarantee and would lead to incorrect query results.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UTs.
Regenerated golden files for TPCDS plan stability tests. There is NO change to the simplified.txt files, the only differences are expression IDs.

@github-actions github-actions bot added the SQL label Aug 6, 2021
@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Test build #142172 has finished for PR 33671 at commit 4f24104.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class UnresolvedWith(
  • case class CTERelationDef(child: LogicalPlan, id: Long = CTERelationDef.newId) extends UnaryNode
  • case class CTERelationRef(
  • case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46684/

@maryannxue
Copy link
Contributor Author

cc @peter-toth @cloud-fan @sigmod

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46687/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46687/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46688/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46688/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Test build #142175 has finished for PR 33671 at commit 4d08307.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Test build #142176 has finished for PR 33671 at commit 7ab352a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use substituted.resolveOperatorsWithPruning(_ => !done) { to break out?

@peter-toth
Copy link
Contributor

Thanks @maryannxue for pinging me. Unfortunately, I can take a closer look at this PR only the week after next...

@maryannxue
Copy link
Contributor Author

Never mind, @peter-toth! Hopefully the added logical nodes make sense to you. We can always improve the implementation later on.

@SparkQA
Copy link

SparkQA commented Aug 9, 2021

Test build #142229 has started for PR 33671 at commit dd4fc28.

@SparkQA
Copy link

SparkQA commented Aug 9, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46742/

@SparkQA
Copy link

SparkQA commented Aug 9, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46742/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46786/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46785/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Test build #142277 has finished for PR 33671 at commit 2c30d4a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46790/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46790/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Test build #142278 has finished for PR 33671 at commit 7443fb9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@sigmod sigmod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

legacyTraverseAndSubstituteCTE(plan)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan)
val isCommand = plan.find {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan.collectFirst would be quicker and less memory-hungry, wouldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing sth, but I don't see how so.


case other =>
other.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
case e: SubqueryExpression => e.withNewPlan(resolveWithCTE(e.plan, cteDefMap))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the main query has more than one subqueries, when resolving the second subquery, the cteDefMap will contain CTE defs from the first subquery. I think we should clone the map here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary, right? The real resolving has happened in CTESubstitution earlier. Now there's a strict 1 to 1 ID mapping, so the map can only contain some unrelated CTE defs at its worst.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, makes sense

.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
case e: SubqueryExpression =>
val forceInline =
e.plan.find(_.expressions.exists(_.isInstanceOf[OuterReference])).nonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an easier way to check correlated subquery: e.outerAttrs.nonEmpty

* @param statsOpt The optional statistics inferred from the corresponding CTE definition.
*/
case class CTERelationRef(
cteId: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces indentation

val newLogicalPlan = logicalPlan.transformDown {
case p if p.eq(logicalNode) => newLogicalNode
}
assert(newLogicalPlan != logicalPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this assert is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because stages in CTE defs with multiple references can be "replaced" more than once, but those are just the reuses of the same exchange, so we can ignore them.

@@ -4215,6 +4215,255 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-36447: non-deterministic CTE dedup") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we create a new CTEInlineSuite?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and run it twice with AQE on and off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

// an Exchange reuse at runtime.
// TODO create a new identity partitioning instead of using RoundRobinPartitioning.
exchange.ShuffleExchangeExec(
RoundRobinPartitioning(conf.numShufflePartitions),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RoundRobin sorts data before shuffling right? That will slow things a lot.

Copy link
Contributor Author

@maryannxue maryannxue Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why we put a TODO there.

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46858/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46858/

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Test build #142349 has finished for PR 33671 at commit 09c1c27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

abstract class SubqueryExpression(
plan: LogicalPlan,
outerAttrs: Seq[Expression],
val outerAttrs: Seq[Expression],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this needs to touch all the subclasses, which is a bit messy. How about we just add a new method in this class?

def isCorrelated: Boolean = outerAttrs.nonEmpty

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one minor comment

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Test build #142386 has finished for PR 33671 at commit 3746d76.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46894/

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46894/

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46899/

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46899/

@SparkQA
Copy link

SparkQA commented Aug 12, 2021

Test build #142393 has finished for PR 33671 at commit 4cc52f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.2 (since it's a correctness fix)!

@cloud-fan cloud-fan closed this in 29b1e39 Aug 13, 2021
cloud-fan pushed a commit that referenced this pull request Aug 13, 2021
This PR fixes an existing correctness issue where a non-deterministic With-CTE can be executed multiple times producing different results, by deferring the inline of With-CTE to after the analysis stage. This fix also provides the future opportunity of performance improvement by executing deterministic With-CTEs only once in some circumstances.

The major changes include:
1. Added new With-CTE logical nodes: `CTERelationDef`, `CTERelationRef`, `WithCTE`. Each `CTERelationDef` has a unique ID and the mapping between CTE def and CTE ref is based on IDs rather than names. `WithCTE` is a resolved version of `With`, only that: 1) `WithCTE` is a multi-children logical node so that most logical rules can automatically apply to CTE defs; 2) In the main query and each subquery, there can only be at most one `WithCTE`, which means nested With-CTEs are combined.
2. Changed `CTESubstitution` rule so that if NOT in legacy mode, CTE defs will not be inlined immediately, but rather transformed into a `CTERelationRef` per reference.
3. Added new With-CTE rules: 1) `ResolveWithCTE` - to update `CTERelationRef`s with resolved output from corresponding `CTERelationDef`s; 2) `InlineCTE` - to inline deterministic CTEs or non-deterministic CTEs with only ONE reference; 3) `UpdateCTERelationStats` - to update stats for `CTERelationRef`s that are not inlined.
4. Added a CTE physical planning strategy to plan `CTERelationRef`s as an independent shuffle with round-robin partitioning so that such CTEs will only be materialized once and different references will later be a shuffle reuse.

A current limitation is that With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces.

This is a correctness issue. Non-deterministic CTEs should produce the same output regardless of how many times it is referenced/used in query, while under the current implementation there is no such guarantee and would lead to incorrect query results.

No.

Added UTs.
Regenerated golden files for TPCDS plan stability tests. There is NO change to the `simplified.txt` files, the only differences are expression IDs.

Closes #33671 from maryannxue/spark-36447.

Authored-by: Maryann Xue <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 29b1e39)
Signed-off-by: Wenchen Fan <[email protected]>
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @return the plan where CTE substitution is applied
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the return value of traverseAndSubstituteCTE changed a bit, this docs could use some update.

@peter-toth
Copy link
Contributor

Late LGTM. Just a very minor comment.

@zinking
Copy link

zinking commented Dec 16, 2021

@maryannxue I find after this change, the eventually expanded query now contains duplicated expression Ids.

is this expected? I am thinking of the assumption expression id would be unique globally.

turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
### What changes were proposed in this pull request?
This PR adds recursive query feature to Spark SQL.

A recursive query is defined using the `WITH RECURSIVE` keywords and referring the name of the common table expression within the query.
The implementation complies with SQL standard and follows similar rules to other relational databases:
- A query is made of an anchor followed by a recursive term.
- The anchor terms doesn't contain self reference and it is used to initialize the query.
- The recursive term contains a self reference and it is used to expand the current set of rows with new ones.
- The anchor and recursive terms must be joined with each other by `UNION` or `UNION ALL` operators.
- New rows can only be derived from the newly added rows of the previous iteration (or from the initial set of rows of anchor term). This limitation implies that recursive references can't be used with some of the joins, aggregations or subqueries.

Please see `cte-recursive.sql` for some examples.

The implemetation has the same limiation that [SPARK-36447](https://issues.apache.org/jira/browse/SPARK-36447) / apache#33671 has: 

> With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces.

which means that recursive queries are not supported in SQL commands and DMLs.
With apache#42036 this restriction is lifted and a recursive CTE only doesn't work when the CTE is force inlined (`spark.sql.legacy.inlineCTEInCommands=true` or the command is a multi-insert statement).

### Why are the changes needed?
Recursive query is an ANSI SQL feature that is useful to process hierarchical data.

### Does this PR introduce _any_ user-facing change?
Yes, adds recursive query feature.

### How was this patch tested?
Added new UTs and tests in `cte-recursion.sql`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants