diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 5464a7496d23..f58c8c7edaba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalog.v2 import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogManager /** * A trait to encapsulate catalog lookup function and helpful extractors. @@ -26,24 +27,29 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - protected def lookupCatalog(name: String): CatalogPlugin - - type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) + val catalogManager: CatalogManager /** * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match { - case Seq(name) => - Some((None, Identifier.of(Array.empty, name))) - case Seq(catalogName, tail @ _*) => + def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = { + assert(parts.nonEmpty) + if (parts.length == 1) { + catalogManager.getDefaultCatalog().map { catalog => + (catalog, Identifier.of(Array.empty, parts.last)) + } + } else { try { - Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last))) + val catalog = catalogManager.getCatalog(parts.head) + Some((catalog, Identifier.of(parts.tail.init.toArray, parts.last))) } catch { case _: CatalogNotFoundException => - Some((None, Identifier.of(parts.init.toArray, parts.last))) + catalogManager.getDefaultCatalog().map { catalog => + (catalog, Identifier.of(parts.init.toArray, parts.last)) + } } + } } } @@ -54,17 +60,11 @@ trait LookupCatalog { */ object AsTableIdentifier { def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { - case CatalogObjectIdentifier(None, ident) => - ident.namespace match { - case Array() => - Some(TableIdentifier(ident.name)) - case Array(database) => - Some(TableIdentifier(ident.name, Some(database))) - case _ => - None - } - case _ => - None + case CatalogObjectIdentifier(_, _) => + throw new IllegalStateException(parts.mkString(".") + " is not a TableIdentifier.") + case Seq(tblName) => Some(TableIdentifier(tblName)) + case Seq(dbName, tblName) => Some(TableIdentifier(tblName, Some(dbName))) + case _ => None } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5d37e909f80a..b51a589fc7db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -104,8 +104,7 @@ class Analyzer( this(catalog, conf, conf.optimizerMaxIterations) } - override protected def lookupCatalog(name: String): CatalogPlugin = - throw new CatalogNotFoundException("No catalog lookup function") + override lazy val catalogManager: CatalogManager = new CatalogManager(conf) def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { @@ -163,7 +162,6 @@ class Analyzer( new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: - ResolveTables :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: @@ -658,20 +656,6 @@ class Analyzer( } } - /** - * Resolve table relations with concrete relations from v2 catalog. - * - * [[ResolveRelations]] still resolves v1 tables. - */ - object ResolveTables extends Rule[LogicalPlan] { - import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ - - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => - loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) - } - } - /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ @@ -704,7 +688,7 @@ class Analyzer( // Note this is compatible with the views defined by older versions of Spark(before 2.2), which // have empty defaultDatabase and all the relations in viewText have database part defined. def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match { - case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) => + case u @ UnresolvedRelation(ident) => val defaultDatabase = AnalysisContext.get.defaultDatabase val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase) if (foundRelation != u) { @@ -735,7 +719,7 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _) + case i @ InsertIntoTable(u @ UnresolvedRelation(ident), _, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match { case v: View => @@ -752,28 +736,47 @@ class Analyzer( // and the default database is only used to look up a view); // 3. Use the currentDb of the SessionCatalog. private def lookupTableFromCatalog( - tableIdentifier: TableIdentifier, + nameParts: Seq[String], u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { - val tableIdentWithDb = tableIdentifier.copy( - database = tableIdentifier.database.orElse(defaultDatabase)) + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.CatalogHelper + + val namePartsWithDb = if (nameParts.length == 1) { + defaultDatabase.toSeq ++ nameParts + } else { + nameParts + } + try { - catalog.lookupRelation(tableIdentWithDb) + namePartsWithDb match { + case AsTempViewIdentifier(ident) => catalog.lookupRelation(ident) + + case CatalogObjectIdentifier(v2Catalog, ident) => + val table = v2Catalog.asTableCatalog.loadTable(ident) + DataSourceV2Relation.create(table) + + case _ => + // The builtin hive catalog doesn't support more than 2 table name parts. Here we assume + // the first name part is a catalog which doesn't exist. + if (namePartsWithDb.length > 2) { + throw new CatalogNotFoundException(s"Catalog '${namePartsWithDb.head}' not found.") + } + catalog.lookupRelation(TableIdentifier(namePartsWithDb)) + } } catch { case _: NoSuchTableException | _: NoSuchDatabaseException => u } } - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved in the rule `ResolveDataSource`. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { - table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && - (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) + object AsTempViewIdentifier { + def unapply(parts: Seq[String]): Option[TableIdentifier] = { + if (parts.nonEmpty && parts.length <= 2) { + Some(TableIdentifier(parts)).filter(catalog.isTemporaryTable) + } else { + None + } + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala new file mode 100644 index 000000000000..12c6f5bcd994 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/CatalogManager.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import scala.collection.mutable + +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs} +import org.apache.spark.sql.internal.SQLConf + +/** + * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow + * the caller to look up a catalog by name. + */ +class CatalogManager(conf: SQLConf) { + + /** + * Tracks all the registered catalogs. + */ + private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] + + /** + * Looks up a catalog by name. + */ + def getCatalog(name: String): CatalogPlugin = synchronized { + catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) + } + + def getDefaultCatalog(): Option[CatalogPlugin] = { + conf.defaultV2Catalog.map(getCatalog) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index deceec73dda3..9f79bdc7789e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -85,6 +85,15 @@ case class QualifiedTableName(database: String, name: String) { object TableIdentifier { def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) + + def apply(nameParts: Seq[String]): TableIdentifier = { + assert(nameParts.nonEmpty && nameParts.length <= 2) + if (nameParts.length == 1) { + TableIdentifier(nameParts.last) + } else { + TableIdentifier(nameParts.last, Some(nameParts.head)) + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala index 783751ff7986..bb2666b1f3e1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala @@ -16,12 +16,16 @@ */ package org.apache.spark.sql.catalyst.catalog.v2 +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, when} +import org.mockito.invocation.InvocationOnMock import org.scalatest.Inside import org.scalatest.Matchers._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogManager import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -33,10 +37,17 @@ private case class TestCatalogPlugin(override val name: String) extends CatalogP class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { import CatalystSqlParser._ - private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap + private val catalogs = Seq("prod", "test").map(x => x -> TestCatalogPlugin(x)).toMap - override def lookupCatalog(name: String): CatalogPlugin = - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + override val catalogManager: CatalogManager = { + val manager = mock(classOf[CatalogManager]) + when(manager.getCatalog(any())).thenAnswer((invocation: InvocationOnMock) => { + val name = invocation.getArgument[String](0) + catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + }) + when(manager.getDefaultCatalog()).thenReturn(None) + manager + } test("catalog object identifier") { Seq( @@ -54,8 +65,9 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { case (sql, expectedCatalog, namespace, name) => inside(parseMultipartIdentifier(sql)) { case CatalogObjectIdentifier(catalog, ident) => - catalog shouldEqual expectedCatalog + Some(catalog) shouldEqual expectedCatalog ident shouldEqual Identifier.of(namespace.toArray, name) + case _ => assert(expectedCatalog.isEmpty) } } } @@ -78,10 +90,11 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { "prod.func", "prod.db.tbl", "ns1.ns2.tbl").foreach { sql => - parseMultipartIdentifier(sql) match { - case AsTableIdentifier(_) => - fail(s"$sql should not be resolved as TableIdentifier") - case _ => + val nameParts = parseMultipartIdentifier(sql) + if (nameParts.head == "prod") { + intercept[IllegalStateException](AsTableIdentifier.unapply(nameParts)) + } else { + assert(AsTableIdentifier.unapply(nameParts).isEmpty) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 90d1b9205787..e0d0062e976c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -607,12 +607,6 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) - @transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]() - - private[sql] def catalog(name: String): CatalogPlugin = synchronized { - catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) - } - /** * Returns the specified table/view as a `DataFrame`. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 4d3eb11250c3..3bd7a3241060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -181,8 +181,6 @@ case class CreateViewCommand( * Permanent views are not allowed to reference temp objects, including temp function and views */ private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = { - import sparkSession.sessionState.analyzer.AsTableIdentifier - if (!isTemporary) { // This func traverses the unresolved plan `child`. Below are the reasons: // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding @@ -190,13 +188,17 @@ case class CreateViewCommand( // added/generated from a temporary view. // 2) The temp functions are represented by multiple classes. Most are inaccessible from this // package (e.g., HiveGenericUDF). - child.collect { + child.foreach { // Disallow creating permanent views based on temporary views. - case UnresolvedRelation(AsTableIdentifier(ident)) - if sparkSession.sessionState.catalog.isTemporaryTable(ident) => - // temporary views are only stored in the session catalog - throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temporary view $ident") + case UnresolvedRelation(parts) => + // The `DataSourceResolution` rule guarantees this. + assert(parts.nonEmpty && parts.length <= 2) + val tblIdent = TableIdentifier(parts) + if (sparkSession.sessionState.catalog.isTemporaryTable(tblIdent)) { + // temporary views are only stored in the session catalog + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary view $tblIdent") + } case other if !other.resolved => other.expressions.flatMap(_.collect { // Disallow creating permanent views based on temporary UDFs. case e: UnresolvedFunction @@ -204,6 +206,7 @@ case class CreateViewCommand( throw new AnalysisException(s"Not allowed to create a permanent view $name by " + s"referencing a temporary function `${e.name}`") }) + case _ => } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 26f7230c8fe8..a6c87ce197da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -22,11 +22,11 @@ import java.util.Locale import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} +import org.apache.spark.sql.catalog.v2.{Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogManager, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType} import org.apache.spark.sql.catalyst.rules.Rule @@ -37,61 +37,55 @@ import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBui case class DataSourceResolution( conf: SQLConf, - findCatalog: String => CatalogPlugin) + catalogManager: CatalogManager) extends Rule[LogicalPlan] with CastSupport with LookupCatalog { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - override protected def lookupCatalog(name: String): CatalogPlugin = findCatalog(name) + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case c: CreateTableStatement => + c.tableName match { + case CatalogObjectIdentifier(catalog, ident) => + convertCreateTable(catalog.asTableCatalog, ident, c) + + // TODO: The query will fail if the provider is v2. We can fix this by introducing v2 + // session catalog, see SPARK-27919. + case AsTableIdentifier(ident) => + val tableDesc = buildCatalogTable(ident, c.tableSchema, c.partitioning, c.bucketSpec, + c.properties, c.provider, c.options, c.location, c.comment, c.ifNotExists) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, None) + + case _ => + throw new AnalysisException( + s"No catalog specified for table ${c.tableName.quoted} and no default catalog is set") + } - def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog) + case c: CreateTableAsSelectStatement => + c.tableName match { + case CatalogObjectIdentifier(catalog, ident) => + convertCTAS(catalog.asTableCatalog, ident, c) + + // TODO: The query will fail if the provider is v2. We can fix this by introducing v2 + // session catalog, see SPARK-27919. + case AsTableIdentifier(ident) => + val tableDesc = buildCatalogTable(ident, new StructType(), c.partitioning, c.bucketSpec, + c.properties, c.provider, c.options, c.location, c.comment, c.ifNotExists) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, Some(c.asSelect)) + + case _ => + throw new AnalysisException( + s"No catalog specified for table ${c.tableName.quoted} and no default catalog is set") + } - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case CreateTableStatement( - AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, - V1WriteProvider(provider), options, location, comment, ifNotExists) => - - val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, None) - - case create: CreateTableStatement => - // the provider was not a v1 source, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog - convertCreateTable(catalog, identifier, create) - - case CreateTableAsSelectStatement( - AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, - V1WriteProvider(provider), options, location, comment, ifNotExists) => - - val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, - properties, provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, Some(query)) - - case create: CreateTableAsSelectStatement => - // the provider was not a v1 source, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog - convertCTAS(catalog, identifier, create) - - case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => + case DropTableStatement(CatalogObjectIdentifier(catalog, ident), ifExists, _) => DropTable(catalog.asTableCatalog, ident, ifExists) case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) => DropTableCommand(tableName, ifExists, isView = false, purge) - case DropViewStatement(CatalogObjectIdentifier(Some(catalog), ident), _) => + case DropViewStatement(CatalogObjectIdentifier(catalog, ident), _) => throw new AnalysisException( s"Can not specify catalog `${catalog.name}` for view $ident " + s"because view support in catalog has not been implemented yet") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 8dc30eaa3a31..3bad925b9659 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.internal import org.apache.spark.SparkConf import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} -import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.catalog.{CatalogManager, SessionCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -170,7 +169,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, session.catalog(_)) +: + DataSourceResolution(conf, this.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = @@ -186,8 +185,6 @@ abstract class BaseSessionStateBuilder( V2WriteSupportCheck +: V2StreamingScanSupportCheck +: customCheckRules - - override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b962ab6feabc..24ac9cf9ca39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -77,6 +77,8 @@ private[sql] class SessionState( // The following fields are lazy to avoid creating the Hive client when creating SessionState. lazy val catalog: SessionCatalog = catalogBuilder() + lazy val catalogManager: CatalogManager = new CatalogManager(conf) + lazy val analyzer: Analyzer = analyzerBuilder() lazy val optimizer: Optimizer = optimizerBuilder() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 727160dafc5d..bb5c682f3d89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -20,11 +20,15 @@ package org.apache.spark.sql.execution.command import java.net.URI import java.util.Locale +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, when} +import org.mockito.invocation.InvocationOnMock + import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog, TestTableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, Identifier, TableCatalog, TestTableCatalog} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogManager, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} @@ -43,17 +47,26 @@ class PlanResolutionSuite extends AnalysisTest { newCatalog } - private val lookupCatalog: String => CatalogPlugin = { - case "testcat" => - testCat - case name => - throw new CatalogNotFoundException(s"No such catalog: $name") + private def createCatalogManager(hasDefaultCatalog: Boolean): CatalogManager = { + val manager = mock(classOf[CatalogManager]) + when(manager.getCatalog(any())).thenAnswer((invocation: InvocationOnMock) => { + val name = invocation.getArgument[String](0) + if (name == "testcat") { + testCat + } else { + throw new CatalogNotFoundException(s"No such catalog: $name") + } + }) + if (hasDefaultCatalog) { + when(manager.getDefaultCatalog()).thenReturn(Some(testCat)) + } else { + when(manager.getDefaultCatalog()).thenReturn(None) + } + manager } - def parseAndResolve(query: String): LogicalPlan = { - val newConf = conf.copy() - newConf.setConfString("spark.sql.default.catalog", "testcat") - DataSourceResolution(newConf, lookupCatalog).apply(parsePlan(query)) + def parseAndResolve(query: String, hasDefaultCatalog: Boolean = false): LogicalPlan = { + DataSourceResolution(conf, createCatalogManager(hasDefaultCatalog)).apply(parsePlan(query)) } private def parseResolveCompare(query: String, expected: LogicalPlan): Unit = @@ -84,7 +97,7 @@ class PlanResolutionSuite extends AnalysisTest { case CreateTable(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableCommand].getName} from query," + s"got ${other.getClass.getName}: $query") } } @@ -306,7 +319,7 @@ class PlanResolutionSuite extends AnalysisTest { | id bigint, | description string, | point struct) - |USING parquet + |USING xyz |COMMENT 'table comment' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') |OPTIONS (path 's3://bucket/path/to/data', other 20) @@ -316,63 +329,76 @@ class PlanResolutionSuite extends AnalysisTest { "p1" -> "v1", "p2" -> "v2", "other" -> "20", - "provider" -> "parquet", + "provider" -> "xyz", "location" -> "s3://bucket/path/to/data", "comment" -> "table comment") - parseAndResolve(sql) match { - case create: CreateV2Table => - assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(create.tableSchema == new StructType() + Seq(true, false).foreach { hasDefaultCatalog => + // Since the identifier specifies a registered catalog, the default catalog doesn't matter. + parseAndResolve(sql, hasDefaultCatalog) match { + case create: CreateV2Table => + assert(create.catalog.name == "testcat") + assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) - assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) - assert(create.ignoreIfExists) + assert(create.partitioning.isEmpty) + assert(create.properties == expectedProperties) + assert(create.ignoreIfExists) - case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + case other => + fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + s"got ${other.getClass.getName}: $sql") + } } } - test("Test v2 CreateTable with data source v2 provider") { + test("Test v2 CreateTable with default catalog") { val sql = - s""" - |CREATE TABLE IF NOT EXISTS mydb.page_view ( - | id bigint, - | description string, - | point struct) - |USING $orc2 - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + """ + |CREATE TABLE mydb.table_name(i int) USING xyz + |OPTIONS (a 1, b 0.1, c TRUE) """.stripMargin val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> orc2, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") + "a" -> "1", + "b" -> "0.1", + "c" -> "true", + "provider" -> "xyz") - parseAndResolve(sql) match { + parseAndResolve(sql, hasDefaultCatalog = true) match { case create: CreateV2Table => assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(create.tableSchema == new StructType() - .add("id", LongType) - .add("description", StringType) - .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) + assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.tableSchema == new StructType().add("i", IntegerType)) assert(create.partitioning.isEmpty) assert(create.properties == expectedProperties) - assert(create.ignoreIfExists) + assert(!create.ignoreIfExists) + } + } - case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("Test v2 CTAS with default catalog") { + val sql = + """ + |CREATE TABLE mydb.table_name USING xyz + |OPTIONS (a 1, b 0.1, c TRUE) + |AS SELECT * FROM src + """.stripMargin + + val expectedProperties = Map( + "a" -> "1", + "b" -> "0.1", + "c" -> "true", + "provider" -> "xyz") + + parseAndResolve(sql, hasDefaultCatalog = true) match { + case create: CreateTableAsSelect => + assert(create.catalog.name == "testcat") + assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.partitioning.isEmpty) + assert(create.properties == expectedProperties) + assert(!create.ignoreIfExists) } } @@ -380,7 +406,7 @@ class PlanResolutionSuite extends AnalysisTest { val sql = s""" |CREATE TABLE IF NOT EXISTS testcat.mydb.table_name - |USING parquet + |USING xyz |COMMENT 'table comment' |TBLPROPERTIES ('p1'='v1', 'p2'='v2') |OPTIONS (path 's3://bucket/path/to/data', other 20) @@ -391,55 +417,25 @@ class PlanResolutionSuite extends AnalysisTest { "p1" -> "v1", "p2" -> "v2", "other" -> "20", - "provider" -> "parquet", + "provider" -> "xyz", "location" -> "s3://bucket/path/to/data", "comment" -> "table comment") - parseAndResolve(sql) match { - case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) - assert(ctas.writeOptions == Map("other" -> "20")) - assert(ctas.partitioning.isEmpty) - assert(ctas.ignoreIfExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - - test("Test v2 CTAS with data source v2 provider") { - val sql = - s""" - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING $orc2 - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> orc2, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - - parseAndResolve(sql) match { - case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(ctas.properties == expectedProperties) - assert(ctas.writeOptions.isEmpty) - assert(ctas.partitioning.isEmpty) - assert(ctas.ignoreIfExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + + Seq(true, false).foreach { hasDefaultCatalog => + // Since the identifier specifies a registered catalog, the default catalog doesn't matter. + parseAndResolve(sql, hasDefaultCatalog) match { + case ctas: CreateTableAsSelect => + assert(ctas.catalog.name == "testcat") + assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(ctas.properties == expectedProperties) + assert(ctas.writeOptions == Map("other" -> "20")) + assert(ctas.partitioning.isEmpty) + assert(ctas.ignoreIfExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + s"got ${other.getClass.getName}: $sql") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 96345e22dbd5..082af271232a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -21,23 +21,18 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, Identifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { - - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - - private val orc2 = classOf[OrcDataSourceV2].getName - before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) - spark.conf.set("spark.sql.default.catalog", "testcat") val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") @@ -45,46 +40,77 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn df2.createOrReplaceTempView("source2") } + private def getTestCatalog() = { + spark.sessionState.analyzer.catalogManager.getCatalog("testcat") + .asInstanceOf[TestInMemoryTableCatalog] + } + after { - spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() + getTestCatalog().clearTables() spark.sql("DROP TABLE source") spark.sql("DROP TABLE source2") } - test("CreateTable: use v2 plan because catalog is set") { - spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") - - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + test("CreateTable: basic") { + def checkTestCatalog(sql: String): Unit = { + spark.sql(sql) + val testCatalog = getTestCatalog() + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) - assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) - } + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) + } - test("CreateTable: use v2 plan because provider is v2") { - spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $orc2") + withClue("table identifier specifies catalog") { + withTable("testcat.table_name") { + checkTestCatalog("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + } + } - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + withClue("table identifier doesn't specify catalog") { + // This would create table in ExternalCatalog. + val e = intercept[Exception] { + spark.sql("CREATE TABLE table_name (id bigint, data string) USING foo") + } + assert(e.getMessage.contains("Failed to find data source: foo")) + + withTable("table_name") { + spark.sql("CREATE TABLE table_name (id bigint, data string) USING json") + intercept[NoSuchTableException] { + getTestCatalog().loadTable(Identifier.of(Array(), "table_name")) + } + val table = spark.sharedState.externalCatalog.getTable("default", "table_name") + assert(table.identifier == TableIdentifier("table_name", Some("default"))) + } + } - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> orc2).asJava) - assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) + withClue("table identifier doesn't specify catalog and has more than 2 name parts") { + // Spark tries to fallback to ExternalCatalog, which can't handle 3-part table name. + val e = intercept[AnalysisException] { + spark.sql("CREATE TABLE ns1.ns2.table_name (id bigint, data string) USING json") + } + assert(e.message.contains("No catalog specified for table ns1.ns2.table_name")) + assert(e.message.contains("no default catalog is set")) + } - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("table_name") { + checkTestCatalog("CREATE TABLE table_name (id bigint, data string) USING foo") + } + } + } } test("CreateTable: fail if table exists") { spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -115,7 +141,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn spark.sql( "CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING foo") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -138,61 +164,75 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("CreateTable: fail analysis when default catalog is needed but missing") { - val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog") - try { - conf.unsetConf("spark.sql.default.catalog") - - val exc = intercept[AnalysisException] { - spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") - } - - assert(exc.getMessage.contains("No catalog specified for table")) - assert(exc.getMessage.contains("table_name")) - assert(exc.getMessage.contains("no default catalog is set")) - - } finally { - conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) + val exc = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE a.b.c USING foo") } - } - test("CreateTableAsSelect: use v2 plan because catalog is set") { - spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + assert(exc.getMessage.contains("No catalog specified for table")) + assert(exc.getMessage.contains("a.b.c")) + assert(exc.getMessage.contains("no default catalog is set")) + } - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + test("CreateTableAsSelect: basic") { + def checkTestCatalog(sql: String): Unit = { + spark.sql(sql) + val testCatalog = getTestCatalog() + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) - assert(table.schema == new StructType() - .add("id", LongType, nullable = false) - .add("data", StringType)) + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType().add("a", StringType, false)) - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) - } + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Row("1")) + } - test("CreateTableAsSelect: use v2 plan because provider is v2") { - spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") + withClue("table identifier specifies catalog") { + withTable("testcat.table_name") { + checkTestCatalog("CREATE TABLE testcat.table_name USING foo AS SELECT '1' AS a") + } + } - val testCatalog = spark.catalog("testcat").asTableCatalog - val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + withClue("table identifier doesn't specify catalog") { + // This would create table in ExternalCatalog. + val e = intercept[Exception] { + spark.sql("CREATE TABLE table_name USING foo AS SELECT '1' AS a") + } + assert(e.getMessage.contains("Failed to find data source: foo")) + + withTable("table_name") { + spark.sql("CREATE TABLE table_name USING json AS SELECT '1' AS a") + intercept[NoSuchTableException] { + getTestCatalog().loadTable(Identifier.of(Array(), "table_name")) + } + val table = spark.sharedState.externalCatalog.getTable("default", "table_name") + assert(table.schema == new StructType().add("a", StringType)) + } + } - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> orc2).asJava) - assert(table.schema == new StructType() - .add("id", LongType, nullable = false) - .add("data", StringType)) + withClue("table identifier doesn't specify catalog and has more than 2 name parts") { + // Spark tries to fallback to ExternalCatalog, which can't handle 3-part table name. + val e = intercept[AnalysisException] { + spark.sql("CREATE TABLE ns1.ns2.table_name USING foo AS SELECT '1' AS a") + } + assert(e.message.contains("No catalog specified for table ns1.ns2.table_name")) + assert(e.message.contains("no default catalog is set")) + } - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("table_name") { + checkTestCatalog("CREATE TABLE table_name USING foo AS SELECT '1' AS a") + } + } + } } test("CreateTableAsSelect: fail if table exists") { spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -230,7 +270,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn spark.sql( "CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source") - val testCatalog = spark.catalog("testcat").asTableCatalog + val testCatalog = getTestCatalog() val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") @@ -252,30 +292,22 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("CreateTableAsSelect: fail analysis when default catalog is needed but missing") { - val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog") - try { - conf.unsetConf("spark.sql.default.catalog") - - val exc = intercept[AnalysisException] { - spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") - } - - assert(exc.getMessage.contains("No catalog specified for table")) - assert(exc.getMessage.contains("table_name")) - assert(exc.getMessage.contains("no default catalog is set")) - - } finally { - conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) + val exc = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE a.b.c USING foo AS SELECT id, data FROM source") } + + assert(exc.getMessage.contains("No catalog specified for table")) + assert(exc.getMessage.contains("a.b.c")) + assert(exc.getMessage.contains("no default catalog is set")) } test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") - assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === true) + assert(getTestCatalog().tableExists(ident) === true) sql(s"DROP TABLE $tableName") - assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === false) + assert(getTestCatalog().tableExists(ident) === false) } test("DropTable: if exists") { @@ -286,11 +318,85 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("Relation: basic") { - val t1 = "testcat.ns1.ns2.tbl" - withTable(t1) { - sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") - checkAnswer(sql(s"TABLE $t1"), spark.table("source")) - checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source")) + def checkTableScan(tblNameParts: Array[String]): Unit = { + val tblName = tblNameParts.mkString(".") + withTable(tblName) { + sql(s"CREATE TABLE $tblName USING json AS SELECT '1' AS a") + checkAnswer(sql(s"TABLE $tblName"), Row("1")) + checkAnswer(sql(s"SELECT * FROM $tblName"), Row("1")) + } + } + + withClue("table identifier specifies catalog") { + checkTableScan(Array("testcat", "ns1", "ns2", "tbl")) + } + + withClue("table identifier doesn't specify catalog") { + checkTableScan(Array("tbl")) + } + + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + checkTableScan(Array("tbl")) + } + } + } + + test("Relation: table not found") { + withClue("table identifier specifies catalog") { + val e = intercept[AnalysisException] { + spark.sql("SELECT * FROM testcat.tbl") + } + assert(e.message.contains("Table or view not found: testcat.tbl")) + } + + withClue("table identifier doesn't specify catalog") { + val e = intercept[AnalysisException] { + spark.sql("SELECT * FROM tbl") + } + assert(e.message.contains("Table or view not found: tbl")) + } + + withClue("table identifier doesn't specify catalog and has more than 2 name parts") { + val e = intercept[CatalogNotFoundException] { + spark.sql("SELECT * FROM a.b.c") + } + assert(e.getMessage.contains("Catalog 'a' not found")) + } + + withClue("table identifier doesn't specify catalog but default catalog is set") { + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + val e = intercept[AnalysisException] { + spark.sql("SELECT * FROM tbl") + } + assert(e.message.contains("Table or view not found: tbl")) + } + } + } + + test("Relation: name conflicts with temp view") { + withTempView("v") { + spark.range(10).createOrReplaceTempView("v") + + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("v") { + spark.sql("CREATE TABLE v(i INT) USING foo") + checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF()) + } + } + } + } + + test("Relation: name conflicts with global temp view") { + withTempView("v") { + spark.range(10).createOrReplaceGlobalTempView("v") + + withSQLConf(SQLConf.DEFAULT_V2_CATALOG.key -> "testcat") { + withTable("global_temp.v") { + spark.sql("CREATE TABLE global_temp.v(i INT) USING foo") + checkAnswer(sql("SELECT * FROM global_temp.v"), spark.range(10).toDF()) + } + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index b04b3f1031d7..3d3c7ac766f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql._ -import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -74,7 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, session.catalog(_)) +: + DataSourceResolution(conf, this.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = @@ -92,8 +91,6 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session V2WriteSupportCheck +: V2StreamingScanSupportCheck +: customCheckRules - - override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) } /**