Skip to content

Commit 161f28b

Browse files
cloud-fansunchao
authored andcommitted
[SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places
### What changes were proposed in this pull request? This is a follow-up of apache#31940 . This PR generalizes the matching of attributes and outer references, so that outer references are handled everywhere. Note that, currently correlated subquery has a lot of limitations in Spark, and the newly covered cases are not possible to happen. So this PR is a code refactor. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#31959 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]> (cherry picked from commit 658e95c) Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 80414a7 commit 161f28b

File tree

1 file changed

+41
-26
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+41
-26
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3920,34 +3920,32 @@ object UpdateOuterReferences extends Rule[LogicalPlan] {
39203920
*/
39213921
object ApplyCharTypePadding extends Rule[LogicalPlan] {
39223922

3923+
object AttrOrOuterRef {
3924+
def unapply(e: Expression): Option[Attribute] = e match {
3925+
case a: Attribute => Some(a)
3926+
case OuterReference(a: Attribute) => Some(a)
3927+
case _ => None
3928+
}
3929+
}
3930+
39233931
override def apply(plan: LogicalPlan): LogicalPlan = {
39243932
plan.resolveOperatorsUp {
39253933
case operator => operator.transformExpressionsUp {
39263934
case e if !e.childrenResolved => e
39273935

39283936
// String literal is treated as char type when it's compared to a char type column.
39293937
// We should pad the shorter one to the longer length.
3930-
case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable =>
3931-
padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
3932-
b.withNewChildren(newChildren)
3933-
}.getOrElse(b)
3934-
3935-
case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable =>
3936-
padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
3937-
b.withNewChildren(newChildren.reverse)
3938-
}.getOrElse(b)
3939-
3940-
case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) if lit.foldable =>
3941-
padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
3938+
case b @ BinaryComparison(e @ AttrOrOuterRef(attr), lit) if lit.foldable =>
3939+
padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
39423940
b.withNewChildren(newChildren)
39433941
}.getOrElse(b)
39443942

3945-
case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) if lit.foldable =>
3946-
padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
3943+
case b @ BinaryComparison(lit, e @ AttrOrOuterRef(attr)) if lit.foldable =>
3944+
padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
39473945
b.withNewChildren(newChildren.reverse)
39483946
}.getOrElse(b)
39493947

3950-
case i @ In(attr: Attribute, list)
3948+
case i @ In(e @ AttrOrOuterRef(attr), list)
39513949
if attr.dataType == StringType && list.forall(_.foldable) =>
39523950
CharVarcharUtils.getRawType(attr.metadata).flatMap {
39533951
case CharType(length) =>
@@ -3956,27 +3954,44 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
39563954
val literalCharLengths = literalChars.map(_.numChars())
39573955
val targetLen = (length +: literalCharLengths).max
39583956
Some(i.copy(
3959-
value = addPadding(attr, length, targetLen),
3957+
value = addPadding(e, length, targetLen),
39603958
list = list.zip(literalCharLengths).map {
39613959
case (lit, charLength) => addPadding(lit, charLength, targetLen)
39623960
} ++ nulls.map(Literal.create(_, StringType))))
39633961
case _ => None
39643962
}.getOrElse(i)
39653963

39663964
// For char type column or inner field comparison, pad the shorter one to the longer length.
3967-
case b @ BinaryComparison(left: Attribute, right: Attribute) =>
3968-
b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, right)))
3969-
3970-
case b @ BinaryComparison(OuterReference(left: Attribute), right: Attribute) =>
3971-
b.withNewChildren(padOuterRefAttrCmp(left, right))
3972-
3973-
case b @ BinaryComparison(left: Attribute, OuterReference(right: Attribute)) =>
3974-
b.withNewChildren(padOuterRefAttrCmp(right, left).reverse)
3965+
case b @ BinaryComparison(e1 @ AttrOrOuterRef(left), e2 @ AttrOrOuterRef(right))
3966+
// For the same attribute, they must be the same length and no padding is needed.
3967+
if !left.semanticEquals(right) =>
3968+
val outerRefs = (e1, e2) match {
3969+
case (_: OuterReference, _: OuterReference) => Seq(left, right)
3970+
case (_: OuterReference, _) => Seq(left)
3971+
case (_, _: OuterReference) => Seq(right)
3972+
case _ => Nil
3973+
}
3974+
val newChildren = CharVarcharUtils.addPaddingInStringComparison(Seq(left, right))
3975+
if (outerRefs.nonEmpty) {
3976+
b.withNewChildren(newChildren.map(_.transform {
3977+
case a: Attribute if outerRefs.exists(_.semanticEquals(a)) => OuterReference(a)
3978+
}))
3979+
} else {
3980+
b.withNewChildren(newChildren)
3981+
}
39753982

3976-
case i @ In(attr: Attribute, list) if list.forall(_.isInstanceOf[Attribute]) =>
3983+
case i @ In(e @ AttrOrOuterRef(attr), list) if list.forall(_.isInstanceOf[Attribute]) =>
39773984
val newChildren = CharVarcharUtils.addPaddingInStringComparison(
39783985
attr +: list.map(_.asInstanceOf[Attribute]))
3979-
i.copy(value = newChildren.head, list = newChildren.tail)
3986+
if (e.isInstanceOf[OuterReference]) {
3987+
i.copy(
3988+
value = newChildren.head.transform {
3989+
case a: Attribute if a.semanticEquals(attr) => OuterReference(a)
3990+
},
3991+
list = newChildren.tail)
3992+
} else {
3993+
i.copy(value = newChildren.head, list = newChildren.tail)
3994+
}
39803995
}
39813996
}
39823997
}

0 commit comments

Comments
 (0)