Skip to content

Commit e139042

Browse files
maropufishcus
authored andcommitted
[SPARK-34833][SQL] Apply right-padding correctly for correlated subqueries
### What changes were proposed in this pull request? This PR intends to fix the bug that does not apply right-padding for char types inside correlated subquries. For example, a query below returns nothing in master, but a correct result is `c`. ``` scala> sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING parquet") scala> sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(7)) USING parquet") scala> sql("INSERT INTO t1 VALUES ('c', 'b')") scala> sql("INSERT INTO t2 VALUES ('a', 'b')") scala> val df = sql(""" |SELECT v FROM t1 |WHERE 'a' IN (SELECT v FROM t2 WHERE t2.c = t1.c )""".stripMargin) scala> df.show() +---+ | v| +---+ +---+ ``` This is because `ApplyCharTypePadding` does not handle the case above to apply right-padding into `'abc'`. This PR modifies the code in `ApplyCharTypePadding` for handling it correctly. ``` // Before this PR: scala> df.explain(true) == Analyzed Logical Plan == v: string Project [v#13] +- Filter a IN (list#12 [c#14]) : +- Project [v#15] : +- Filter (c#16 = outer(c#14)) : +- SubqueryAlias spark_catalog.default.t2 : +- Relation default.t2[v#15,c#16] parquet +- SubqueryAlias spark_catalog.default.t1 +- Relation default.t1[v#13,c#14] parquet scala> df.show() +---+ | v| +---+ +---+ // After this PR: scala> df.explain(true) == Analyzed Logical Plan == v: string Project [v#43] +- Filter a IN (list#42 [c#44]) : +- Project [v#45] : +- Filter (c#46 = rpad(outer(c#44), 7, )) : +- SubqueryAlias spark_catalog.default.t2 : +- Relation default.t2[v#45,c#46] parquet +- SubqueryAlias spark_catalog.default.t1 +- Relation default.t1[v#43,c#44] parquet scala> df.show() +---+ | v| +---+ | c| +---+ ``` This fix is lated to TPCDS q17; the query returns nothing because of this bug: https://github.com/apache/spark/pull/31886/files#r599333799 ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests added. Closes apache#31940 from maropu/FixCharPadding. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]> (cherry picked from commit 150769b) Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 5a22ad8 commit e139042

File tree

2 files changed

+79
-23
lines changed

2 files changed

+79
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3921,16 +3921,28 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
39213921

39223922
override def apply(plan: LogicalPlan): LogicalPlan = {
39233923
plan.resolveOperatorsUp {
3924-
case operator if operator.resolved => operator.transformExpressionsUp {
3924+
case operator => operator.transformExpressionsUp {
3925+
case e if !e.childrenResolved => e
3926+
39253927
// String literal is treated as char type when it's compared to a char type column.
39263928
// We should pad the shorter one to the longer length.
39273929
case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable =>
3928-
padAttrLitCmp(attr, lit).map { newChildren =>
3930+
padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
39293931
b.withNewChildren(newChildren)
39303932
}.getOrElse(b)
39313933

39323934
case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable =>
3933-
padAttrLitCmp(attr, lit).map { newChildren =>
3935+
padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
3936+
b.withNewChildren(newChildren.reverse)
3937+
}.getOrElse(b)
3938+
3939+
case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) if lit.foldable =>
3940+
padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
3941+
b.withNewChildren(newChildren)
3942+
}.getOrElse(b)
3943+
3944+
case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) if lit.foldable =>
3945+
padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
39343946
b.withNewChildren(newChildren.reverse)
39353947
}.getOrElse(b)
39363948

@@ -3954,6 +3966,12 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
39543966
case b @ BinaryComparison(left: Attribute, right: Attribute) =>
39553967
b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, right)))
39563968

3969+
case b @ BinaryComparison(OuterReference(left: Attribute), right: Attribute) =>
3970+
b.withNewChildren(padOuterRefAttrCmp(left, right))
3971+
3972+
case b @ BinaryComparison(left: Attribute, OuterReference(right: Attribute)) =>
3973+
b.withNewChildren(padOuterRefAttrCmp(right, left).reverse)
3974+
39573975
case i @ In(attr: Attribute, list) if list.forall(_.isInstanceOf[Attribute]) =>
39583976
val newChildren = CharVarcharUtils.addPaddingInStringComparison(
39593977
attr +: list.map(_.asInstanceOf[Attribute]))
@@ -3962,19 +3980,22 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
39623980
}
39633981
}
39643982

