From d5559be15f784bb0f46de55b1b657b7e71e00399 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Sun, 19 May 2019 23:23:36 -0700 Subject: [PATCH 1/3] [SPARK-26946][SQL][FOLLOWUP] Handle lookupCatalog function not defined Treat it as catalog not found. --- .../spark/sql/catalog/v2/LookupCatalog.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 932d32022702b..c23aebeaa3026 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalog.v2 +import scala.util.{Failure, Success, Try} + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier @@ -26,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - def lookupCatalog: Option[(String) => CatalogPlugin] = None + def lookupCatalog: Option[String => CatalogPlugin] = None type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) @@ -34,19 +36,23 @@ trait LookupCatalog { * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup => - parts match { - case Seq(name) => - (None, Identifier.of(Array.empty, name)) - case Seq(catalogName, tail @ _*) => - try { - val catalog = lookup(catalogName) - (Some(catalog), Identifier.of(tail.init.toArray, tail.last)) - } catch { - case _: CatalogNotFoundException => - (None, Identifier.of(parts.init.toArray, parts.last)) - } - } + def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = parts match { + case Seq(name) => + Some((None, Identifier.of(Array.empty, name))) + case Seq(catalogName, tail @ _*) => + lookupCatalog match { + case Some(lookup) => + Try(lookup(catalogName)) match { + case Success(catalog) => + Some((Some(catalog), Identifier.of(tail.init.toArray, tail.last))) + case Failure(_: CatalogNotFoundException) => + Some((None, Identifier.of(parts.init.toArray, parts.last))) + case Failure(ex) => + throw ex + } + case None => + Some((None, Identifier.of(parts.init.toArray, parts.last))) + } } } From 2085f8134df6221acb70ec8de250cb157a0cf2ba Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Sun, 26 May 2019 12:14:08 -0700 Subject: [PATCH 2/3] [SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../spark/sql/catalog/v2/CatalogManager.java | 88 +++++++++++++++++++ .../apache/spark/sql/catalog/v2/Catalogs.java | 42 +++++---- .../spark/sql/catalog/v2/IdentifierImpl.java | 17 ++++ .../sql/util/CaseInsensitiveStringMap.java | 14 +++ .../plans/logical/basicLogicalOperators.scala | 8 ++ .../logical/sql/DropTableStatement.scala | 35 ++++++++ .../sql/catalog/v2/CatalogManagerSuite.java | 71 +++++++++++++++ .../sql/catalog/v2/CatalogV2TestUtils.scala | 58 ++++++++++++ .../sql/catalog/v2/TestTableCatalog.scala | 2 + .../spark/sql/execution/SparkSqlParser.scala | 4 +- .../datasources/DataSourceResolution.scala | 11 ++- .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../datasources/v2/DropTableExec.scala | 44 ++++++++++ .../execution/command/DDLParserSuite.scala | 82 +++++++---------- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 34 +++++-- .../sources/v2/TestInMemoryTableCatalog.scala | 2 + 17 files changed, 441 insertions(+), 80 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala create mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f5b808197c9be..08ce482a65cc6 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -139,8 +139,8 @@ statement DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions - | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable - | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable + | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable + | DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? VIEW (IF NOT EXISTS)? tableIdentifier identifierCommentList? diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java new file mode 100644 index 0000000000000..2d52e5395a83c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.annotation.Private; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import static org.apache.spark.sql.catalog.v2.Catalogs.classKey; +import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey; +import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix; +import static scala.collection.JavaConverters.mapAsJavaMapConverter; + +@Private +public class CatalogManager { + + private final SQLConf conf; + + public CatalogManager(SQLConf conf) { + this.conf = conf; + } + + /** + * Load a catalog. + * + * @param name a catalog name + * @return a catalog plugin + */ + public CatalogPlugin load(String name) throws SparkException { + return Catalogs.load(name, conf); + } + + /** + * Add a catalog. + * + * @param name a catalog name + * @param pluginClassName a catalog plugin class name + * @param options catalog options + */ + public void add( + String name, + String pluginClassName, + CaseInsensitiveStringMap options) { + options.entrySet().stream() + .forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue())); + conf.setConfString(classKey(name), pluginClassName); + } + + /** + * Add a catalog without option. + * + * @param name a catalog name + * @param pluginClassName a catalog plugin class name + */ + public void add( + String name, + String pluginClassName) { + add(name, pluginClassName, CaseInsensitiveStringMap.empty()); + } + + /** + * Remove a catalog. + * + * @param name a catalog name + */ + public void remove(String name) { + conf.unsetConf(classKey(name)); + mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream() + .filter(key -> isOptionKey(name, key)) + .forEach(conf::unsetConf); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index 851a6a9f6d165..d53e5bfb25142 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -23,10 +23,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.Utils; -import java.util.HashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.stream.Collectors; import static scala.collection.JavaConverters.mapAsJavaMapConverter; @@ -35,6 +33,23 @@ public class Catalogs { private Catalogs() { } + public static String classKey(String name) { + return "spark.sql.catalog." + name; + } + + public static String optionKeyPrefix(String name) { + return "spark.sql.catalog." + name + "."; + } + + public static boolean isOptionKey(String name, String keyName) { + return keyName.startsWith(optionKeyPrefix(name)); + } + + public static String optionName(String name, String keyName) { + assert(isOptionKey(name, keyName)); + return keyName.substring(optionKeyPrefix(name).length()); + } + /** * Load and configure a catalog by name. *

@@ -49,10 +64,10 @@ private Catalogs() { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { - String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); + String pluginClassName = conf.getConfString(classKey(name), null); if (pluginClassName == null) { throw new CatalogNotFoundException(String.format( - "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); + "Catalog '%s' plugin class not found: %s is not defined", name, classKey(name))); } ClassLoader loader = Utils.getContextOrSparkClassLoader(); @@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf) * @return a case insensitive string map of options starting with spark.sql.catalog.(name). */ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { - Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); - Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); - - HashMap options = new HashMap<>(); - for (Map.Entry entry : allConfs.entrySet()) { - Matcher matcher = prefix.matcher(entry.getKey()); - if (matcher.matches() && matcher.groupCount() > 0) { - options.put(matcher.group(1), entry.getValue()); - } - } - + Map options = + mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream() + .filter(e -> isOptionKey(name, e.getKey())) + .collect(Collectors.toMap( + e -> optionName(name, e.getKey()), + e -> e.getValue())); return new CaseInsensitiveStringMap(options); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index cd131432008a6..51c5a589dd0fe 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -22,6 +22,8 @@ import java.util.Arrays; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * An {@link Identifier} implementation. @@ -49,6 +51,21 @@ public String name() { return name; } + private String quote(String part) { + if (part.contains("`")) { + return part.replace("`", "``"); + } else { + return part; + } + } + + @Override + public String toString() { + return Stream.concat(Stream.of(namespace), Stream.of(name)) + .map(part -> '`' + quote(part) + '`') + .collect(Collectors.joining(".")); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index da41346d7ce71..2bf1b6c22929e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) { public Map asCaseSensitiveMap() { return Collections.unmodifiableMap(original); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o; + return delegate.equals(that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 42755fc274192..6bf12cff28f9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -499,6 +499,14 @@ object OverwritePartitionsDynamic { } } +/** + * Drop a table. + */ +case class DropTable( + catalog: TableCatalog, + ident: Identifier, + ifExists: Boolean) extends Command + /** * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala new file mode 100644 index 0000000000000..bc31e57ac1b2b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A DROP TABLE statement, as parsed from SQL. + */ +case class DropTableStatement( + tableName: Seq[String], + ifExists: Boolean, + isView: Boolean, + purge: Boolean) extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java new file mode 100644 index 0000000000000..880cb457fbef4 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; + +public class CatalogManagerSuite { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + CatalogManager catalogManager = new CatalogManager(new SQLConf()); + + @Test + public void testAdd() throws SparkException { + CaseInsensitiveStringMap options = new CaseInsensitiveStringMap( + new HashMap() {{ + put("option1", "value1"); + put("option2", "value2"); + }}); + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options); + CatalogPlugin catalogPlugin = catalogManager.load("testcat"); + assertThat(catalogPlugin.name(), is("testcat")); + assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); + assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options)); + } + + @Test + public void testAddWithOption() throws SparkException { + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); + CatalogPlugin catalogPlugin = catalogManager.load("testcat"); + assertThat(catalogPlugin.name(), is("testcat")); + assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); + assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty())); + } + + @Test + public void testRemove() throws SparkException { + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); + catalogManager.load("testcat"); + catalogManager.remove("testcat"); + exception.expect(CatalogNotFoundException.class); + catalogManager.load("testcat"); + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala new file mode 100644 index 0000000000000..663eed6e109da --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import org.apache.spark.sql.internal.SQLConf + +private[sql] trait CatalogV2TestUtils { + + protected lazy val catalogManager: CatalogManager = new CatalogManager(SQLConf.get) + + /** + * Adds a catalog. + */ + protected def addCatalog(name: String, pluginClassName: String): Unit = + catalogManager.add(name, pluginClassName) + + /** + * Removes catalogs. + */ + protected def removeCatalog(catalogNames: String*): Unit = + catalogNames.foreach { catalogName => + catalogManager.remove(catalogName) + } + + /** + * Sets the default catalog. + * + * @param catalog the new default catalog + */ + protected def setDefaultCatalog(catalog: String): Unit = + SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, catalog) + + /** + * Returns the current default catalog. + */ + protected def defaultCatalog: Option[String] = SQLConf.get.defaultV2Catalog + + /** + * Restores the default catalog to the previously saved value. + */ + protected def restoreDefaultCatalog(previous: Option[String]): Unit = + previous.foreach(SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, _)) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 78b4763484cc0..f0bb5739046ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -89,6 +89,8 @@ class TestTableCatalog extends TableCatalog { } override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + + override def toString: String = name } object TestTableCatalog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f433cc8d32793..c292a8e301afc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -649,8 +649,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create a [[DropTableCommand]] command. */ override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - DropTableCommand( - visitTableIdentifier(ctx.tableIdentifier), + sql.DropTableStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), ctx.EXISTS != null, ctx.VIEW != null, ctx.PURGE != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index acbe37349762b..bc874a0ac4deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -27,9 +27,10 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DropTableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.StructType @@ -83,6 +84,12 @@ case class DataSourceResolution( s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) .asTableCatalog convertCTAS(catalog, identifier, create) + + case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _, _) => + DropTable(catalog.asTableCatalog, ident, ifExists) + + case DropTableStatement(AsTableIdentifier(tableName), ifExists, isView, purge) => + DropTableCommand(tableName, ifExists, isView, purge) } object V1WriteProvider { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index a1d547eb7e86d..27d87960edb3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -199,6 +199,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } + case DropTable(catalog, ident, ifExists) => + DropTableExec(catalog, ident, ifExists) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala new file mode 100644 index 0000000000000..d325e0205f9d8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for dropping a table. + */ +case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean) + extends LeafExecNode { + + override def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + catalog.dropTable(ident) + } else if (!ifExists) { + throw new NoSuchTableException(ident) + } + + sqlContext.sparkContext.parallelize(Seq.empty, 1) + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index d7bfbce73af05..e51d07f6587c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -32,13 +32,13 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.sql.DropTableStatement import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) @@ -906,59 +906,39 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { test("drop table") { val tableName1 = "db.tab" val tableName2 = "tab" - - val parsed = Seq( - s"DROP TABLE $tableName1", - s"DROP TABLE IF EXISTS $tableName1", - s"DROP TABLE $tableName2", - s"DROP TABLE IF EXISTS $tableName2", - s"DROP TABLE $tableName2 PURGE", - s"DROP TABLE IF EXISTS $tableName2 PURGE" - ).map(parser.parsePlan) - - val expected = Seq( - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, - purge = true), - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, - purge = true)) - - parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) } + Seq( + (s"DROP TABLE $tableName1", + DropTableStatement(Seq("db", "tab"), ifExists = false, isView = false, purge = false)), + (s"DROP TABLE IF EXISTS $tableName1", + DropTableStatement(Seq("db", "tab"), ifExists = true, isView = false, purge = false)), + (s"DROP TABLE $tableName2", + DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = false)), + (s"DROP TABLE IF EXISTS $tableName2", + DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = false)), + (s"DROP TABLE $tableName2 PURGE", + DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = true)), + (s"DROP TABLE IF EXISTS $tableName2 PURGE", + DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = true))).foreach { + case (sql, expected) => + comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) + } } test("drop view") { val viewName1 = "db.view" val viewName2 = "view" - - val parsed1 = parser.parsePlan(s"DROP VIEW $viewName1") - val parsed2 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName1") - val parsed3 = parser.parsePlan(s"DROP VIEW $viewName2") - val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2") - - val expected1 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true, - purge = false) - val expected2 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true, - purge = false) - val expected3 = - DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true, - purge = false) - val expected4 = - DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true, - purge = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) - comparePlans(parsed4, expected4) + Seq( + (s"DROP VIEW $viewName1", + DropTableStatement(Seq("db", "view"), ifExists = false, isView = true, purge = false)), + (s"DROP VIEW IF EXISTS $viewName1", + DropTableStatement(Seq("db", "view"), ifExists = true, isView = true, purge = false)), + (s"DROP VIEW $viewName2", + DropTableStatement(Seq("view"), ifExists = false, isView = true, purge = false)), + (s"DROP VIEW IF EXISTS $viewName2", + DropTableStatement(Seq("view"), ifExists = true, isView = true, purge = false))).foreach { + case (sql, expected) => + comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) + } } test("show columns") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 2424e6e1d2d1e..f3126e4cf893a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -22,29 +22,35 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalog.v2.Identifier -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalog.v2.{CatalogV2TestUtils, Identifier} +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} -class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { +class DataSourceV2SQLSuite + extends QueryTest with SharedSQLContext with BeforeAndAfter with CatalogV2TestUtils { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ private val orc2 = classOf[OrcDataSourceV2].getName - before { - spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - spark.conf.set("spark.sql.default.catalog", "testcat") + private val previousDefaultCatalog = defaultCatalog + before { val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") + + addCatalog("testcat", classOf[TestInMemoryTableCatalog].getName) + setDefaultCatalog("testcat") } after { + restoreDefaultCatalog(previousDefaultCatalog) + removeCatalog("testcat") + spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.sql("DROP TABLE source") } @@ -266,4 +272,20 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) } } + + test("DropTable: basic") { + val tableName = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") + assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === true) + sql(s"DROP TABLE $tableName") + assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === false) + } + + test("DropTable: if exists") { + intercept[NoSuchTableException] { + sql(s"DROP TABLE testcat.db.notbl") + } + sql(s"DROP TABLE IF EXISTS testcat.db.notbl") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 2ecf1c2f184fb..b970d62bc30cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -102,6 +102,8 @@ class TestInMemoryTableCatalog extends TableCatalog { def clearTables(): Unit = { tables.clear() } + + override def toString: String = name } /** From 70c2e687167de6f66464789de10f073cab93e6df Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Sat, 25 May 2019 23:28:37 -0700 Subject: [PATCH 3/3] [SPARK-27322][SQL] DataSourceV2: Select from multiple catalogs --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../catalog/v2/TableIdentifierHelper.scala | 37 +++++++++++ .../sql/catalyst/analysis/Analyzer.scala | 7 +- .../sql/catalyst/analysis/ResolveHints.scala | 12 ++-- .../sql/catalyst/analysis/unresolved.scala | 8 ++- .../spark/sql/catalyst/identifiers.scala | 27 +++++++- .../sql/catalyst/parser/AstBuilder.scala | 9 ++- .../sql/catalog/v2/CatalogV2TestUtils.scala | 22 +++++++ .../v2/TableIdentifierHelperSuite.scala | 63 ++++++++++++++++++ .../org/apache/spark/sql/SparkSession.scala | 10 +-- .../spark/sql/execution/command/views.scala | 6 +- .../datasources/DataSourceResolution.scala | 10 ++- .../sql/execution/datasources/rules.scala | 3 +- .../internal/BaseSessionStateBuilder.scala | 9 ++- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 64 ++++++++++++++++++- .../apache/spark/sql/hive/test/TestHive.scala | 3 +- 16 files changed, 262 insertions(+), 32 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 08ce482a65cc6..68fef3ed58dc4 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -397,7 +397,7 @@ queryTerm queryPrimary : querySpecification #queryPrimaryDefault - | TABLE tableIdentifier #table + | TABLE multipartIdentifier #table | inlineTable #inlineTableDefault1 | '(' queryNoWith ')' #subquery ; @@ -536,7 +536,7 @@ identifierComment ; relationPrimary - : tableIdentifier sample? tableAlias #tableName + : multipartIdentifier sample? tableAlias #tableName | '(' queryNoWith ')' sample? tableAlias #aliasedQuery | '(' relation ')' sample? tableAlias #aliasedRelation | inlineTable #inlineTableDefault2 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala new file mode 100644 index 0000000000000..9d371dc3b61b4 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier, TableIdentifierLike} + +/** + * Resolve multipart identifier to [[CatalogTableIdentifier]] or [[TableIdentifier]]. + */ +trait TableIdentifierHelper extends LookupCatalog { + import CatalogV2Implicits._ + + implicit class TableIdentifierHelper(parts: Seq[String]) { + def asCatalogTableIdentifier: TableIdentifierLike = parts match { + case CatalogObjectIdentifier(Some(catalog), ident) => + CatalogTableIdentifier(catalog.asTableCatalog, ident) + + case AsTableIdentifier(tableIdentifier) => + tableIdentifier + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f24d6f168dacf..5449c38d7dfa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -227,9 +227,8 @@ class Analyzer( def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = { plan resolveOperatorsDown { - case u: UnresolvedRelation => - cteRelations.find(x => resolver(x._1, u.tableIdentifier.table)) - .map(_._2).getOrElse(u) + case u @ UnresolvedRelation(TableIdentifier(table, _)) => + cteRelations.find(x => resolver(x._1, table)).map(_._2).getOrElse(u) case other => // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. other transformExpressions { @@ -721,7 +720,7 @@ class Analyzer( u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } - case u: UnresolvedRelation => resolveRelation(u) + case u @ UnresolvedRelation(_: TableIdentifier) => resolveRelation(u) } // Look up the table with the given name from catalog. The database we used is decided by the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 9440a3f806b4e..544052d596261 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -22,6 +22,7 @@ import java.util.Locale import scala.collection.mutable import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.IntegerLiteral import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -71,17 +72,18 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { - case ResolvedHint(u: UnresolvedRelation, hint) - if relations.exists(resolver(_, u.tableIdentifier.table)) => - relations.remove(u.tableIdentifier.table) + case ResolvedHint(u @ UnresolvedRelation(TableIdentifier(table, _)), hint) + if relations.exists(resolver(_, table)) => + relations.remove(table) ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo)) case ResolvedHint(r: SubqueryAlias, hint) if relations.exists(resolver(_, r.alias)) => relations.remove(r.alias) ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo)) - case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) => - relations.remove(u.tableIdentifier.table) + case UnresolvedRelation(TableIdentifier(table, _)) + if relations.exists(resolver(_, table)) => + relations.remove(table) ResolvedHint(plan, createHintInfo(hintName)) case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) => relations.remove(r.alias) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index d44b42134f868..9272bdabc3b49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier, TableIdentifierLike} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -38,11 +38,13 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str /** * Holds the name of a relation that has yet to be looked up in a catalog. * - * @param tableIdentifier table name + * @param table table name */ -case class UnresolvedRelation(tableIdentifier: TableIdentifier) +case class UnresolvedRelation(table: TableIdentifierLike) extends LeafNode { + def tableIdentifier: TableIdentifier = table.tableIdentifier + /** Returns a `.` separated name for this relation. */ def tableName: String = tableIdentifier.unquotedString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index deceec73dda30..afabca981ba02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} + /** * An identifier that optionally specifies a database. * @@ -64,6 +66,27 @@ object AliasIdentifier { def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier) } +/** + * An interface to ease transition from [[TableIdentifier]] to [[CatalogTableIdentifier]]. + */ +sealed trait TableIdentifierLike { + def tableIdentifier: TableIdentifier +} + +/** + * A data source V2 table identifier. + * + * @param catalog a catalog plugin + * @param ident an object identifier + */ +case class CatalogTableIdentifier(catalog: TableCatalog, ident: Identifier) + extends TableIdentifierLike { + + override def tableIdentifier: TableIdentifier = + throw new UnsupportedOperationException( + s"$this should not be used on non-DSv2 code path") +} + /** * Identifies a table in a database. * If `database` is not defined, the current database is used. @@ -71,7 +94,9 @@ object AliasIdentifier { * unquotedString as the function name. */ case class TableIdentifier(table: String, database: Option[String]) - extends IdentifierWithDatabase { + extends IdentifierWithDatabase with TableIdentifierLike { + + override def tableIdentifier: TableIdentifier = this override val identifier: String = table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8137db854cbcb..86d13dc0f39e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -29,6 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalog.v2 +import org.apache.spark.sql.catalog.v2.TableIdentifierHelper import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -49,7 +50,8 @@ import org.apache.spark.util.random.RandomSampler * The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or * TableIdentifier. */ -class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging { +class AstBuilder(conf: SQLConf) + extends SqlBaseBaseVisitor[AnyRef] with Logging with TableIdentifierHelper { import ParserUtils._ def this() = this(new SQLConf()) @@ -844,14 +846,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) { - UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier)) + val tableId = visitMultipartIdentifier(ctx.multipartIdentifier).asCatalogTableIdentifier + UnresolvedRelation(tableId) } /** * Create an aliased table reference. This is typically used in FROM clauses. */ override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { - val tableId = visitTableIdentifier(ctx.tableIdentifier) + val tableId = visitMultipartIdentifier(ctx.multipartIdentifier).asCatalogTableIdentifier val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId)) table.optionalMap(ctx.sample)(withSample) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala index 663eed6e109da..7c35dbdf4ef92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala @@ -23,6 +23,12 @@ private[sql] trait CatalogV2TestUtils { protected lazy val catalogManager: CatalogManager = new CatalogManager(SQLConf.get) + /** + * Loads a catalog. + */ + protected def catalog(name: String): CatalogPlugin = + catalogManager.load(name) + /** * Adds a catalog. */ @@ -37,6 +43,12 @@ private[sql] trait CatalogV2TestUtils { catalogManager.remove(catalogName) } + /** + * Removes catalogs after calling `f`. + */ + protected def withCatalog(catalogNames: String*)(f: => Unit): Unit = + try f finally removeCatalog(catalogNames: _*) + /** * Sets the default catalog. * @@ -55,4 +67,14 @@ private[sql] trait CatalogV2TestUtils { */ protected def restoreDefaultCatalog(previous: Option[String]): Unit = previous.foreach(SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, _)) + + /** + * Sets default catalog to `catalog` before executing `f`, + * then switches back to the previous default catalog after `f` returns. + */ + protected def activateCatalog(catalog: String)(f: => Unit): Unit = { + val previous = defaultCatalog + setDefaultCatalog(catalog) + try f finally restoreDefaultCatalog(previous) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala new file mode 100644 index 0000000000000..458b09629fe8d --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalog.v2 + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class TableIdentifierHelperSuite extends SparkFunSuite with SQLHelper { + import CatalystSqlParser._ + + private val testCat = { + val newCatalog = new TestTableCatalog + newCatalog.initialize("testcat", CaseInsensitiveStringMap.empty()) + newCatalog + } + + private def findCatalog(name: String): CatalogPlugin = name match { + case "testcat" => + testCat + case _ => + throw new CatalogNotFoundException(s"$name not found") + } + + test("with catalog lookup function") { + val helper = new TableIdentifierHelper { + override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog(_)) + } + import helper._ + + assert(parseMultipartIdentifier("testcat.v2tbl").asCatalogTableIdentifier === + CatalogTableIdentifier(testCat, Identifier.of(Array.empty, "v2tbl"))) + assert(parseMultipartIdentifier("db.tbl").asCatalogTableIdentifier === + TableIdentifier("tbl", Some("db"))) + + } + + test("without catalog lookup function") { + val helper = new TableIdentifierHelper { + override def lookupCatalog: Option[String => CatalogPlugin] = None + } + import helper._ + + assert(parseMultipartIdentifier("testcat.v2tbl").asCatalogTableIdentifier === + TableIdentifier("v2tbl", Some("testcat"))) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0b5bf3f48b593..1de3d4d4d94ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs, TableIdentifierHelper} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders._ @@ -82,7 +82,7 @@ class SparkSession private( @transient private val existingSharedState: Option[SharedState], @transient private val parentSessionState: Option[SessionState], @transient private[sql] val extensions: SparkSessionExtensions) - extends Serializable with Closeable with Logging { self => + extends Serializable with Closeable with Logging with TableIdentifierHelper { self => // The call site where this SparkSession was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -613,6 +613,8 @@ class SparkSession private( catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) } + override def lookupCatalog: Option[String => CatalogPlugin] = Some(catalog(_)) + /** * Returns the specified table/view as a `DataFrame`. * @@ -624,10 +626,10 @@ class SparkSession private( * @since 2.0.0 */ def table(tableName: String): DataFrame = { - table(sessionState.sqlParser.parseTableIdentifier(tableName)) + table(sessionState.sqlParser.parseMultipartIdentifier(tableName).asCatalogTableIdentifier) } - private[sql] def table(tableIdent: TableIdentifier): DataFrame = { + private[sql] def table(tableIdent: TableIdentifierLike): DataFrame = { Dataset.ofRows(self, UnresolvedRelation(tableIdent)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index cd34dfafd1320..ffc8c279da457 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -190,10 +190,10 @@ case class CreateViewCommand( // package (e.g., HiveGenericUDF). child.collect { // Disallow creating permanent views based on temporary views. - case s: UnresolvedRelation - if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) => + case UnresolvedRelation(tableIdentifier: TableIdentifier) + if sparkSession.sessionState.catalog.isTemporaryTable(tableIdentifier) => throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temporary view ${s.tableIdentifier}") + s"referencing a temporary view ${tableIdentifier}") case other if !other.resolved => other.expressions.flatMap(_.collect { // Disallow creating permanent views based on temporary UDFs. case e: UnresolvedFunction diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index bc874a0ac4deb..d53c4852277a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -24,16 +24,18 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DropTableCommand +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap case class DataSourceResolution( conf: SQLConf, @@ -90,6 +92,10 @@ case class DataSourceResolution( case DropTableStatement(AsTableIdentifier(tableName), ifExists, isView, purge) => DropTableCommand(tableName, ifExists, isView, purge) + + case UnresolvedRelation(CatalogTableIdentifier(catalog, ident)) => + DataSourceV2Relation.create(catalog.loadTable(ident), + CaseInsensitiveStringMap.empty) } object V1WriteProvider { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 534e2fd0757f9..9e371cab250ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering} @@ -41,7 +42,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case u: UnresolvedRelation if maybeSQLFile(u) => + case u @ UnresolvedRelation(_: TableIdentifier) if maybeSQLFile(u) => try { val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index b2d065274b151..c192ed629e365 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.internal import org.apache.spark.SparkConf import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} +import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlAstBuilder, SparkSqlParser} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck} import org.apache.spark.sql.streaming.StreamingQueryManager @@ -124,7 +125,11 @@ abstract class BaseSessionStateBuilder( * Note: this depends on the `conf` field. */ protected lazy val sqlParser: ParserInterface = { - extensions.buildParser(session, new SparkSqlParser(conf)) + extensions.buildParser(session, new SparkSqlParser(conf) { + override val astBuilder: SparkSqlAstBuilder = new SparkSqlAstBuilder(conf) { + override def lookupCatalog: Option[String => CatalogPlugin] = Some(session.catalog(_)) + } + }) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index f3126e4cf893a..3b53d2f2ac508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -44,15 +44,17 @@ class DataSourceV2SQLSuite df2.createOrReplaceTempView("source2") addCatalog("testcat", classOf[TestInMemoryTableCatalog].getName) + addCatalog("testcat2", classOf[TestInMemoryTableCatalog].getName) setDefaultCatalog("testcat") } after { restoreDefaultCatalog(previousDefaultCatalog) - removeCatalog("testcat") + removeCatalog("testcat", "testcat2") spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.sql("DROP TABLE source") + spark.sql("DROP TABLE source2") } test("CreateTable: use v2 plan because catalog is set") { @@ -288,4 +290,64 @@ class DataSourceV2SQLSuite } sql(s"DROP TABLE IF EXISTS testcat.db.notbl") } + + test("Relation: basic") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + checkAnswer(sql(s"TABLE $t1"), spark.table("source")) + checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source")) + } + } + + test("Relation: SparkSession.table()") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + checkAnswer(spark.table(s"$t1"), spark.table("source")) + } + } + + test("Relation: CTE") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + checkAnswer( + sql(s""" + |WITH cte AS (SELECT * FROM $t1) + |SELECT * FROM cte + """.stripMargin), + spark.table("source")) + } + } + + test("Relation: view text") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + withView("view1") { v1: String => + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + sql(s"CREATE VIEW $v1 AS SELECT * from $t1") + checkAnswer(sql(s"TABLE $v1"), spark.table("source")) + } + } + } + + test("Relation: join tables from 2 catalogs") { + val t1 = "testcat.ns1.ns2.tbl" + val t2 = "testcat2.v2tbl" + withTable(t1, t2) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + sql(s"CREATE TABLE $t2 USING foo AS SELECT id, data FROM source2") + val df1 = spark.table("source") + val df2 = spark.table("source2") + val df_joined = df1.join(df2).where(df1("id") + 1 === df2("id")) + checkAnswer( + sql(s""" + |SELECT * + |FROM $t1 t1, $t2 t2 + |WHERE t1.id + 1 = t2.id + """.stripMargin), + df_joined) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 5e77cac450ac3..eb735ba42e6b4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.internal.config.UI._ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation @@ -595,7 +596,7 @@ private[hive] class TestHiveQueryExecution( // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table } + logical.collect { case UnresolvedRelation(TableIdentifier(table, _)) => table } val resolver = sparkSession.sessionState.conf.resolver val referencedTestTables = sparkSession.testTables.keys.filter { testTable => referencedTables.exists(resolver(_, testTable))