From 1231ebb129f0930813190c058d6e9fd7e6efd8b7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 3 Jul 2019 21:08:52 +0800 Subject: [PATCH 1/4] fix the behavior of table name resolution with multi-catalog --- .../spark/sql/catalog/v2/LookupCatalog.scala | 40 +-- .../sql/catalyst/analysis/Analyzer.scala | 48 +-- .../sql/catalyst/catalog/CatalogManager.scala | 49 +++ .../sql/catalyst/catalog/SessionCatalog.scala | 92 ++++-- .../catalog/SessionCatalogSuite.scala | 10 +- .../catalog/v2/LookupCatalogSuite.scala | 17 +- .../org/apache/spark/sql/SparkSession.scala | 6 - .../spark/sql/execution/command/tables.scala | 7 +- .../spark/sql/execution/command/views.scala | 23 +- .../datasources/DataSourceResolution.scala | 86 +++-- .../internal/BaseSessionStateBuilder.scala | 5 +- .../command/PlanResolutionSuite.scala | 186 ++++++----- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 300 ++++++++++++------ .../sql/hive/HiveSessionStateBuilder.scala | 5 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 15 files changed, 517 insertions(+), 359 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 5464a7496d23..d0c91f8f3d7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalog.v2 import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogManager /** * A trait to encapsulate catalog lookup function and helpful extractors. @@ -26,24 +27,29 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - protected def lookupCatalog(name: String): CatalogPlugin - - type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) + val catalogManager: CatalogManager /** * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match { - case Seq(name) => - Some((None, Identifier.of(Array.empty, name))) - case Seq(catalogName, tail @ _*) => + def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = { + assert(parts.nonEmpty) + if (parts.length == 1) { + catalogManager.getDefaultCatalog().map { catalog => + (catalog, Identifier.of(Array.empty, parts.last)) + } + } else { try { - Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last))) + val catalog = catalogManager.getCatalog(parts.head) + Some((catalog, Identifier.of(parts.tail.init.toArray, parts.last))) } catch { case _: CatalogNotFoundException => - Some((None, Identifier.of(parts.init.toArray, parts.last))) + catalogManager.getDefaultCatalog().map { catalog => + (catalog, Identifier.of(parts.init.toArray, parts.last)) + } } + } } } @@ -54,17 +60,11 @@ trait LookupCatalog { */ object AsTableIdentifier { def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { - case CatalogObjectIdentifier(None, ident) => - ident.namespace match { - case Array() => - Some(TableIdentifier(ident.name)) - case Array(database) => - Some(TableIdentifier(ident.name, Some(database))) - case _ => - None - } - case _ => - None + case Seq(tblName) => Some(TableIdentifier(tblName)) + case Seq(dbName, tblName) => Some(TableIdentifier(tblName, Some(dbName))) + case CatalogObjectIdentifier(_, _) => + throw new IllegalStateException(parts.mkString(".") + " is not a TableIdentifier.") + case _ => None } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5d37e909f80a..9476d845d4b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -37,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -98,15 +96,12 @@ class Analyzer( catalog: SessionCatalog, conf: SQLConf, maxIterations: Int) - extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { + extends RuleExecutor[LogicalPlan] with CheckAnalysis { def this(catalog: SessionCatalog, conf: SQLConf) = { this(catalog, conf, conf.optimizerMaxIterations) } - override protected def lookupCatalog(name: String): CatalogPlugin = - throw new CatalogNotFoundException("No catalog lookup function") - def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { val analyzed = executeAndTrack(plan, tracker) @@ -163,7 +158,6 @@ class Analyzer( new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: - ResolveTables :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: @@ -658,20 +652,6 @@ class Analyzer( } } - /** - * Resolve table relations with concrete relations from v2 catalog. - * - * [[ResolveRelations]] still resolves v1 tables. - */ - object ResolveTables extends Rule[LogicalPlan] { - import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ - - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => - loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) - } - } - /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ @@ -704,7 +684,7 @@ class Analyzer( // Note this is compatible with the views defined by older versions of Spark(before 2.2), which // have empty defaultDatabase and all the relations in viewText have database part defined. def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match { - case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) => + case u @ UnresolvedRelation(ident) => val defaultDatabase = AnalysisContext.get.defaultDatabase val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase) if (foundRelation != u) { @@ -735,7 +715,7 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _) + case i @ InsertIntoTable(u @ UnresolvedRelation(ident), _, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match { case v: View => @@ -752,29 +732,21 @@ class Analyzer( // and the default database is only used to look up a view); // 3. Use the currentDb of the SessionCatalog. private def lookupTableFromCatalog( - tableIdentifier: TableIdentifier, + ident: Seq[String], u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { - val tableIdentWithDb = tableIdentifier.copy( - database = tableIdentifier.database.orElse(defaultDatabase)) + val identWithDb = if (ident.length == 1) { + defaultDatabase.toSeq ++ ident + } else { + ident + } try { - catalog.lookupRelation(tableIdentWithDb) + catalog.lookupRelation(identWithDb) } catch { case _: NoSuchTableException | _: NoSuchDatabaseException => u } } - - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved in the rule `ResolveDataSource`. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { - table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && - (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) - } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala new file mode 100644 index 000000000000..354abf51189a --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import scala.collection.mutable + +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs} +import org.apache.spark.sql.internal.SQLConf + +/** + * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow + * the caller to look up a catalog by name. + * + * TODO: it keeps an [[ExternalCatalog]] because we haven't migrated it to the new + * catalog API. + */ +class CatalogManager(conf: SQLConf, val externalCatalog: ExternalCatalog) { + + /** + * Tracks all the registered catalogs. + */ + private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] + + /** + * Looks up a catalog by name. + */ + def getCatalog(name: String): CatalogPlugin = synchronized { + catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) + } + + def getDefaultCatalog(): Option[CatalogPlugin] = { + conf.defaultV2Catalog.map(getCatalog) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 74559f5d8879..93d4416a23a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.v2.{Identifier, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder @@ -38,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Im import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType @@ -61,7 +63,7 @@ class SessionCatalog( conf: SQLConf, hadoopConf: Configuration, parser: ParserInterface, - functionResourceLoader: FunctionResourceLoader) extends Logging { + functionResourceLoader: FunctionResourceLoader) extends Logging with LookupCatalog { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -90,6 +92,7 @@ class SessionCatalog( lazy val externalCatalog = externalCatalogBuilder() lazy val globalTempViewManager = globalTempViewManagerBuilder() + override lazy val catalogManager = new CatalogManager(conf, externalCatalog) /** List of temporary views, mapping from table name to their logical plan. */ @GuardedBy("this") @@ -710,12 +713,15 @@ class SessionCatalog( /** * Return a [[LogicalPlan]] that represents the given table or view. * - * If a database is specified in `name`, this will return the table/view from that database. - * If no database is specified, this will first attempt to return a temporary view with - * the same name, then, if that does not exist, return the table/view from the current database. + * In general, the rule is: + * 1. If the name matches a temp view or global temp view, return that view. + * 2. If the first part of the name matches a registered catalog, look up the table from that + * catalog. + * 3. If the default catalog config is set, look up the table from that default catalog. + * 4. Otherwise, look up the table from the `ExternalCatalog`. * - * Note that, the global temp view database is also valid here, this will return the global temp - * view matching the given name. + * Note that, the name resolution is case sensitive for the new catalog, and case insensitive for + * temp views and the `ExternalCatalog`. * * If the relation is a view, we generate a [[View]] operator from the view description, and * wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view. @@ -723,32 +729,62 @@ class SessionCatalog( * * @param name The name of the table/view that we look up. */ - def lookupRelation(name: TableIdentifier): LogicalPlan = { + def lookupRelation(name: Seq[String]): LogicalPlan = { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.CatalogHelper + assert(name.nonEmpty) + + def relationForExternalCatalog(tblName: String, dbName: String, table: CatalogTable) = { + if (table.tableType == CatalogTableType.VIEW) { + val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) + logDebug(s"'$viewText' will be used for the view($table).") + // The relation is a view, so we wrap the relation by: + // 1. Add a [[View]] operator over the relation to keep track of the view desc; + // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. + val child = View( + desc = table, + output = table.schema.toAttributes, + child = parser.parsePlan(viewText)) + SubqueryAlias(tblName, dbName, child) + } else { + SubqueryAlias(tblName, dbName, UnresolvedCatalogRelation(table)) + } + } + + val tblName = formatTableName(name.last) synchronized { - val db = formatDatabaseName(name.database.getOrElse(currentDb)) - val table = formatTableName(name.table) - if (db == globalTempViewManager.database) { - globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(table, db, viewDef) - }.getOrElse(throw new NoSuchTableException(db, table)) - } else if (name.database.isDefined || !tempViews.contains(table)) { - val metadata = externalCatalog.getTable(db, table) - if (metadata.tableType == CatalogTableType.VIEW) { - val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) - logDebug(s"'$viewText' will be used for the view($table).") - // The relation is a view, so we wrap the relation by: - // 1. Add a [[View]] operator over the relation to keep track of the view desc; - // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. - val child = View( - desc = metadata, - output = metadata.schema.toAttributes, - child = parser.parsePlan(viewText)) - SubqueryAlias(table, db, child) + if (name.length == 1) { + if (tempViews.contains(tblName)) { + SubqueryAlias(tblName, tempViews(tblName)) + } else { + catalogManager.getDefaultCatalog().map { catalog => + val table = catalog.asTableCatalog.loadTable(Identifier.of(Array.empty, name.head)) + DataSourceV2Relation.create(table) + }.getOrElse { + val dbName = formatDatabaseName(currentDb) + val table = externalCatalog.getTable(dbName, tblName) + relationForExternalCatalog(tblName, dbName, table) + } + } + } else if (name.length == 2) { + val dbName = formatDatabaseName(name.head) + if (dbName == globalTempViewManager.database) { + globalTempViewManager.get(tblName).map { viewDef => + SubqueryAlias(tblName, dbName, viewDef) + }.getOrElse(throw new NoSuchTableException(dbName, tblName)) } else { - SubqueryAlias(table, db, UnresolvedCatalogRelation(metadata)) + name match { + case CatalogObjectIdentifier(catalog, ident) => + val table = catalog.asTableCatalog.loadTable(ident) + DataSourceV2Relation.create(table) + case _ => + val table = externalCatalog.getTable(dbName, tblName) + relationForExternalCatalog(tblName, dbName, table) + } } } else { - SubqueryAlias(table, tempViews(table)) + val table = catalogManager.getCatalog(name.head).asTableCatalog + .loadTable(Identifier.of(name.tail.init.toArray, name.last)) + DataSourceV2Relation.create(table) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index bce85534ce7e..40a42bfe8aec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -606,14 +606,14 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database - assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head + assert(catalog.lookupRelation(Seq("db2", "tbl1")).children.head .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name - assert(catalog.lookupRelation(TableIdentifier("tbl1")) + assert(catalog.lookupRelation(Seq("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head + assert(catalog.lookupRelation(Seq("tbl1")).children.head .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) } } @@ -626,11 +626,11 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(metadata.viewText.isDefined) val view = View(desc = metadata, output = metadata.schema.toAttributes, child = CatalystSqlParser.parsePlan(metadata.viewText.get)) - comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))), + comparePlans(catalog.lookupRelation(Seq("db3", "view1")), SubqueryAlias("view1", "db3", view)) // Look up a view using current database of the session catalog. catalog.setCurrentDatabase("db3") - comparePlans(catalog.lookupRelation(TableIdentifier("view1")), + comparePlans(catalog.lookupRelation(Seq("view1")), SubqueryAlias("view1", "db3", view)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala index 783751ff7986..0fad25314b47 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala @@ -16,12 +16,16 @@ */ package org.apache.spark.sql.catalyst.catalog.v2 +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, when} +import org.mockito.invocation.InvocationOnMock import org.scalatest.Inside import org.scalatest.Matchers._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogManager import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -33,10 +37,17 @@ private case class TestCatalogPlugin(override val name: String) extends CatalogP class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { import CatalystSqlParser._ - private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap + private val catalogs = Seq("prod", "test").map(x => x -> TestCatalogPlugin(x)).toMap - override def lookupCatalog(name: String): CatalogPlugin = - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + override val catalogManager: CatalogManager = { + val manager = mock(classOf[CatalogManager]) + when(manager.getCatalog(any())).thenAnswer((invocation: InvocationOnMock) => { + val name = invocation.getArgument[String](0) + catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + }) + when(manager.getDefaultCatalog()).thenReturn(None) + manager + } test("catalog object identifier") { Seq( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 90d1b9205787..e0d0062e976c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -607,12 +607,6 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) - @transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]() - - private[sql] def catalog(name: String): CatalogPlugin = synchronized { - catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) - } - /** * Returns the specified table/view as a `DataFrame`. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 03aca89bc642..5ef42e9163d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.execution.command -import java.io.File import java.net.{URI, URISyntaxException} -import java.nio.file.FileSystems import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -29,7 +27,7 @@ import org.apache.hadoop.fs.{FileContext, FsConstants, Path} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -547,7 +545,8 @@ case class DescribeTableCommand( throw new AnalysisException( s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") } - describeSchema(catalog.lookupRelation(table).schema, result, header = false) + val tblNameParts = table.database.toSeq :+ table.table + describeSchema(catalog.lookupRelation(tblNameParts).schema, result, header = false) } else { val metadata = catalog.getTableMetadata(table) if (metadata.schema.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 4d3eb11250c3..799cec317a49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -181,8 +181,6 @@ case class CreateViewCommand( * Permanent views are not allowed to reference temp objects, including temp function and views */ private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = { - import sparkSession.sessionState.analyzer.AsTableIdentifier - if (!isTemporary) { // This func traverses the unresolved plan `child`. Below are the reasons: // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding @@ -190,13 +188,21 @@ case class CreateViewCommand( // added/generated from a temporary view. // 2) The temp functions are represented by multiple classes. Most are inaccessible from this // package (e.g., HiveGenericUDF). - child.collect { + child.foreach { // Disallow creating permanent views based on temporary views. - case UnresolvedRelation(AsTableIdentifier(ident)) - if sparkSession.sessionState.catalog.isTemporaryTable(ident) => - // temporary views are only stored in the session catalog - throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temporary view $ident") + case UnresolvedRelation(parts) => + // The `DataSourceResolution` rule guarantees this. + assert(parts.nonEmpty && parts.length <= 2) + val tblIdent = if (parts.length == 1) { + TableIdentifier(parts.head) + } else { + TableIdentifier(parts.last, Some(parts.head)) + } + if (sparkSession.sessionState.catalog.isTemporaryTable(tblIdent)) { + // temporary views are only stored in the session catalog + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary view $tblIdent") + } case other if !other.resolved => other.expressions.flatMap(_.collect { // Disallow creating permanent views based on temporary UDFs. case e: UnresolvedFunction @@ -204,6 +210,7 @@ case class CreateViewCommand( throw new AnalysisException(s"Not allowed to create a permanent view $name by " + s"referencing a temporary function `${e.name}`") }) + case _ => } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 26f7230c8fe8..a6c87ce197da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -22,11 +22,11 @@ import java.util.Locale import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} +import org.apache.spark.sql.catalog.v2.{Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogManager, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType} import org.apache.spark.sql.catalyst.rules.Rule @@ -37,61 +37,55 @@ import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBui case class DataSourceResolution( conf: SQLConf, - findCatalog: String => CatalogPlugin) + catalogManager: CatalogManager) extends Rule[LogicalPlan] with CastSupport with LookupCatalog { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - override protected def lookupCatalog(name: String): CatalogPlugin = findCatalog(name) + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case c: CreateTableStatement => + c.tableName match { + case CatalogObjectIdentifier(catalog, ident) => + convertCreateTable(catalog.asTableCatalog, ident, c) + + // TODO: The query will fail if the provider is v2. We can fix this by introducing v2 + // session catalog, see SPARK-27919. + case AsTableIdentifier(ident) => + val tableDesc = buildCatalogTable(ident, c.tableSchema, c.partitioning, c.bucketSpec, + c.properties, c.provider, c.options, c.location, c.comment, c.ifNotExists) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, None) + + case _ => + throw new AnalysisException( + s"No catalog specified for table ${c.tableName.quoted} and no default catalog is set") + } - def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog) + case c: CreateTableAsSelectStatement => + c.tableName match { + case CatalogObjectIdentifier(catalog, ident) => + convertCTAS(catalog.asTableCatalog, ident, c) + + // TODO: The query will fail if the provider is v2. We can fix this by introducing v2 + // session catalog, see SPARK-27919. + case AsTableIdentifier(ident) => + val tableDesc = buildCatalogTable(ident, new StructType(), c.partitioning, c.bucketSpec, + c.properties, c.provider, c.options, c.location, c.comment, c.ifNotExists) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, Some(c.asSelect)) + + case _ => + throw new AnalysisException( + s"No catalog specified for table ${c.tableName.quoted} and no default catalog is set") + } - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case CreateTableStatement( - AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, - V1WriteProvider(provider), options, location, comment, ifNotExists) => - - val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, None) - - case create: CreateTableStatement => - // the provider was not a v1 source, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog - convertCreateTable(catalog, identifier, create) - - case CreateTableAsSelectStatement( - AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, - V1WriteProvider(provider), options, location, comment, ifNotExists) => - - val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, - properties, provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, Some(query)) - - case create: CreateTableAsSelectStatement => - // the provider was not a v1 source, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog - convertCTAS(catalog, identifier, create) - - case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => + case DropTableStatement(CatalogObjectIdentifier(catalog, ident), ifExists, _) => DropTable(catalog.asTableCatalog, ident, ifExists) case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) => DropTableCommand(tableName, ifExists, isView = false, purge) - case DropViewStatement(CatalogObjectIdentifier(Some(catalog), ident), _) => + case DropViewStatement(CatalogObjectIdentifier(catalog, ident), _) => throw new AnalysisException( s"Can not specify catalog `${catalog.name}` for view $ident " + s"because view support in catalog has not been implemented yet") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 8dc30eaa3a31..9aec504010cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.internal import org.apache.spark.SparkConf import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} -import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -170,7 +169,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, session.catalog(_)) +: + DataSourceResolution(conf, session.sessionState.catalog.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = @@ -186,8 +185,6 @@ abstract class BaseSessionStateBuilder( V2WriteSupportCheck +: V2StreamingScanSupportCheck +: customCheckRules - - override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 727160dafc5d..bb5c682f3d89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -20,11 +20,15 @@ package org.apache.spark.sql.execution.command import java.net.URI import java.util.Locale +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, when} +import org.mockito.invocation.InvocationOnMock + import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog, TestTableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, Identifier, TableCatalog, TestTableCatalog} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogManager, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} @@ -43,17 +47,26 @@ class PlanResolutionSuite extends AnalysisTest { newCatalog } - private val lookupCatalog: String => CatalogPlugin = { - case "testcat" => - testCat - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + private def createCatalogManager(hasDefaultCatalog: Boolean): CatalogManager = { + val manager = mock(classOf[CatalogManager]) + when(manager.getCatalog(any())).thenAnswer((invocation: InvocationOnMock) => { + val name = invocation.getArgument[String](0) + if (name == "testcat") { + testCat + } else { + throw new CatalogNotFoundException(s"No such catalog: $name") + } + }) + if (hasDefaultCatalog) { + when(manager.getDefaultCatalog()).thenReturn(Some(testCat)) + } else { + when(manager.getDefaultCatalog()).thenReturn(None) + } + manager } - def parseAndResolve(query: String): LogicalPlan = { - val newConf = conf.copy() - newConf.setConfString("spark.sql.default.catalog", "testcat") - DataSourceResolution(newConf, lookupCatalog).apply(parsePlan(query)) + def parseAndResolve(query: String, hasDefaultCatalog: Boolean = false): LogicalPlan = { + DataSourceResolution(conf, createCatalogManager(hasDefaultCatalog)).apply(parsePlan(query)) } private def parseResolveCompare(query: String, expected: LogicalPlan): Unit = @@ -84,7 +97,7 @@ class PlanResolutionSuite extends AnalysisTest { case CreateTable(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableCommand].getName} from query," + s"got ${other.getClass.getName}: $query") } } @@ -306,7 +319,7 @@ class PlanResolutionSuite extends AnalysisTest { | id bigint, | description string, | point struct) - |USING parquet + |USING xyz |COMMENT 'table comment' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') |OPTIONS (path 's3://bucket/path/to/data', other 20) @@ -316,63 +329,76 @@ class PlanResolutionSuite extends AnalysisTest { "p1" -> "v1", "p2" -> "v2", "other" -> "20", - "provider" -> "parquet", + "provider" -> "xyz", "location" -> "s3://bucket/path/to/data", "comment" -> "table comment") - parseAndResolve(sql) match { - case create: CreateV2Table => - assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(create.tableSchema == new StructType() + Seq(true, false).foreach { hasDefaultCatalog => + // Since the identifier specifies a registered catalog, the default catalog doesn't matter. + parseAndResolve(sql, hasDefaultCatalog) match { + case create: CreateV2Table => + assert(create.catalog.name == "testcat") + assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) - assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) - assert(create.ignoreIfExists) + assert(create.partitioning.isEmpty) + assert(create.properties == expectedProperties) + assert(create.ignoreIfExists) - case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + case other => + fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + s"got ${other.getClass.getName}: $sql") + } } } - test("Test v2 CreateTable with data source v2 provider") { + test("Test v2 CreateTable with default catalog") { val sql = - s""" - |CREATE TABLE IF NOT EXISTS mydb.page_view ( - | id bigint, - | description string, - | point struct) - |USING $orc2 - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + """ + |CREATE TABLE mydb.table_name(i int) USING xyz + |OPTIONS (a 1, b 0.1, c TRUE) """.stripMargin val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> orc2, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") + "a" -> "1", + "b" -> "0.1", + "c" -> "true", + "provider" -> "xyz") - parseAndResolve(sql) match { + parseAndResolve(sql, hasDefaultCatalog = true) match { case create: CreateV2Table => assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(create.tableSchema == new StructType() - .add("id", LongType) - .add("description", StringType) - .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) + assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.tableSchema == new StructType().add("i", IntegerType)) assert(create.partitioning.isEmpty) assert(create.properties == expectedProperties) - assert(create.ignoreIfExists) + assert(!create.ignoreIfExists) + } + } - case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("Test v2 CTAS with default catalog") { + val sql = + """ + |CREATE TABLE mydb.table_name USING xyz + |OPTIONS (a 1, b 0.1, c TRUE) + |AS SELECT * FROM src + """.stripMargin + + val expectedProperties = Map( + "a" -> "1", + "b" -> "0.1", + "c" -> "true", + "provider" -> "xyz") + + parseAndResolve(sql, hasDefaultCatalog = true) match { + case create: CreateTableAsSelect => + assert(create.catalog.name == "testcat") + assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.partitioning.isEmpty) + assert(create.properties == expectedProperties) + assert(!create.ignoreIfExists) } } @@ -380,7 +406,7 @@ class PlanResolutionSuite extends AnalysisTest { val sql = s""" |CREATE TABLE IF NOT EXISTS testcat.mydb.table_name - |USING parquet + |USING xyz |COMMENT 'table comment' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') |OPTIONS (path 's3://bucket/path/to/data', other 20) @@ -391,55 +417,25 @@ class PlanResolutionSuite extends AnalysisTest { "p1" -> "v1", "p2" -> "v2", "other" -> "20", - "provider" -> "parquet", + "provider" -> "xyz", "location" -> "s3://bucket/path/to/data", "comment" -> "table comment") - parseAndResolve(sql) match { - case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) - assert(ctas.writeOptions == Map("other" -> "20")) - assert(ctas.partitioning.isEmpty) - assert(ctas.ignoreIfExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - - test("Test v2 CTAS with data source v2 provider") { - val sql = - s""" - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING $orc2 - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> orc2, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - - parseAndResolve(sql) match { - case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(ctas.properties == expectedProperties) - assert(ctas.writeOptions.isEmpty) - assert(ctas.partitioning.isEmpty) - assert(ctas.ignoreIfExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + + Seq(true, false).foreach { hasDefaultCatalog => + // Since the identifier specifies a registered catalog, the default catalog doesn't matter. + parseAndResolve(sql, hasDefaultCatalog) match { + case ctas: CreateTableAsSelect => + assert(ctas.catalog.name == "testcat") + assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(ctas.properties == expectedProperties) + assert(ctas.writeOptions == Map("other" -> "20")) + assert(ctas.partitioning.isEmpty) + assert(ctas.ignoreIfExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + s"got ${other.getClass.getName}: $sql") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 96345e22dbd5..41bd3ecc157f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -21,23 +21,18 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, Identifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { - - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - - private val orc2 = classOf[OrcDataSourceV2].getName - before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) - spark.conf.set("spark.sql.default.catalog", "testcat") val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") @@ -45,46 +40,77 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn df2.createOrReplaceTempView("source2") } + private def getTestCatalog() = { + spark.sessionState.catalog.catalogManager.getCatalog("testcat") + .asInstanceOf[TestInMemoryTableCatalog] + } + after { - spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() + getTestCatalog().clearTables() spark.sql("DROP TABLE source") spark.sql("DROP TABLE source2") } - test("CreateTable: use v2 plan because catalog is set") { - spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") - - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + test("CreateTable: basic") { + def checkTestCatalog(sql: String): Unit = { + spark.sql(sql) + val testCatalog = getTestCatalog() + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) - assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) - } + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) + } - test("CreateTable: use v2 plan because provider is v2") { - spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $orc2") + withClue("table identifier specifies catalog") { + withTable("testcat.table_name") { + checkTestCatalog("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + } + } - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + withClue("table identifier doesn't specify catalog") { + // This would create table in ExternalCatalog. + val e = intercept[Exception] { + spark.sql("CREATE TABLE table_name (id bigint, data string) USING foo") + } + assert(e.getMessage.contains("Failed to find data source: foo")) + + withTable("table_name") { + spark.sql("CREATE TABLE table_name (id bigint, data string) USING json") + intercept[NoSuchTableException] { + getTestCatalog().loadTable(Identifier.of(Array(), "table_name")) + } + val table = spark.sharedState.externalCatalog.getTable("default", "table_name") + assert(table.identifier == TableIdentifier("table_name", Some("default"))) + } + } - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> orc2).asJava) - assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) + withClue("table identifier doesn't specify catalog and has more than 2 name parts") { + // Spark tries to fallback to ExternalCatalog, which can't handle 3-part table name. + val e = intercept[AnalysisException] { + spark.sql("CREATE TABLE ns1.ns2.table_name (id bigint, data string) USING json") + } + assert(e.message.contains("No catalog specified for table ns1.ns2.table_name")) + assert(e.message.contains("no default catalog is set")) + } - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("table_name") { + checkTestCatalog("CREATE TABLE table_name (id bigint, data string) USING foo") + } + } + } } test("CreateTable: fail if table exists") { spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -115,7 +141,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn spark.sql( "CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING foo") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -138,61 +164,75 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("CreateTable: fail analysis when default catalog is needed but missing") { - val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog") - try { - conf.unsetConf("spark.sql.default.catalog") - - val exc = intercept[AnalysisException] { - spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") - } - - assert(exc.getMessage.contains("No catalog specified for table")) - assert(exc.getMessage.contains("table_name")) - assert(exc.getMessage.contains("no default catalog is set")) - - } finally { - conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) + val exc = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE a.b.c USING foo") } - } - test("CreateTableAsSelect: use v2 plan because catalog is set") { - spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + assert(exc.getMessage.contains("No catalog specified for table")) + assert(exc.getMessage.contains("a.b.c")) + assert(exc.getMessage.contains("no default catalog is set")) + } - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + test("CreateTableAsSelect: basic") { + def checkTestCatalog(sql: String): Unit = { + spark.sql(sql) + val testCatalog = getTestCatalog() + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) - assert(table.schema == new StructType() - .add("id", LongType, nullable = false) - .add("data", StringType)) + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType().add("a", StringType, false)) - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) - } + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Row("1")) + } - test("CreateTableAsSelect: use v2 plan because provider is v2") { - spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") + withClue("table identifier specifies catalog") { + withTable("testcat.table_name") { + checkTestCatalog("CREATE TABLE testcat.table_name USING foo AS SELECT '1' AS a") + } + } - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + withClue("table identifier doesn't specify catalog") { + // This would create table in ExternalCatalog. + val e = intercept[Exception] { + spark.sql("CREATE TABLE table_name USING foo AS SELECT '1' AS a") + } + assert(e.getMessage.contains("Failed to find data source: foo")) + + withTable("table_name") { + spark.sql("CREATE TABLE table_name USING json AS SELECT '1' AS a") + intercept[NoSuchTableException] { + getTestCatalog().loadTable(Identifier.of(Array(), "table_name")) + } + val table = spark.sharedState.externalCatalog.getTable("default", "table_name") + assert(table.schema == new StructType().add("a", StringType)) + } + } - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> orc2).asJava) - assert(table.schema == new StructType() - .add("id", LongType, nullable = false) - .add("data", StringType)) + withClue("table identifier doesn't specify catalog and has more than 2 name parts") { + // Spark tries to fallback to ExternalCatalog, which can't handle 3-part table name. + val e = intercept[AnalysisException] { + spark.sql("CREATE TABLE ns1.ns2.table_name USING foo AS SELECT '1' AS a") + } + assert(e.message.contains("No catalog specified for table ns1.ns2.table_name")) + assert(e.message.contains("no default catalog is set")) + } - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("table_name") { + checkTestCatalog("CREATE TABLE table_name USING foo AS SELECT '1' AS a") + } + } + } } test("CreateTableAsSelect: fail if table exists") { spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -230,7 +270,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn spark.sql( "CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -252,30 +292,22 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("CreateTableAsSelect: fail analysis when default catalog is needed but missing") { - val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog") - try { - conf.unsetConf("spark.sql.default.catalog") - - val exc = intercept[AnalysisException] { - spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") - } - - assert(exc.getMessage.contains("No catalog specified for table")) - assert(exc.getMessage.contains("table_name")) - assert(exc.getMessage.contains("no default catalog is set")) - - } finally { - conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) + val exc = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE a.b.c USING foo AS SELECT id, data FROM source") } + + assert(exc.getMessage.contains("No catalog specified for table")) + assert(exc.getMessage.contains("a.b.c")) + assert(exc.getMessage.contains("no default catalog is set")) } test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") - assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === true) + assert(getTestCatalog().tableExists(ident) === true) sql(s"DROP TABLE $tableName") - assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === false) + assert(getTestCatalog().tableExists(ident) === false) } test("DropTable: if exists") { @@ -286,11 +318,85 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("Relation: basic") { - val t1 = "testcat.ns1.ns2.tbl" - withTable(t1) { - sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") - checkAnswer(sql(s"TABLE $t1"), spark.table("source")) - checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source")) + def checkTableScan(tblNameParts: Array[String]): Unit = { + val tblName = tblNameParts.mkString(".") + withTable(tblName) { + sql(s"CREATE TABLE $tblName USING json AS SELECT '1' AS a") + checkAnswer(sql(s"TABLE $tblName"), Row("1")) + checkAnswer(sql(s"SELECT * FROM $tblName"), Row("1")) + } + } + + withClue("table identifier specifies catalog") { + checkTableScan(Array("testcat", "ns1", "ns2", "tbl")) + } + + withClue("table identifier doesn't specify catalog") { + checkTableScan(Array("tbl")) + } + + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + checkTableScan(Array("tbl")) + } + } + } + + test("Relation: table not found") { + withClue("table identifier specifies catalog") { + val e = intercept[AnalysisException] { + spark.sql("SELECT * FROM testcat.tbl") + } + assert(e.message.contains("Table or view not found: testcat.tbl")) + } + + withClue("table identifier doesn't specify catalog") { + val e = intercept[AnalysisException] { + spark.sql("SELECT * FROM tbl") + } + assert(e.message.contains("Table or view not found: tbl")) + } + + withClue("table identifier doesn't specify catalog and has more than 2 name parts") { + val e = intercept[CatalogNotFoundException] { + spark.sql("SELECT * FROM a.b.c") + } + assert(e.getMessage.contains("Catalog 'a' plugin class not found")) + } + + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + val e = intercept[AnalysisException] { + spark.sql("SELECT * FROM tbl") + } + assert(e.message.contains("Table or view not found: tbl")) + } + } + } + + test("Relation: name conflicts with temp view") { + withTempView("v") { + spark.range(10).createOrReplaceTempView("v") + + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("v") { + spark.sql("CREATE TABLE v(i INT) USING foo") + checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF()) + } + } + } + } + + test("Relation: name conflicts with global temp view") { + withTempView("v") { + spark.range(10).createOrReplaceGlobalTempView("v") + + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("global_temp.v") { + spark.sql("CREATE TABLE global_temp.v(i INT) USING foo") + checkAnswer(sql("SELECT * FROM global_temp.v"), spark.range(10).toDF()) + } + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index b04b3f1031d7..ffcc93d82f18 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql._ -import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -74,7 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, session.catalog(_)) +: + DataSourceResolution(conf, session.sessionState.catalog.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = @@ -92,8 +91,6 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session V2WriteSupportCheck +: V2StreamingScanSupportCheck +: customCheckRules - - override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index acfb84ede7ad..35694bd41aa7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -956,7 +956,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { override def run(): Unit = { val tableName = s"SPARK_6618_table_$i" sql(s"CREATE TABLE $tableName (col1 string)") - sessionState.catalog.lookupRelation(TableIdentifier(tableName)) + sessionState.catalog.lookupRelation(Seq(tableName)) table(tableName) tables() sql(s"DROP TABLE $tableName") From d3f23b99d4e0203f4e2fe93ad8703db34768524c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 9 Jul 2019 11:23:23 +0800 Subject: [PATCH 2/4] fix a bug --- .../scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index d0c91f8f3d7d..f58c8c7edaba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -60,10 +60,10 @@ trait LookupCatalog { */ object AsTableIdentifier { def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { - case Seq(tblName) => Some(TableIdentifier(tblName)) - case Seq(dbName, tblName) => Some(TableIdentifier(tblName, Some(dbName))) case CatalogObjectIdentifier(_, _) => throw new IllegalStateException(parts.mkString(".") + " is not a TableIdentifier.") + case Seq(tblName) => Some(TableIdentifier(tblName)) + case Seq(dbName, tblName) => Some(TableIdentifier(tblName, Some(dbName))) case _ => None } } From 6a27efef9e73158f3f4db187fe531e75e4038587 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 9 Jul 2019 14:23:04 +0800 Subject: [PATCH 3/4] revert changes in SessionCatalog --- .../sql/catalyst/analysis/Analyzer.scala | 43 +++++++-- .../sql/catalyst/catalog/CatalogManager.scala | 5 +- .../sql/catalyst/catalog/SessionCatalog.scala | 92 ++++++------------- .../spark/sql/catalyst/identifiers.scala | 9 ++ .../catalog/SessionCatalogSuite.scala | 10 +- .../spark/sql/execution/command/tables.scala | 7 +- .../spark/sql/execution/command/views.scala | 6 +- .../internal/BaseSessionStateBuilder.scala | 4 +- .../spark/sql/internal/SessionState.scala | 2 + .../sql/sources/v2/DataSourceV2SQLSuite.scala | 4 +- .../sql/hive/HiveSessionStateBuilder.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 12 files changed, 93 insertions(+), 93 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9476d845d4b2..b51a589fc7db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -96,12 +98,14 @@ class Analyzer( catalog: SessionCatalog, conf: SQLConf, maxIterations: Int) - extends RuleExecutor[LogicalPlan] with CheckAnalysis { + extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { def this(catalog: SessionCatalog, conf: SQLConf) = { this(catalog, conf, conf.optimizerMaxIterations) } + override lazy val catalogManager: CatalogManager = new CatalogManager(conf) + def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { val analyzed = executeAndTrack(plan, tracker) @@ -732,21 +736,48 @@ class Analyzer( // and the default database is only used to look up a view); // 3. Use the currentDb of the SessionCatalog. private def lookupTableFromCatalog( - ident: Seq[String], + nameParts: Seq[String], u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { - val identWithDb = if (ident.length == 1) { - defaultDatabase.toSeq ++ ident + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.CatalogHelper + + val namePartsWithDb = if (nameParts.length == 1) { + defaultDatabase.toSeq ++ nameParts } else { - ident + nameParts } + try { - catalog.lookupRelation(identWithDb) + namePartsWithDb match { + case AsTempViewIdentifier(ident) => catalog.lookupRelation(ident) + + case CatalogObjectIdentifier(v2Catalog, ident) => + val table = v2Catalog.asTableCatalog.loadTable(ident) + DataSourceV2Relation.create(table) + + case _ => + // The builtin hive catalog doesn't support more than 2 table name parts. Here we assume + // the first name part is a catalog which doesn't exist. + if (namePartsWithDb.length > 2) { + throw new CatalogNotFoundException(s"Catalog '${namePartsWithDb.head}' not found.") + } + catalog.lookupRelation(TableIdentifier(namePartsWithDb)) + } } catch { case _: NoSuchTableException | _: NoSuchDatabaseException => u } } + + object AsTempViewIdentifier { + def unapply(parts: Seq[String]): Option[TableIdentifier] = { + if (parts.nonEmpty && parts.length <= 2) { + Some(TableIdentifier(parts)).filter(catalog.isTemporaryTable) + } else { + None + } + } + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala index 354abf51189a..12c6f5bcd994 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala @@ -25,11 +25,8 @@ import org.apache.spark.sql.internal.SQLConf /** * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. - * - * TODO: it keeps an [[ExternalCatalog]] because we haven't migrated it to the new - * catalog API. */ -class CatalogManager(conf: SQLConf, val externalCatalog: ExternalCatalog) { +class CatalogManager(conf: SQLConf) { /** * Tracks all the registered catalogs. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 93d4416a23a0..74559f5d8879 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{Identifier, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder @@ -39,7 +38,6 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Im import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} import org.apache.spark.sql.catalyst.util.StringUtils -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType @@ -63,7 +61,7 @@ class SessionCatalog( conf: SQLConf, hadoopConf: Configuration, parser: ParserInterface, - functionResourceLoader: FunctionResourceLoader) extends Logging with LookupCatalog { + functionResourceLoader: FunctionResourceLoader) extends Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -92,7 +90,6 @@ class SessionCatalog( lazy val externalCatalog = externalCatalogBuilder() lazy val globalTempViewManager = globalTempViewManagerBuilder() - override lazy val catalogManager = new CatalogManager(conf, externalCatalog) /** List of temporary views, mapping from table name to their logical plan. */ @GuardedBy("this") @@ -713,15 +710,12 @@ class SessionCatalog( /** * Return a [[LogicalPlan]] that represents the given table or view. * - * In general, the rule is: - * 1. If the name matches a temp view or global temp view, return that view. - * 2. If the first part of the name matches a registered catalog, look up the table from that - * catalog. - * 3. If the default catalog config is set, look up the table from that default catalog. - * 4. Otherwise, look up the table from the `ExternalCatalog`. + * If a database is specified in `name`, this will return the table/view from that database. + * If no database is specified, this will first attempt to return a temporary view with + * the same name, then, if that does not exist, return the table/view from the current database. * - * Note that, the name resolution is case sensitive for the new catalog, and case insensitive for - * temp views and the `ExternalCatalog`. + * Note that, the global temp view database is also valid here, this will return the global temp + * view matching the given name. * * If the relation is a view, we generate a [[View]] operator from the view description, and * wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view. @@ -729,62 +723,32 @@ class SessionCatalog( * * @param name The name of the table/view that we look up. */ - def lookupRelation(name: Seq[String]): LogicalPlan = { - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.CatalogHelper - assert(name.nonEmpty) - - def relationForExternalCatalog(tblName: String, dbName: String, table: CatalogTable) = { - if (table.tableType == CatalogTableType.VIEW) { - val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) - logDebug(s"'$viewText' will be used for the view($table).") - // The relation is a view, so we wrap the relation by: - // 1. Add a [[View]] operator over the relation to keep track of the view desc; - // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. - val child = View( - desc = table, - output = table.schema.toAttributes, - child = parser.parsePlan(viewText)) - SubqueryAlias(tblName, dbName, child) - } else { - SubqueryAlias(tblName, dbName, UnresolvedCatalogRelation(table)) - } - } - - val tblName = formatTableName(name.last) + def lookupRelation(name: TableIdentifier): LogicalPlan = { synchronized { - if (name.length == 1) { - if (tempViews.contains(tblName)) { - SubqueryAlias(tblName, tempViews(tblName)) - } else { - catalogManager.getDefaultCatalog().map { catalog => - val table = catalog.asTableCatalog.loadTable(Identifier.of(Array.empty, name.head)) - DataSourceV2Relation.create(table) - }.getOrElse { - val dbName = formatDatabaseName(currentDb) - val table = externalCatalog.getTable(dbName, tblName) - relationForExternalCatalog(tblName, dbName, table) - } - } - } else if (name.length == 2) { - val dbName = formatDatabaseName(name.head) - if (dbName == globalTempViewManager.database) { - globalTempViewManager.get(tblName).map { viewDef => - SubqueryAlias(tblName, dbName, viewDef) - }.getOrElse(throw new NoSuchTableException(dbName, tblName)) + val db = formatDatabaseName(name.database.getOrElse(currentDb)) + val table = formatTableName(name.table) + if (db == globalTempViewManager.database) { + globalTempViewManager.get(table).map { viewDef => + SubqueryAlias(table, db, viewDef) + }.getOrElse(throw new NoSuchTableException(db, table)) + } else if (name.database.isDefined || !tempViews.contains(table)) { + val metadata = externalCatalog.getTable(db, table) + if (metadata.tableType == CatalogTableType.VIEW) { + val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) + logDebug(s"'$viewText' will be used for the view($table).") + // The relation is a view, so we wrap the relation by: + // 1. Add a [[View]] operator over the relation to keep track of the view desc; + // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. + val child = View( + desc = metadata, + output = metadata.schema.toAttributes, + child = parser.parsePlan(viewText)) + SubqueryAlias(table, db, child) } else { - name match { - case CatalogObjectIdentifier(catalog, ident) => - val table = catalog.asTableCatalog.loadTable(ident) - DataSourceV2Relation.create(table) - case _ => - val table = externalCatalog.getTable(dbName, tblName) - relationForExternalCatalog(tblName, dbName, table) - } + SubqueryAlias(table, db, UnresolvedCatalogRelation(metadata)) } } else { - val table = catalogManager.getCatalog(name.head).asTableCatalog - .loadTable(Identifier.of(name.tail.init.toArray, name.last)) - DataSourceV2Relation.create(table) + SubqueryAlias(table, tempViews(table)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index deceec73dda3..9f79bdc7789e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -85,6 +85,15 @@ case class QualifiedTableName(database: String, name: String) { object TableIdentifier { def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) + + def apply(nameParts: Seq[String]): TableIdentifier = { + assert(nameParts.nonEmpty && nameParts.length <= 2) + if (nameParts.length == 1) { + TableIdentifier(nameParts.last) + } else { + TableIdentifier(nameParts.last, Some(nameParts.head)) + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 40a42bfe8aec..bce85534ce7e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -606,14 +606,14 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database - assert(catalog.lookupRelation(Seq("db2", "tbl1")).children.head + assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name - assert(catalog.lookupRelation(Seq("tbl1")) + assert(catalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(catalog.lookupRelation(Seq("tbl1")).children.head + assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) } } @@ -626,11 +626,11 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(metadata.viewText.isDefined) val view = View(desc = metadata, output = metadata.schema.toAttributes, child = CatalystSqlParser.parsePlan(metadata.viewText.get)) - comparePlans(catalog.lookupRelation(Seq("db3", "view1")), + comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))), SubqueryAlias("view1", "db3", view)) // Look up a view using current database of the session catalog. catalog.setCurrentDatabase("db3") - comparePlans(catalog.lookupRelation(Seq("view1")), + comparePlans(catalog.lookupRelation(TableIdentifier("view1")), SubqueryAlias("view1", "db3", view)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5ef42e9163d6..03aca89bc642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.command +import java.io.File import java.net.{URI, URISyntaxException} +import java.nio.file.FileSystems import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -27,7 +29,7 @@ import org.apache.hadoop.fs.{FileContext, FsConstants, Path} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -545,8 +547,7 @@ case class DescribeTableCommand( throw new AnalysisException( s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") } - val tblNameParts = table.database.toSeq :+ table.table - describeSchema(catalog.lookupRelation(tblNameParts).schema, result, header = false) + describeSchema(catalog.lookupRelation(table).schema, result, header = false) } else { val metadata = catalog.getTableMetadata(table) if (metadata.schema.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 799cec317a49..3bd7a3241060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -193,11 +193,7 @@ case class CreateViewCommand( case UnresolvedRelation(parts) => // The `DataSourceResolution` rule guarantees this. assert(parts.nonEmpty && parts.length <= 2) - val tblIdent = if (parts.length == 1) { - TableIdentifier(parts.head) - } else { - TableIdentifier(parts.last, Some(parts.head)) - } + val tblIdent = TableIdentifier(parts) if (sparkSession.sessionState.catalog.isTemporaryTable(tblIdent)) { // temporary views are only stored in the session catalog throw new AnalysisException(s"Not allowed to create a permanent view $name by " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 9aec504010cb..3bad925b9659 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.catalog.{CatalogManager, SessionCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -169,7 +169,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, session.sessionState.catalog.catalogManager) +: + DataSourceResolution(conf, this.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b962ab6feabc..24ac9cf9ca39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -77,6 +77,8 @@ private[sql] class SessionState( // The following fields are lazy to avoid creating the Hive client when creating SessionState. lazy val catalog: SessionCatalog = catalogBuilder() + lazy val catalogManager: CatalogManager = new CatalogManager(conf) + lazy val analyzer: Analyzer = analyzerBuilder() lazy val optimizer: Optimizer = optimizerBuilder() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 41bd3ecc157f..082af271232a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -41,7 +41,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } private def getTestCatalog() = { - spark.sessionState.catalog.catalogManager.getCatalog("testcat") + spark.sessionState.analyzer.catalogManager.getCatalog("testcat") .asInstanceOf[TestInMemoryTableCatalog] } @@ -361,7 +361,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn val e = intercept[CatalogNotFoundException] { spark.sql("SELECT * FROM a.b.c") } - assert(e.getMessage.contains("Catalog 'a' plugin class not found")) + assert(e.getMessage.contains("Catalog 'a' not found")) } withClue("table identifier doesn't specify catalog but default catalog is set") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index ffcc93d82f18..3d3c7ac766f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -73,7 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, session.sessionState.catalog.catalogManager) +: + DataSourceResolution(conf, this.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 35694bd41aa7..acfb84ede7ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -956,7 +956,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { override def run(): Unit = { val tableName = s"SPARK_6618_table_$i" sql(s"CREATE TABLE $tableName (col1 string)") - sessionState.catalog.lookupRelation(Seq(tableName)) + sessionState.catalog.lookupRelation(TableIdentifier(tableName)) table(tableName) tables() sql(s"DROP TABLE $tableName") From 4ed4399ad558249ceb2c56e1d1c88837ae4602df Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 9 Jul 2019 20:27:45 +0800 Subject: [PATCH 4/4] fix test --- .../sql/catalyst/catalog/v2/LookupCatalogSuite.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala index 0fad25314b47..bb2666b1f3e1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala @@ -65,8 +65,9 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { case (sql, expectedCatalog, namespace, name) => inside(parseMultipartIdentifier(sql)) { case CatalogObjectIdentifier(catalog, ident) => - catalog shouldEqual expectedCatalog + Some(catalog) shouldEqual expectedCatalog ident shouldEqual Identifier.of(namespace.toArray, name) + case _ => assert(expectedCatalog.isEmpty) } } } @@ -89,10 +90,11 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { "prod.func", "prod.db.tbl", "ns1.ns2.tbl").foreach { sql => - parseMultipartIdentifier(sql) match { - case AsTableIdentifier(_) => - fail(s"$sql should not be resolved as TableIdentifier") - case _ => + val nameParts = parseMultipartIdentifier(sql) + if (nameParts.head == "prod") { + intercept[IllegalStateException](AsTableIdentifier.unapply(nameParts)) + } else { + assert(AsTableIdentifier.unapply(nameParts).isEmpty) } } }