From d9e087eef196f7e3ed1b25247614faf676ee7204 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 7 May 2024 20:48:31 +0800 Subject: [PATCH 1/3] CheckAnalsis should see the entire query plan --- .../sql/catalyst/analysis/CheckAnalysis.scala | 49 +++++++++++++++++-- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7f10bdbc80ca9..f4df1db7637cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -141,17 +141,56 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB errorClass, missingCol, orderedCandidates, a.origin) } + private def checkUnreferencedCTERelations( + cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])], + visited: mutable.Map[Long, Boolean], + danglingCTERelations: mutable.ArrayBuffer[CTERelationDef], + cteId: Long): Unit = { + if (visited(cteId)) { + return + } + val (cteDef, _, refMap) = cteMap(cteId) + refMap.foreach { case (id, _) => + checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id) + } + danglingCTERelations.append(cteDef) + visited(cteId) = true + } + def checkAnalysis(plan: LogicalPlan): Unit = { val inlineCTE = InlineCTE(alwaysInline = true) val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])] inlineCTE.buildCTEMap(plan, cteMap) - cteMap.values.foreach { case (relation, refCount, _) => - // If a CTE relation is never used, it will disappear after inline. Here we explicitly check - // analysis for it, to make sure the entire query plan is valid. - if (refCount == 0) checkAnalysis0(relation.child) + val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef] + val visited: mutable.Map[Long, Boolean] = mutable.Map.empty.withDefaultValue(false) + // If a CTE relation is never used, it will disappear after inline. Here we explicitly collect + // these dangling CTE relations, and put them back in the main query, to make sure the entire + // query plan is valid. + cteMap.foreach { case (cteId, (_, refCount, _)) => + // If a CTE relation ref count is 0, the other CTE relations that reference it should also be + // collected. This code will also guarantee the leaf relations that do not reference + // any others are collected first. + if (refCount == 0) { + checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, cteId) + } } // Inline all CTEs in the plan to help check query plan structures in subqueries. - checkAnalysis0(inlineCTE(plan)) + var inlinedPlan: LogicalPlan = plan + try { + inlinedPlan = inlineCTE(plan) + } catch { + case e: AnalysisException => + throw new ExtendedAnalysisException(e, plan) + } + if (danglingCTERelations.nonEmpty) { + inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq) + } + try { + checkAnalysis0(inlinedPlan) + } catch { + case e: AnalysisException => + throw new ExtendedAnalysisException(e, inlinedPlan) + } plan.setAnalyzed() } From 734f2d281f98ac983f4691242bb04ac9d58533e4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 7 May 2024 23:44:24 +0800 Subject: [PATCH 2/3] Update CheckAnalysis.scala --- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f4df1db7637cd..56ff85c2f6f9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -176,21 +176,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB } // Inline all CTEs in the plan to help check query plan structures in subqueries. var inlinedPlan: LogicalPlan = plan - try { - inlinedPlan = inlineCTE(plan) - } catch { - case e: AnalysisException => - throw new ExtendedAnalysisException(e, plan) - } + inlinedPlan = inlineCTE(plan) if (danglingCTERelations.nonEmpty) { inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq) } - try { - checkAnalysis0(inlinedPlan) - } catch { - case e: AnalysisException => - throw new ExtendedAnalysisException(e, inlinedPlan) - } + checkAnalysis0(inlinedPlan) plan.setAnalyzed() } From 96127523deade8fc47ad0fc044437e2ff3c823b5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 7 May 2024 23:45:02 +0800 Subject: [PATCH 3/3] Update CheckAnalysis.scala --- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 56ff85c2f6f9c..485015f2efab4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -175,8 +175,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB } } // Inline all CTEs in the plan to help check query plan structures in subqueries. - var inlinedPlan: LogicalPlan = plan - inlinedPlan = inlineCTE(plan) + var inlinedPlan: LogicalPlan = inlineCTE(plan) if (danglingCTERelations.nonEmpty) { inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq) }