diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 23e4709bbd882..0aa406884a166 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1374,9 +1374,10 @@ class Analyzer( * Removes [[SubqueryAlias]] operators from the plan. Subqueries are only required to provide * scoping information for attributes and can be removed once analysis is complete. */ + object EliminateSubqueryAliases extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child) => child + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case SubqueryAlias(_, child, _) => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8095083f336e1..7bfe09dc56b3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -238,26 +238,66 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } } + private def doMatch( + nameParts: Seq[String], + input: Attribute, + resolver: Resolver): Option[(Attribute, List[String])] = { + var i = 0 + var j = 0 + val qualifiers = input.qualifiers + while (i < qualifiers.size && j < nameParts.size) { + if (resolver(qualifiers(i), nameParts(j))) { + i += 1 + j += 1 + } else { + return None + } + } + // nameParts.slice(j, nameParts.size) would be empty + // when the name of Attribute is same as the table i.e select tableName from tableName. + if (i == qualifiers.size && j != nameParts.size){ + resolveAsColumn(nameParts.slice(j, nameParts.size), resolver, input) + } else { + None + } + } + + private def resolveAsDbTableColumn( + nameParts: Seq[String], + input: Seq[Attribute], + resolver: Resolver): Seq[(Attribute, List[String])] = { + input.flatMap { option => + doMatch(nameParts, option, resolver) + } + } + /** Performs attribute resolution given a name and a sequence of possible attributes. */ protected def resolve( nameParts: Seq[String], input: Seq[Attribute], resolver: Resolver): Option[NamedExpression] = { + // Try to resolve as `db.table.column` strictly first + var candidates: Seq[(Attribute, List[String])] = + resolveAsDbTableColumn(nameParts, input, resolver) + // A sequence of possible candidate matches. // Each candidate is a tuple. The first element is a resolved attribute, followed by a list // of parts that are to be resolved. // For example, consider an example where "a" is the table name, "b" is the column name, // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", // and the second element will be List("c"). - var candidates: Seq[(Attribute, List[String])] = { - // If the name has 2 or more parts, try to resolve it as `table.column` first. - if (nameParts.length > 1) { - input.flatMap { option => - resolveAsTableColumn(nameParts, resolver, option) + // Resolve as `table.column` + if (candidates.isEmpty) { + candidates = { + // If the name has 2 or more parts, try to identify the related attribute by the qualifiers + if (nameParts.length > 1) { + input.flatMap { option => + resolveAsTableColumn(nameParts, resolver, option) + } + } else { + Seq.empty } - } else { - Seq.empty } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 5d2a65b716b24..c82d3ea4631bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -644,9 +644,16 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } } -case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode { - - override def output: Seq[Attribute] = child.output.map(_.withQualifiers(alias :: Nil)) +case class SubqueryAlias(alias: String, child: LogicalPlan, databaseName: Option[String] = None) + extends UnaryNode { + override def output: Seq[Attribute] = child.output.map( + _.withQualifiers { + if (databaseName.isDefined) { + databaseName.get :: alias :: Nil + } else { + alias :: Nil + } + }) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3788736fd13c6..96233eeea309d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -420,7 +420,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte if (table.properties.get("spark.sql.sources.provider").isDefined) { val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val tableWithQualifiers = SubqueryAlias(qualifiedTableName.name, dataSourceTable) + val tableWithQualifiers = SubqueryAlias(qualifiedTableName.name, dataSourceTable, + Some(qualifiedTableName.database)) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) @@ -897,7 +898,7 @@ private[hive] case class MetastoreRelation( HiveMetastoreTypes.toDataType(f.dataType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true - )(qualifiers = Seq(alias.getOrElse(tableName))) + )(qualifiers = if (alias.isEmpty) Seq(databaseName, tableName) else Seq(alias.get)) } /** PartitionKey attributes */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 5182af9d20e58..1afe3eb1115e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -136,7 +136,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi s"${quoteIdentifier(database)}.${quoteIdentifier(table)}" // Parentheses is not used for persisted data source relations // e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1 - case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) => + case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation, _) => build(toSQL(p.child), "AS", p.alias) case _ => build("(" + toSQL(p.child) + ")", "AS", p.alias) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6624494e08ea8..b96d4becced3b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1748,4 +1748,57 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", BigDecimal("3.14"), "hello")) } + + test("test attribute with full qualifiers: db.table.field") { + sql("create database db1") + sql("use db1") + sqlContext.read.json( + sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).write.saveAsTable("db1.tmp") + sqlContext.sql("create table test1 as select * from db1.tmp") + sql("create database db2") + sql("use db2") + sqlContext.read.json( + sparkContext.makeRDD("""{"a": {"b": 2}}""" :: Nil)).write.saveAsTable("db2.tmp") + sqlContext.sql("create table test1 as select * from db2.tmp") + + // Validate table created by dataframe API + checkAnswer(sql("SELECT db1.tmp.a.b, db2.tmp.a.b from db1.tmp, db2.tmp"), Row(1, 2) :: Nil) + checkAnswer(sql("SELECT tmp.a.b from db1.tmp"), Row(1) :: Nil) + checkAnswer(sql("SELECT a.b from db1.tmp"), Row(1) :: Nil) + checkAnswer(sql("SELECT a.b from tmp"), Row(2) :: Nil) + + // Validate hive table + checkAnswer(sql("SELECT a.b from test1"), Row(2) :: Nil) + checkAnswer(sql("SELECT test1.a.b from db1.test1"), Row(1) :: Nil) + checkAnswer(sql("SELECT db1.test1.a.b from db1.test1"), Row(1) :: Nil) + checkAnswer(sql( + """SELECT db1.test1.a.b, db2.test1.a.b from db1.test1 + |left outer join db2.test1 on (db1.test1.a = db2.test1.a)""".stripMargin), + Row(1, null) :: Nil) + // Test alias + checkAnswer(sql( + """SELECT t1.a.b, t2.a.b from db1.test1 as t1 + |left outer join db2.test1 as t2 on (t1.a = t2.a)""".stripMargin), + Row(1, null) :: Nil) + + // Special cases + sql("create database a") + sql("use a") + sqlContext.read.json( + sparkContext.makeRDD("""{"b": {"c": 1}}""" :: Nil)).write.saveAsTable("a.a") + sqlContext.read.json( + sparkContext.makeRDD("""{"c": 2}""" :: Nil)).write.saveAsTable("a.b") + checkAnswer(sql("select a.b.c from a, b"), Row(2) :: Nil) + checkAnswer(sql("select a.b from a, b"), Row(Row(1)) :: Nil) + + sql("drop table db1.test1") + sql("drop table db1.tmp") + sql("drop table db2.test1") + sql("drop table db2.tmp") + sql("drop database db1") + sql("drop database db2") + sql("drop table a.a") + sql("drop table a.b") + sql("drop database a") + } }