Skip to content

Commit b022ef7

Browse files
committed
Handle project aliases.
1 parent d8caa40 commit b022ef7

File tree

1 file changed

+21
-4
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+21
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,22 +237,35 @@ class Analyzer(catalog: Catalog,
237237
// Special handling for cases when self-join introduce duplicate expression ids.
238238
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
239239
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
240+
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
240241

241-
val (oldRelation, newRelation, attributeRewrites) = right.collect {
242+
val (oldRelation, newRelation) = right.collect {
243+
// Handle base relations that might appear more than once.
242244
case oldVersion: MultiInstanceRelation
243245
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
244246
val newVersion = oldVersion.newInstance()
245-
val newAttributes = AttributeMap(oldVersion.output.zip(newVersion.output))
246-
(oldVersion, newVersion, newAttributes)
247+
(oldVersion, newVersion)
248+
249+
// Handle projects that create conflicting aliases.
250+
case oldVersion @ Project(projectList, child)
251+
if newAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
252+
val newVersion =
253+
oldVersion.copy(
254+
projectList = projectList.map {
255+
case a: Alias => Alias(a.child, a.name)()
256+
case other => other
257+
})
258+
(oldVersion, newVersion)
247259
}.head // Only handle first case found, others will be fixed on the next pass.
248260

261+
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
249262
val newRight = right transformUp {
250263
case r if r == oldRelation => newRelation
264+
} transformUp {
251265
case other => other transformExpressions {
252266
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
253267
}
254268
}
255-
256269
j.copy(right = newRight)
257270

258271
case q: LogicalPlan =>
@@ -272,6 +285,10 @@ class Analyzer(catalog: Catalog,
272285
}
273286
}
274287

288+
def newAliases(projectList: Seq[NamedExpression]): AttributeSet = {
289+
AttributeSet(projectList.collect { case a: Alias => a.toAttribute })
290+
}
291+
275292
/**
276293
* Returns true if `exprs` contains a [[Star]].
277294
*/

0 commit comments

Comments
 (0)