From 70a2d446c1be31528fcb500eec16f609275788d8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 1 Dec 2016 14:44:51 -0800 Subject: [PATCH 1/3] added an example. --- .../org/apache/spark/sql/DataFrameSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 312cd17c26d6..1e2bff16b359 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1711,6 +1711,26 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(joinedDf.join(df3, "a"), Row(3, 0, 4, 1)) } + test("SPARK-18674: improve the error message of using join") { + val schema = StructType( + StructField("col1", StructType( + StructField("field1", IntegerType) :: + StructField("field2", StringType) :: Nil)) :: Nil) + val rowRDD = sparkContext.makeRDD(Seq(Row(Row(1, "col1")), Row(Row(2, "col2")))) + val df = spark.createDataFrame(rowRDD, schema) + + checkAnswer( + df.as("df2").join(df.as("df1"), $"df1.col1.field1" === $"df2.col1.field1", "inner") + .select($"df1.col1.field1"), + Row(1) :: Row(2) :: Nil) + + val e = intercept[AnalysisException] { + df.as("df2").join(df.as("df1"), "col1.field1") + }.getMessage + assert(e.contains("USING column `col1.field1` can not be resolved with the left join side, " + + "the left output is: [col1]")) + } + test("SPARK-17123: Performing set operations that combine non-scala native types") { val dates = Seq( (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)), From e754ef515b5e563e597674adce175b21830deaa3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 1 Dec 2016 14:46:44 -0800 Subject: [PATCH 2/3] added a message --- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 1e2bff16b359..3e5385872d52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1719,16 +1719,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val rowRDD = sparkContext.makeRDD(Seq(Row(Row(1, "col1")), Row(Row(2, "col2")))) val df = spark.createDataFrame(rowRDD, schema) - checkAnswer( - df.as("df2").join(df.as("df1"), $"df1.col1.field1" === $"df2.col1.field1", "inner") - .select($"df1.col1.field1"), - Row(1) :: Row(2) :: Nil) - + // Using join does not work for the nested fields val e = intercept[AnalysisException] { df.as("df2").join(df.as("df1"), "col1.field1") }.getMessage assert(e.contains("USING column `col1.field1` can not be resolved with the left join side, " + "the left output is: [col1]")) + + // Nested fields can work well in join conditions. + checkAnswer( + df.as("df2").join(df.as("df1"), $"df1.col1.field1" === $"df2.col1.field1", "inner") + .select($"df1.col1.field1"), + Row(1) :: Row(2) :: Nil) } test("SPARK-17123: Performing set operations that combine non-scala native types") { From 4d43bca86c08ac5ee1ba5960ee448db93445d8a9 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 1 Dec 2016 22:06:32 -0800 Subject: [PATCH 3/3] address comments --- .../sql/catalyst/analysis/Analyzer.scala | 8 +++---- .../analysis/ResolveNaturalJoinSuite.scala | 16 +++++++++++--- .../org/apache/spark/sql/DataFrameSuite.scala | 22 ------------------- 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 931e6599f8ca..8faf0eda548e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1974,14 +1974,14 @@ class Analyzer( condition: Option[Expression]) = { val leftKeys = joinNames.map { keyName => left.output.find(attr => resolver(attr.name, keyName)).getOrElse { - throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " + - s"left join side, the left output is: [${left.output.map(_.name).mkString(", ")}]") + throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the left " + + s"side of the join. The left-side columns: [${left.output.map(_.name).mkString(", ")}]") } } val rightKeys = joinNames.map { keyName => right.output.find(attr => resolver(attr.name, keyName)).getOrElse { - throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " + - s"right join side, the right output is: [${right.output.map(_.name).mkString(", ")}]") + throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the right " + + s"side of the join. The right-side columns: [${right.output.map(_.name).mkString(", ")}]") } } val joinPairs = leftKeys.zip(rightKeys) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala index 1421d36fdb2a..e449b9669cc7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala @@ -28,6 +28,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { lazy val a = 'a.string lazy val b = 'b.string lazy val c = 'c.string + lazy val d = 'd.struct('f1.int, 'f2.long) lazy val aNotNull = a.notNull lazy val bNotNull = b.notNull lazy val cNotNull = c.notNull @@ -35,6 +36,8 @@ class ResolveNaturalJoinSuite extends AnalysisTest { lazy val r2 = LocalRelation(c, a) lazy val r3 = LocalRelation(aNotNull, bNotNull) lazy val r4 = LocalRelation(cNotNull, bNotNull) + lazy val r5 = LocalRelation(d) + lazy val r6 = LocalRelation(d) test("natural/using inner join") { val naturalPlan = r1.join(r2, NaturalJoin(Inner), None) @@ -108,10 +111,10 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("using unresolved attribute") { assertAnalysisError( r1.join(r2, UsingJoin(Inner, Seq("d"))), - "USING column `d` can not be resolved with the left join side" :: Nil) + "USING column `d` cannot be resolved on the left side of the join" :: Nil) assertAnalysisError( r1.join(r2, UsingJoin(Inner, Seq("b"))), - "USING column `b` can not be resolved with the right join side" :: Nil) + "USING column `b` cannot be resolved on the right side of the join" :: Nil) } test("using join with a case sensitive analyzer") { @@ -122,7 +125,14 @@ class ResolveNaturalJoinSuite extends AnalysisTest { assertAnalysisError( r1.join(r2, UsingJoin(Inner, Seq("A"))), - "USING column `A` can not be resolved with the left join side" :: Nil) + "USING column `A` cannot be resolved on the left side of the join" :: Nil) + } + + test("using join on nested fields") { + assertAnalysisError( + r5.join(r6, UsingJoin(Inner, Seq("d.f1"))), + "USING column `d.f1` cannot be resolved on the left side of the join. " + + "The left-side columns: [d]" :: Nil) } test("using join with a case insensitive analyzer") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3e5385872d52..312cd17c26d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1711,28 +1711,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(joinedDf.join(df3, "a"), Row(3, 0, 4, 1)) } - test("SPARK-18674: improve the error message of using join") { - val schema = StructType( - StructField("col1", StructType( - StructField("field1", IntegerType) :: - StructField("field2", StringType) :: Nil)) :: Nil) - val rowRDD = sparkContext.makeRDD(Seq(Row(Row(1, "col1")), Row(Row(2, "col2")))) - val df = spark.createDataFrame(rowRDD, schema) - - // Using join does not work for the nested fields - val e = intercept[AnalysisException] { - df.as("df2").join(df.as("df1"), "col1.field1") - }.getMessage - assert(e.contains("USING column `col1.field1` can not be resolved with the left join side, " + - "the left output is: [col1]")) - - // Nested fields can work well in join conditions. - checkAnswer( - df.as("df2").join(df.as("df1"), $"df1.col1.field1" === $"df2.col1.field1", "inner") - .select($"df1.col1.field1"), - Row(1) :: Row(2) :: Nil) - } - test("SPARK-17123: Performing set operations that combine non-scala native types") { val dates = Seq( (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),