Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,127 @@

package org.apache.spark.sql.catalog.v2

import scala.util.control.NonFatal

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier

/**
* A trait to encapsulate catalog lookup function and helpful extractors.
*/
@Experimental
trait LookupCatalog {
trait LookupCatalog extends Logging {

import LookupCatalog._

protected def defaultCatalogName: Option[String] = None
protected def lookupCatalog(name: String): CatalogPlugin

type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
/**
* Returns the default catalog. When set, this catalog is used for all identifiers that do not
* set a specific catalog. When this is None, the session catalog is responsible for the
* identifier.
*
* If this is None and a table's provider (source) is a v2 provider, the v2 session catalog will
* be used.
*/
def defaultCatalog: Option[CatalogPlugin] = {
try {
defaultCatalogName.map(lookupCatalog)
} catch {
case NonFatal(e) =>
logError(s"Cannot load default v2 catalog: ${defaultCatalogName.get}", e)
None
}
}

/**
* Extract catalog plugin and identifier from a multi-part identifier.
* This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
* session catalog is responsible for an identifier, but the source requires the v2 catalog API.
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
object CatalogObjectIdentifier {
def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match {
case Seq(name) =>
Some((None, Identifier.of(Array.empty, name)))
def sessionCatalog: Option[CatalogPlugin] = {
try {
Some(lookupCatalog(SESSION_CATALOG_NAME))
} catch {
case NonFatal(e) =>
logError("Cannot load v2 session catalog", e)
None
}
}

/**
* Extract catalog plugin and remaining identifier names.
*
* This does not substitute the default catalog if no catalog is set in the identifier.
*/
private object CatalogAndIdentifier {
def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {
case Seq(_) =>
Some((None, parts))
case Seq(catalogName, tail @ _*) =>
try {
Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last)))
Some((Some(lookupCatalog(catalogName)), tail))
} catch {
case _: CatalogNotFoundException =>
Some((None, Identifier.of(parts.init.toArray, parts.last)))
Some((None, parts))
}
}
}

type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)

/**
* Extract catalog and identifier from a multi-part identifier with the default catalog if needed.
*/
object CatalogObjectIdentifier {
def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match {
case CatalogAndIdentifier(maybeCatalog, nameParts) =>
Some((
maybeCatalog.orElse(defaultCatalog),
Identifier.of(nameParts.init.toArray, nameParts.last)
))
}
}

/**
* Extract legacy table identifier from a multi-part identifier.
*
* For legacy support only. Please use [[CatalogObjectIdentifier]] instead on DSv2 code paths.
*/
object AsTableIdentifier {
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
case CatalogObjectIdentifier(None, ident) =>
ident.namespace match {
case Array() =>
Some(TableIdentifier(ident.name))
case Array(database) =>
Some(TableIdentifier(ident.name, Some(database)))
case CatalogAndIdentifier(None, names) if defaultCatalog.isEmpty =>
names match {
case Seq(name) =>
Some(TableIdentifier(name))
case Seq(database, name) =>
Some(TableIdentifier(name, Some(database)))
case _ =>
None
}
case _ =>
None
}
}

/**
* For temp views, extract a table identifier from a multi-part identifier if it has no catalog.
*/
object AsTemporaryViewIdentifier {
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
case CatalogAndIdentifier(None, Seq(table)) =>
Some(TableIdentifier(table))
case CatalogAndIdentifier(None, Seq(database, table)) =>
Some(TableIdentifier(table, Some(database)))
case _ =>
None
}
}
}

object LookupCatalog {
val SESSION_CATALOG_NAME: String = "session"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we register the v2 session catalog?

BTW this should be an internal thing, I think we just need to ask LookupCatalog implementations to provide the v2 session catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The v2 session catalog is configured by adding a catalog named "session" in SQL config. That needs to be added to SQLConf defaults, but I wasn't sure whether it should happen in this PR or later. I can add it in this PR if you'd prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we allow users to config session catalog? I see V2SessionCatalog as a way to fallback to the builtin hive catalog, before we migrate hive catalog to the v2 catalog API. This is a completely internal thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that if you'd like. This PR uses a different implementation to test SQL behavior because the built-in sources don't behave correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a default config for this that uses the V2SessionCatalog. May as well add it now, but we do need to be able to override it until we have a built-in source that passes all SQL tests.

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}

override protected def defaultCatalogName: Option[String] = conf.defaultV2Catalog

override protected def lookupCatalog(name: String): CatalogPlugin =
throw new CatalogNotFoundException("No catalog lookup function")

