From 2f5a66fc700695d1a5507b96788a899646f8409a Mon Sep 17 00:00:00 2001 From: tianyi Date: Fri, 26 Sep 2014 14:43:58 +0800 Subject: [PATCH 01/13] SPARK-3688 fix logical bug in LogicalPlan and add a test case for this bug --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 15 ++++++++++++--- .../spark/sql/hive/execution/SQLQuerySuite.scala | 6 ++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index ed578e081be73..cb5f2be05db57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -146,10 +146,19 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { parts } - if (resolver(option.name, remainingParts.head)) { - // Preserve the case of the user's attribute reference. - (option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil + if(resolver(option.name, remainingParts.head)) { + if(remainingParts.length == 1) { + //for simple data type + (option.withName(remainingParts.head), Nil) :: Nil + } else if (option.dataType.isInstanceOf[StructType]) { + //for complex data type + (option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil + } else { + //for simple data type but multiple parts remaining + Nil + } } else { + //did not match column name Nil } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 4f96a327ee2c7..5906f742d0f63 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -116,6 +116,12 @@ class SQLQuerySuite extends QueryTest { 1) } + test("test tianyi") { + checkAnswer( + sql("SELECT key.value, count(1) FROM src key join src b GROUP BY key.value"), + sql("SELECT a.value, count(1) FROM src a join src b GROUP BY a.value").collect().toSeq) + } + test("test CTAS") { checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row]) checkAnswer( From f10fe7fd4f00e835af75c4bb10bd502286676eef Mon Sep 17 00:00:00 2001 From: tianyi Date: Fri, 26 Sep 2014 14:49:02 +0800 Subject: [PATCH 02/13] SPARK-3688 fix some code style --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index cb5f2be05db57..0bd7ffe492b49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -146,8 +146,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { parts } - if(resolver(option.name, remainingParts.head)) { - if(remainingParts.length == 1) { + if (resolver(option.name, remainingParts.head)) { + if (remainingParts.length == 1) { //for simple data type (option.withName(remainingParts.head), Nil) :: Nil } else if (option.dataType.isInstanceOf[StructType]) { From 8834920360c3fac2bef92d51e1c425b4903b9f95 Mon Sep 17 00:00:00 2001 From: tianyi Date: Fri, 26 Sep 2014 17:41:54 +0800 Subject: [PATCH 03/13] SPARK-3688 fix test name and code style --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 6 +++--- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0bd7ffe492b49..c9b22ceb761f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -148,13 +148,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { if (resolver(option.name, remainingParts.head)) { if (remainingParts.length == 1) { - //for simple data type + // for simple data type (option.withName(remainingParts.head), Nil) :: Nil } else if (option.dataType.isInstanceOf[StructType]) { - //for complex data type + // for complex data type (option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil } else { - //for simple data type but multiple parts remaining + // for simple data type but multiple parts remaining Nil } } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5906f742d0f63..e2cd818ab6ead 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -116,7 +116,7 @@ class SQLQuerySuite extends QueryTest { 1) } - test("test tianyi") { + test("test particular table alias") { checkAnswer( sql("SELECT key.value, count(1) FROM src key join src b GROUP BY key.value"), sql("SELECT a.value, count(1) FROM src a join src b GROUP BY a.value").collect().toSeq) From 266d116aa97db645d32b429c9f92454563928e49 Mon Sep 17 00:00:00 2001 From: tianyi Date: Fri, 26 Sep 2014 17:49:32 +0800 Subject: [PATCH 04/13] SPARK-3688 fix code style --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index c9b22ceb761f0..2f3b3ab4e4136 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -158,7 +158,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { Nil } } else { - //did not match column name + // did not match column name Nil } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e2cd818ab6ead..b0776bc2f75a9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -118,8 +118,8 @@ class SQLQuerySuite extends QueryTest { test("test particular table alias") { checkAnswer( - sql("SELECT key.value, count(1) FROM src key join src b GROUP BY key.value"), - sql("SELECT a.value, count(1) FROM src a join src b GROUP BY a.value").collect().toSeq) + sql("SELECT key.value, COUNT(1) FROM src key JOIN src b GROUP BY key.value"), + sql("SELECT a.value, COUNT(1) FROM src a JOIN src b GROUP BY a.value").collect().toSeq) } test("test CTAS") { From b2c0c95c1d1c28dd56a9dd13e87b99db501cab41 Mon Sep 17 00:00:00 2001 From: tianyi Date: Sun, 28 Sep 2014 00:11:08 +0800 Subject: [PATCH 05/13] SPARK-3688 add a function in StructType to check valid reference --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 3 ++- .../spark/sql/catalyst/types/dataTypes.scala | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 2f3b3ab4e4136..063bb65b8b629 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -150,7 +150,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { if (remainingParts.length == 1) { // for simple data type (option.withName(remainingParts.head), Nil) :: Nil - } else if (option.dataType.isInstanceOf[StructType]) { + } else if (option.dataType.isInstanceOf[StructType] && + option.dataType.asInstanceOf[StructType].isValidField(remainingParts.drop(1).mkString("."), resolver)) { // for complex data type (option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index b9cf37d53ffd2..60775c77a918b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.types import java.sql.{Date, Timestamp} -import scala.math.Numeric.{BigDecimalAsIfIntegral, DoubleAsIfIntegral, FloatAsIfIntegral} +import org.apache.spark.sql.catalyst.analysis._ + +import scala.math.Numeric.{FloatAsIfIntegral, BigDecimalAsIfIntegral, DoubleAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} import scala.util.parsing.combinator.RegexParsers @@ -416,6 +418,7 @@ case class StructType(fields: Seq[StructField]) extends DataType { lazy val fieldNames: Seq[String] = fields.map(_.name) private lazy val fieldNamesSet: Set[String] = fieldNames.toSet private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap + /** * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not * have a name matching the given name, `null` will be returned. @@ -459,6 +462,16 @@ case class StructType(fields: Seq[StructField]) extends DataType { override private[sql] def jsonValue = ("type" -> typeName) ~ ("fields" -> fields.map(_.jsonValue)) + + private lazy val validReference: Seq[String] = fields.flatMap( x => x.dataType match { + case st:StructType => st.validReference.map(y => x.name + "." + y) ++ Seq(x.name) + case _ => Seq(x.name) + }) + + def isValidField(ref: String, resolver: Resolver): Boolean = + validReference.count(resolver(_, ref)) == 1 + + def simpleString: String = "struct" } object MapType { From 56b48672a67e5149b0a5a60b04aa8a3cc8c51bdd Mon Sep 17 00:00:00 2001 From: tianyi Date: Sun, 28 Sep 2014 00:11:57 +0800 Subject: [PATCH 06/13] SPARK-3688 add a function in StructType to check valid reference --- .../catalyst/plans/logical/LogicalPlan.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 063bb65b8b629..78f291d0fc7b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -131,7 +131,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { input: Seq[Attribute], resolver: Resolver): Option[NamedExpression] = { - val parts = name.split("\\.") + val parts = name.split("\\.").toList // Collect all attributes that are output by this nodes children where either the first part // matches the name or where the first part matches the scope and the second part matches the @@ -147,33 +147,33 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } if (resolver(option.name, remainingParts.head)) { - if (remainingParts.length == 1) { - // for simple data type - (option.withName(remainingParts.head), Nil) :: Nil - } else if (option.dataType.isInstanceOf[StructType] && - option.dataType.asInstanceOf[StructType].isValidField(remainingParts.drop(1).mkString("."), resolver)) { - // for complex data type - (option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil - } else { - // for simple data type but multiple parts remaining - Nil + (option.dataType, remainingParts) match { + // No nesting + case (_, _ :: Nil) => (option, Nil) :: Nil + // Points to nested field(s) of a structure + case (_: StructType, _ :: tail) => { + try { + (resolveNesting(tail, option, resolver), tail.last) :: Nil + } catch { + case _ => Nil + } + } + // Invalid + case _ => Nil } } else { - // did not match column name Nil } } options.distinct match { // One match, no nested fields, use it. - case Seq((a, Nil)) => Some(a) + case Seq((a:Attribute, Nil)) => Some(a) // One match, but we also need to extract the requested nested field. - case Seq((a, nestedFields)) => + case Seq((nestedExpression, last:String)) => val aliased = - Alias( - resolveNesting(nestedFields, a, resolver), - nestedFields.last)() // Preserve the case of the user's field access. + Alias(nestedExpression, last)() // Preserve the case of the user's field access. Some(aliased) // No matches. From 5d63c0aa3999fc6be2f430daccd34dfe8cb01bdb Mon Sep 17 00:00:00 2001 From: tianyi Date: Sun, 28 Sep 2014 10:07:46 +0800 Subject: [PATCH 07/13] revert datatye change, use resolveNesting instead. --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../org/apache/spark/sql/catalyst/types/dataTypes.scala | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 78f291d0fc7b4..22ad2e682ffcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -155,7 +155,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { try { (resolveNesting(tail, option, resolver), tail.last) :: Nil } catch { - case _ => Nil + case _: Throwable => Nil } } // Invalid diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 60775c77a918b..982aa95e2fb83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -463,14 +463,6 @@ case class StructType(fields: Seq[StructField]) extends DataType { ("type" -> typeName) ~ ("fields" -> fields.map(_.jsonValue)) - private lazy val validReference: Seq[String] = fields.flatMap( x => x.dataType match { - case st:StructType => st.validReference.map(y => x.name + "." + y) ++ Seq(x.name) - case _ => Seq(x.name) - }) - - def isValidField(ref: String, resolver: Resolver): Boolean = - validReference.count(resolver(_, ref)) == 1 - def simpleString: String = "struct" } From 5fdaff9a00ba74975e8e7cad4d8ad60a14454bc0 Mon Sep 17 00:00:00 2001 From: tianyi Date: Sun, 28 Sep 2014 10:42:45 +0800 Subject: [PATCH 08/13] revert datatye change --- .../scala/org/apache/spark/sql/catalyst/types/dataTypes.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 982aa95e2fb83..85d84e75fe72e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst.types import java.sql.{Date, Timestamp} -import org.apache.spark.sql.catalyst.analysis._ - import scala.math.Numeric.{FloatAsIfIntegral, BigDecimalAsIfIntegral, DoubleAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} @@ -418,7 +416,6 @@ case class StructType(fields: Seq[StructField]) extends DataType { lazy val fieldNames: Seq[String] = fields.map(_.name) private lazy val fieldNamesSet: Set[String] = fieldNames.toSet private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap - /** * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not * have a name matching the given name, `null` will be returned. From d86885caffdd6700b3680fdb3c5387a594d48af3 Mon Sep 17 00:00:00 2001 From: tianyi Date: Sun, 28 Sep 2014 14:01:42 +0800 Subject: [PATCH 09/13] fix case insensitivity issue --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 22ad2e682ffcc..b3867e68ea79b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -149,7 +149,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { if (resolver(option.name, remainingParts.head)) { (option.dataType, remainingParts) match { // No nesting - case (_, _ :: Nil) => (option, Nil) :: Nil + case (_, _ :: Nil) => (option.withName(remainingParts.head), Nil) :: Nil // Points to nested field(s) of a structure case (_: StructType, _ :: tail) => { try { From 84b35ad44c242e80dc2188dd7598cc1c5496b667 Mon Sep 17 00:00:00 2001 From: tianyi Date: Wed, 8 Oct 2014 12:59:16 +0800 Subject: [PATCH 10/13] add codes to implement the compatibility of with hive, and some test cases --- .../catalyst/plans/logical/LogicalPlan.scala | 62 ++++++++++--------- .../sql/hive/execution/SQLQuerySuite.scala | 26 ++++++++ 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index b3867e68ea79b..3b96f9dfe5f87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -132,32 +132,32 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { resolver: Resolver): Option[NamedExpression] = { val parts = name.split("\\.").toList + // Reference to hive SemanticAnalyzer + // we should filter some input Attribute if 'name' contains table alias. + val inputMatchTableAlias = input.filter(_.qualifiers.exists(resolver(_, parts.head))) + + val (finalParts, finalInput) = + if(parts.size == 1 || inputMatchTableAlias.isEmpty) { + // should be column reference + (parts, input) + } else { + // 'name' contains table alias + (parts.drop(1), inputMatchTableAlias) + } - // Collect all attributes that are output by this nodes children where either the first part - // matches the name or where the first part matches the scope and the second part matches the - // name. Return these matches along with any remaining parts, which represent dotted access to - // struct fields. - val options = input.flatMap { option => + val options = finalInput.flatMap { option => // If the first part of the desired name matches a qualifier for this possible match, drop it. - val remainingParts = - if (option.qualifiers.find(resolver(_, parts.head)).nonEmpty && parts.size > 1) { - parts.drop(1) - } else { - parts - } - - if (resolver(option.name, remainingParts.head)) { - (option.dataType, remainingParts) match { + if (resolver(option.name, finalParts.head)) { + val optionalNestedReferences = finalParts.tail + (option.dataType, optionalNestedReferences) match { // No nesting - case (_, _ :: Nil) => (option.withName(remainingParts.head), Nil) :: Nil + case (_, Nil) => (option.withName(finalParts.head), Nil) :: Nil // Points to nested field(s) of a structure - case (_: StructType, _ :: tail) => { - try { - (resolveNesting(tail, option, resolver), tail.last) :: Nil - } catch { - case _: Throwable => Nil + case (_: StructType, nestedReferences) => + resolveNesting(nestedReferences, option, resolver) match { + case Some(expression) => (expression, nestedReferences.last) :: Nil + case invalidReference => Nil } - } // Invalid case _ => Nil } @@ -166,12 +166,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } } + // for sql like : select a from (select a, a from b) + // we should filter the duplicated attributes. options.distinct match { // One match, no nested fields, use it. - case Seq((a:Attribute, Nil)) => Some(a) + case Seq((a: Attribute, Nil)) => Some(a) // One match, but we also need to extract the requested nested field. - case Seq((nestedExpression, last:String)) => + case Seq((nestedExpression: Expression, last:String)) => val aliased = Alias(nestedExpression, last)() // Preserve the case of the user's field access. Some(aliased) @@ -195,22 +197,26 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { private def resolveNesting( nestedFields: List[String], expression: Expression, - resolver: Resolver): Expression = { + resolver: Resolver): Option[Expression] = { (nestedFields, expression.dataType) match { - case (Nil, _) => expression + case (Nil, _) => Some(expression) case (requestedField :: rest, StructType(fields)) => val actualField = fields.filter(f => resolver(f.name, requestedField)) actualField match { case Seq() => - sys.error( + logTrace( s"No such struct field $requestedField in ${fields.map(_.name).mkString(", ")}") + None case Seq(singleMatch) => resolveNesting(rest, GetField(expression, singleMatch.name), resolver) case multipleMatches => - sys.error(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}") + logTrace(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}") + None } - case (_, dt) => sys.error(s"Can't access nested field in type $dt") + case (_, dt) => + logTrace(s"Can't access nested field in type $dt") + None } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b0776bc2f75a9..b1caab04309fc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,12 +20,20 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan} +import org.apache.spark.sql.hive.HiveQl import org.apache.spark.sql.hive.test.TestHive._ case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) case class Nested3(f3: Int) +case class A1(x: Int) +case class A2(a: A3, k:Int) +case class A3(x: String) +case class A4(x: String) + /** * A collection of hive query tests where we generate the answers ourselves instead of depending on * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is @@ -116,6 +124,24 @@ class SQLQuerySuite extends QueryTest { 1) } + test("test ambiguousReferences resolved as hive") { + sparkContext.parallelize(A1(1) :: Nil).registerTempTable("t1") + sparkContext.parallelize(A2(A3("test"), 1) :: Nil).registerTempTable("t2") + checkAnswer( + sql("SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k"), + 1) + } + + test("test ambiguousReferences exception thrown") { + sparkContext.parallelize(A3("a") :: Nil).registerTempTable("t3") + sparkContext.parallelize(A4("b") :: Nil).registerTempTable("t4") + intercept[TreeNodeException[Project]] { + checkAnswer( + sql("SELECT x FROM t3 a JOIN t4 b"), + "a") + } + } + test("test particular table alias") { checkAnswer( sql("SELECT key.value, COUNT(1) FROM src key JOIN src b GROUP BY key.value"), From c92705687561a0fb355c1b292ad79538fa02632b Mon Sep 17 00:00:00 2001 From: tianyi Date: Mon, 13 Oct 2014 08:27:33 +0800 Subject: [PATCH 11/13] move tests to HiveResolutionSuite --- .../catalyst/plans/logical/LogicalPlan.scala | 5 +++ .../hive/execution/HiveResolutionSuite.scala | 24 ++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 32 ------------------- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 3b96f9dfe5f87..641db1163a4d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -132,8 +132,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { resolver: Resolver): Option[NamedExpression] = { val parts = name.split("\\.").toList + // Reference to hive SemanticAnalyzer // we should filter some input Attribute if 'name' contains table alias. + // For example: + // CREATE TABLE t1(x INT); + // CREATE TABLE t2(a STRUCT, k INT); + // SELECT a.x FROM t1 a JOIN t2 b; val inputMatchTableAlias = input.filter(_.qualifiers.exists(resolver(_, parts.head))) val (finalParts, finalInput) = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index ee9d08ff75450..17d704b840653 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -17,12 +17,19 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ case class Nested(a: Int, B: Int) case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested]) +case class A1(x: Int) +case class A2(a: A3, k:Int) +case class A3(x: String) +case class A4(x: String) + /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ @@ -79,6 +86,23 @@ class HiveResolutionSuite extends HiveComparisonTest { assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1) } + createQueryTest("test ambiguousReferences resolved as hive", + """ + |CREATE TABLE t1 (x int); + |CREATE TABLE t2 (a STRUCT, x int); + |INSERT OVERWRITE TABLE t1 SELECT 1 FROM src LIMIT 1; + |INSERT OVERWRITE TABLE t2 SELECT named_struct("x","str"),1 FROM src LIMIT 1; + |SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k; + """.stripMargin) + + test("test ambiguousReferences exception thrown") { + sparkContext.parallelize(A3("a") :: Nil).registerTempTable("t3") + sparkContext.parallelize(A4("b") :: Nil).registerTempTable("t4") + intercept[TreeNodeException[Project]] { + sql("SELECT x FROM t3 a JOIN t4 b").collect() + } + } + /** * Negative examples. Currently only left here for documentation purposes. * TODO(marmbrus): Test that catalyst fails on these queries. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b1caab04309fc..4f96a327ee2c7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,20 +20,12 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan} -import org.apache.spark.sql.hive.HiveQl import org.apache.spark.sql.hive.test.TestHive._ case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) case class Nested3(f3: Int) -case class A1(x: Int) -case class A2(a: A3, k:Int) -case class A3(x: String) -case class A4(x: String) - /** * A collection of hive query tests where we generate the answers ourselves instead of depending on * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is @@ -124,30 +116,6 @@ class SQLQuerySuite extends QueryTest { 1) } - test("test ambiguousReferences resolved as hive") { - sparkContext.parallelize(A1(1) :: Nil).registerTempTable("t1") - sparkContext.parallelize(A2(A3("test"), 1) :: Nil).registerTempTable("t2") - checkAnswer( - sql("SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k"), - 1) - } - - test("test ambiguousReferences exception thrown") { - sparkContext.parallelize(A3("a") :: Nil).registerTempTable("t3") - sparkContext.parallelize(A4("b") :: Nil).registerTempTable("t4") - intercept[TreeNodeException[Project]] { - checkAnswer( - sql("SELECT x FROM t3 a JOIN t4 b"), - "a") - } - } - - test("test particular table alias") { - checkAnswer( - sql("SELECT key.value, COUNT(1) FROM src key JOIN src b GROUP BY key.value"), - sql("SELECT a.value, COUNT(1) FROM src a JOIN src b GROUP BY a.value").collect().toSeq) - } - test("test CTAS") { checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row]) checkAnswer( From 8bfac8ba7f10d3be080e276f89cde997e7d4c3c0 Mon Sep 17 00:00:00 2001 From: tianyi Date: Tue, 14 Oct 2014 16:01:35 +0800 Subject: [PATCH 12/13] fix error in test sql --- .../apache/spark/sql/hive/execution/HiveResolutionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 17d704b840653..fabdc4ab5c993 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -92,7 +92,7 @@ class HiveResolutionSuite extends HiveComparisonTest { |CREATE TABLE t2 (a STRUCT, x int); |INSERT OVERWRITE TABLE t1 SELECT 1 FROM src LIMIT 1; |INSERT OVERWRITE TABLE t2 SELECT named_struct("x","str"),1 FROM src LIMIT 1; - |SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k; + |SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.x; """.stripMargin) test("test ambiguousReferences exception thrown") { From b708fc7636143562b950fda5fda778e1cd447ae1 Mon Sep 17 00:00:00 2001 From: tianyi Date: Thu, 30 Oct 2014 12:01:08 +0800 Subject: [PATCH 13/13] add golden files --- ...eferences resolved as hive-0-f4267f0d858d3a03986e52bf22a56d54 | 0 ...eferences resolved as hive-1-7c9cae90e4fe1ff29926d04e3d01a84a | 0 ...eferences resolved as hive-2-26f54240cf5b909086fc34a34d7fdb56 | 0 ...eferences resolved as hive-3-a68f010702d12801a761fbb7a8cac3b3 | 0 ...eferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 | 1 + 5 files changed, 1 insertion(+) create mode 100644 sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-0-f4267f0d858d3a03986e52bf22a56d54 create mode 100644 sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-1-7c9cae90e4fe1ff29926d04e3d01a84a create mode 100644 sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-2-26f54240cf5b909086fc34a34d7fdb56 create mode 100644 sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-3-a68f010702d12801a761fbb7a8cac3b3 create mode 100644 sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-0-f4267f0d858d3a03986e52bf22a56d54 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-0-f4267f0d858d3a03986e52bf22a56d54 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-1-7c9cae90e4fe1ff29926d04e3d01a84a b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-1-7c9cae90e4fe1ff29926d04e3d01a84a new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-2-26f54240cf5b909086fc34a34d7fdb56 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-2-26f54240cf5b909086fc34a34d7fdb56 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-3-a68f010702d12801a761fbb7a8cac3b3 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-3-a68f010702d12801a761fbb7a8cac3b3 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 new file mode 100644 index 0000000000000..d00491fd7e5bb --- /dev/null +++ b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 @@ -0,0 +1 @@ +1