Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1374,9 +1374,10 @@ class Analyzer(
* Removes [[SubqueryAlias]] operators from the plan. Subqueries are only required to provide
* scoping information for attributes and can be removed once analysis is complete.
*/

object EliminateSubqueryAliases extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case SubqueryAlias(_, child) => child
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case SubqueryAlias(_, child, _) => child
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,26 +238,66 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}
}

private def doMatch(
nameParts: Seq[String],
input: Attribute,
resolver: Resolver): Option[(Attribute, List[String])] = {
var i = 0
var j = 0
val qualifiers = input.qualifiers
while (i < qualifiers.size && j < nameParts.size) {
if (resolver(qualifiers(i), nameParts(j))) {
i += 1
j += 1
} else {
return None
}
}
// nameParts.slice(j, nameParts.size) would be empty
// when the name of Attribute is same as the table i.e select tableName from tableName.
if (i == qualifiers.size && j != nameParts.size){
resolveAsColumn(nameParts.slice(j, nameParts.size), resolver, input)
} else {
None
}
}

private def resolveAsDbTableColumn(
nameParts: Seq[String],
input: Seq[Attribute],
resolver: Resolver): Seq[(Attribute, List[String])] = {
input.flatMap { option =>
doMatch(nameParts, option, resolver)
}
}

/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(
nameParts: Seq[String],
input: Seq[Attribute],
resolver: Resolver): Option[NamedExpression] = {

// Try to resolve as `db.table.column` strictly first
var candidates: Seq[(Attribute, List[String])] =
resolveAsDbTableColumn(nameParts, input, resolver)

// A sequence of possible candidate matches.
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
// of parts that are to be resolved.
// For example, consider an example where "a" is the table name, "b" is the column name,
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
// and the second element will be List("c").
var candidates: Seq[(Attribute, List[String])] = {
// If the name has 2 or more parts, try to resolve it as `table.column` first.
if (nameParts.length > 1) {
input.flatMap { option =>
resolveAsTableColumn(nameParts, resolver, option)
// Resolve as `table.column`
if (candidates.isEmpty) {
candidates = {
// If the name has 2 or more parts, try to identify the related attribute by the qualifiers
if (nameParts.length > 1) {
input.flatMap { option =>
resolveAsTableColumn(nameParts, resolver, option)
}
} else {
Seq.empty
}
} else {
Seq.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,16 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
}

case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output.map(_.withQualifiers(alias :: Nil))
case class SubqueryAlias(alias: String, child: LogicalPlan, databaseName: Option[String] = None)
extends UnaryNode {
override def output: Seq[Attribute] = child.output.map(
_.withQualifiers {
if (databaseName.isDefined) {
databaseName.get :: alias :: Nil
} else {
alias :: Nil
}
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte

if (table.properties.get("spark.sql.sources.provider").isDefined) {
val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
val tableWithQualifiers = SubqueryAlias(qualifiedTableName.name, dataSourceTable)
val tableWithQualifiers = SubqueryAlias(qualifiedTableName.name, dataSourceTable,
Some(qualifiedTableName.database))
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
Expand Down Expand Up @@ -897,7 +898,7 @@ private[hive] case class MetastoreRelation(
HiveMetastoreTypes.toDataType(f.dataType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifiers = Seq(alias.getOrElse(tableName)))
)(qualifiers = if (alias.isEmpty) Seq(databaseName, tableName) else Seq(alias.get))
}

/** PartitionKey attributes */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
// Parentheses is not used for persisted data source relations
// e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1
case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) =>
case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation, _) =>
build(toSQL(p.child), "AS", p.alias)
case _ =>
build("(" + toSQL(p.child) + ")", "AS", p.alias)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,4 +1748,57 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test
""".stripMargin), Row("value1", "12", BigDecimal("3.14"), "hello"))
}

test("test attribute with full qualifiers: db.table.field") {
sql("create database db1")
sql("use db1")
sqlContext.read.json(
sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).write.saveAsTable("db1.tmp")
sqlContext.sql("create table test1 as select * from db1.tmp")
sql("create database db2")
sql("use db2")
sqlContext.read.json(
sparkContext.makeRDD("""{"a": {"b": 2}}""" :: Nil)).write.saveAsTable("db2.tmp")
sqlContext.sql("create table test1 as select * from db2.tmp")

// Validate table created by dataframe API
checkAnswer(sql("SELECT db1.tmp.a.b, db2.tmp.a.b from db1.tmp, db2.tmp"), Row(1, 2) :: Nil)
checkAnswer(sql("SELECT tmp.a.b from db1.tmp"), Row(1) :: Nil)
checkAnswer(sql("SELECT a.b from db1.tmp"), Row(1) :: Nil)
checkAnswer(sql("SELECT a.b from tmp"), Row(2) :: Nil)

// Validate hive table
checkAnswer(sql("SELECT a.b from test1"), Row(2) :: Nil)
checkAnswer(sql("SELECT test1.a.b from db1.test1"), Row(1) :: Nil)
checkAnswer(sql("SELECT db1.test1.a.b from db1.test1"), Row(1) :: Nil)
checkAnswer(sql(
"""SELECT db1.test1.a.b, db2.test1.a.b from db1.test1
|left outer join db2.test1 on (db1.test1.a = db2.test1.a)""".stripMargin),
Row(1, null) :: Nil)
// Test alias
checkAnswer(sql(
"""SELECT t1.a.b, t2.a.b from db1.test1 as t1
|left outer join db2.test1 as t2 on (t1.a = t2.a)""".stripMargin),
Row(1, null) :: Nil)

// Special cases
sql("create database a")
sql("use a")
sqlContext.read.json(
sparkContext.makeRDD("""{"b": {"c": 1}}""" :: Nil)).write.saveAsTable("a.a")
sqlContext.read.json(
sparkContext.makeRDD("""{"c": 2}""" :: Nil)).write.saveAsTable("a.b")
checkAnswer(sql("select a.b.c from a, b"), Row(2) :: Nil)
checkAnswer(sql("select a.b from a, b"), Row(Row(1)) :: Nil)

sql("drop table db1.test1")
sql("drop table db1.tmp")
sql("drop table db2.test1")
sql("drop table db2.tmp")
sql("drop database db1")
sql("drop database db2")
sql("drop table a.a")
sql("drop table a.b")
sql("drop database a")
}
}