Skip to content

Commit b84d4b4

Browse files
smolamarmbrus
authored andcommitted
[SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.
ResolveReferences analysis rule now does not throw when it cannot resolve references in a self-join. Author: Santiago M. Mola <[email protected]> Closes apache#6853 from smola/SPARK-7088 and squashes the following commits: af71ac7 [Santiago M. Mola] [SPARK-7088] Fix analysis for 3rd party logical plan.
1 parent 43e6619 commit b84d4b4

File tree

2 files changed

+32
-18
lines changed

2 files changed

+32
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ class Analyzer(
283283
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
284284
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
285285

286-
val (oldRelation, newRelation) = right.collect {
286+
right.collect {
287287
// Handle base relations that might appear more than once.
288288
case oldVersion: MultiInstanceRelation
289289
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
@@ -308,25 +308,27 @@ class Analyzer(
308308
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
309309
.nonEmpty =>
310310
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
311-
}.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
312-
sys.error(
313-
s"""
314-
|Failure when resolving conflicting references in Join:
315-
|$plan
316-
|
317-
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
318-
""".stripMargin)
319311
}
320-
321-
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
322-
val newRight = right transformUp {
323-
case r if r == oldRelation => newRelation
324-
} transformUp {
325-
case other => other transformExpressions {
326-
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
327-
}
312+
// Only handle first case, others will be fixed on the next pass.
313+
.headOption match {
314+
case None =>
315+
/*
316+
* No result implies that there is a logical plan node that produces new references
317+
* that this rule cannot handle. When that is the case, there must be another rule
318+
* that resolves these conflicts. Otherwise, the analysis will fail.
319+
*/
320+
j
321+
case Some((oldRelation, newRelation)) =>
322+
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
323+
val newRight = right transformUp {
324+
case r if r == oldRelation => newRelation
325+
} transformUp {
326+
case other => other transformExpressions {
327+
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
328+
}
329+
}
330+
j.copy(right = newRight)
328331
}
329-
j.copy(right = newRight)
330332

331333
// When resolve `SortOrder`s in Sort based on child, don't report errors as
332334
// we still have chance to resolve it based on grandchild

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ trait CheckAnalysis {
4848
// We transform up and order the rules so as to catch the first possible failure instead
4949
// of the result of cascading resolution failures.
5050
plan.foreachUp {
51+
5152
case operator: LogicalPlan =>
5253
operator transformExpressionsUp {
5354
case a: Attribute if !a.resolved =>
@@ -121,6 +122,17 @@ trait CheckAnalysis {
121122

122123
case _ => // Analysis successful!
123124
}
125+
126+
// Special handling for cases when self-join introduce duplicate expression ids.
127+
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
128+
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
129+
failAnalysis(
130+
s"""
131+
|Failure when resolving conflicting references in Join:
132+
|$plan
133+
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
134+
|""".stripMargin)
135+
124136
}
125137
extendedCheckRules.foreach(_(plan))
126138
}

0 commit comments

Comments
 (0)