diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index ed578e081be73..641db1163a4d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -131,39 +131,56 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { input: Seq[Attribute], resolver: Resolver): Option[NamedExpression] = { - val parts = name.split("\\.") + val parts = name.split("\\.").toList + + // Reference to hive SemanticAnalyzer + // we should filter some input Attribute if 'name' contains table alias. + // For example: + // CREATE TABLE t1(x INT); + // CREATE TABLE t2(a STRUCT, k INT); + // SELECT a.x FROM t1 a JOIN t2 b; + val inputMatchTableAlias = input.filter(_.qualifiers.exists(resolver(_, parts.head))) + + val (finalParts, finalInput) = + if(parts.size == 1 || inputMatchTableAlias.isEmpty) { + // should be column reference + (parts, input) + } else { + // 'name' contains table alias + (parts.drop(1), inputMatchTableAlias) + } - // Collect all attributes that are output by this nodes children where either the first part - // matches the name or where the first part matches the scope and the second part matches the - // name. Return these matches along with any remaining parts, which represent dotted access to - // struct fields. - val options = input.flatMap { option => + val options = finalInput.flatMap { option => // If the first part of the desired name matches a qualifier for this possible match, drop it. - val remainingParts = - if (option.qualifiers.find(resolver(_, parts.head)).nonEmpty && parts.size > 1) { - parts.drop(1) - } else { - parts + if (resolver(option.name, finalParts.head)) { + val optionalNestedReferences = finalParts.tail + (option.dataType, optionalNestedReferences) match { + // No nesting + case (_, Nil) => (option.withName(finalParts.head), Nil) :: Nil + // Points to nested field(s) of a structure + case (_: StructType, nestedReferences) => + resolveNesting(nestedReferences, option, resolver) match { + case Some(expression) => (expression, nestedReferences.last) :: Nil + case invalidReference => Nil + } + // Invalid + case _ => Nil } - - if (resolver(option.name, remainingParts.head)) { - // Preserve the case of the user's attribute reference. - (option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil } else { Nil } } + // for sql like : select a from (select a, a from b) + // we should filter the duplicated attributes. options.distinct match { // One match, no nested fields, use it. - case Seq((a, Nil)) => Some(a) + case Seq((a: Attribute, Nil)) => Some(a) // One match, but we also need to extract the requested nested field. - case Seq((a, nestedFields)) => + case Seq((nestedExpression: Expression, last:String)) => val aliased = - Alias( - resolveNesting(nestedFields, a, resolver), - nestedFields.last)() // Preserve the case of the user's field access. + Alias(nestedExpression, last)() // Preserve the case of the user's field access. Some(aliased) // No matches. @@ -185,22 +202,26 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { private def resolveNesting( nestedFields: List[String], expression: Expression, - resolver: Resolver): Expression = { + resolver: Resolver): Option[Expression] = { (nestedFields, expression.dataType) match { - case (Nil, _) => expression + case (Nil, _) => Some(expression) case (requestedField :: rest, StructType(fields)) => val actualField = fields.filter(f => resolver(f.name, requestedField)) actualField match { case Seq() => - sys.error( + logTrace( s"No such struct field $requestedField in ${fields.map(_.name).mkString(", ")}") + None case Seq(singleMatch) => resolveNesting(rest, GetField(expression, singleMatch.name), resolver) case multipleMatches => - sys.error(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}") + logTrace(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}") + None } - case (_, dt) => sys.error(s"Can't access nested field in type $dt") + case (_, dt) => + logTrace(s"Can't access nested field in type $dt") + None } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index b9cf37d53ffd2..85d84e75fe72e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.types import java.sql.{Date, Timestamp} -import scala.math.Numeric.{BigDecimalAsIfIntegral, DoubleAsIfIntegral, FloatAsIfIntegral} +import scala.math.Numeric.{FloatAsIfIntegral, BigDecimalAsIfIntegral, DoubleAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} import scala.util.parsing.combinator.RegexParsers @@ -459,6 +459,8 @@ case class StructType(fields: Seq[StructField]) extends DataType { override private[sql] def jsonValue = ("type" -> typeName) ~ ("fields" -> fields.map(_.jsonValue)) + + def simpleString: String = "struct" } object MapType { diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-0-f4267f0d858d3a03986e52bf22a56d54 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-0-f4267f0d858d3a03986e52bf22a56d54 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-1-7c9cae90e4fe1ff29926d04e3d01a84a b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-1-7c9cae90e4fe1ff29926d04e3d01a84a new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-2-26f54240cf5b909086fc34a34d7fdb56 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-2-26f54240cf5b909086fc34a34d7fdb56 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-3-a68f010702d12801a761fbb7a8cac3b3 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-3-a68f010702d12801a761fbb7a8cac3b3 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 new file mode 100644 index 0000000000000..d00491fd7e5bb --- /dev/null +++ b/sql/hive/src/test/resources/golden/test ambiguousReferences resolved as hive-4-21f26bbf4e87cafc6cc4ab3abaf7cef9 @@ -0,0 +1 @@ +1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index ee9d08ff75450..fabdc4ab5c993 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -17,12 +17,19 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ case class Nested(a: Int, B: Int) case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested]) +case class A1(x: Int) +case class A2(a: A3, k:Int) +case class A3(x: String) +case class A4(x: String) + /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ @@ -79,6 +86,23 @@ class HiveResolutionSuite extends HiveComparisonTest { assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1) } + createQueryTest("test ambiguousReferences resolved as hive", + """ + |CREATE TABLE t1 (x int); + |CREATE TABLE t2 (a STRUCT, x int); + |INSERT OVERWRITE TABLE t1 SELECT 1 FROM src LIMIT 1; + |INSERT OVERWRITE TABLE t2 SELECT named_struct("x","str"),1 FROM src LIMIT 1; + |SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.x; + """.stripMargin) + + test("test ambiguousReferences exception thrown") { + sparkContext.parallelize(A3("a") :: Nil).registerTempTable("t3") + sparkContext.parallelize(A4("b") :: Nil).registerTempTable("t4") + intercept[TreeNodeException[Project]] { + sql("SELECT x FROM t3 a JOIN t4 b").collect() + } + } + /** * Negative examples. Currently only left here for documentation purposes. * TODO(marmbrus): Test that catalyst fails on these queries.