Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,39 +131,56 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
input: Seq[Attribute],
resolver: Resolver): Option[NamedExpression] = {

val parts = name.split("\\.")
val parts = name.split("\\.").toList

// Reference to hive SemanticAnalyzer
// we should filter some input Attribute if 'name' contains table alias.
// For example:
// CREATE TABLE t1(x INT);
// CREATE TABLE t2(a STRUCT<x: INT>, k INT);
// SELECT a.x FROM t1 a JOIN t2 b;
val inputMatchTableAlias = input.filter(_.qualifiers.exists(resolver(_, parts.head)))

val (finalParts, finalInput) =
if(parts.size == 1 || inputMatchTableAlias.isEmpty) {
// should be column reference
(parts, input)
} else {
// 'name' contains table alias
(parts.drop(1), inputMatchTableAlias)
}

// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
// name. Return these matches along with any remaining parts, which represent dotted access to
// struct fields.
val options = input.flatMap { option =>
val options = finalInput.flatMap { option =>
// If the first part of the desired name matches a qualifier for this possible match, drop it.
val remainingParts =
if (option.qualifiers.find(resolver(_, parts.head)).nonEmpty && parts.size > 1) {
parts.drop(1)
} else {
parts
if (resolver(option.name, finalParts.head)) {
val optionalNestedReferences = finalParts.tail
(option.dataType, optionalNestedReferences) match {
// No nesting
case (_, Nil) => (option.withName(finalParts.head), Nil) :: Nil
// Points to nested field(s) of a structure
case (_: StructType, nestedReferences) =>
resolveNesting(nestedReferences, option, resolver) match {
case Some(expression) => (expression, nestedReferences.last) :: Nil
case invalidReference => Nil
}
// Invalid
case _ => Nil
}

if (resolver(option.name, remainingParts.head)) {
// Preserve the case of the user's attribute reference.
(option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil
} else {
Nil
}
}

// for sql like : select a from (select a, a from b)
// we should filter the duplicated attributes.
options.distinct match {
// One match, no nested fields, use it.
case Seq((a, Nil)) => Some(a)
case Seq((a: Attribute, Nil)) => Some(a)

// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) =>
case Seq((nestedExpression: Expression, last:String)) =>
val aliased =
Alias(
resolveNesting(nestedFields, a, resolver),
nestedFields.last)() // Preserve the case of the user's field access.
Alias(nestedExpression, last)() // Preserve the case of the user's field access.
Some(aliased)

// No matches.
Expand All @@ -185,22 +202,26 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
private def resolveNesting(
nestedFields: List[String],
expression: Expression,
resolver: Resolver): Expression = {
resolver: Resolver): Option[Expression] = {

(nestedFields, expression.dataType) match {
case (Nil, _) => expression
case (Nil, _) => Some(expression)
case (requestedField :: rest, StructType(fields)) =>
val actualField = fields.filter(f => resolver(f.name, requestedField))
actualField match {
case Seq() =>
sys.error(
logTrace(
s"No such struct field $requestedField in ${fields.map(_.name).mkString(", ")}")
None
case Seq(singleMatch) =>
resolveNesting(rest, GetField(expression, singleMatch.name), resolver)
case multipleMatches =>
sys.error(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}")
logTrace(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}")
None
}
case (_, dt) => sys.error(s"Can't access nested field in type $dt")
case (_, dt) =>
logTrace(s"Can't access nested field in type $dt")
None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.types

import java.sql.{Date, Timestamp}

import scala.math.Numeric.{BigDecimalAsIfIntegral, DoubleAsIfIntegral, FloatAsIfIntegral}
import scala.math.Numeric.{FloatAsIfIntegral, BigDecimalAsIfIntegral, DoubleAsIfIntegral}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
import scala.util.parsing.combinator.RegexParsers
Expand Down Expand Up @@ -459,6 +459,8 @@ case class StructType(fields: Seq[StructField]) extends DataType {
override private[sql] def jsonValue =
("type" -> typeName) ~
("fields" -> fields.map(_.jsonValue))

def simpleString: String = "struct"
}

object MapType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

case class Nested(a: Int, B: Int)
case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])

case class A1(x: Int)
case class A2(a: A3, k:Int)
case class A3(x: String)
case class A4(x: String)

/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
*/
Expand Down Expand Up @@ -79,6 +86,23 @@ class HiveResolutionSuite extends HiveComparisonTest {
assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
}

createQueryTest("test ambiguousReferences resolved as hive",
"""
|CREATE TABLE t1 (x int);
|CREATE TABLE t2 (a STRUCT<x:string>, x int);
|INSERT OVERWRITE TABLE t1 SELECT 1 FROM src LIMIT 1;
|INSERT OVERWRITE TABLE t2 SELECT named_struct("x","str"),1 FROM src LIMIT 1;
|SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.x;
""".stripMargin)

test("test ambiguousReferences exception thrown") {
sparkContext.parallelize(A3("a") :: Nil).registerTempTable("t3")
sparkContext.parallelize(A4("b") :: Nil).registerTempTable("t4")
intercept[TreeNodeException[Project]] {
sql("SELECT x FROM t3 a JOIN t4 b").collect()
}
}

/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.
Expand Down