Skip to content

Commit 729cadb

Browse files
cloud-fanhvanhovell
authored andcommitted
[SPARK-18674][SQL] improve the error message of using join
## What changes were proposed in this pull request? The current error message of USING join is quite confusing, for example: ``` scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1") df1: org.apache.spark.sql.DataFrame = [c1: int] scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2") df2: org.apache.spark.sql.DataFrame = [c2: int] scala> df1.join(df2, usingColumn = "c1") org.apache.spark.sql.AnalysisException: using columns ['c1] can not be resolved given input columns: [c1, c2] ;; 'Join UsingJoin(Inner,List('c1)) :- Project [value#1 AS c1#3] : +- LocalRelation [value#1] +- Project [value#7 AS c2#9] +- LocalRelation [value#7] ``` after this PR, it becomes: ``` scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1") df1: org.apache.spark.sql.DataFrame = [c1: int] scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2") df2: org.apache.spark.sql.DataFrame = [c2: int] scala> df1.join(df2, usingColumn = "c1") org.apache.spark.sql.AnalysisException: USING column `c1` can not be resolved with the right join side, the right output is: [c2]; ``` ## How was this patch tested? updated tests Author: Wenchen Fan <[email protected]> Closes #16100 from cloud-fan/natural. (cherry picked from commit e653484) Signed-off-by: Herman van Hovell <[email protected]>
1 parent 6e3fd2b commit 729cadb

File tree

7 files changed

+34
-58
lines changed

7 files changed

+34
-58
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,15 +1887,7 @@ class Analyzer(
18871887
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
18881888
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
18891889
if left.resolved && right.resolved && j.duplicateResolved =>
1890-
// Resolve the column names referenced in using clause from both the legs of join.
1891-
val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, resolver))
1892-
val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, resolver))
1893-
if ((lCols.length == usingCols.length) && (rCols.length == usingCols.length)) {
1894-
val joinNames = lCols.map(exp => exp.name)
1895-
commonNaturalJoinProcessing(left, right, joinType, joinNames, None)
1896-
} else {
1897-
j
1898-
}
1890+
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
18991891
case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
19001892
// find common column names from both sides
19011893
val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
@@ -1910,18 +1902,16 @@ class Analyzer(
19101902
joinNames: Seq[String],
19111903
condition: Option[Expression]) = {
19121904
val leftKeys = joinNames.map { keyName =>
1913-
val joinColumn = left.output.find(attr => resolver(attr.name, keyName))
1914-
assert(
1915-
joinColumn.isDefined,
1916-
s"$keyName should exist in ${left.output.map(_.name).mkString(",")}")
1917-
joinColumn.get
1905+
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
1906+
throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " +
1907+
s"left join side, the left output is: [${left.output.map(_.name).mkString(", ")}]")
1908+
}
19181909
}
19191910
val rightKeys = joinNames.map { keyName =>
1920-
val joinColumn = right.output.find(attr => resolver(attr.name, keyName))
1921-
assert(
1922-
joinColumn.isDefined,
1923-
s"$keyName should exist in ${right.output.map(_.name).mkString(",")}")
1924-
joinColumn.get
1911+
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
1912+
throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " +
1913+
s"right join side, the right output is: [${right.output.map(_.name).mkString(", ")}]")
1914+
}
19251915
}
19261916
val joinPairs = leftKeys.zip(rightKeys)
19271917

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,6 @@ trait CheckAnalysis extends PredicateHelper {
180180
case e =>
181181
}
182182

183-
case j @ Join(_, _, UsingJoin(_, cols), _) =>
184-
val from = operator.inputSet.map(_.name).mkString(", ")
185-
failAnalysis(
186-
s"using columns [${cols.mkString(",")}] " +
187-
s"can not be resolved given input columns: [$from] ")
188-
189183
case j @ Join(_, _, _, Some(condition)) if condition.dataType != BooleanType =>
190184
failAnalysis(
191185
s"join condition '${condition.sql}' " +

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -558,10 +558,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
558558
// Resolve the join type and join condition
559559
val (joinType, condition) = Option(join.joinCriteria) match {
560560
case Some(c) if c.USING != null =>
561-
val columns = c.identifier.asScala.map { column =>
562-
UnresolvedAttribute.quoted(column.getText)
563-
}
564-
(UsingJoin(baseJoinType, columns), None)
561+
(UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None)
565562
case Some(c) if c.booleanExpression != null =>
566563
(baseJoinType, Option(expression(c.booleanExpression)))
567564
case None if join.NATURAL != null =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ case class NaturalJoin(tpe: JoinType) extends JoinType {
8484
override def sql: String = "NATURAL " + tpe.sql
8585
}
8686

87-
case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) extends JoinType {
87+
case class UsingJoin(tpe: JoinType, usingColumns: Seq[String]) extends JoinType {
8888
require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti).contains(tpe),
8989
"Unsupported using join type " + tpe)
9090
override def sql: String = "USING " + tpe.sql

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,31 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
3838

3939
test("natural/using inner join") {
4040
val naturalPlan = r1.join(r2, NaturalJoin(Inner), None)
41-
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
41+
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
4242
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
4343
checkAnalysis(naturalPlan, expected)
4444
checkAnalysis(usingPlan, expected)
4545
}
4646

4747
test("natural/using left join") {
4848
val naturalPlan = r1.join(r2, NaturalJoin(LeftOuter), None)
49-
val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("a"))), None)
49+
val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq("a")), None)
5050
val expected = r1.join(r2, LeftOuter, Some(EqualTo(a, a))).select(a, b, c)
5151
checkAnalysis(naturalPlan, expected)
5252
checkAnalysis(usingPlan, expected)
5353
}
5454

