diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index f5b808197c9be..68fef3ed58dc4 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -139,8 +139,8 @@ statement
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
- | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
- | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
+ | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
+ | DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList?
@@ -397,7 +397,7 @@ queryTerm
queryPrimary
: querySpecification #queryPrimaryDefault
- | TABLE tableIdentifier #table
+ | TABLE multipartIdentifier #table
| inlineTable #inlineTableDefault1
| '(' queryNoWith ')' #subquery
;
@@ -536,7 +536,7 @@ identifierComment
;
relationPrimary
- : tableIdentifier sample? tableAlias #tableName
+ : multipartIdentifier sample? tableAlias #tableName
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java
new file mode 100644
index 0000000000000..2d52e5395a83c
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.annotation.Private;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.spark.sql.catalog.v2.Catalogs.classKey;
+import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey;
+import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix;
+import static scala.collection.JavaConverters.mapAsJavaMapConverter;
+
+@Private
+public class CatalogManager {
+
+ private final SQLConf conf;
+
+ public CatalogManager(SQLConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Load a catalog.
+ *
+ * @param name a catalog name
+ * @return a catalog plugin
+ */
+ public CatalogPlugin load(String name) throws SparkException {
+ return Catalogs.load(name, conf);
+ }
+
+ /**
+ * Add a catalog.
+ *
+ * @param name a catalog name
+ * @param pluginClassName a catalog plugin class name
+ * @param options catalog options
+ */
+ public void add(
+ String name,
+ String pluginClassName,
+ CaseInsensitiveStringMap options) {
+ options.entrySet().stream()
+ .forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue()));
+ conf.setConfString(classKey(name), pluginClassName);
+ }
+
+ /**
+ * Add a catalog without option.
+ *
+ * @param name a catalog name
+ * @param pluginClassName a catalog plugin class name
+ */
+ public void add(
+ String name,
+ String pluginClassName) {
+ add(name, pluginClassName, CaseInsensitiveStringMap.empty());
+ }
+
+ /**
+ * Remove a catalog.
+ *
+ * @param name a catalog name
+ */
+ public void remove(String name) {
+ conf.unsetConf(classKey(name));
+ mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream()
+ .filter(key -> isOptionKey(name, key))
+ .forEach(conf::unsetConf);
+ }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
index 851a6a9f6d165..d53e5bfb25142 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -23,10 +23,8 @@
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;
-import java.util.HashMap;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static scala.collection.JavaConverters.mapAsJavaMapConverter;
@@ -35,6 +33,23 @@ public class Catalogs {
private Catalogs() {
}
+ public static String classKey(String name) {
+ return "spark.sql.catalog." + name;
+ }
+
+ public static String optionKeyPrefix(String name) {
+ return "spark.sql.catalog." + name + ".";
+ }
+
+ public static boolean isOptionKey(String name, String keyName) {
+ return keyName.startsWith(optionKeyPrefix(name));
+ }
+
+ public static String optionName(String name, String keyName) {
+ assert(isOptionKey(name, keyName));
+ return keyName.substring(optionKeyPrefix(name).length());
+ }
+
/**
* Load and configure a catalog by name.
*
@@ -49,10 +64,10 @@ private Catalogs() {
*/
public static CatalogPlugin load(String name, SQLConf conf)
throws CatalogNotFoundException, SparkException {
- String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
+ String pluginClassName = conf.getConfString(classKey(name), null);
if (pluginClassName == null) {
throw new CatalogNotFoundException(String.format(
- "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
+ "Catalog '%s' plugin class not found: %s is not defined", name, classKey(name)));
}
ClassLoader loader = Utils.getContextOrSparkClassLoader();
@@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf)
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
- Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
- Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");
-
- HashMap options = new HashMap<>();
- for (Map.Entry entry : allConfs.entrySet()) {
- Matcher matcher = prefix.matcher(entry.getKey());
- if (matcher.matches() && matcher.groupCount() > 0) {
- options.put(matcher.group(1), entry.getValue());
- }
- }
-
+ Map options =
+ mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream()
+ .filter(e -> isOptionKey(name, e.getKey()))
+ .collect(Collectors.toMap(
+ e -> optionName(name, e.getKey()),
+ e -> e.getValue()));
return new CaseInsensitiveStringMap(options);
}
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
index cd131432008a6..51c5a589dd0fe 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
@@ -22,6 +22,8 @@
import java.util.Arrays;
import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* An {@link Identifier} implementation.
@@ -49,6 +51,21 @@ public String name() {
return name;
}
+ private String quote(String part) {
+ if (part.contains("`")) {
+ return part.replace("`", "``");
+ } else {
+ return part;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Stream.concat(Stream.of(namespace), Stream.of(name))
+ .map(part -> '`' + quote(part) + '`')
+ .collect(Collectors.joining("."));
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
index da41346d7ce71..2bf1b6c22929e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
/**
@@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) {
public Map asCaseSensitiveMap() {
return Collections.unmodifiableMap(original);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o;
+ return delegate.equals(that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delegate);
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
index 932d32022702b..c23aebeaa3026 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalog.v2
+import scala.util.{Failure, Success, Try}
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -26,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
@Experimental
trait LookupCatalog {
- def lookupCatalog: Option[(String) => CatalogPlugin] = None
+ def lookupCatalog: Option[String => CatalogPlugin] = None
type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
@@ -34,19 +36,23 @@ trait LookupCatalog {
* Extract catalog plugin and identifier from a multi-part identifier.
*/
object CatalogObjectIdentifier {
- def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup =>
- parts match {
- case Seq(name) =>
- (None, Identifier.of(Array.empty, name))
- case Seq(catalogName, tail @ _*) =>
- try {
- val catalog = lookup(catalogName)
- (Some(catalog), Identifier.of(tail.init.toArray, tail.last))
- } catch {
- case _: CatalogNotFoundException =>
- (None, Identifier.of(parts.init.toArray, parts.last))
- }
- }
+ def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = parts match {
+ case Seq(name) =>
+ Some((None, Identifier.of(Array.empty, name)))
+ case Seq(catalogName, tail @ _*) =>
+ lookupCatalog match {
+ case Some(lookup) =>
+ Try(lookup(catalogName)) match {
+ case Success(catalog) =>
+ Some((Some(catalog), Identifier.of(tail.init.toArray, tail.last)))
+ case Failure(_: CatalogNotFoundException) =>
+ Some((None, Identifier.of(parts.init.toArray, parts.last)))
+ case Failure(ex) =>
+ throw ex
+ }
+ case None =>
+ Some((None, Identifier.of(parts.init.toArray, parts.last)))
+ }
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala
new file mode 100644
index 0000000000000..9d371dc3b61b4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelper.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier, TableIdentifierLike}
+
+/**
+ * Resolve multipart identifier to [[CatalogTableIdentifier]] or [[TableIdentifier]].
+ */
+trait TableIdentifierHelper extends LookupCatalog {
+ import CatalogV2Implicits._
+
+ implicit class TableIdentifierHelper(parts: Seq[String]) {
+ def asCatalogTableIdentifier: TableIdentifierLike = parts match {
+ case CatalogObjectIdentifier(Some(catalog), ident) =>
+ CatalogTableIdentifier(catalog.asTableCatalog, ident)
+
+ case AsTableIdentifier(tableIdentifier) =>
+ tableIdentifier
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f24d6f168dacf..5449c38d7dfa8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -227,9 +227,8 @@ class Analyzer(
def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
plan resolveOperatorsDown {
- case u: UnresolvedRelation =>
- cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
- .map(_._2).getOrElse(u)
+ case u @ UnresolvedRelation(TableIdentifier(table, _)) =>
+ cteRelations.find(x => resolver(x._1, table)).map(_._2).getOrElse(u)
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
@@ -721,7 +720,7 @@ class Analyzer(
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
}
- case u: UnresolvedRelation => resolveRelation(u)
+ case u @ UnresolvedRelation(_: TableIdentifier) => resolveRelation(u)
}
// Look up the table with the given name from catalog. The database we used is decided by the
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 9440a3f806b4e..544052d596261 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -22,6 +22,7 @@ import java.util.Locale
import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -71,17 +72,18 @@ object ResolveHints {
val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
- case ResolvedHint(u: UnresolvedRelation, hint)
- if relations.exists(resolver(_, u.tableIdentifier.table)) =>
- relations.remove(u.tableIdentifier.table)
+ case ResolvedHint(u @ UnresolvedRelation(TableIdentifier(table, _)), hint)
+ if relations.exists(resolver(_, table)) =>
+ relations.remove(table)
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
case ResolvedHint(r: SubqueryAlias, hint)
if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
- case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
- relations.remove(u.tableIdentifier.table)
+ case UnresolvedRelation(TableIdentifier(table, _))
+ if relations.exists(resolver(_, table)) =>
+ relations.remove(table)
ResolvedHint(plan, createHintInfo(hintName))
case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index d44b42134f868..9272bdabc3b49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier, TableIdentifierLike}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -38,11 +38,13 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
/**
* Holds the name of a relation that has yet to be looked up in a catalog.
*
- * @param tableIdentifier table name
+ * @param table table name
*/
-case class UnresolvedRelation(tableIdentifier: TableIdentifier)
+case class UnresolvedRelation(table: TableIdentifierLike)
extends LeafNode {
+ def tableIdentifier: TableIdentifier = table.tableIdentifier
+
/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.unquotedString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index deceec73dda30..afabca981ba02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
+
/**
* An identifier that optionally specifies a database.
*
@@ -64,6 +66,27 @@ object AliasIdentifier {
def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier)
}
+/**
+ * An interface to ease transition from [[TableIdentifier]] to [[CatalogTableIdentifier]].
+ */
+sealed trait TableIdentifierLike {
+ def tableIdentifier: TableIdentifier
+}
+
+/**
+ * A data source V2 table identifier.
+ *
+ * @param catalog a catalog plugin
+ * @param ident an object identifier
+ */
+case class CatalogTableIdentifier(catalog: TableCatalog, ident: Identifier)
+ extends TableIdentifierLike {
+
+ override def tableIdentifier: TableIdentifier =
+ throw new UnsupportedOperationException(
+ s"$this should not be used on non-DSv2 code path")
+}
+
/**
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
@@ -71,7 +94,9 @@ object AliasIdentifier {
* unquotedString as the function name.
*/
case class TableIdentifier(table: String, database: Option[String])
- extends IdentifierWithDatabase {
+ extends IdentifierWithDatabase with TableIdentifierLike {
+
+ override def tableIdentifier: TableIdentifier = this
override val identifier: String = table
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8137db854cbcb..86d13dc0f39e5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -29,6 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2
+import org.apache.spark.sql.catalog.v2.TableIdentifierHelper
import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@@ -49,7 +50,8 @@ import org.apache.spark.util.random.RandomSampler
* The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or
* TableIdentifier.
*/
-class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging {
+class AstBuilder(conf: SQLConf)
+ extends SqlBaseBaseVisitor[AnyRef] with Logging with TableIdentifierHelper {
import ParserUtils._
def this() = this(new SQLConf())
@@ -844,14 +846,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
- UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
+ val tableId = visitMultipartIdentifier(ctx.multipartIdentifier).asCatalogTableIdentifier
+ UnresolvedRelation(tableId)
}
/**
* Create an aliased table reference. This is typically used in FROM clauses.
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
- val tableId = visitTableIdentifier(ctx.tableIdentifier)
+ val tableId = visitMultipartIdentifier(ctx.multipartIdentifier).asCatalogTableIdentifier
val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
table.optionalMap(ctx.sample)(withSample)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 42755fc274192..6bf12cff28f9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -499,6 +499,14 @@ object OverwritePartitionsDynamic {
}
}
+/**
+ * Drop a table.
+ */
+case class DropTable(
+ catalog: TableCatalog,
+ ident: Identifier,
+ ifExists: Boolean) extends Command
+
/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala
new file mode 100644
index 0000000000000..bc31e57ac1b2b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.sql
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * A DROP TABLE statement, as parsed from SQL.
+ */
+case class DropTableStatement(
+ tableName: Seq[String],
+ ifExists: Boolean,
+ isView: Boolean,
+ purge: Boolean) extends ParsedStatement {
+
+ override def output: Seq[Attribute] = Seq.empty
+
+ override def children: Seq[LogicalPlan] = Seq.empty
+}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java
new file mode 100644
index 0000000000000..880cb457fbef4
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class CatalogManagerSuite {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ CatalogManager catalogManager = new CatalogManager(new SQLConf());
+
+ @Test
+ public void testAdd() throws SparkException {
+ CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(
+ new HashMap() {{
+ put("option1", "value1");
+ put("option2", "value2");
+ }});
+ catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options);
+ CatalogPlugin catalogPlugin = catalogManager.load("testcat");
+ assertThat(catalogPlugin.name(), is("testcat"));
+ assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class));
+ assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options));
+ }
+
+ @Test
+ public void testAddWithOption() throws SparkException {
+ catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName());
+ CatalogPlugin catalogPlugin = catalogManager.load("testcat");
+ assertThat(catalogPlugin.name(), is("testcat"));
+ assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class));
+ assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty()));
+ }
+
+ @Test
+ public void testRemove() throws SparkException {
+ catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName());
+ catalogManager.load("testcat");
+ catalogManager.remove("testcat");
+ exception.expect(CatalogNotFoundException.class);
+ catalogManager.load("testcat");
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala
new file mode 100644
index 0000000000000..7c35dbdf4ef92
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import org.apache.spark.sql.internal.SQLConf
+
+private[sql] trait CatalogV2TestUtils {
+
+ protected lazy val catalogManager: CatalogManager = new CatalogManager(SQLConf.get)
+
+ /**
+ * Loads a catalog.
+ */
+ protected def catalog(name: String): CatalogPlugin =
+ catalogManager.load(name)
+
+ /**
+ * Adds a catalog.
+ */
+ protected def addCatalog(name: String, pluginClassName: String): Unit =
+ catalogManager.add(name, pluginClassName)
+
+ /**
+ * Removes catalogs.
+ */
+ protected def removeCatalog(catalogNames: String*): Unit =
+ catalogNames.foreach { catalogName =>
+ catalogManager.remove(catalogName)
+ }
+
+ /**
+ * Removes catalogs after calling `f`.
+ */
+ protected def withCatalog(catalogNames: String*)(f: => Unit): Unit =
+ try f finally removeCatalog(catalogNames: _*)
+
+ /**
+ * Sets the default catalog.
+ *
+ * @param catalog the new default catalog
+ */
+ protected def setDefaultCatalog(catalog: String): Unit =
+ SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, catalog)
+
+ /**
+ * Returns the current default catalog.
+ */
+ protected def defaultCatalog: Option[String] = SQLConf.get.defaultV2Catalog
+
+ /**
+ * Restores the default catalog to the previously saved value.
+ */
+ protected def restoreDefaultCatalog(previous: Option[String]): Unit =
+ previous.foreach(SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, _))
+
+ /**
+ * Sets default catalog to `catalog` before executing `f`,
+ * then switches back to the previous default catalog after `f` returns.
+ */
+ protected def activateCatalog(catalog: String)(f: => Unit): Unit = {
+ val previous = defaultCatalog
+ setDefaultCatalog(catalog)
+ try f finally restoreDefaultCatalog(previous)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala
new file mode 100644
index 0000000000000..458b09629fe8d
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableIdentifierHelperSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalog.v2
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TableIdentifierHelperSuite extends SparkFunSuite with SQLHelper {
+ import CatalystSqlParser._
+
+ private val testCat = {
+ val newCatalog = new TestTableCatalog
+ newCatalog.initialize("testcat", CaseInsensitiveStringMap.empty())
+ newCatalog
+ }
+
+ private def findCatalog(name: String): CatalogPlugin = name match {
+ case "testcat" =>
+ testCat
+ case _ =>
+ throw new CatalogNotFoundException(s"$name not found")
+ }
+
+ test("with catalog lookup function") {
+ val helper = new TableIdentifierHelper {
+ override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog(_))
+ }
+ import helper._
+
+ assert(parseMultipartIdentifier("testcat.v2tbl").asCatalogTableIdentifier ===
+ CatalogTableIdentifier(testCat, Identifier.of(Array.empty, "v2tbl")))
+ assert(parseMultipartIdentifier("db.tbl").asCatalogTableIdentifier ===
+ TableIdentifier("tbl", Some("db")))
+
+ }
+
+ test("without catalog lookup function") {
+ val helper = new TableIdentifierHelper {
+ override def lookupCatalog: Option[String => CatalogPlugin] = None
+ }
+ import helper._
+
+ assert(parseMultipartIdentifier("testcat.v2tbl").asCatalogTableIdentifier ===
+ TableIdentifier("v2tbl", Some("testcat")))
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
index 78b4763484cc0..f0bb5739046ed 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
@@ -89,6 +89,8 @@ class TestTableCatalog extends TableCatalog {
}
override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
+
+ override def toString: String = name
}
object TestTableCatalog {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 0b5bf3f48b593..1de3d4d4d94ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
-import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs}
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs, TableIdentifierHelper}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders._
@@ -82,7 +82,7 @@ class SparkSession private(
@transient private val existingSharedState: Option[SharedState],
@transient private val parentSessionState: Option[SessionState],
@transient private[sql] val extensions: SparkSessionExtensions)
- extends Serializable with Closeable with Logging { self =>
+ extends Serializable with Closeable with Logging with TableIdentifierHelper { self =>
// The call site where this SparkSession was constructed.
private val creationSite: CallSite = Utils.getCallSite()
@@ -613,6 +613,8 @@ class SparkSession private(
catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf))
}
+ override def lookupCatalog: Option[String => CatalogPlugin] = Some(catalog(_))
+
/**
* Returns the specified table/view as a `DataFrame`.
*
@@ -624,10 +626,10 @@ class SparkSession private(
* @since 2.0.0
*/
def table(tableName: String): DataFrame = {
- table(sessionState.sqlParser.parseTableIdentifier(tableName))
+ table(sessionState.sqlParser.parseMultipartIdentifier(tableName).asCatalogTableIdentifier)
}
- private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
+ private[sql] def table(tableIdent: TableIdentifierLike): DataFrame = {
Dataset.ofRows(self, UnresolvedRelation(tableIdent))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index f433cc8d32793..c292a8e301afc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -649,8 +649,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* Create a [[DropTableCommand]] command.
*/
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
- DropTableCommand(
- visitTableIdentifier(ctx.tableIdentifier),
+ sql.DropTableStatement(
+ visitMultipartIdentifier(ctx.multipartIdentifier()),
ctx.EXISTS != null,
ctx.VIEW != null,
ctx.PURGE != null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index cd34dfafd1320..ffc8c279da457 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -190,10 +190,10 @@ case class CreateViewCommand(
// package (e.g., HiveGenericUDF).
child.collect {
// Disallow creating permanent views based on temporary views.
- case s: UnresolvedRelation
- if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
+ case UnresolvedRelation(tableIdentifier: TableIdentifier)
+ if sparkSession.sessionState.catalog.isTemporaryTable(tableIdentifier) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
- s"referencing a temporary view ${s.tableIdentifier}")
+ s"referencing a temporary view ${tableIdentifier}")
case other if !other.resolved => other.expressions.flatMap(_.collect {
// Disallow creating permanent views based on temporary UDFs.
case e: UnresolvedFunction
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index acbe37349762b..d53c4852277a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -24,15 +24,18 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, LogicalPlan}
-import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.DropTableCommand
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class DataSourceResolution(
conf: SQLConf,
@@ -83,6 +86,16 @@ case class DataSourceResolution(
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
.asTableCatalog
convertCTAS(catalog, identifier, create)
+
+ case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _, _) =>
+ DropTable(catalog.asTableCatalog, ident, ifExists)
+
+ case DropTableStatement(AsTableIdentifier(tableName), ifExists, isView, purge) =>
+ DropTableCommand(tableName, ifExists, isView, purge)
+
+ case UnresolvedRelation(CatalogTableIdentifier(catalog, ident)) =>
+ DataSourceV2Relation.create(catalog.loadTable(ident),
+ CaseInsensitiveStringMap.empty)
}
object V1WriteProvider {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 534e2fd0757f9..9e371cab250ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.util.Locale
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
@@ -41,7 +42,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
}
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case u: UnresolvedRelation if maybeSQLFile(u) =>
+ case u @ UnresolvedRelation(_: TableIdentifier) if maybeSQLFile(u) =>
try {
val dataSource = DataSource(
sparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index a1d547eb7e86d..27d87960edb3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -199,6 +199,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
Nil
}
+ case DropTable(catalog, ident, ifExists) =>
+ DropTableExec(catalog, ident, ifExists) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
new file mode 100644
index 0000000000000..d325e0205f9d8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.LeafExecNode
+
+/**
+ * Physical plan node for dropping a table.
+ */
+case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean)
+ extends LeafExecNode {
+
+ override def doExecute(): RDD[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ catalog.dropTable(ident)
+ } else if (!ifExists) {
+ throw new NoSuchTableException(ident)
+ }
+
+ sqlContext.sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index b2d065274b151..c192ed629e365 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.internal
import org.apache.spark.SparkConf
import org.apache.spark.annotation.{Experimental, Unstable}
import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _}
+import org.apache.spark.sql.catalog.v2.CatalogPlugin
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlAstBuilder, SparkSqlParser}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
import org.apache.spark.sql.streaming.StreamingQueryManager
@@ -124,7 +125,11 @@ abstract class BaseSessionStateBuilder(
* Note: this depends on the `conf` field.
*/
protected lazy val sqlParser: ParserInterface = {
- extensions.buildParser(session, new SparkSqlParser(conf))
+ extensions.buildParser(session, new SparkSqlParser(conf) {
+ override val astBuilder: SparkSqlAstBuilder = new SparkSqlAstBuilder(conf) {
+ override def lookupCatalog: Option[String => CatalogPlugin] = Some(session.catalog(_))
+ }
+ })
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index d7bfbce73af05..e51d07f6587c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -32,13 +32,13 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan}
-import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical.sql.DropTableStatement
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
class DDLParserSuite extends PlanTest with SharedSQLContext {
private lazy val parser = new SparkSqlParser(new SQLConf)
@@ -906,59 +906,39 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
test("drop table") {
val tableName1 = "db.tab"
val tableName2 = "tab"
-
- val parsed = Seq(
- s"DROP TABLE $tableName1",
- s"DROP TABLE IF EXISTS $tableName1",
- s"DROP TABLE $tableName2",
- s"DROP TABLE IF EXISTS $tableName2",
- s"DROP TABLE $tableName2 PURGE",
- s"DROP TABLE IF EXISTS $tableName2 PURGE"
- ).map(parser.parsePlan)
-
- val expected = Seq(
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false,
- purge = false),
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false,
- purge = false),
- DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
- purge = false),
- DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
- purge = false),
- DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
- purge = true),
- DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
- purge = true))
-
- parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) }
+ Seq(
+ (s"DROP TABLE $tableName1",
+ DropTableStatement(Seq("db", "tab"), ifExists = false, isView = false, purge = false)),
+ (s"DROP TABLE IF EXISTS $tableName1",
+ DropTableStatement(Seq("db", "tab"), ifExists = true, isView = false, purge = false)),
+ (s"DROP TABLE $tableName2",
+ DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = false)),
+ (s"DROP TABLE IF EXISTS $tableName2",
+ DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = false)),
+ (s"DROP TABLE $tableName2 PURGE",
+ DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = true)),
+ (s"DROP TABLE IF EXISTS $tableName2 PURGE",
+ DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = true))).foreach {
+ case (sql, expected) =>
+ comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false)
+ }
}
test("drop view") {
val viewName1 = "db.view"
val viewName2 = "view"
-
- val parsed1 = parser.parsePlan(s"DROP VIEW $viewName1")
- val parsed2 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName1")
- val parsed3 = parser.parsePlan(s"DROP VIEW $viewName2")
- val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
-
- val expected1 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true,
- purge = false)
- val expected2 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true,
- purge = false)
- val expected3 =
- DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true,
- purge = false)
- val expected4 =
- DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true,
- purge = false)
-
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
- comparePlans(parsed3, expected3)
- comparePlans(parsed4, expected4)
+ Seq(
+ (s"DROP VIEW $viewName1",
+ DropTableStatement(Seq("db", "view"), ifExists = false, isView = true, purge = false)),
+ (s"DROP VIEW IF EXISTS $viewName1",
+ DropTableStatement(Seq("db", "view"), ifExists = true, isView = true, purge = false)),
+ (s"DROP VIEW $viewName2",
+ DropTableStatement(Seq("view"), ifExists = false, isView = true, purge = false)),
+ (s"DROP VIEW IF EXISTS $viewName2",
+ DropTableStatement(Seq("view"), ifExists = true, isView = true, purge = false))).foreach {
+ case (sql, expected) =>
+ comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false)
+ }
}
test("show columns") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
index 2424e6e1d2d1e..3b53d2f2ac508 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
@@ -22,31 +22,39 @@ import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, QueryTest}
-import org.apache.spark.sql.catalog.v2.Identifier
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalog.v2.{CatalogV2TestUtils, Identifier}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{LongType, StringType, StructType}
-class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter {
+class DataSourceV2SQLSuite
+ extends QueryTest with SharedSQLContext with BeforeAndAfter with CatalogV2TestUtils {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
private val orc2 = classOf[OrcDataSourceV2].getName
- before {
- spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
- spark.conf.set("spark.sql.default.catalog", "testcat")
+ private val previousDefaultCatalog = defaultCatalog
+ before {
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data")
df2.createOrReplaceTempView("source2")
+
+ addCatalog("testcat", classOf[TestInMemoryTableCatalog].getName)
+ addCatalog("testcat2", classOf[TestInMemoryTableCatalog].getName)
+ setDefaultCatalog("testcat")
}
after {
+ restoreDefaultCatalog(previousDefaultCatalog)
+ removeCatalog("testcat", "testcat2")
+
spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables()
spark.sql("DROP TABLE source")
+ spark.sql("DROP TABLE source2")
}
test("CreateTable: use v2 plan because catalog is set") {
@@ -266,4 +274,80 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog)
}
}
+
+ test("DropTable: basic") {
+ val tableName = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source")
+ assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === true)
+ sql(s"DROP TABLE $tableName")
+ assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === false)
+ }
+
+ test("DropTable: if exists") {
+ intercept[NoSuchTableException] {
+ sql(s"DROP TABLE testcat.db.notbl")
+ }
+ sql(s"DROP TABLE IF EXISTS testcat.db.notbl")
+ }
+
+ test("Relation: basic") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
+ checkAnswer(sql(s"TABLE $t1"), spark.table("source"))
+ checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source"))
+ }
+ }
+
+ test("Relation: SparkSession.table()") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
+ checkAnswer(spark.table(s"$t1"), spark.table("source"))
+ }
+ }
+
+ test("Relation: CTE") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
+ checkAnswer(
+ sql(s"""
+ |WITH cte AS (SELECT * FROM $t1)
+ |SELECT * FROM cte
+ """.stripMargin),
+ spark.table("source"))
+ }
+ }
+
+ test("Relation: view text") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ withView("view1") { v1: String =>
+ sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
+ sql(s"CREATE VIEW $v1 AS SELECT * from $t1")
+ checkAnswer(sql(s"TABLE $v1"), spark.table("source"))
+ }
+ }
+ }
+
+ test("Relation: join tables from 2 catalogs") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ val t2 = "testcat2.v2tbl"
+ withTable(t1, t2) {
+ sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
+ sql(s"CREATE TABLE $t2 USING foo AS SELECT id, data FROM source2")
+ val df1 = spark.table("source")
+ val df2 = spark.table("source2")
+ val df_joined = df1.join(df2).where(df1("id") + 1 === df2("id"))
+ checkAnswer(
+ sql(s"""
+ |SELECT *
+ |FROM $t1 t1, $t2 t2
+ |WHERE t1.id + 1 = t2.id
+ """.stripMargin),
+ df_joined)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
index 2ecf1c2f184fb..b970d62bc30cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
@@ -102,6 +102,8 @@ class TestInMemoryTableCatalog extends TableCatalog {
def clearTables(): Unit = {
tables.clear()
}
+
+ override def toString: String = name
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 5e77cac450ac3..eb735ba42e6b4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
@@ -595,7 +596,7 @@ private[hive] class TestHiveQueryExecution(
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
- logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table }
+ logical.collect { case UnresolvedRelation(TableIdentifier(table, _)) => table }
val resolver = sparkSession.sessionState.conf.resolver
val referencedTestTables = sparkSession.testTables.keys.filter { testTable =>
referencedTables.exists(resolver(_, testTable))