3965-
private def padAttrLitCmp(attr: Attribute, lit: Expression): Option[Seq[Expression]] = {
3966-
if (attr.dataType == StringType) {
3967-
CharVarcharUtils.getRawType(attr.metadata).flatMap {
3983+
private def padAttrLitCmp(
3984+
expr: Expression,
3985+
metadata: Metadata,
3986+
lit: Expression): Option[Seq[Expression]] = {
3987+
if (expr.dataType == StringType) {
3988+
CharVarcharUtils.getRawType(metadata).flatMap {
39683989
case CharType(length) =>
39693990
val str = lit.eval().asInstanceOf[UTF8String]
39703991
if (str == null) {
39713992
None
39723993
} else {
39733994
val stringLitLen = str.numChars()
39743995
if (length < stringLitLen) {
3975-
Some(Seq(StringRPad(attr, Literal(stringLitLen)), lit))
3996+
Some(Seq(StringRPad(expr, Literal(stringLitLen)), lit))
39763997
} else if (length > stringLitLen) {
3977-
Some(Seq(attr, StringRPad(lit, Literal(length))))
3998+
Some(Seq(expr, StringRPad(lit, Literal(length))))
39783999
} else {
39794000
None
39804001
}
@@ -3986,6 +4007,14 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
39864007
}
39874008
}
39884009

4010+
private def padOuterRefAttrCmp(outerAttr: Attribute, attr: Attribute): Seq[Expression] = {
4011+
val Seq(r, newAttr) = CharVarcharUtils.addPaddingInStringComparison(Seq(outerAttr, attr))
4012+
val newOuterRef = r.transform {
4013+
case ar: Attribute if ar.semanticEquals(outerAttr) => OuterReference(ar)
4014+
}
4015+
Seq(newOuterRef, newAttr)
4016+
}
4017+
39894018
private def addPadding(expr: Expression, charLength: Int, targetLength: Int): Expression = {
39904019
if (targetLength > charLength) StringRPad(expr, Literal(targetLength)) else expr
39914020
}

sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -584,21 +584,6 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
584584
}
585585
}
586586

587-
test("SPARK-33992: char/varchar resolution in correlated sub query") {
588-
withTable("t1", "t2") {
589-
sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format")
590-
sql(s"CREATE TABLE t2(v VARCHAR(3), c CHAR(5)) USING $format")
591-
sql("INSERT INTO t1 VALUES ('c', 'b')")
592-
sql("INSERT INTO t2 VALUES ('a', 'b')")
593-
594-
checkAnswer(sql(
595-
"""
596-
|SELECT v FROM t1
597-
|WHERE 'a' IN (SELECT v FROM t2 WHERE t1.c = t2.c )""".stripMargin),
598-
Row("c"))
599-
}
600-
}
601-
602587
test("SPARK-34003: fix char/varchar fails w/ both group by and order by ") {
603588
withTable("t") {
604589
sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
@@ -633,6 +618,48 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
633618
checkAnswer(spark.table("t"), Row("c "))
634619
}
635620
}
621+
622+
test("SPARK-34833: right-padding applied correctly for correlated subqueries - join keys") {
623+
withTable("t1", "t2") {
624+
sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format")
625+
sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(8)) USING $format")
626+
sql("INSERT INTO t1 VALUES ('c', 'b')")
627+
sql("INSERT INTO t2 VALUES ('a', 'b')")
628+
Seq("t1.c = t2.c", "t2.c = t1.c",
629+
"t1.c = 'b'", "'b' = t1.c", "t1.c = 'b '", "'b ' = t1.c",
630+
"t1.c = 'b '", "'b ' = t1.c").foreach { predicate =>
631+
checkAnswer(sql(
632+
s"""
633+
|SELECT v FROM t1
634+
|WHERE 'a' IN (SELECT v FROM t2 WHERE $predicate)
635+
""".stripMargin),
636+
Row("c"))
637+
}
638+
}
639+
}
640+
641+
test("SPARK-34833: right-padding applied correctly for correlated subqueries - other preds") {
642+
withTable("t") {
643+
sql(s"CREATE TABLE t(c0 INT, c1 CHAR(5), c2 CHAR(7)) USING $format")
644+
sql("INSERT INTO t VALUES (1, 'abc', 'abc')")
645+
Seq("c1 = 'abc'", "'abc' = c1", "c1 = 'abc '", "'abc ' = c1",
646+
"c1 = 'abc '", "'abc ' = c1", "c1 = c2", "c2 = c1",
647+
"c1 IN ('xxx', 'abc', 'xxxxx')", "c1 IN ('xxx', 'abc ', 'xxxxx')",
648+
"c1 IN ('xxx', 'abc ', 'xxxxx')",
649+
"c1 IN (c2)", "c2 IN (c1)").foreach { predicate =>
650+
checkAnswer(sql(
651+
s"""
652+
|SELECT c0 FROM t t1
653+
|WHERE (
654+
| SELECT count(*) AS c
655+
| FROM t
656+
| WHERE c0 = t1.c0 AND $predicate
657+
|) > 0
658+
""".stripMargin),
659+
Row(1))
660+
}
661+
}
662+
}
636663
}
637664

638665
// Some basic char/varchar tests which doesn't rely on table implementation.

0 commit comments

Comments
 (0)