From a79cb84fa7cf8d424d584d109395e0a28731d832 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Sun, 19 May 2019 23:23:36 -0700 Subject: [PATCH 1/7] [SPARK-26946][SQL][FOLLOWUP] Handle lookupCatalog function not defined Treat it as catalog not found. --- .../spark/sql/catalog/v2/LookupCatalog.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 932d32022702..c23aebeaa302 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalog.v2 +import scala.util.{Failure, Success, Try} + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier @@ -26,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - def lookupCatalog: Option[(String) => CatalogPlugin] = None + def lookupCatalog: Option[String => CatalogPlugin] = None type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) @@ -34,19 +36,23 @@ trait LookupCatalog { * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup => - parts match { - case Seq(name) => - (None, Identifier.of(Array.empty, name)) - case Seq(catalogName, tail @ _*) => - try { - val catalog = lookup(catalogName) - (Some(catalog), Identifier.of(tail.init.toArray, tail.last)) - } catch { - case _: CatalogNotFoundException => - (None, Identifier.of(parts.init.toArray, parts.last)) - } - } + def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = parts match { + case Seq(name) => + Some((None, Identifier.of(Array.empty, name))) + case Seq(catalogName, tail @ _*) => + lookupCatalog match { + case Some(lookup) => + Try(lookup(catalogName)) match { + case Success(catalog) => + Some((Some(catalog), Identifier.of(tail.init.toArray, tail.last))) + case Failure(_: CatalogNotFoundException) => + Some((None, Identifier.of(parts.init.toArray, parts.last))) + case Failure(ex) => + throw ex + } + case None => + Some((None, Identifier.of(parts.init.toArray, parts.last))) + } } } From db4a417c5fc3fb80648223d3860516dd49da3f3d Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Sun, 26 May 2019 12:14:08 -0700 Subject: [PATCH 2/7] [SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../spark/sql/catalog/v2/CatalogManager.java | 88 +++++++++++++++++++ .../apache/spark/sql/catalog/v2/Catalogs.java | 42 +++++---- .../spark/sql/catalog/v2/IdentifierImpl.java | 17 ++++ .../sql/util/CaseInsensitiveStringMap.java | 14 +++ .../plans/logical/basicLogicalOperators.scala | 8 ++ .../logical/sql/DropTableStatement.scala | 35 ++++++++ .../sql/catalog/v2/CatalogManagerSuite.java | 71 +++++++++++++++ .../sql/catalog/v2/CatalogV2TestUtils.scala | 58 ++++++++++++ .../sql/catalog/v2/TestTableCatalog.scala | 2 + .../spark/sql/execution/SparkSqlParser.scala | 4 +- .../datasources/DataSourceResolution.scala | 11 ++- .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../datasources/v2/DropTableExec.scala | 44 ++++++++++ .../execution/command/DDLParserSuite.scala | 82 +++++++---------- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 34 +++++-- .../sources/v2/TestInMemoryTableCatalog.scala | 2 + 17 files changed, 441 insertions(+), 80 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala create mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f5b808197c9b..08ce482a65cc 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -139,8 +139,8 @@ statement DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions - | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable - | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable + | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable + | DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? VIEW (IF NOT EXISTS)? tableIdentifier identifierCommentList? diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java new file mode 100644 index 000000000000..2d52e5395a83 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.annotation.Private; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import static org.apache.spark.sql.catalog.v2.Catalogs.classKey; +import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey; +import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix; +import static scala.collection.JavaConverters.mapAsJavaMapConverter; + +@Private +public class CatalogManager { + + private final SQLConf conf; + + public CatalogManager(SQLConf conf) { + this.conf = conf; + } + + /** + * Load a catalog. + * + * @param name a catalog name + * @return a catalog plugin + */ + public CatalogPlugin load(String name) throws SparkException { + return Catalogs.load(name, conf); + } + + /** + * Add a catalog. + * + * @param name a catalog name + * @param pluginClassName a catalog plugin class name + * @param options catalog options + */ + public void add( + String name, + String pluginClassName, + CaseInsensitiveStringMap options) { + options.entrySet().stream() + .forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue())); + conf.setConfString(classKey(name), pluginClassName); + } + + /** + * Add a catalog without option. + * + * @param name a catalog name + * @param pluginClassName a catalog plugin class name + */ + public void add( + String name, + String pluginClassName) { + add(name, pluginClassName, CaseInsensitiveStringMap.empty()); + } + + /** + * Remove a catalog. + * + * @param name a catalog name + */ + public void remove(String name) { + conf.unsetConf(classKey(name)); + mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream() + .filter(key -> isOptionKey(name, key)) + .forEach(conf::unsetConf); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index 851a6a9f6d16..d53e5bfb2514 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -23,10 +23,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.Utils; -import java.util.HashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.stream.Collectors; import static scala.collection.JavaConverters.mapAsJavaMapConverter; @@ -35,6 +33,23 @@ public class Catalogs { private Catalogs() { } + public static String classKey(String name) { + return "spark.sql.catalog." + name; + } + + public static String optionKeyPrefix(String name) { + return "spark.sql.catalog." + name + "."; + } + + public static boolean isOptionKey(String name, String keyName) { + return keyName.startsWith(optionKeyPrefix(name)); + } + + public static String optionName(String name, String keyName) { + assert(isOptionKey(name, keyName)); + return keyName.substring(optionKeyPrefix(name).length()); + } + /** * Load and configure a catalog by name. *

@@ -49,10 +64,10 @@ private Catalogs() { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { - String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); + String pluginClassName = conf.getConfString(classKey(name), null); if (pluginClassName == null) { throw new CatalogNotFoundException(String.format( - "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); + "Catalog '%s' plugin class not found: %s is not defined", name, classKey(name))); } ClassLoader loader = Utils.getContextOrSparkClassLoader(); @@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf) * @return a case insensitive string map of options starting with spark.sql.catalog.(name). */ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { - Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); - Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); - - HashMap options = new HashMap<>(); - for (Map.Entry entry : allConfs.entrySet()) { - Matcher matcher = prefix.matcher(entry.getKey()); - if (matcher.matches() && matcher.groupCount() > 0) { - options.put(matcher.group(1), entry.getValue()); - } - } - + Map options = + mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream() + .filter(e -> isOptionKey(name, e.getKey())) + .collect(Collectors.toMap( + e -> optionName(name, e.getKey()), + e -> e.getValue())); return new CaseInsensitiveStringMap(options); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index cd131432008a..51c5a589dd0f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -22,6 +22,8 @@ import java.util.Arrays; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * An {@link Identifier} implementation. @@ -49,6 +51,21 @@ public String name() { return name; } + private String quote(String part) { + if (part.contains("`")) { + return part.replace("`", "``"); + } else { + return part; + } + } + + @Override + public String toString() { + return Stream.concat(Stream.of(namespace), Stream.of(name)) + .map(part -> '`' + quote(part) + '`') + .collect(Collectors.joining(".")); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index da41346d7ce7..2bf1b6c22929 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) { public Map asCaseSensitiveMap() { return Collections.unmodifiableMap(original); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o; + return delegate.equals(that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 42755fc27419..6bf12cff28f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -499,6 +499,14 @@ object OverwritePartitionsDynamic { } } +/** + * Drop a table. + */ +case class DropTable( + catalog: TableCatalog, + ident: Identifier, + ifExists: Boolean) extends Command + /** * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala new file mode 100644 index 000000000000..bc31e57ac1b2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A DROP TABLE statement, as parsed from SQL. + */ +case class DropTableStatement( + tableName: Seq[String], + ifExists: Boolean, + isView: Boolean, + purge: Boolean) extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java new file mode 100644 index 000000000000..880cb457fbef --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; + +public class CatalogManagerSuite { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + CatalogManager catalogManager = new CatalogManager(new SQLConf()); + + @Test + public void testAdd() throws SparkException { + CaseInsensitiveStringMap options = new CaseInsensitiveStringMap( + new HashMap() {{ + put("option1", "value1"); + put("option2", "value2"); + }}); + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options); + CatalogPlugin catalogPlugin = catalogManager.load("testcat"); + assertThat(catalogPlugin.name(), is("testcat")); + assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); + assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options)); + } + + @Test + public void testAddWithOption() throws SparkException { + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); + CatalogPlugin catalogPlugin = catalogManager.load("testcat"); + assertThat(catalogPlugin.name(), is("testcat")); + assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); + assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty())); + } + + @Test + public void testRemove() throws SparkException { + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); + catalogManager.load("testcat"); + catalogManager.remove("testcat"); + exception.expect(CatalogNotFoundException.class); + catalogManager.load("testcat"); + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala new file mode 100644 index 000000000000..663eed6e109d --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import org.apache.spark.sql.internal.SQLConf + +private[sql] trait CatalogV2TestUtils { + + protected lazy val catalogManager: CatalogManager = new CatalogManager(SQLConf.get) + + /** + * Adds a catalog. + */ + protected def addCatalog(name: String, pluginClassName: String): Unit = + catalogManager.add(name, pluginClassName) + + /** + * Removes catalogs. + */ + protected def removeCatalog(catalogNames: String*): Unit = + catalogNames.foreach { catalogName => + catalogManager.remove(catalogName) + } + + /** + * Sets the default catalog. + * + * @param catalog the new default catalog + */ + protected def setDefaultCatalog(catalog: String): Unit = + SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, catalog) + + /** + * Returns the current default catalog. + */ + protected def defaultCatalog: Option[String] = SQLConf.get.defaultV2Catalog + + /** + * Restores the default catalog to the previously saved value. + */ + protected def restoreDefaultCatalog(previous: Option[String]): Unit = + previous.foreach(SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, _)) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 78b4763484cc..f0bb5739046e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -89,6 +89,8 @@ class TestTableCatalog extends TableCatalog { } override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + + override def toString: String = name } object TestTableCatalog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f433cc8d3279..c292a8e301af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -649,8 +649,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create a [[DropTableCommand]] command. */ override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - DropTableCommand( - visitTableIdentifier(ctx.tableIdentifier), + sql.DropTableStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), ctx.EXISTS != null, ctx.VIEW != null, ctx.PURGE != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index acbe37349762..bc874a0ac4de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -27,9 +27,10 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DropTableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.StructType @@ -83,6 +84,12 @@ case class DataSourceResolution( s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) .asTableCatalog convertCTAS(catalog, identifier, create) + + case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _, _) => + DropTable(catalog.asTableCatalog, ident, ifExists) + + case DropTableStatement(AsTableIdentifier(tableName), ifExists, isView, purge) => + DropTableCommand(tableName, ifExists, isView, purge) } object V1WriteProvider { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index a1d547eb7e86..27d87960edb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -199,6 +199,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } + case DropTable(catalog, ident, ifExists) => + DropTableExec(catalog, ident, ifExists) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala new file mode 100644 index 000000000000..d325e0205f9d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for dropping a table. + */ +case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean) + extends LeafExecNode { + + override def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + catalog.dropTable(ident) + } else if (!ifExists) { + throw new NoSuchTableException(ident) + } + + sqlContext.sparkContext.parallelize(Seq.empty, 1) + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index d7bfbce73af0..e51d07f6587c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -32,13 +32,13 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.sql.DropTableStatement import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) @@ -906,59 +906,39 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { test("drop table") { val tableName1 = "db.tab" val tableName2 = "tab" - - val parsed = Seq( - s"DROP TABLE $tableName1", - s"DROP TABLE IF EXISTS $tableName1", - s"DROP TABLE $tableName2", - s"DROP TABLE IF EXISTS $tableName2", - s"DROP TABLE $tableName2 PURGE", - s"DROP TABLE IF EXISTS $tableName2 PURGE" - ).map(parser.parsePlan) - - val expected = Seq( - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, - purge = true), - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, - purge = true)) - - parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) } + Seq( + (s"DROP TABLE $tableName1", + DropTableStatement(Seq("db", "tab"), ifExists = false, isView = false, purge = false)), + (s"DROP TABLE IF EXISTS $tableName1", + DropTableStatement(Seq("db", "tab"), ifExists = true, isView = false, purge = false)), + (s"DROP TABLE $tableName2", + DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = false)), + (s"DROP TABLE IF EXISTS $tableName2", + DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = false)), + (s"DROP TABLE $tableName2 PURGE", + DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = true)), + (s"DROP TABLE IF EXISTS $tableName2 PURGE", + DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = true))).foreach { + case (sql, expected) => + comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) + } } test("drop view") { val viewName1 = "db.view" val viewName2 = "view" - - val parsed1 = parser.parsePlan(s"DROP VIEW $viewName1") - val parsed2 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName1") - val parsed3 = parser.parsePlan(s"DROP VIEW $viewName2") - val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2") - - val expected1 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true, - purge = false) - val expected2 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true, - purge = false) - val expected3 = - DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true, - purge = false) - val expected4 = - DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true, - purge = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) - comparePlans(parsed4, expected4) + Seq( + (s"DROP VIEW $viewName1", + DropTableStatement(Seq("db", "view"), ifExists = false, isView = true, purge = false)), + (s"DROP VIEW IF EXISTS $viewName1", + DropTableStatement(Seq("db", "view"), ifExists = true, isView = true, purge = false)), + (s"DROP VIEW $viewName2", + DropTableStatement(Seq("view"), ifExists = false, isView = true, purge = false)), + (s"DROP VIEW IF EXISTS $viewName2", + DropTableStatement(Seq("view"), ifExists = true, isView = true, purge = false))).foreach { + case (sql, expected) => + comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) + } } test("show columns") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 2424e6e1d2d1..f3126e4cf893 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -22,29 +22,35 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalog.v2.Identifier -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalog.v2.{CatalogV2TestUtils, Identifier} +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} -class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { +class DataSourceV2SQLSuite + extends QueryTest with SharedSQLContext with BeforeAndAfter with CatalogV2TestUtils { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ private val orc2 = classOf[OrcDataSourceV2].getName - before { - spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - spark.conf.set("spark.sql.default.catalog", "testcat") + private val previousDefaultCatalog = defaultCatalog + before { val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") + + addCatalog("testcat", classOf[TestInMemoryTableCatalog].getName) + setDefaultCatalog("testcat") } after { + restoreDefaultCatalog(previousDefaultCatalog) + removeCatalog("testcat") + spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.sql("DROP TABLE source") } @@ -266,4 +272,20 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) } } + + test("DropTable: basic") { + val tableName = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") + assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === true) + sql(s"DROP TABLE $tableName") + assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === false) + } + + test("DropTable: if exists") { + intercept[NoSuchTableException] { + sql(s"DROP TABLE testcat.db.notbl") + } + sql(s"DROP TABLE IF EXISTS testcat.db.notbl") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 2ecf1c2f184f..b970d62bc30c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -102,6 +102,8 @@ class TestInMemoryTableCatalog extends TableCatalog { def clearTables(): Unit = { tables.clear() } + + override def toString: String = name } /** From 424df75742bb4c327df88520927a6f31f91c040e Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Wed, 29 May 2019 08:28:20 -0700 Subject: [PATCH 3/7] Revert "[SPARK-26946][SQL][FOLLOWUP] Handle lookupCatalog function not defined" This reverts commit a79cb84f --- .../spark/sql/catalog/v2/LookupCatalog.scala | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index c23aebeaa302..932d32022702 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalog.v2 -import scala.util.{Failure, Success, Try} - import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier @@ -28,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - def lookupCatalog: Option[String => CatalogPlugin] = None + def lookupCatalog: Option[(String) => CatalogPlugin] = None type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) @@ -36,23 +34,19 @@ trait LookupCatalog { * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = parts match { - case Seq(name) => - Some((None, Identifier.of(Array.empty, name))) - case Seq(catalogName, tail @ _*) => - lookupCatalog match { - case Some(lookup) => - Try(lookup(catalogName)) match { - case Success(catalog) => - Some((Some(catalog), Identifier.of(tail.init.toArray, tail.last))) - case Failure(_: CatalogNotFoundException) => - Some((None, Identifier.of(parts.init.toArray, parts.last))) - case Failure(ex) => - throw ex - } - case None => - Some((None, Identifier.of(parts.init.toArray, parts.last))) - } + def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup => + parts match { + case Seq(name) => + (None, Identifier.of(Array.empty, name)) + case Seq(catalogName, tail @ _*) => + try { + val catalog = lookup(catalogName) + (Some(catalog), Identifier.of(tail.init.toArray, tail.last)) + } catch { + case _: CatalogNotFoundException => + (None, Identifier.of(parts.init.toArray, parts.last)) + } + } } } From 7e11ca6233af0eb11d24f6cc62258dde72ff48ce Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Mon, 27 May 2019 17:37:15 -0700 Subject: [PATCH 4/7] [SPARK-26946][SQL][FOLLOWUP] Require lookup function --- .../spark/sql/catalog/v2/LookupCatalog.scala | 28 +++--- .../sql/catalyst/analysis/Analyzer.scala | 11 +-- .../catalog/v2/LookupCatalogSuite.scala | 88 +++++++++++++++++ .../v2/ResolveMultipartIdentifierSuite.scala | 99 ------------------- .../datasources/DataSourceResolution.scala | 2 +- 5 files changed, 105 insertions(+), 123 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 932d32022702..5464a7496d23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - def lookupCatalog: Option[(String) => CatalogPlugin] = None + protected def lookupCatalog(name: String): CatalogPlugin type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) @@ -34,27 +34,23 @@ trait LookupCatalog { * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup => - parts match { - case Seq(name) => - (None, Identifier.of(Array.empty, name)) - case Seq(catalogName, tail @ _*) => - try { - val catalog = lookup(catalogName) - (Some(catalog), Identifier.of(tail.init.toArray, tail.last)) - } catch { - case _: CatalogNotFoundException => - (None, Identifier.of(parts.init.toArray, parts.last)) - } - } + def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match { + case Seq(name) => + Some((None, Identifier.of(Array.empty, name))) + case Seq(catalogName, tail @ _*) => + try { + Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last))) + } catch { + case _: CatalogNotFoundException => + Some((None, Identifier.of(parts.init.toArray, parts.last))) + } } } /** * Extract legacy table identifier from a multi-part identifier. * - * For legacy support only. Please use - * [[org.apache.spark.sql.catalog.v2.LookupCatalog.CatalogObjectIdentifier]] in DSv2 code paths. + * For legacy support only. Please use [[CatalogObjectIdentifier]] instead on DSv2 code paths. */ object AsTableIdentifier { def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f24d6f168dac..91365fcbb91f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, LookupCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -96,18 +96,15 @@ object AnalysisContext { class Analyzer( catalog: SessionCatalog, conf: SQLConf, - maxIterations: Int, - override val lookupCatalog: Option[(String) => CatalogPlugin] = None) + maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { def this(catalog: SessionCatalog, conf: SQLConf) = { this(catalog, conf, conf.optimizerMaxIterations) } - def this(lookupCatalog: Option[(String) => CatalogPlugin], catalog: SessionCatalog, - conf: SQLConf) = { - this(catalog, conf, conf.optimizerMaxIterations, lookupCatalog) - } + override protected def lookupCatalog(name: String): CatalogPlugin = + throw new CatalogNotFoundException("No catalog lookup function") def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala new file mode 100644 index 000000000000..783751ff7986 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.catalog.v2 + +import org.scalatest.Inside +import org.scalatest.Matchers._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +private case class TestCatalogPlugin(override val name: String) extends CatalogPlugin { + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = Unit +} + +class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { + import CatalystSqlParser._ + + private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap + + override def lookupCatalog(name: String): CatalogPlugin = + catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) + + test("catalog object identifier") { + Seq( + ("tbl", None, Seq.empty, "tbl"), + ("db.tbl", None, Seq("db"), "tbl"), + ("prod.func", catalogs.get("prod"), Seq.empty, "func"), + ("ns1.ns2.tbl", None, Seq("ns1", "ns2"), "tbl"), + ("prod.db.tbl", catalogs.get("prod"), Seq("db"), "tbl"), + ("test.db.tbl", catalogs.get("test"), Seq("db"), "tbl"), + ("test.ns1.ns2.ns3.tbl", catalogs.get("test"), Seq("ns1", "ns2", "ns3"), "tbl"), + ("`db.tbl`", None, Seq.empty, "db.tbl"), + ("parquet.`file:/tmp/db.tbl`", None, Seq("parquet"), "file:/tmp/db.tbl"), + ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", None, + Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json")).foreach { + case (sql, expectedCatalog, namespace, name) => + inside(parseMultipartIdentifier(sql)) { + case CatalogObjectIdentifier(catalog, ident) => + catalog shouldEqual expectedCatalog + ident shouldEqual Identifier.of(namespace.toArray, name) + } + } + } + + test("table identifier") { + Seq( + ("tbl", "tbl", None), + ("db.tbl", "tbl", Some("db")), + ("`db.tbl`", "db.tbl", None), + ("parquet.`file:/tmp/db.tbl`", "file:/tmp/db.tbl", Some("parquet")), + ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", "s3://buck/tmp/abc.json", + Some("org.apache.spark.sql.json"))).foreach { + case (sql, table, db) => + inside (parseMultipartIdentifier(sql)) { + case AsTableIdentifier(ident) => + ident shouldEqual TableIdentifier(table, db) + } + } + Seq( + "prod.func", + "prod.db.tbl", + "ns1.ns2.tbl").foreach { sql => + parseMultipartIdentifier(sql) match { + case AsTableIdentifier(_) => + fail(s"$sql should not be resolved as TableIdentifier") + case _ => + } + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala deleted file mode 100644 index 0f2d67eaa9b2..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.catalog.v2 - -import org.scalatest.Matchers._ - -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer} -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -private class TestCatalogPlugin(override val name: String) extends CatalogPlugin { - - override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = Unit -} - -class ResolveMultipartIdentifierSuite extends AnalysisTest { - import CatalystSqlParser._ - - private val analyzer = makeAnalyzer(caseSensitive = false) - - private val catalogs = Seq("prod", "test").map(name => name -> new TestCatalogPlugin(name)).toMap - - private def lookupCatalog(catalog: String): CatalogPlugin = - catalogs.getOrElse(catalog, throw new CatalogNotFoundException("Not found")) - - private def makeAnalyzer(caseSensitive: Boolean) = { - val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive) - new Analyzer(Some(lookupCatalog _), null, conf) - } - - override protected def getAnalyzer(caseSensitive: Boolean) = analyzer - - private def checkResolution(sqlText: String, expectedCatalog: Option[CatalogPlugin], - expectedNamespace: Array[String], expectedName: String): Unit = { - - import analyzer.CatalogObjectIdentifier - val CatalogObjectIdentifier(catalog, ident) = parseMultipartIdentifier(sqlText) - catalog shouldEqual expectedCatalog - ident.namespace shouldEqual expectedNamespace - ident.name shouldEqual expectedName - } - - private def checkTableResolution(sqlText: String, - expectedIdent: Option[TableIdentifier]): Unit = { - - import analyzer.AsTableIdentifier - parseMultipartIdentifier(sqlText) match { - case AsTableIdentifier(ident) => - assert(Some(ident) === expectedIdent) - case _ => - assert(None === expectedIdent) - } - } - - test("resolve multipart identifier") { - checkResolution("tbl", None, Array.empty, "tbl") - checkResolution("db.tbl", None, Array("db"), "tbl") - checkResolution("prod.func", catalogs.get("prod"), Array.empty, "func") - checkResolution("ns1.ns2.tbl", None, Array("ns1", "ns2"), "tbl") - checkResolution("prod.db.tbl", catalogs.get("prod"), Array("db"), "tbl") - checkResolution("test.db.tbl", catalogs.get("test"), Array("db"), "tbl") - checkResolution("test.ns1.ns2.ns3.tbl", - catalogs.get("test"), Array("ns1", "ns2", "ns3"), "tbl") - checkResolution("`db.tbl`", None, Array.empty, "db.tbl") - checkResolution("parquet.`file:/tmp/db.tbl`", None, Array("parquet"), "file:/tmp/db.tbl") - checkResolution("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", None, - Array("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json") - } - - test("resolve table identifier") { - checkTableResolution("tbl", Some(TableIdentifier("tbl"))) - checkTableResolution("db.tbl", Some(TableIdentifier("tbl", Some("db")))) - checkTableResolution("prod.func", None) - checkTableResolution("ns1.ns2.tbl", None) - checkTableResolution("prod.db.tbl", None) - checkTableResolution("`db.tbl`", Some(TableIdentifier("db.tbl"))) - checkTableResolution("parquet.`file:/tmp/db.tbl`", - Some(TableIdentifier("file:/tmp/db.tbl", Some("parquet")))) - checkTableResolution("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", - Some(TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index bc874a0ac4de..fd101d4b79e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -42,7 +42,7 @@ case class DataSourceResolution( import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog) + override protected def lookupCatalog(name: String): CatalogPlugin = findCatalog(name) def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog) From 780f7ff5111097af4877ce79dbabf77225d41a03 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Wed, 29 May 2019 00:08:13 -0700 Subject: [PATCH 5/7] Review comments --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../spark/sql/catalog/v2/CatalogManager.java | 88 ------------------- .../apache/spark/sql/catalog/v2/Catalogs.java | 42 ++++----- .../spark/sql/catalog/v2/IdentifierImpl.java | 17 ---- .../sql/util/CaseInsensitiveStringMap.java | 14 --- .../sql/catalyst/parser/AstBuilder.scala | 18 ++++ .../logical/sql/DropTableStatement.scala | 1 - .../plans/logical/sql/DropViewStatement.scala | 33 +++++++ .../sql/catalog/v2/CatalogManagerSuite.java | 71 --------------- .../sql/catalog/v2/CatalogV2TestUtils.scala | 58 ------------ .../sql/catalog/v2/TestTableCatalog.scala | 2 - .../sql/catalyst/parser/DDLParserSuite.scala | 34 ++++++- .../spark/sql/execution/SparkSqlParser.scala | 11 --- .../datasources/DataSourceResolution.scala | 11 ++- .../execution/command/DDLParserSuite.scala | 39 -------- .../command/PlanResolutionSuite.scala | 64 +++++++++++++- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 15 ++-- .../sources/v2/TestInMemoryTableCatalog.scala | 2 - 18 files changed, 176 insertions(+), 346 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala delete mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 08ce482a65cc..91beb5e639af 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -140,7 +140,7 @@ statement | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable - | DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable + | DROP VIEW (IF EXISTS)? multipartIdentifier #dropView | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? VIEW (IF NOT EXISTS)? tableIdentifier identifierCommentList? diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java deleted file mode 100644 index 2d52e5395a83..000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalog.v2; - -import org.apache.spark.SparkException; -import org.apache.spark.annotation.Private; -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -import static org.apache.spark.sql.catalog.v2.Catalogs.classKey; -import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey; -import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix; -import static scala.collection.JavaConverters.mapAsJavaMapConverter; - -@Private -public class CatalogManager { - - private final SQLConf conf; - - public CatalogManager(SQLConf conf) { - this.conf = conf; - } - - /** - * Load a catalog. - * - * @param name a catalog name - * @return a catalog plugin - */ - public CatalogPlugin load(String name) throws SparkException { - return Catalogs.load(name, conf); - } - - /** - * Add a catalog. - * - * @param name a catalog name - * @param pluginClassName a catalog plugin class name - * @param options catalog options - */ - public void add( - String name, - String pluginClassName, - CaseInsensitiveStringMap options) { - options.entrySet().stream() - .forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue())); - conf.setConfString(classKey(name), pluginClassName); - } - - /** - * Add a catalog without option. - * - * @param name a catalog name - * @param pluginClassName a catalog plugin class name - */ - public void add( - String name, - String pluginClassName) { - add(name, pluginClassName, CaseInsensitiveStringMap.empty()); - } - - /** - * Remove a catalog. - * - * @param name a catalog name - */ - public void remove(String name) { - conf.unsetConf(classKey(name)); - mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream() - .filter(key -> isOptionKey(name, key)) - .forEach(conf::unsetConf); - } -} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index d53e5bfb2514..851a6a9f6d16 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -23,8 +23,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.Utils; +import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static scala.collection.JavaConverters.mapAsJavaMapConverter; @@ -33,23 +35,6 @@ public class Catalogs { private Catalogs() { } - public static String classKey(String name) { - return "spark.sql.catalog." + name; - } - - public static String optionKeyPrefix(String name) { - return "spark.sql.catalog." + name + "."; - } - - public static boolean isOptionKey(String name, String keyName) { - return keyName.startsWith(optionKeyPrefix(name)); - } - - public static String optionName(String name, String keyName) { - assert(isOptionKey(name, keyName)); - return keyName.substring(optionKeyPrefix(name).length()); - } - /** * Load and configure a catalog by name. *

@@ -64,10 +49,10 @@ public static String optionName(String name, String keyName) { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { - String pluginClassName = conf.getConfString(classKey(name), null); + String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); if (pluginClassName == null) { throw new CatalogNotFoundException(String.format( - "Catalog '%s' plugin class not found: %s is not defined", name, classKey(name))); + "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); } ClassLoader loader = Utils.getContextOrSparkClassLoader(); @@ -111,12 +96,17 @@ public static CatalogPlugin load(String name, SQLConf conf) * @return a case insensitive string map of options starting with spark.sql.catalog.(name). */ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { - Map options = - mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream() - .filter(e -> isOptionKey(name, e.getKey())) - .collect(Collectors.toMap( - e -> optionName(name, e.getKey()), - e -> e.getValue())); + Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); + Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); + + HashMap options = new HashMap<>(); + for (Map.Entry entry : allConfs.entrySet()) { + Matcher matcher = prefix.matcher(entry.getKey()); + if (matcher.matches() && matcher.groupCount() > 0) { + options.put(matcher.group(1), entry.getValue()); + } + } + return new CaseInsensitiveStringMap(options); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index 51c5a589dd0f..cd131432008a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -22,8 +22,6 @@ import java.util.Arrays; import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * An {@link Identifier} implementation. @@ -51,21 +49,6 @@ public String name() { return name; } - private String quote(String part) { - if (part.contains("`")) { - return part.replace("`", "``"); - } else { - return part; - } - } - - @Override - public String toString() { - return Stream.concat(Stream.of(namespace), Stream.of(name)) - .map(part -> '`' + quote(part) + '`') - .collect(Collectors.joining(".")); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index 2bf1b6c22929..da41346d7ce7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Set; /** @@ -179,17 +178,4 @@ public double getDouble(String key, double defaultValue) { public Map asCaseSensitiveMap() { return Collections.unmodifiableMap(original); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o; - return delegate.equals(that.delegate); - } - - @Override - public int hashCode() { - return Objects.hash(delegate); - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8137db854cbc..7092868517f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2195,4 +2195,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a [[sql.DropTableStatement]] command. + */ + override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { + sql.DropTableStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), + ctx.EXISTS != null, + ctx.PURGE != null) + } + + /** + * Create a [[sql.DropViewStatement]] command. + */ + override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) { + sql.DropViewStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), + ctx.EXISTS != null) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala index bc31e57ac1b2..d41e8a501025 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan case class DropTableStatement( tableName: Seq[String], ifExists: Boolean, - isView: Boolean, purge: Boolean) extends ParsedStatement { override def output: Seq[Attribute] = Seq.empty diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala new file mode 100644 index 000000000000..7a12db000d3e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A DROP VIEW statement, as parsed from SQL. + */ +case class DropViewStatement( + tableName: Seq[String], + ifExists: Boolean) extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java deleted file mode 100644 index 880cb457fbef..000000000000 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalog.v2; - -import org.apache.spark.SparkException; -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.HashMap; - -import static org.hamcrest.core.Is.is; -import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertThat; - -public class CatalogManagerSuite { - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - CatalogManager catalogManager = new CatalogManager(new SQLConf()); - - @Test - public void testAdd() throws SparkException { - CaseInsensitiveStringMap options = new CaseInsensitiveStringMap( - new HashMap() {{ - put("option1", "value1"); - put("option2", "value2"); - }}); - catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options); - CatalogPlugin catalogPlugin = catalogManager.load("testcat"); - assertThat(catalogPlugin.name(), is("testcat")); - assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); - assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options)); - } - - @Test - public void testAddWithOption() throws SparkException { - catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); - CatalogPlugin catalogPlugin = catalogManager.load("testcat"); - assertThat(catalogPlugin.name(), is("testcat")); - assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); - assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty())); - } - - @Test - public void testRemove() throws SparkException { - catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); - catalogManager.load("testcat"); - catalogManager.remove("testcat"); - exception.expect(CatalogNotFoundException.class); - catalogManager.load("testcat"); - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala deleted file mode 100644 index 663eed6e109d..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalog.v2 - -import org.apache.spark.sql.internal.SQLConf - -private[sql] trait CatalogV2TestUtils { - - protected lazy val catalogManager: CatalogManager = new CatalogManager(SQLConf.get) - - /** - * Adds a catalog. - */ - protected def addCatalog(name: String, pluginClassName: String): Unit = - catalogManager.add(name, pluginClassName) - - /** - * Removes catalogs. - */ - protected def removeCatalog(catalogNames: String*): Unit = - catalogNames.foreach { catalogName => - catalogManager.remove(catalogName) - } - - /** - * Sets the default catalog. - * - * @param catalog the new default catalog - */ - protected def setDefaultCatalog(catalog: String): Unit = - SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, catalog) - - /** - * Returns the current default catalog. - */ - protected def defaultCatalog: Option[String] = SQLConf.get.defaultV2Catalog - - /** - * Restores the default catalog to the previously saved value. - */ - protected def restoreDefaultCatalog(previous: Option[String]): Unit = - previous.foreach(SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, _)) -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index f0bb5739046e..78b4763484cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -89,8 +89,6 @@ class TestTableCatalog extends TableCatalog { } override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined - - override def toString: String = name } object TestTableCatalog { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 08baebbf140e..35cd813ae65c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement} import org.apache.spark.sql.types.{IntegerType, StringType, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -34,6 +35,10 @@ class DDLParserSuite extends AnalysisTest { } } + private def parseCompare(sql: String, expected: LogicalPlan): Unit = { + comparePlans(parsePlan(sql), expected, checkAnalysis = false) + } + test("create table using - schema") { val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" @@ -362,4 +367,31 @@ class DDLParserSuite extends AnalysisTest { } } } + + test("drop table") { + parseCompare("DROP TABLE testcat.ns1.ns2.tbl", + DropTableStatement(Seq("testcat", "ns1", "ns2", "tbl"), ifExists = false, purge = false)) + parseCompare(s"DROP TABLE db.tab", + DropTableStatement(Seq("db", "tab"), ifExists = false, purge = false)) + parseCompare(s"DROP TABLE IF EXISTS db.tab", + DropTableStatement(Seq("db", "tab"), ifExists = true, purge = false)) + parseCompare(s"DROP TABLE tab", + DropTableStatement(Seq("tab"), ifExists = false, purge = false)) + parseCompare(s"DROP TABLE IF EXISTS tab", + DropTableStatement(Seq("tab"), ifExists = true, purge = false)) + parseCompare(s"DROP TABLE tab PURGE", + DropTableStatement(Seq("tab"), ifExists = false, purge = true)) + parseCompare(s"DROP TABLE IF EXISTS tab PURGE", + DropTableStatement(Seq("tab"), ifExists = true, purge = true)) + } + + test("drop view") { + parseCompare(s"DROP VIEW testcat.db.view", + DropViewStatement(Seq("testcat", "db", "view"), ifExists = false)) + parseCompare(s"DROP VIEW db.view", DropViewStatement(Seq("db", "view"), ifExists = false)) + parseCompare(s"DROP VIEW IF EXISTS db.view", + DropViewStatement(Seq("db", "view"), ifExists = true)) + parseCompare(s"DROP VIEW view", DropViewStatement(Seq("view"), ifExists = false)) + parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c292a8e301af..1f1b41b7c440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -645,17 +645,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ctx.TEMPORARY != null) } - /** - * Create a [[DropTableCommand]] command. - */ - override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - sql.DropTableStatement( - visitMultipartIdentifier(ctx.multipartIdentifier()), - ctx.EXISTS != null, - ctx.VIEW != null, - ctx.PURGE != null) - } - /** * Create a [[AlterTableRenameCommand]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index fd101d4b79e0..ec31b79fd56f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DropTableCommand import org.apache.spark.sql.internal.SQLConf @@ -85,11 +85,14 @@ case class DataSourceResolution( .asTableCatalog convertCTAS(catalog, identifier, create) - case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _, _) => + case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => DropTable(catalog.asTableCatalog, ident, ifExists) - case DropTableStatement(AsTableIdentifier(tableName), ifExists, isView, purge) => - DropTableCommand(tableName, ifExists, isView, purge) + case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) => + DropTableCommand(tableName, ifExists, false, purge) + + case DropViewStatement(AsTableIdentifier(tableName), ifExists) => + DropTableCommand(tableName, ifExists, true, false) } object V1WriteProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index e51d07f6587c..a5bc73b20992 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} -import org.apache.spark.sql.catalyst.plans.logical.sql.DropTableStatement import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -903,44 +902,6 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assert(e.contains("Found an empty partition key 'b'")) } - test("drop table") { - val tableName1 = "db.tab" - val tableName2 = "tab" - Seq( - (s"DROP TABLE $tableName1", - DropTableStatement(Seq("db", "tab"), ifExists = false, isView = false, purge = false)), - (s"DROP TABLE IF EXISTS $tableName1", - DropTableStatement(Seq("db", "tab"), ifExists = true, isView = false, purge = false)), - (s"DROP TABLE $tableName2", - DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = false)), - (s"DROP TABLE IF EXISTS $tableName2", - DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = false)), - (s"DROP TABLE $tableName2 PURGE", - DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = true)), - (s"DROP TABLE IF EXISTS $tableName2 PURGE", - DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = true))).foreach { - case (sql, expected) => - comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) - } - } - - test("drop view") { - val viewName1 = "db.view" - val viewName2 = "view" - Seq( - (s"DROP VIEW $viewName1", - DropTableStatement(Seq("db", "view"), ifExists = false, isView = true, purge = false)), - (s"DROP VIEW IF EXISTS $viewName1", - DropTableStatement(Seq("db", "view"), ifExists = true, isView = true, purge = false)), - (s"DROP VIEW $viewName2", - DropTableStatement(Seq("view"), ifExists = false, isView = true, purge = false)), - (s"DROP VIEW IF EXISTS $viewName2", - DropTableStatement(Seq("view"), ifExists = true, isView = true, purge = false))).foreach { - case (sql, expected) => - comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) - } - } - test("show columns") { val sql1 = "SHOW COLUMNS FROM t1" val sql2 = "SHOW COLUMNS IN db1.t1" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 1b4fcab9173f..d62ae616e539 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType} @@ -55,6 +55,9 @@ class PlanResolutionSuite extends AnalysisTest { DataSourceResolution(newConf, lookupCatalog).apply(parsePlan(query)) } + private def parseResolveCompare(query: String, expected: LogicalPlan): Unit = + comparePlans(parseAndResolve(query), expected, checkAnalysis = true) + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parseAndResolve(sql).collect { case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) @@ -438,4 +441,63 @@ class PlanResolutionSuite extends AnalysisTest { s"got ${other.getClass.getName}: $sql") } } + + test("drop table") { + val tableName1 = "db.tab" + val tableIdent1 = TableIdentifier("tab", Option("db")) + val tableName2 = "tab" + val tableIdent2 = TableIdentifier("tab", None) + + parseResolveCompare(s"DROP TABLE $tableName1", + DropTableCommand(tableIdent1, ifExists = false, isView = false, purge = false)) + parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1", + DropTableCommand(tableIdent1, ifExists = true, isView = false, purge = false)) + parseResolveCompare(s"DROP TABLE $tableName2", + DropTableCommand(tableIdent2, ifExists = false, isView = false, purge = false)) + parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2", + DropTableCommand(tableIdent2, ifExists = true, isView = false, purge = false)) + parseResolveCompare(s"DROP TABLE $tableName2 PURGE", + DropTableCommand(tableIdent2, ifExists = false, isView = false, purge = true)) + parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2 PURGE", + DropTableCommand(tableIdent2, ifExists = true, isView = false, purge = true)) + } + + test("drop table v2") { + val tableName1 = "testcat.db.tab" + val tableIdent1 = Identifier.of(Array("db"), "tab") + val tableName2 = "testcat.tab" + val tableIdent2 = Identifier.of(Array.empty, "tab") + + parseResolveCompare(s"DROP TABLE $tableName1", + DropTable(testCat, tableIdent1, ifExists = false)) + parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1", + DropTable(testCat, tableIdent1, ifExists = true)) + parseResolveCompare(s"DROP TABLE $tableName2", + DropTable(testCat, tableIdent2, ifExists = false)) + parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2", + DropTable(testCat, tableIdent2, ifExists = true)) + } + + test("drop view") { + val viewName1 = "db.view" + val viewIdent1 = TableIdentifier("view", Option("db")) + val viewName2 = "view" + val viewIdent2 = TableIdentifier("view") + + parseResolveCompare(s"DROP VIEW $viewName1", + DropTableCommand(viewIdent1, ifExists = false, isView = true, purge = false)) + parseResolveCompare(s"DROP VIEW IF EXISTS $viewName1", + DropTableCommand(viewIdent1, ifExists = true, isView = true, purge = false)) + parseResolveCompare(s"DROP VIEW $viewName2", + DropTableCommand(viewIdent2, ifExists = false, isView = true, purge = false)) + parseResolveCompare(s"DROP VIEW IF EXISTS $viewName2", + DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false)) + } + + test("drop view v2") { + assertAnalysisError( + parseAndResolve("DROP VIEW testcat.db.view"), + Seq("unresolved operator 'DropViewStatement"), + caseSensitive = false) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index f3126e4cf893..2cccd62edf3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -22,35 +22,30 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalog.v2.{CatalogV2TestUtils, Identifier} +import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} class DataSourceV2SQLSuite - extends QueryTest with SharedSQLContext with BeforeAndAfter with CatalogV2TestUtils { + extends QueryTest with SharedSQLContext with BeforeAndAfter { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ private val orc2 = classOf[OrcDataSourceV2].getName - private val previousDefaultCatalog = defaultCatalog - before { + spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.default.catalog", "testcat") + val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") - - addCatalog("testcat", classOf[TestInMemoryTableCatalog].getName) - setDefaultCatalog("testcat") } after { - restoreDefaultCatalog(previousDefaultCatalog) - removeCatalog("testcat") - spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.sql("DROP TABLE source") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index b970d62bc30c..2ecf1c2f184f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -102,8 +102,6 @@ class TestInMemoryTableCatalog extends TableCatalog { def clearTables(): Unit = { tables.clear() } - - override def toString: String = name } /** From dd5b799da03c6cfd0d08b98c0fbae59f74406fc7 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Wed, 29 May 2019 09:00:48 -0700 Subject: [PATCH 6/7] Revert "[SPARK-26946][SQL][FOLLOWUP] Require lookup function" This reverts commit 7e11ca62 --- .../spark/sql/catalog/v2/LookupCatalog.scala | 28 +++--- .../sql/catalyst/analysis/Analyzer.scala | 11 ++- .../catalog/v2/LookupCatalogSuite.scala | 88 ----------------- .../v2/ResolveMultipartIdentifierSuite.scala | 99 +++++++++++++++++++ .../datasources/DataSourceResolution.scala | 2 +- 5 files changed, 123 insertions(+), 105 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 5464a7496d23..932d32022702 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier @Experimental trait LookupCatalog { - protected def lookupCatalog(name: String): CatalogPlugin + def lookupCatalog: Option[(String) => CatalogPlugin] = None type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) @@ -34,23 +34,27 @@ trait LookupCatalog { * Extract catalog plugin and identifier from a multi-part identifier. */ object CatalogObjectIdentifier { - def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match { - case Seq(name) => - Some((None, Identifier.of(Array.empty, name))) - case Seq(catalogName, tail @ _*) => - try { - Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last))) - } catch { - case _: CatalogNotFoundException => - Some((None, Identifier.of(parts.init.toArray, parts.last))) - } + def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup => + parts match { + case Seq(name) => + (None, Identifier.of(Array.empty, name)) + case Seq(catalogName, tail @ _*) => + try { + val catalog = lookup(catalogName) + (Some(catalog), Identifier.of(tail.init.toArray, tail.last)) + } catch { + case _: CatalogNotFoundException => + (None, Identifier.of(parts.init.toArray, parts.last)) + } + } } } /** * Extract legacy table identifier from a multi-part identifier. * - * For legacy support only. Please use [[CatalogObjectIdentifier]] instead on DSv2 code paths. + * For legacy support only. Please use + * [[org.apache.spark.sql.catalog.v2.LookupCatalog.CatalogObjectIdentifier]] in DSv2 code paths. */ object AsTableIdentifier { def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 91365fcbb91f..f24d6f168dac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, LookupCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -96,15 +96,18 @@ object AnalysisContext { class Analyzer( catalog: SessionCatalog, conf: SQLConf, - maxIterations: Int) + maxIterations: Int, + override val lookupCatalog: Option[(String) => CatalogPlugin] = None) extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { def this(catalog: SessionCatalog, conf: SQLConf) = { this(catalog, conf, conf.optimizerMaxIterations) } - override protected def lookupCatalog(name: String): CatalogPlugin = - throw new CatalogNotFoundException("No catalog lookup function") + def this(lookupCatalog: Option[(String) => CatalogPlugin], catalog: SessionCatalog, + conf: SQLConf) = { + this(catalog, conf, conf.optimizerMaxIterations, lookupCatalog) + } def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala deleted file mode 100644 index 783751ff7986..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.catalog.v2 - -import org.scalatest.Inside -import org.scalatest.Matchers._ - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -private case class TestCatalogPlugin(override val name: String) extends CatalogPlugin { - - override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = Unit -} - -class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside { - import CatalystSqlParser._ - - private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap - - override def lookupCatalog(name: String): CatalogPlugin = - catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found")) - - test("catalog object identifier") { - Seq( - ("tbl", None, Seq.empty, "tbl"), - ("db.tbl", None, Seq("db"), "tbl"), - ("prod.func", catalogs.get("prod"), Seq.empty, "func"), - ("ns1.ns2.tbl", None, Seq("ns1", "ns2"), "tbl"), - ("prod.db.tbl", catalogs.get("prod"), Seq("db"), "tbl"), - ("test.db.tbl", catalogs.get("test"), Seq("db"), "tbl"), - ("test.ns1.ns2.ns3.tbl", catalogs.get("test"), Seq("ns1", "ns2", "ns3"), "tbl"), - ("`db.tbl`", None, Seq.empty, "db.tbl"), - ("parquet.`file:/tmp/db.tbl`", None, Seq("parquet"), "file:/tmp/db.tbl"), - ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", None, - Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json")).foreach { - case (sql, expectedCatalog, namespace, name) => - inside(parseMultipartIdentifier(sql)) { - case CatalogObjectIdentifier(catalog, ident) => - catalog shouldEqual expectedCatalog - ident shouldEqual Identifier.of(namespace.toArray, name) - } - } - } - - test("table identifier") { - Seq( - ("tbl", "tbl", None), - ("db.tbl", "tbl", Some("db")), - ("`db.tbl`", "db.tbl", None), - ("parquet.`file:/tmp/db.tbl`", "file:/tmp/db.tbl", Some("parquet")), - ("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", "s3://buck/tmp/abc.json", - Some("org.apache.spark.sql.json"))).foreach { - case (sql, table, db) => - inside (parseMultipartIdentifier(sql)) { - case AsTableIdentifier(ident) => - ident shouldEqual TableIdentifier(table, db) - } - } - Seq( - "prod.func", - "prod.db.tbl", - "ns1.ns2.tbl").foreach { sql => - parseMultipartIdentifier(sql) match { - case AsTableIdentifier(_) => - fail(s"$sql should not be resolved as TableIdentifier") - case _ => - } - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala new file mode 100644 index 000000000000..0f2d67eaa9b2 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/ResolveMultipartIdentifierSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.catalog.v2 + +import org.scalatest.Matchers._ + +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +private class TestCatalogPlugin(override val name: String) extends CatalogPlugin { + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = Unit +} + +class ResolveMultipartIdentifierSuite extends AnalysisTest { + import CatalystSqlParser._ + + private val analyzer = makeAnalyzer(caseSensitive = false) + + private val catalogs = Seq("prod", "test").map(name => name -> new TestCatalogPlugin(name)).toMap + + private def lookupCatalog(catalog: String): CatalogPlugin = + catalogs.getOrElse(catalog, throw new CatalogNotFoundException("Not found")) + + private def makeAnalyzer(caseSensitive: Boolean) = { + val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive) + new Analyzer(Some(lookupCatalog _), null, conf) + } + + override protected def getAnalyzer(caseSensitive: Boolean) = analyzer + + private def checkResolution(sqlText: String, expectedCatalog: Option[CatalogPlugin], + expectedNamespace: Array[String], expectedName: String): Unit = { + + import analyzer.CatalogObjectIdentifier + val CatalogObjectIdentifier(catalog, ident) = parseMultipartIdentifier(sqlText) + catalog shouldEqual expectedCatalog + ident.namespace shouldEqual expectedNamespace + ident.name shouldEqual expectedName + } + + private def checkTableResolution(sqlText: String, + expectedIdent: Option[TableIdentifier]): Unit = { + + import analyzer.AsTableIdentifier + parseMultipartIdentifier(sqlText) match { + case AsTableIdentifier(ident) => + assert(Some(ident) === expectedIdent) + case _ => + assert(None === expectedIdent) + } + } + + test("resolve multipart identifier") { + checkResolution("tbl", None, Array.empty, "tbl") + checkResolution("db.tbl", None, Array("db"), "tbl") + checkResolution("prod.func", catalogs.get("prod"), Array.empty, "func") + checkResolution("ns1.ns2.tbl", None, Array("ns1", "ns2"), "tbl") + checkResolution("prod.db.tbl", catalogs.get("prod"), Array("db"), "tbl") + checkResolution("test.db.tbl", catalogs.get("test"), Array("db"), "tbl") + checkResolution("test.ns1.ns2.ns3.tbl", + catalogs.get("test"), Array("ns1", "ns2", "ns3"), "tbl") + checkResolution("`db.tbl`", None, Array.empty, "db.tbl") + checkResolution("parquet.`file:/tmp/db.tbl`", None, Array("parquet"), "file:/tmp/db.tbl") + checkResolution("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", None, + Array("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json") + } + + test("resolve table identifier") { + checkTableResolution("tbl", Some(TableIdentifier("tbl"))) + checkTableResolution("db.tbl", Some(TableIdentifier("tbl", Some("db")))) + checkTableResolution("prod.func", None) + checkTableResolution("ns1.ns2.tbl", None) + checkTableResolution("prod.db.tbl", None) + checkTableResolution("`db.tbl`", Some(TableIdentifier("db.tbl"))) + checkTableResolution("parquet.`file:/tmp/db.tbl`", + Some(TableIdentifier("file:/tmp/db.tbl", Some("parquet")))) + checkTableResolution("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", + Some(TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index ec31b79fd56f..96e077931e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -42,7 +42,7 @@ case class DataSourceResolution( import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - override protected def lookupCatalog(name: String): CatalogPlugin = findCatalog(name) + override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog) def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog) From d5b0d4ad5f276a9f9fd52154c782418b8fa80a28 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Wed, 29 May 2019 20:02:25 -0700 Subject: [PATCH 7/7] Review comments --- .../spark/sql/catalog/v2/IdentifierImpl.java | 17 +++++++++++++++++ .../spark/sql/catalyst/parser/AstBuilder.scala | 10 +++++----- .../plans/logical/sql/DropViewStatement.scala | 2 +- .../datasources/DataSourceResolution.scala | 9 +++++++-- .../execution/command/PlanResolutionSuite.scala | 13 +++++++------ .../sql/sources/v2/DataSourceV2SQLSuite.scala | 3 +-- 6 files changed, 38 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index cd131432008a..34f3882c9c41 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -22,6 +22,8 @@ import java.util.Arrays; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * An {@link Identifier} implementation. @@ -49,6 +51,21 @@ public String name() { return name; } + private String escapeQuote(String part) { + if (part.contains("`")) { + return part.replace("`", "``"); + } else { + return part; + } + } + + @Override + public String toString() { + return Stream.concat(Stream.of(namespace), Stream.of(name)) + .map(part -> '`' + escapeQuote(part) + '`') + .collect(Collectors.joining(".")); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7092868517f1..fa05efebf9c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -2196,20 +2196,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a [[sql.DropTableStatement]] command. + * Create a [[DropTableStatement]] command. */ override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - sql.DropTableStatement( + DropTableStatement( visitMultipartIdentifier(ctx.multipartIdentifier()), ctx.EXISTS != null, ctx.PURGE != null) } /** - * Create a [[sql.DropViewStatement]] command. + * Create a [[DropViewStatement]] command. */ override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) { - sql.DropViewStatement( + DropViewStatement( visitMultipartIdentifier(ctx.multipartIdentifier()), ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala index 7a12db000d3e..523158788e83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropViewStatement.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * A DROP VIEW statement, as parsed from SQL. */ case class DropViewStatement( - tableName: Seq[String], + viewName: Seq[String], ifExists: Boolean) extends ParsedStatement { override def output: Seq[Attribute] = Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 96e077931e66..58b9276869ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -89,10 +89,15 @@ case class DataSourceResolution( DropTable(catalog.asTableCatalog, ident, ifExists) case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) => - DropTableCommand(tableName, ifExists, false, purge) + DropTableCommand(tableName, ifExists, isView = false, purge) + + case DropViewStatement(CatalogObjectIdentifier(Some(catalog), ident), _) => + throw new AnalysisException( + s"Can not specify catalog `${catalog.name}` for view $ident " + + s"because view support in catalog has not been implemented yet") case DropViewStatement(AsTableIdentifier(tableName), ifExists) => - DropTableCommand(tableName, ifExists, true, false) + DropTableCommand(tableName, ifExists, isView = true, purge = false) } object V1WriteProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index d62ae616e539..06f733208637 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import java.net.URI +import java.util.Locale import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog, TestTableCatalog} @@ -462,7 +463,7 @@ class PlanResolutionSuite extends AnalysisTest { DropTableCommand(tableIdent2, ifExists = true, isView = false, purge = true)) } - test("drop table v2") { + test("drop table in v2 catalog") { val tableName1 = "testcat.db.tab" val tableIdent1 = Identifier.of(Array("db"), "tab") val tableName2 = "testcat.tab" @@ -494,10 +495,10 @@ class PlanResolutionSuite extends AnalysisTest { DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false)) } - test("drop view v2") { - assertAnalysisError( - parseAndResolve("DROP VIEW testcat.db.view"), - Seq("unresolved operator 'DropViewStatement"), - caseSensitive = false) + test("drop view in v2 catalog") { + intercept[AnalysisException] { + parseAndResolve("DROP VIEW testcat.db.view") + }.getMessage.toLowerCase(Locale.ROOT).contains( + "view support in catalog has not been implemented") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 2cccd62edf3a..5b9071b59b9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -28,8 +28,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} -class DataSourceV2SQLSuite - extends QueryTest with SharedSQLContext with BeforeAndAfter { +class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._