5555
test("natural/using right join") {
5656
val naturalPlan = r1.join(r2, NaturalJoin(RightOuter), None)
57-
val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq(UnresolvedAttribute("a"))), None)
57+
val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq("a")), None)
5858
val expected = r1.join(r2, RightOuter, Some(EqualTo(a, a))).select(a, b, c)
5959
checkAnalysis(naturalPlan, expected)
6060
checkAnalysis(usingPlan, expected)
6161
}
6262

6363
test("natural/using full outer join") {
6464
val naturalPlan = r1.join(r2, NaturalJoin(FullOuter), None)
65-
val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq(UnresolvedAttribute("a"))), None)
65+
val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq("a")), None)
6666
val expected = r1.join(r2, FullOuter, Some(EqualTo(a, a))).select(
6767
Alias(Coalesce(Seq(a, a)), "a")(), b, c)
6868
checkAnalysis(naturalPlan, expected)
@@ -71,7 +71,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
7171

7272
test("natural/using inner join with no nullability") {
7373
val naturalPlan = r3.join(r4, NaturalJoin(Inner), None)
74-
val usingPlan = r3.join(r4, UsingJoin(Inner, Seq(UnresolvedAttribute("b"))), None)
74+
val usingPlan = r3.join(r4, UsingJoin(Inner, Seq("b")), None)
7575
val expected = r3.join(r4, Inner, Some(EqualTo(bNotNull, bNotNull))).select(
7676
bNotNull, aNotNull, cNotNull)
7777
checkAnalysis(naturalPlan, expected)
@@ -80,7 +80,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
8080

8181
test("natural/using left join with no nullability") {
8282
val naturalPlan = r3.join(r4, NaturalJoin(LeftOuter), None)
83-
val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("b"))), None)
83+
val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq("b")), None)
8484
val expected = r3.join(r4, LeftOuter, Some(EqualTo(bNotNull, bNotNull))).select(
8585
bNotNull, aNotNull, c)
8686
checkAnalysis(naturalPlan, expected)
@@ -89,7 +89,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
8989

9090
test("natural/using right join with no nullability") {
9191
val naturalPlan = r3.join(r4, NaturalJoin(RightOuter), None)
92-
val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq(UnresolvedAttribute("b"))), None)
92+
val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq("b")), None)
9393
val expected = r3.join(r4, RightOuter, Some(EqualTo(bNotNull, bNotNull))).select(
9494
bNotNull, a, cNotNull)
9595
checkAnalysis(naturalPlan, expected)
@@ -98,48 +98,43 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
9898

9999
test("natural/using full outer join with no nullability") {
100100
val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None)
101-
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None)
101+
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq("b")), None)
102102
val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select(
103103
Alias(Coalesce(Seq(b, b)), "b")(), a, c)
104104
checkAnalysis(naturalPlan, expected)
105105
checkAnalysis(usingPlan, expected)
106106
}
107107

108108
test("using unresolved attribute") {
109-
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("d"))), None)
110-
val error = intercept[AnalysisException] {
111-
SimpleAnalyzer.checkAnalysis(usingPlan)
112-
}
113-
assert(error.message.contains(
114-
"using columns ['d] can not be resolved given input columns: [b, a, c]"))
109+
assertAnalysisError(
110+
r1.join(r2, UsingJoin(Inner, Seq("d"))),
111+
"USING column `d` can not be resolved with the left join side" :: Nil)
112+
assertAnalysisError(
113+
r1.join(r2, UsingJoin(Inner, Seq("b"))),
114+
"USING column `b` can not be resolved with the right join side" :: Nil)
115115
}
116116

117117
test("using join with a case sensitive analyzer") {
118118
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
119119

120-
{
121-
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
122-
checkAnalysis(usingPlan, expected, caseSensitive = true)
123-
}
120+
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
121+
checkAnalysis(usingPlan, expected, caseSensitive = true)
124122

125-
{
126-
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None)
127-
assertAnalysisError(
128-
usingPlan,
129-
Seq("using columns ['A] can not be resolved given input columns: [b, a, c, a]"))
130-
}
123+
assertAnalysisError(
124+
r1.join(r2, UsingJoin(Inner, Seq("A"))),
125+
"USING column `A` can not be resolved with the left join side" :: Nil)
131126
}
132127

133128
test("using join with a case insensitive analyzer") {
134129
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
135130

136131
{
137-
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
132+
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
138133
checkAnalysis(usingPlan, expected, caseSensitive = false)
139134
}
140135

141136
{
142-
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None)
137+
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("A")), None)
143138
checkAnalysis(usingPlan, expected, caseSensitive = false)
144139
}
145140
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class PlanParserSuite extends PlanTest {
336336
val testUsingJoin = (sql: String, jt: JoinType) => {
337337
assertEqual(
338338
s"select * from t $sql u using(a, b)",
339-
table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), None).select(star()))
339+
table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), None).select(star()))
340340
}
341341
val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin)
342342
val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin)

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ class Dataset[T] private[sql](
642642
Join(
643643
joined.left,
644644
joined.right,
645-
UsingJoin(JoinType(joinType), usingColumns.map(UnresolvedAttribute(_))),
645+
UsingJoin(JoinType(joinType), usingColumns),
646646
None)
647647
}
648648
}

0 commit comments

Comments
 (0)