Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,10 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.map(SubqueryAlias(u.multipartIdentifier, _))
.getOrElse(u)
.map { rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
}.getOrElse(u)

case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
CatalogV2Util.loadTable(catalog, ident)
Expand Down Expand Up @@ -933,7 +935,7 @@ class Analyzer(
v1SessionCatalog.getRelation(v1Table.v1Table)
case table =>
SubqueryAlias(
identifier,
ident.asMultipartIdentifier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add catalog name too? to support cases like select spark_catalog.default.t.i from t.

Copy link
Contributor Author

@imback82 imback82 Feb 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but it would not be consistent with v1 table behavior. I was thinking about adding this support when I update the resolution rule for session catalogs: #27391 (comment). What do you think, should I do it now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok. Let's leave it for now.

DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
}
val key = catalog.name +: ident.namespace :+ ident.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,12 +685,21 @@ class DataSourceV2SQLSuite
sql(s"CREATE TABLE $t (id bigint, point struct<x: bigint, y: bigint>) USING foo")
sql(s"INSERT INTO $t VALUES (1, (10, 20))")

checkAnswer(
sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"),
Row(1, 10))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10))
checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10))
checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10))
def check(tbl: String): Unit = {
checkAnswer(
sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $tbl"),
Row(1, 10))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $tbl"), Row(1, 10))
checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $tbl"), Row(1, 10))
checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $tbl"), Row(1, 10))
}

// Test with qualified table name "testcat.ns1.ns2.tbl".
check(t)

// Test if current catalog and namespace is respected in column resolution.
sql("USE testcat.ns1.ns2")
check("tbl")

val ex = intercept[AnalysisException] {
sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
Expand All @@ -700,19 +709,30 @@ class DataSourceV2SQLSuite
}

test("qualified column names for v1 tables") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)

withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
Seq(true, false).foreach { useV1Table =>
val format = if (useV1Table) "json" else v2Format
if (useV1Table) {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we keep this comment: // unset this config to use the default v2 session catalog.

} else {
spark.conf.set(
V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)
}

// catalog name cannot be used for v1 tables.
val ex = intercept[AnalysisException] {
sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
withTable("t") {
sql(s"CREATE TABLE t USING $format AS SELECT 1 AS i")
checkAnswer(sql("select i from t"), Row(1))
checkAnswer(sql("select t.i from t"), Row(1))
checkAnswer(sql("select default.t.i from t"), Row(1))
checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))

// catalog name cannot be used for tables in the session catalog.
val ex = intercept[AnalysisException] {
sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
}
assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
}
assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
}
}

Expand Down