Expand Down Expand Up @@ -667,6 +669,10 @@ class Analyzer(
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
if catalog.isTemporaryTable(ident) =>
u // temporary views take precedence over catalog table names

case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
}
Expand Down Expand Up @@ -704,6 +710,10 @@ class Analyzer(
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
// have empty defaultDatabase and all the relations in viewText have database part defined.
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
if catalog.isTemporaryTable(ident) =>
resolveRelation(lookupTableFromCatalog(ident, u, AnalysisContext.get.defaultDatabase))

case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object UpdateAttributeNullability extends Rule[LogicalPlan] {
case p if !p.resolved => p
// Skip leaf node, as it has no child and no need to update nullability.
case p: LeafNode => p
case p: LogicalPlan =>
case p: LogicalPlan if p.childrenResolved =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a concerning change in light of the class comment. If failing to apply this rule can cause wrong results, how sure are we that the rule doesn't need to be applied when children are unresolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a child is not resolved, then the next line will fail because the children do not have expression IDs. If the children are never resolved, then the query will fail the resolution check. If the children are eventually resolved, then this simply runs later, when the next line will not fail.

This was a preexisting bug that was uncovered by this PR and is caused by not throwing an analysis exception immediately in ResolveRelations. There have been several updates like this to fix places that didn't check child resolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed when the first case is case p if !p.resolved => p? Do you mean there is a plan that is resolved but its children are not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, feel free to look into that, but this case was matching with an unresolved child that did not have expression IDs. I thought the best approach was to require resolved children, but if you'd like to suggest a different approach I can update here.

Otherwise, this solves the problem and it is clearly correct to only access expression IDs when children are resolved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what is the problem this resolved. When we reach here, p is already resolved, which usually means p.childrenResolved is always true.

Copy link
Contributor Author

@rdblue rdblue Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, not necessarily. Some plans may not check that children are resolved. CheckAnalysis accounts for this case by using foreachUp:

    plan.foreachUp {
      case o if !o.resolved =>
        failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
      case _ =>
    }

val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this to happen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ case class CreateTableAsSelect(

override def children: Seq[LogicalPlan] = Seq(query)

override lazy val resolved: Boolean = {
override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
// that the columns referenced by the table's partitioning exist in the query schema
val references = partitioning.flatMap(_.references).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,11 @@ object SQLConf {
.stringConf
.createOptional

val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
.doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
.stringConf
.createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")

val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
.doc("When true, the upcast will be loose and allows string to atomic types.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,111 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
}
}
}

test("temporary table identifier") {
Seq(
("tbl", TableIdentifier("tbl")),
("db.tbl", TableIdentifier("tbl", Some("db"))),
("`db.tbl`", TableIdentifier("db.tbl")),
("parquet.`file:/tmp/db.tbl`", TableIdentifier("file:/tmp/db.tbl", Some("parquet"))),
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))).foreach {
case (sqlIdent: String, expectedTableIdent: TableIdentifier) =>
// when there is no catalog and the namespace has one part, the rule should match
inside(parseMultipartIdentifier(sqlIdent)) {
case AsTemporaryViewIdentifier(ident) =>
ident shouldEqual expectedTableIdent
}
}

Seq("prod.func", "prod.db.tbl", "test.db.tbl", "ns1.ns2.tbl", "test.ns1.ns2.ns3.tbl")
.foreach { sqlIdent =>
inside(parseMultipartIdentifier(sqlIdent)) {
case AsTemporaryViewIdentifier(_) =>
fail("AsTemporaryViewIdentifier should not match when " +
"the catalog is set or the namespace has multiple parts")
case _ =>
// expected
}
}
}
}

class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog with Inside {
import CatalystSqlParser._

private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap

override def defaultCatalogName: Option[String] = Some("prod")

override def lookupCatalog(name: String): CatalogPlugin =
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))

test("catalog object identifier") {
Seq(
("tbl", catalogs.get("prod"), Seq.empty, "tbl"),
("db.tbl", catalogs.get("prod"), Seq("db"), "tbl"),
("prod.func", catalogs.get("prod"), Seq.empty, "func"),
("ns1.ns2.tbl", catalogs.get("prod"), Seq("ns1", "ns2"), "tbl"),
("prod.db.tbl", catalogs.get("prod"), Seq("db"), "tbl"),
("test.db.tbl", catalogs.get("test"), Seq("db"), "tbl"),
("test.ns1.ns2.ns3.tbl", catalogs.get("test"), Seq("ns1", "ns2", "ns3"), "tbl"),
("`db.tbl`", catalogs.get("prod"), Seq.empty, "db.tbl"),
("parquet.`file:/tmp/db.tbl`", catalogs.get("prod"), Seq("parquet"), "file:/tmp/db.tbl"),
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", catalogs.get("prod"),
Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json")).foreach {
case (sql, expectedCatalog, namespace, name) =>
inside(parseMultipartIdentifier(sql)) {
case CatalogObjectIdentifier(catalog, ident) =>
catalog shouldEqual expectedCatalog
ident shouldEqual Identifier.of(namespace.toArray, name)
}
}
}

test("table identifier") {
Seq(
"tbl",
"db.tbl",
"`db.tbl`",
"parquet.`file:/tmp/db.tbl`",
"`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
"prod.func",
"prod.db.tbl",
"ns1.ns2.tbl").foreach { sql =>
parseMultipartIdentifier(sql) match {
case AsTableIdentifier(_) =>
fail(s"$sql should not be resolved as TableIdentifier")
case _ =>
}
}
}

test("temporary table identifier") {
Seq(
("tbl", TableIdentifier("tbl")),
("db.tbl", TableIdentifier("tbl", Some("db"))),
("`db.tbl`", TableIdentifier("db.tbl")),
("parquet.`file:/tmp/db.tbl`", TableIdentifier("file:/tmp/db.tbl", Some("parquet"))),
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))).foreach {
case (sqlIdent: String, expectedTableIdent: TableIdentifier) =>
// when there is no catalog and the namespace has one part, the rule should match
inside(parseMultipartIdentifier(sqlIdent)) {
case AsTemporaryViewIdentifier(ident) =>
ident shouldEqual expectedTableIdent
}
}

Seq("prod.func", "prod.db.tbl", "test.db.tbl", "ns1.ns2.tbl", "test.ns1.ns2.ns3.tbl")
.foreach { sqlIdent =>
inside(parseMultipartIdentifier(sqlIdent)) {
case AsTemporaryViewIdentifier(_) =>
fail("AsTemporaryViewIdentifier should not match when " +
"the catalog is set or the namespace has multiple parts")
case _ =>
// expected
}
}
}
}
Loading