@@ -47,12 +47,21 @@ class Analyzer(
4747
4848 val fixedPoint = FixedPoint (maxIterations)
4949
50+ /**
51+ * Override to provide additional rules for the "Substitution" batch.
52+ */
53+ val extendedSubstitutionRules : Seq [Rule [LogicalPlan ]] = Nil
54+
5055 /**
5156 * Override to provide additional rules for the "Resolution" batch.
5257 */
5358 val extendedResolutionRules : Seq [Rule [LogicalPlan ]] = Nil
5459
5560 lazy val batches : Seq [Batch ] = Seq (
61+ Batch (" Substitution" , fixedPoint,
62+ CTESubstitution ::
63+ Nil ++
64+ extendedSubstitutionRules : _* ),
5665 Batch (" Resolution" , fixedPoint,
5766 ResolveRelations ::
5867 ResolveReferences ::
@@ -68,6 +77,38 @@ class Analyzer(
6877 extendedResolutionRules : _* )
6978 )
7079
80+ /**
81+ * Substitute CTE definitions
82+ */
83+ object CTESubstitution extends Rule [LogicalPlan ] {
84+ def apply (plan : LogicalPlan ): LogicalPlan = {
85+ val (realPlan, cteRelations) = plan match {
86+ case With (child, relations) =>
87+ (child, relations)
88+ case other => (other, Map .empty[String , LogicalPlan ])
89+ }
90+ substituteCTE(realPlan, cteRelations)
91+ }
92+
93+ def substituteCTE (plan : LogicalPlan , cteRelations : Map [String , LogicalPlan ]): LogicalPlan = {
94+ plan transform {
95+ case i @ InsertIntoTable (u : UnresolvedRelation , _, _, _, _) =>
96+ // In hive, if there is same table name in database and CTE definition,
97+ // hive will use the table in database, not the CTE one.
98+ // Taking into account the reasonableness and the implementation complexity,
99+ // here use the CTE definition first, check table name only and ignore database name
100+ val relation = cteRelations.get(u.tableIdentifier.last)
101+ .map(relation => u.alias.map(Subquery (_, relation)).getOrElse(relation))
102+ .getOrElse(u)
103+ i.copy(table = relation)
104+ case u : UnresolvedRelation =>
105+ cteRelations.get(u.tableIdentifier.last)
106+ .map(relation => u.alias.map(Subquery (_, relation)).getOrElse(relation))
107+ .getOrElse(u)
108+ }
109+ }
110+ }
111+
71112 /**
72113 * Removes no-op Alias expressions from the plan.
73114 */
@@ -169,36 +210,20 @@ class Analyzer(
169210 * Replaces [[UnresolvedRelation ]]s with concrete relations from the catalog.
170211 */
171212 object ResolveRelations extends Rule [LogicalPlan ] {
172- def getTable (u : UnresolvedRelation , cteRelations : Map [ String , LogicalPlan ] ): LogicalPlan = {
213+ def getTable (u : UnresolvedRelation ): LogicalPlan = {
173214 try {
174- // In hive, if there is same table name in database and CTE definition,
175- // hive will use the table in database, not the CTE one.
176- // Taking into account the reasonableness and the implementation complexity,
177- // here use the CTE definition first, check table name only and ignore database name
178- cteRelations.get(u.tableIdentifier.last)
179- .map(relation => u.alias.map(Subquery (_, relation)).getOrElse(relation))
180- .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
215+ catalog.lookupRelation(u.tableIdentifier, u.alias)
181216 } catch {
182217 case _ : NoSuchTableException =>
183218 u.failAnalysis(s " no such table ${u.tableName}" )
184219 }
185220 }
186221
187- def apply (plan : LogicalPlan ): LogicalPlan = {
188- val (realPlan, cteRelations) = plan match {
189- // TODO allow subquery to define CTE
190- // Add cte table to a temp relation map,drop `with` plan and keep its child
191- case With (child, relations) => (child, relations)
192- case other => (other, Map .empty[String , LogicalPlan ])
193- }
194-
195- realPlan transform {
196- case i@ InsertIntoTable (u : UnresolvedRelation , _, _, _, _) =>
197- i.copy(
198- table = EliminateSubQueries (getTable(u, cteRelations)))
199- case u : UnresolvedRelation =>
200- getTable(u, cteRelations)
201- }
222+ def apply (plan : LogicalPlan ): LogicalPlan = plan transform {
223+ case i@ InsertIntoTable (u : UnresolvedRelation , _, _, _, _) =>
224+ i.copy(table = EliminateSubQueries (getTable(u)))
225+ case u : UnresolvedRelation =>
226+ getTable(u, cteRelations)
202227 }
203228 }
204229
0 commit comments