Skip to content

Commit 975baa6

Browse files
cloud-fanfishcus
authored andcommitted
[SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places
### What changes were proposed in this pull request? This is a follow-up of apache#31940 . This PR generalizes the matching of attributes and outer references, so that outer references are handled everywhere. Note that, currently correlated subquery has a lot of limitations in Spark, and the newly covered cases are not possible to happen. So this PR is a code refactor. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#31959 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]> (cherry picked from commit 658e95c) Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 544a035 commit 975baa6

File tree

1 file changed

+41
-26
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+41
-26
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3919,34 +3919,32 @@ object UpdateOuterReferences extends Rule[LogicalPlan] {
39193919
*/
39203920
object ApplyCharTypePadding extends Rule[LogicalPlan] {
39213921

3922+
object AttrOrOuterRef {
3923+
def unapply(e: Expression): Option[Attribute] = e match {
3924+
case a: Attribute => Some(a)
3925+
case OuterReference(a: Attribute) => Some(a)
3926+
case _ => None
3927+
}
3928+
}
3929+
39223930
override def apply(plan: LogicalPlan): LogicalPlan = {
39233931
plan.resolveOperatorsUp {
39243932
case operator => operator.transformExpressionsUp {
39253933
case e if !e.childrenResolved => e
39263934

39273935
// String literal is treated as char type when it's compared to a char type column.
39283936
// We should pad the shorter one to the longer length.
3929-
case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable =>
3930-
padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
3931-
b.withNewChildren(newChildren)
3932-
}.getOrElse(b)
3933-
3934-
case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable =>
3935-
padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
3936-
b.withNewChildren(newChildren.reverse)
3937-
}.getOrElse(b)
3938-
3939-
case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) if lit.foldable =>
3940-
padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
3937+
case b @ BinaryComparison(e @ AttrOrOuterRef(attr), lit) if lit.foldable =>
3938+
padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
39413939
b.withNewChildren(newChildren)
39423940
}.getOrElse(b)
39433941

3944-
case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) if lit.foldable =>
3945-
padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
3942+
case b @ BinaryComparison(lit, e @ AttrOrOuterRef(attr)) if lit.foldable =>
3943+
padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
39463944
b.withNewChildren(newChildren.reverse)
39473945
}.getOrElse(b)
39483946

3949-
case i @ In(attr: Attribute, list)
3947+
case i @ In(e @ AttrOrOuterRef(attr), list)
39503948
if attr.dataType == StringType && list.forall(_.foldable) =>
39513949
CharVarcharUtils.getRawType(attr.metadata).flatMap {
39523950
case CharType(length) =>
@@ -3955,27 +3953,44 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
39553953
val literalCharLengths = literalChars.map(_.numChars())
39563954
val targetLen = (length +: literalCharLengths).max
39573955
Some(i.copy(
3958-
value = addPadding(attr, length, targetLen),
3956+
value = addPadding(e, length, targetLen),
39593957
list = list.zip(literalCharLengths).map {
39603958
case (lit, charLength) => addPadding(lit, charLength, targetLen)
39613959
} ++ nulls.map(Literal.create(_, StringType))))
39623960
case _ => None
39633961
}.getOrElse(i)
39643962

39653963
// For char type column or inner field comparison, pad the shorter one to the longer length.
3966-
case b @ BinaryComparison(left: Attribute, right: Attribute) =>
3967-
b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, right)))
3968-
3969-
case b @ BinaryComparison(OuterReference(left: Attribute), right: Attribute) =>
3970-
b.withNewChildren(padOuterRefAttrCmp(left, right))
3971-
3972-
case b @ BinaryComparison(left: Attribute, OuterReference(right: Attribute)) =>
3973-
b.withNewChildren(padOuterRefAttrCmp(right, left).reverse)
3964+
case b @ BinaryComparison(e1 @ AttrOrOuterRef(left), e2 @ AttrOrOuterRef(right))
3965+
// For the same attribute, they must be the same length and no padding is needed.
3966+
if !left.semanticEquals(right) =>
3967+
val outerRefs = (e1, e2) match {
3968+
case (_: OuterReference, _: OuterReference) => Seq(left, right)
3969+
case (_: OuterReference, _) => Seq(left)
3970+
case (_, _: OuterReference) => Seq(right)
3971+
case _ => Nil
3972+
}
3973+
val newChildren = CharVarcharUtils.addPaddingInStringComparison(Seq(left, right))
3974+
if (outerRefs.nonEmpty) {
3975+
b.withNewChildren(newChildren.map(_.transform {
3976+
case a: Attribute if outerRefs.exists(_.semanticEquals(a)) => OuterReference(a)
3977+
}))
3978+
} else {
3979+
b.withNewChildren(newChildren)
3980+
}
39743981

3975-
case i @ In(attr: Attribute, list) if list.forall(_.isInstanceOf[Attribute]) =>
3982+
case i @ In(e @ AttrOrOuterRef(attr), list) if list.forall(_.isInstanceOf[Attribute]) =>
39763983
val newChildren = CharVarcharUtils.addPaddingInStringComparison(
39773984
attr +: list.map(_.asInstanceOf[Attribute]))
3978-
i.copy(value = newChildren.head, list = newChildren.tail)
3985+
if (e.isInstanceOf[OuterReference]) {
3986+
i.copy(
3987+
value = newChildren.head.transform {
3988+
case a: Attribute if a.semanticEquals(attr) => OuterReference(a)
3989+
},
3990+
list = newChildren.tail)
3991+
} else {
3992+
i.copy(value = newChildren.head, list = newChildren.tail)
3993+
}
39793994
}
39803995
}
39813996
}

0 commit comments

Comments
 (0)