diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 2aca10f1bfbc..2bb930280018 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -167,6 +167,7 @@ statement
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
+ | DELETE FROM catalogTableIdentifier WHERE expression #deleteFrom
| unsupportedHiveNativeCommands .*? #failNativeCommand
;
@@ -215,7 +216,6 @@ unsupportedHiveNativeCommands
| kw1=COMMIT
| kw1=ROLLBACK
| kw1=DFS
- | kw1=DELETE kw2=FROM
;
createTableHeader
@@ -526,6 +526,10 @@ rowFormat
(NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited
;
+catalogTableIdentifier
+ : ((catalog=identifier '.')? db=identifier '.')? table=identifier
+ ;
+
tableIdentifier
: (db=identifier '.')? table=identifier
;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java
new file mode 100644
index 000000000000..a4ad1f6994f9
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Case-insensitive map of string keys to string values.
+ *
+ * This is used to pass options to v2 implementations to ensure consistent case insensitivity.
+ *
+ * Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
+ * keys converted to lower case.
+ */
+public class CaseInsensitiveStringMap implements Map {
+
+ public static CaseInsensitiveStringMap empty() {
+ return new CaseInsensitiveStringMap();
+ }
+
+ private final Map delegate;
+
+ private CaseInsensitiveStringMap() {
+ this.delegate = new HashMap<>();
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return delegate.containsValue(value);
+ }
+
+ @Override
+ public String get(Object key) {
+ return delegate.get(key.toString().toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ public String put(String key, String value) {
+ return delegate.put(key.toLowerCase(Locale.ROOT), value);
+ }
+
+ @Override
+ public String remove(Object key) {
+ return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ public void putAll(Map extends String, ? extends String> m) {
+ for (Map.Entry extends String, ? extends String> entry : m.entrySet()) {
+ delegate.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue());
+ }
+ }
+
+ @Override
+ public void clear() {
+ delegate.clear();
+ }
+
+ @Override
+ public Set keySet() {
+ return delegate.keySet();
+ }
+
+ @Override
+ public Collection values() {
+ return delegate.values();
+ }
+
+ @Override
+ public Set> entrySet() {
+ return delegate.entrySet();
+ }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java
new file mode 100644
index 000000000000..03831b7aa915
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.internal.SQLConf;
+
+/**
+ * A marker interface to provide a catalog implementation for Spark.
+ *
+ * Implementations can provide catalog functions by implementing additional interfaces, like
+ * {@link TableCatalog} to expose table operations.
+ *
+ * Catalog implementations must implement this marker interface to be loaded by
+ * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
+ * required public no-arg constructor. After creating an instance, it will be configured by calling
+ * {@link #initialize(CaseInsensitiveStringMap)}.
+ *
+ * Catalog implementations are registered to a name by adding a configuration option to Spark:
+ * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
+ * in the Spark configuration that share the catalog name prefix,
+ * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
+ * string map of options in initialization with the prefix removed. An additional property,
+ * {@code name}, is also added to the options and will contain the catalog's name; in this case,
+ * "catalog-name".
+ */
+public interface CatalogProvider {
+ /**
+ * Called to initialize configuration.
+ *
+ * This method is called once, just after the provider is instantiated.
+ *
+ * @param options a case-insensitive string map of configuration
+ */
+ void initialize(CaseInsensitiveStringMap options);
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
new file mode 100644
index 000000000000..71ab9f528dbe
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.util.Utils;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static scala.collection.JavaConverters.mapAsJavaMapConverter;
+
+public class Catalogs {
+ private Catalogs() {
+ }
+
+ /**
+ * Load and configure a catalog by name.
+ *
+ * This loads, instantiates, and initializes the catalog provider for each call; it does not
+ * cache or reuse instances.
+ *
+ * @param name a String catalog name
+ * @param conf a SQLConf
+ * @return an initialized CatalogProvider
+ * @throws SparkException If the provider class cannot be found or instantiated
+ */
+ public static CatalogProvider load(String name, SQLConf conf) throws SparkException {
+ String providerClassName = conf.getConfString("spark.sql.catalog." + name, null);
+ if (providerClassName == null) {
+ throw new SparkException(String.format(
+ "Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name));
+ }
+
+ ClassLoader loader = Utils.getContextOrSparkClassLoader();
+
+ try {
+ Class> providerClass = loader.loadClass(providerClassName);
+
+ if (!CatalogProvider.class.isAssignableFrom(providerClass)) {
+ throw new SparkException(String.format(
+ "Provider class for catalog '%s' does not implement CatalogProvider: %s",
+ name, providerClassName));
+ }
+
+ CatalogProvider provider = CatalogProvider.class.cast(providerClass.newInstance());
+
+ provider.initialize(catalogOptions(name, conf));
+
+ return provider;
+
+ } catch (ClassNotFoundException e) {
+ throw new SparkException(String.format(
+ "Cannot find catalog provider class for catalog '%s': %s", name, providerClassName));
+
+ } catch (IllegalAccessException e) {
+ throw new SparkException(String.format(
+ "Failed to call public no-arg constructor for catalog '%s': %s", name, providerClassName),
+ e);
+
+ } catch (InstantiationException e) {
+ throw new SparkException(String.format(
+ "Failed while instantiating provider for catalog '%s': %s", name, providerClassName),
+ e.getCause());
+ }
+ }
+
+ /**
+ * Extracts a named catalog's configuration from a SQLConf.
+ *
+ * @param name a catalog name
+ * @param conf a SQLConf
+ * @return a case insensitive string map of options starting with spark.sql.catalog.(name).
+ */
+ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
+ Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
+ Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");
+
+ CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ for (Map.Entry entry : allConfs.entrySet()) {
+ Matcher matcher = prefix.matcher(entry.getKey());
+ if (matcher.matches() && matcher.groupCount() > 0) {
+ options.put(matcher.group(1), entry.getValue());
+ }
+ }
+
+ // add name last to ensure it overwrites any conflicting options
+ options.put("name", name);
+
+ return options;
+ }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java
new file mode 100644
index 000000000000..30a20f27b8c6
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link TableCatalog} or other table sources.
+ */
+public interface Table {
+ /**
+ * Return the table properties.
+ * @return this table's map of string properties
+ */
+ Map properties();
+
+ /**
+ * Return the table schema.
+ * @return this table's schema as a struct type
+ */
+ StructType schema();
+
+ /**
+ * Return the table partitioning expressions.
+ * @return this table's partitioning expressions
+ */
+ List partitionExpressions();
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
new file mode 100644
index 000000000000..539beb0c39c5
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog extends CatalogProvider {
+ /**
+ * Load table metadata by {@link TableIdentifier identifier} from the catalog.
+ *
+ * @param ident a table identifier
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist.
+ */
+ Table loadTable(TableIdentifier ident) throws NoSuchTableException;
+
+ /**
+ * Test whether a table exists using an {@link TableIdentifier identifier} from the catalog.
+ *
+ * @param ident a table identifier
+ * @return true if the table exists, false otherwise
+ */
+ default boolean tableExists(TableIdentifier ident) {
+ try {
+ return loadTable(ident) != null;
+ } catch (NoSuchTableException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Create a table in the catalog.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @return metadata for the new table
+ * @throws TableAlreadyExistsException If a table already exists for the identifier
+ */
+ default Table createTable(TableIdentifier ident,
+ StructType schema) throws TableAlreadyExistsException {
+ return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap());
+ }
+
+ /**
+ * Create a table in the catalog.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @param properties a string map of table properties
+ * @return metadata for the new table
+ * @throws TableAlreadyExistsException If a table already exists for the identifier
+ */
+ default Table createTable(TableIdentifier ident,
+ StructType schema,
+ Map properties) throws TableAlreadyExistsException {
+ return createTable(ident, schema, Collections.emptyList(), properties);
+ }
+
+ /**
+ * Create a table in the catalog.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @param partitions a list of expressions to use for partitioning data in the table
+ * @param properties a string map of table properties
+ * @return metadata for the new table
+ * @throws TableAlreadyExistsException If a table already exists for the identifier
+ */
+ Table createTable(TableIdentifier ident,
+ StructType schema,
+ List partitions,
+ Map properties) throws TableAlreadyExistsException;
+
+ /**
+ * Apply a list of {@link TableChange changes} to a table in the catalog.
+ *
+ * Implementations may reject the requested changes. If any change is rejected, none of the
+ * changes should be applied to the table.
+ *
+ * @param ident a table identifier
+ * @param changes a list of changes to apply to the table
+ * @return updated metadata for the table
+ * @throws NoSuchTableException If the table doesn't exist.
+ * @throws IllegalArgumentException If any change is rejected by the implementation.
+ */
+ Table alterTable(TableIdentifier ident,
+ List changes) throws NoSuchTableException;
+
+ /**
+ * Apply {@link TableChange changes} to a table in the catalog.
+ *
+ * Implementations may reject the requested changes. If any change is rejected, none of the
+ * changes should be applied to the table.
+ *
+ * @param ident a table identifier
+ * @param changes a list of changes to apply to the table
+ * @return updated metadata for the table
+ * @throws NoSuchTableException If the table doesn't exist.
+ * @throws IllegalArgumentException If any change is rejected by the implementation.
+ */
+ default Table alterTable(TableIdentifier ident,
+ TableChange... changes) throws NoSuchTableException {
+ return alterTable(ident, Arrays.asList(changes));
+ }
+
+ /**
+ * Drop a table in the catalog.
+ *
+ * @param ident a table identifier
+ * @return true if a table was deleted, false if no table exists for the identifier
+ */
+ boolean dropTable(TableIdentifier ident);
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java
new file mode 100644
index 000000000000..3a8ba5e00b39
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These are passed to
+ * {@link TableCatalog#alterTable}. For example,
+ *
+ * import TableChange._
+ * val catalog = source.asInstanceOf[TableSupport].catalog()
+ * catalog.alterTable(ident,
+ * addColumn("x", IntegerType),
+ * renameColumn("a", "b"),
+ * deleteColumn("c")
+ * )
+ *
+ */
+public interface TableChange {
+
+ /**
+ * Create a TableChange for adding a top-level column to a table.
+ *
+ * Because "." may be interpreted as a field path separator or may be used in field names, it is
+ * not allowed in names passed to this method. To add to nested types or to add fields with
+ * names that contain ".", use {@link #addColumn(String, String, DataType)}.
+ *
+ * @param name the new top-level column name
+ * @param dataType the new column's data type
+ * @return a TableChange for the addition
+ */
+ static TableChange addColumn(String name, DataType dataType) {
+ return new AddColumn(null, name, dataType);
+ }
+
+ /**
+ * Create a TableChange for adding a nested column to a table.
+ *
+ * The parent name is used to find the parent struct type where the nested field will be added.
+ * If the parent name is null, the new column will be added to the root as a top-level column.
+ * If parent identifies a struct, a new column is added to that struct. If it identifies a list,
+ * the column is added to the list element struct, and if it identifies a map, the new column is
+ * added to the map's value struct.
+ *
+ * The given name is used to name the new column and names containing "." are not handled
+ * differently.
+ *
+ * @param parent the new field's parent
+ * @param name the new field name
+ * @param dataType the new field's data type
+ * @return a TableChange for the addition
+ */
+ static TableChange addColumn(String parent, String name, DataType dataType) {
+ return new AddColumn(parent, name, dataType);
+ }
+
+ /**
+ * Create a TableChange for renaming a field.
+ *
+ * The name is used to find the field to rename. The new name will replace the name of the type.
+ * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
+ *
+ * @param name the current field name
+ * @param newName the new name
+ * @return a TableChange for the rename
+ */
+ static TableChange renameColumn(String name, String newName) {
+ return new RenameColumn(name, newName);
+ }
+
+ /**
+ * Create a TableChange for updating the type of a field.
+ *
+ * The name is used to find the field to update.
+ *
+ * @param name the field name
+ * @param newDataType the new data type
+ * @return a TableChange for the update
+ */
+ static TableChange updateColumn(String name, DataType newDataType) {
+ return new UpdateColumn(name, newDataType);
+ }
+
+ /**
+ * Create a TableChange for deleting a field from a table.
+ *
+ * @param name the name of the field to delete
+ * @return a TableChange for the delete
+ */
+ static TableChange deleteColumn(String name) {
+ return new DeleteColumn(name);
+ }
+
+ final class AddColumn implements TableChange {
+ private final String parent;
+ private final String name;
+ private final DataType dataType;
+
+ private AddColumn(String parent, String name, DataType dataType) {
+ this.parent = parent;
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ public String parent() {
+ return parent;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public DataType type() {
+ return dataType;
+ }
+ }
+
+ final class RenameColumn implements TableChange {
+ private final String name;
+ private final String newName;
+
+ private RenameColumn(String name, String newName) {
+ this.name = name;
+ this.newName = newName;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String newName() {
+ return newName;
+ }
+ }
+
+ final class UpdateColumn implements TableChange {
+ private final String name;
+ private final DataType newDataType;
+
+ private UpdateColumn(String name, DataType newDataType) {
+ this.name = name;
+ this.newDataType = newDataType;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public DataType newDataType() {
+ return newDataType;
+ }
+ }
+
+ final class DeleteColumn implements TableChange {
+ private final String name;
+
+ private DeleteColumn(String name) {
+ this.name = name;
+ }
+
+ public String name() {
+ return name;
+ }
+ }
+
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4f474f4987dc..da4571792f80 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -175,6 +175,7 @@ class Analyzer(
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
+ ResolveOutputRelation ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
@@ -695,13 +696,14 @@ class Analyzer(
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
- case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
+ case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _)
+ if child.resolved && u.table.catalog.isEmpty =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
}
- case u: UnresolvedRelation => resolveRelation(u)
+ case u @ UnresolvedRelation(table) if table.catalog.isEmpty => resolveRelation(u)
}
// Look up the table with the given name from catalog. The database we used is decided by the
@@ -2226,6 +2228,100 @@ class Analyzer(
}
}
+ /**
+ * Resolves columns of an output table from the data in a logical plan. This rule will:
+ *
+ * - Reorder columns when the write is by name
+ * - Insert safe casts when data types do not match
+ * - Insert aliases when column names do not match
+ * - Detect plans that are not compatible with the output table and throw AnalysisException
+ */
+ object ResolveOutputRelation extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case append @ AppendData(table, query, isByName)
+ if table.resolved && query.resolved && !append.resolved =>
+ val projection = resolveOutputColumns(table.name, table.output, query, isByName)
+
+ if (projection != query) {
+ append.copy(query = projection)
+ } else {
+ append
+ }
+ }
+
+ def resolveOutputColumns(
+ tableName: String,
+ expected: Seq[Attribute],
+ query: LogicalPlan,
+ byName: Boolean): LogicalPlan = {
+
+ if (expected.size < query.output.size) {
+ throw new AnalysisException(
+ s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
+ }
+
+ val errors = new mutable.ArrayBuffer[String]()
+ val resolved: Seq[NamedExpression] = if (byName) {
+ expected.flatMap { outAttr =>
+ query.resolveQuoted(outAttr.name, resolver) match {
+ case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+ errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
+ None
+
+ case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
+ Some(upcast(inAttr, outAttr))
+
+ case Some(inAttr) =>
+ Some(inAttr) // matches nullability, datatype, and name
+
+ case _ =>
+ errors += s"Cannot find data for output column '${outAttr.name}'"
+ None
+ }
+ }
+
+ } else {
+ if (expected.size > query.output.size) {
+ throw new AnalysisException(
+ s"""Cannot write to '$tableName', not enough data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
+ }
+
+ query.output.zip(expected).flatMap {
+ case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
+ errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
+ None
+
+ case (inAttr, outAttr)
+ if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) ||
+ inAttr.name != outAttr.name =>
+ Some(upcast(inAttr, outAttr))
+
+ case (inAttr, _) =>
+ Some(inAttr) // matches nullability, datatype, and name
+ }
+ }
+
+ if (errors.nonEmpty) {
+ throw new AnalysisException(
+ s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
+ }
+
+ Project(resolved, query)
+ }
+
+ private def upcast(inAttr: NamedExpression, outAttr: Attribute): NamedExpression = {
+ Alias(
+ UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name
+ )(
+ explicitMetadata = Option(outAttr.metadata)
+ )
+ }
+ }
+
private def commonNaturalJoinProcessing(
left: LogicalPlan,
right: LogicalPlan,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala
new file mode 100644
index 000000000000..2e72d13cb923
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+trait NamedRelation extends LogicalPlan {
+ def name: String
+
+ def output: Seq[AttributeReference]
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 71e23175168e..ff8874cf7435 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -38,15 +38,19 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
/**
* Holds the name of a relation that has yet to be looked up in a catalog.
*
- * @param tableIdentifier table name
+ * @param table a [[CatalogTableIdentifier]]
*/
-case class UnresolvedRelation(tableIdentifier: TableIdentifier)
- extends LeafNode {
+case class UnresolvedRelation(table: CatalogTableIdentifier) extends LeafNode with NamedRelation {
/** Returns a `.` separated name for this relation. */
- def tableName: String = tableIdentifier.unquotedString
+ def tableName: String = table.unquotedString
- override def output: Seq[Attribute] = Nil
+ def name: String = tableName
+
+ /** Returns the table identifier without the catalog element */
+ def tableIdentifier: TableIdentifier = table.asTableIdentifier
+
+ override def output: Seq[AttributeReference] = Nil
override lazy val resolved = false
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index a3cc4529b545..6b0bd57d1756 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -18,30 +18,32 @@
package org.apache.spark.sql.catalyst
/**
- * An identifier that optionally specifies a database.
+ * An identifier that optionally specifies a database and catalog.
*
* Format (unquoted): "name" or "db.name"
* Format (quoted): "`name`" or "`db`.`name`"
*/
-sealed trait IdentifierWithDatabase {
+sealed trait IdentifierWithOptionalDatabaseAndCatalog {
val identifier: String
def database: Option[String]
+ def catalog: Option[String]
+
/*
* Escapes back-ticks within the identifier name with double-back-ticks.
*/
private def quoteIdentifier(name: String): String = name.replace("`", "``")
def quotedString: String = {
- val replacedId = quoteIdentifier(identifier)
- val replacedDb = database.map(quoteIdentifier(_))
-
- if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else s"`$replacedId`"
+ // database is required if catalog is present
+ assert(database.isDefined || catalog.isEmpty)
+ def q(s: String): String = s"`${quoteIdentifier(s)}`"
+ Seq(catalog.map(q), database.map(q), Some(q(identifier))).flatten.mkString(".")
}
def unquotedString: String = {
- if (database.isDefined) s"${database.get}.$identifier" else identifier
+ Seq(catalog, database, Some(identifier)).flatten.mkString(".")
}
override def toString: String = quotedString
@@ -49,17 +51,62 @@ sealed trait IdentifierWithDatabase {
/**
- * Identifies a table in a database.
- * If `database` is not defined, the current database is used.
- * When we register a permanent function in the FunctionRegistry, we use
- * unquotedString as the function name.
+ * Identifies a table in a database and catalog.
+ * If `database` is not defined, the current catalog's default database is used.
+ * If `catalog` is not defined, the current catalog is used.
*/
-case class TableIdentifier(table: String, database: Option[String])
- extends IdentifierWithDatabase {
+case class CatalogTableIdentifier(table: String, database: Option[String], catalog: Option[String])
+ extends IdentifierWithOptionalDatabaseAndCatalog {
+
+ // ensure database is present if catalog is defined
+ assert(database.isDefined || catalog.isEmpty)
override val identifier: String = table
+ /**
+ * Returns this as a TableIdentifier if its catalog is not set, fail otherwise.
+ *
+ * This is used to provide TableIdentifier for paths that do not support the catalog element. To
+ * ensure that the identifier is compatible, this asserts that the catalog element is not defined.
+ */
+ lazy val asTableIdentifier: TableIdentifier = {
+ assert(catalog.isEmpty, s"Cannot convert to TableIdentifier: catalog is ${catalog.get} != None")
+ new TableIdentifier(table, database)
+ }
+
+ /**
+ * Returns this CatalogTableIdentifier without the catalog.
+ *
+ * This is used for code paths where the catalog has already been used.
+ */
+ lazy val dropCatalog: CatalogTableIdentifier = catalog match {
+ case Some(_) => CatalogTableIdentifier(table, database, None)
+ case _ => this
+ }
+}
+
+
+/**
+ * Identifies a table in a database.
+ * If `database` is not defined, the current database is used.
+ *
+ * This class is used instead of CatalogTableIdentifier in paths that do not yet support table
+ * identifiers with catalogs.
+ */
+class TableIdentifier(name: String, db: Option[String])
+ extends CatalogTableIdentifier(name, db, None) {
+
def this(table: String) = this(table, None)
+
+ override lazy val asTableIdentifier: TableIdentifier = this
+
+ override def copy(
+ name: String = this.name,
+ database: Option[String] = this.db,
+ catalog: Option[String] = None): TableIdentifier = {
+ assert(catalog.isEmpty, "Cannot add catalog to a TableIdentifier using copy")
+ new TableIdentifier(name, database)
+ }
}
/** A fully qualified identifier for a table (i.e., database.tableName) */
@@ -69,18 +116,28 @@ case class QualifiedTableName(database: String, name: String) {
object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
+
+ def apply(
+ tableName: String,
+ database: Option[String] = None,
+ catalog: Option[String] = None): CatalogTableIdentifier =
+ CatalogTableIdentifier(tableName, database, catalog)
}
/**
* Identifies a function in a database.
* If `database` is not defined, the current database is used.
+ * When we register a permanent function in the FunctionRegistry, we use
+ * unquotedString as the function name.
*/
case class FunctionIdentifier(funcName: String, database: Option[String])
- extends IdentifierWithDatabase {
+ extends IdentifierWithOptionalDatabaseAndCatalog {
override val identifier: String = funcName
+ override val catalog: Option[String] = None
+
def this(funcName: String) = this(funcName, None)
override def toString: String = unquotedString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 49f578a24aae..a929f3b9368d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -29,7 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
@@ -547,6 +547,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}
+ /**
+ * Delete data from a table that matches a boolean expression.
+ */
+ override def visitDeleteFrom(ctx: DeleteFromContext): LogicalPlan = withOrigin(ctx) {
+ val tableIdent = visitCatalogTableIdentifier(ctx.catalogTableIdentifier())
+ DeleteFrom(UnresolvedRelation(tableIdent), expression(ctx.expression()))
+ }
+
/**
* Add a [[WithWindowDefinition]] operator to a logical plan.
*/
@@ -929,6 +937,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))
}
+ /**
+ * Create a [[CatalogTableIdentifier]] from a 'tableName' or 'databaseName'.'tableName' pattern.
+ */
+ override def visitCatalogTableIdentifier(
+ ctx: CatalogTableIdentifierContext): CatalogTableIdentifier = withOrigin(ctx) {
+ TableIdentifier(
+ ctx.table.getText,
+ Option(ctx.db).map(_.getText),
+ Option(ctx.catalog).map(_.getText))
+ }
+
/**
* Create a [[FunctionIdentifier]] from a 'functionName' or 'databaseName'.'functionName' pattern.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala
index 75240d219622..b37d3e25386c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala
@@ -46,6 +46,12 @@ trait ParserInterface {
@throws[ParseException]("Text cannot be parsed to a TableIdentifier")
def parseTableIdentifier(sqlText: String): TableIdentifier
+// /**
+// * Parse a string to a [[TableIdentifier]].
+// */
+// @throws[ParseException]("Text cannot be parsed to a TableIdentifier")
+// def parseCatalogTableIdentifier(sqlText: String): TableIdentifier
+
/**
* Parse a string to a [[FunctionIdentifier]].
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ea5a9b8ed554..7d918cd50397 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalog.v2.TableCatalog
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning,
- RangePartitioning, RoundRobinPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler
@@ -344,6 +345,73 @@ case class Join(
}
}
+case class DeleteFrom(table: LogicalPlan, expr: Expression) extends LogicalPlan {
+ override def output: Seq[Attribute] = Seq.empty
+
+ override def children: Seq[LogicalPlan] = Seq.empty
+}
+
+/**
+ * Append data to an existing table.
+ */
+case class AppendData(
+ table: NamedRelation,
+ query: LogicalPlan,
+ isByName: Boolean) extends LogicalPlan {
+ override def children: Seq[LogicalPlan] = Seq(query)
+ override def output: Seq[Attribute] = Seq.empty
+
+ override lazy val resolved: Boolean = {
+ query.output.size == table.output.size && query.output.zip(table.output).forall {
+ case (inAttr, outAttr) =>
+ inAttr.name == outAttr.name && // names must match
+ outAttr.dataType.sameType(inAttr.dataType) && // types must match
+ (outAttr.nullable || !inAttr.nullable) // must accept null or never produce nulls
+ }
+ }
+}
+
+object AppendData {
+ def byName(table: NamedRelation, df: LogicalPlan): AppendData = {
+ new AppendData(table, df, true)
+ }
+
+ def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = {
+ new AppendData(table, query, false)
+ }
+}
+
+/**
+ * Create a new table from a select query.
+ */
+case class CreateTableAsSelect(
+ catalog: TableCatalog,
+ table: TableIdentifier,
+ partitioning: Seq[Expression],
+ query: LogicalPlan,
+ writeOptions: Map[String, String],
+ ignoreIfExists: Boolean) extends LogicalPlan {
+
+ override def children: Seq[LogicalPlan] = Seq(query)
+ override def output: Seq[Attribute] = Seq.empty
+ override lazy val resolved = true
+}
+
+/**
+ * Replace a table with the results of a select query.
+ */
+case class ReplaceTableAsSelect(
+ catalog: TableCatalog,
+ table: TableIdentifier,
+ partitioning: Seq[Expression],
+ query: LogicalPlan,
+ writeOptions: Map[String, String]) extends LogicalPlan {
+
+ override def children: Seq[LogicalPlan] = Seq(query)
+ override def output: Seq[Attribute] = Seq.empty
+ override lazy val resolved = true
+}
+
/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
* concrete implementations during analysis.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index fd40741cfb5f..a63c1717d169 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -27,6 +27,7 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.InterfaceStability
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.internal.SQLConf
@@ -336,4 +337,65 @@ object DataType {
case (fromDataType, toDataType) => fromDataType == toDataType
}
}
+
+ /**
+ * Returns true if the write data type can be read using the read data type.
+ *
+ * The write type is compatible with the read type if:
+ * - Both types are arrays, the array element types are compatible, and element nullability is
+ * compatible (read allows nulls or write does not contain nulls).
+ * - Both types are maps and the map key and value types are compatible, and value nullability
+ * is compatible (read allows nulls or write does not contain nulls).
+ * - Both types are structs and each field in the read struct is present in the write struct and
+ * compatible (including nullability), or is nullable if the write struct does not contain the
+ * field. Write-side structs are not compatible if they contain fields that are not present in
+ * the read-side struct.
+ * - Both types are atomic and are the same type
+ *
+ * This method does not allow type promotion such as, write type int with read type long.
+ *
+ * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
+ * the read schema will not read, and to ensure map key equality is not changed when data is read.
+ *
+ * @param write a write-side data type to validate against the read type
+ * @param read a read-side data type
+ * @return true if data written with the write type can be read using the read type
+ */
+ def canWrite(
+ write: DataType,
+ read: DataType,
+ resolver: Resolver): Boolean = {
+ (write, read) match {
+ case (wArr: ArrayType, rArr: ArrayType) =>
+ canWrite(wArr.elementType, rArr.elementType, resolver) &&
+ (rArr.containsNull || !wArr.containsNull)
+
+ case (wMap: MapType, rMap: MapType) =>
+ // map keys cannot include data fields not in the read schema without changing equality when
+ // read. map keys can be missing fields as long as they are nullable in the read schema.
+ canWrite(wMap.keyType, rMap.keyType, resolver) &&
+ canWrite(wMap.valueType, rMap.valueType, resolver) &&
+ (rMap.valueContainsNull || !wMap.valueContainsNull)
+
+ case (StructType(writeFields), StructType(readFields)) =>
+ lazy val hasNoExtraFields =
+ (writeFields.map(_.name).toSet -- readFields.map(_.name)).isEmpty
+
+ readFields.forall { readField =>
+ writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
+ case Some(writeField) =>
+ canWrite(writeField.dataType, readField.dataType, resolver) &&
+ (readField.nullable || !writeField.nullable)
+
+ case None =>
+ // the write schema doesn't have the field, so it must be nullable in the read schema
+ readField.nullable
+ }
+ } && hasNoExtraFields
+
+ case (w, r) =>
+ // the write and read types are atomic and must be the same
+ w.sameType(r)
+ }
+ }
}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java
new file mode 100644
index 000000000000..0d869108fa7d
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CaseInsensitiveStringMapSuite {
+ @Test
+ public void testPutAndGet() {
+ CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ options.put("kEy", "valUE");
+
+ Assert.assertEquals("Should return correct value for lower-case key",
+ "valUE", options.get("key"));
+ Assert.assertEquals("Should return correct value for upper-case key",
+ "valUE", options.get("KEY"));
+ }
+
+ @Test
+ public void testKeySet() {
+ CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ options.put("kEy", "valUE");
+
+ Set expectedKeySet = new HashSet<>();
+ expectedKeySet.add("key");
+
+ Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet());
+ }
+}
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
new file mode 100644
index 000000000000..62e26af7f0c6
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+
+public class CatalogLoadingSuite {
+ @Test
+ public void testLoad() throws SparkException {
+ SQLConf conf = new SQLConf();
+ conf.setConfString("spark.sql.catalog.test-name", TestCatalogProvider.class.getCanonicalName());
+
+ CatalogProvider provider = Catalogs.load("test-name", conf);
+ Assert.assertNotNull("Should instantiate a non-null provider", provider);
+ Assert.assertEquals("Provider should have correct implementation",
+ TestCatalogProvider.class, provider.getClass());
+
+ TestCatalogProvider testProvider = (TestCatalogProvider) provider;
+ Assert.assertEquals("Options should contain only one key", 1, testProvider.options.size());
+ Assert.assertEquals("Options should contain correct catalog name",
+ "test-name", testProvider.options.get("name"));
+ }
+
+ @Test
+ public void testInitializationOptions() throws SparkException {
+ SQLConf conf = new SQLConf();
+ conf.setConfString("spark.sql.catalog.test-name", TestCatalogProvider.class.getCanonicalName());
+ conf.setConfString("spark.sql.catalog.test-name.name", "overwritten");
+ conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE");
+
+ CatalogProvider provider = Catalogs.load("test-name", conf);
+ Assert.assertNotNull("Should instantiate a non-null provider", provider);
+ Assert.assertEquals("Provider should have correct implementation",
+ TestCatalogProvider.class, provider.getClass());
+
+ TestCatalogProvider testProvider = (TestCatalogProvider) provider;
+
+ Assert.assertEquals("Options should contain only two keys", 2, testProvider.options.size());
+ Assert.assertEquals("Options should contain correct catalog name",
+ "test-name", testProvider.options.get("name"));
+ Assert.assertEquals("Options should contain correct value for key",
+ "valUE", testProvider.options.get("key"));
+ }
+
+ @Test
+ public void testLoadWithoutConfig() {
+ SQLConf conf = new SQLConf();
+
+ SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf));
+
+ Assert.assertTrue("Should complain that implementation is not configured",
+ exc.getMessage().contains("provider not found: spark.sql.catalog.missing is not defined"));
+ Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("missing"));
+ }
+
+ @Test
+ public void testLoadMissingClass() {
+ SQLConf conf = new SQLConf();
+ conf.setConfString("spark.sql.catalog.missing", "com.example.NoSuchCatalogProvider");
+
+ SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf));
+
+ Assert.assertTrue("Should complain that the class is not found",
+ exc.getMessage().contains("Cannot find catalog provider class"));
+ Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("missing"));
+ Assert.assertTrue("Should identify the missing class",
+ exc.getMessage().contains("com.example.NoSuchCatalogProvider"));
+ }
+
+ @Test
+ public void testLoadNonCatalogProvider() {
+ SQLConf conf = new SQLConf();
+ String invalidClassName = InvalidCatalogProvider.class.getCanonicalName();
+ conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+ SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf));
+
+ Assert.assertTrue("Should complain that class does not implement CatalogProvider",
+ exc.getMessage().contains("does not implement CatalogProvider"));
+ Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("invalid"));
+ Assert.assertTrue("Should identify the class", exc.getMessage().contains(invalidClassName));
+ }
+
+ @Test
+ public void testLoadConstructorFailureCatalogProvider() {
+ SQLConf conf = new SQLConf();
+ String invalidClassName = ConstructorFailureCatalogProvider.class.getCanonicalName();
+ conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+ RuntimeException exc = intercept(RuntimeException.class, () -> Catalogs.load("invalid", conf));
+
+ Assert.assertTrue("Should have expected error message",
+ exc.getMessage().contains("Expected failure"));
+ }
+
+ @Test
+ public void testLoadAccessErrorCatalogProvider() {
+ SQLConf conf = new SQLConf();
+ String invalidClassName = AccessErrorCatalogProvider.class.getCanonicalName();
+ conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+ SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf));
+
+ Assert.assertTrue("Should complain that no public constructor is provided",
+ exc.getMessage().contains("Failed to call public no-arg constructor for catalog"));
+ Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("invalid"));
+ Assert.assertTrue("Should identify the class", exc.getMessage().contains(invalidClassName));
+ }
+
+ @SuppressWarnings("unchecked")
+ public static E intercept(Class expected, Callable> callable) {
+ try {
+ callable.call();
+ Assert.fail("No exception was thrown, expected: " +
+ expected.getName());
+ } catch (Exception actual) {
+ try {
+ Assert.assertEquals(expected, actual.getClass());
+ return (E) actual;
+ } catch (AssertionError e) {
+ e.addSuppressed(actual);
+ throw e;
+ }
+ }
+ // Compiler doesn't catch that Assert.fail will always throw an exception.
+ throw new UnsupportedOperationException("[BUG] Should not reach this statement");
+ }
+}
+
+class TestCatalogProvider implements CatalogProvider {
+ CaseInsensitiveStringMap options = null;
+
+ TestCatalogProvider() {
+ }
+
+ @Override
+ public void initialize(CaseInsensitiveStringMap options) {
+ this.options = options;
+ }
+}
+
+class ConstructorFailureCatalogProvider implements CatalogProvider { // fails in its constructor
+ ConstructorFailureCatalogProvider() {
+ throw new RuntimeException("Expected failure.");
+ }
+
+ @Override
+ public void initialize(CaseInsensitiveStringMap options) {
+ }
+}
+
+class AccessErrorCatalogProvider implements CatalogProvider { // no public constructor
+ private AccessErrorCatalogProvider() {
+ }
+
+ @Override
+ public void initialize(CaseInsensitiveStringMap options) {
+ }
+}
+
+class InvalidCatalogProvider { // doesn't implement CatalogProvider
+ public void initialize(CaseInsensitiveStringMap options) {
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java
new file mode 100644
index 000000000000..731f32d3603f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.catalog.v2.Table;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+
+/**
+ * A mix-in interface for {@link DataSourceV2} delete support. Data sources can implement this
+ * interface to provide the ability to delete data from tables that matches filter expressions.
+ *
+ * Data sources must implement this interface to support logical operations that combine writing
+ * data with deleting data, like overwriting partitions.
+ */
+public interface DeleteSupport extends Table {
+ /**
+ * Delete data from a data source table that matches filter expressions.
+ *
+ * Rows are deleted from the data source iff all of the filter expressions match. That is, the
+ * expressions must be interpreted as a set of filters that are ANDed together.
+ *
+ * Implementations may reject a delete operation if the delete isn't possible without significant
+ * effort. For example, partitioned data sources may reject deletes that do not filter by
+ * partition columns because the filter may require rewriting files without deleted records.
+ * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
+ * error message that identifies which expression was rejected.
+ *
+ * Implementations may throw {@link UnsupportedOperationException} if the delete operation is not
+ * supported because one of the filter expressions is not supported. Implementations should throw
+ * this exception with a clear error message that identifies the unsupported expression.
+ *
+ * @param filters filter expressions, used to select rows to delete when all expressions match
+ * @throws UnsupportedOperationException If one or more filter expressions is not supported
+ * @throws IllegalArgumentException If the delete is rejected due to required effort
+ */
+ void deleteWhere(Expression[] filters);
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
index b2526ded53d9..d6ab6ceddd8b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
@@ -25,7 +25,7 @@
* provide data reading ability and scan the data from the data source.
*/
@InterfaceStability.Evolving
-public interface ReadSupport extends DataSourceV2 {
+public interface ReadSupport {
/**
* Creates a {@link DataSourceReader} to scan the data from this data source.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
index f31659904cc5..d2901064a38f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
@@ -30,7 +30,7 @@
* supports both schema inference and user-specified schema.
*/
@InterfaceStability.Evolving
-public interface ReadSupportWithSchema extends DataSourceV2 {
+public interface ReadSupportWithSchema {
/**
* Create a {@link DataSourceReader} to scan the data from this data source.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
index 83aeec0c4785..16484d4b8439 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
@@ -29,7 +29,7 @@
* provide data writing ability and save the data to the data source.
*/
@InterfaceStability.Evolving
-public interface WriteSupport extends DataSourceV2 {
+public interface WriteSupport {
/**
* Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
@@ -38,15 +38,16 @@ public interface WriteSupport extends DataSourceV2 {
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*
- * @param jobId A unique string for the writing job. It's possible that there are many writing
- * jobs running at the same time, and the returned {@link DataSourceWriter} can
- * use this job id to distinguish itself from other jobs.
+ * @param writeUUID A unique string for the writing job. It's possible that there are many writing
+ * jobs running at the same time, and the returned {@link DataSourceWriter} can
+ * use this job id to distinguish itself from other jobs.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data are already in this data
* source, please refer to {@link SaveMode} for more details.
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
+ * @return a writer to append data to this data source
*/
Optional createWriter(
- String jobId, StructType schema, SaveMode mode, DataSourceOptions options);
+ String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options);
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ec9352a7fa05..4fe2ebbfc240 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, DataSourceV2Implicits, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
@@ -191,6 +191,21 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}
+ import DataSourceV2Implicits._
+
+ extraOptions.get("catalog") match {
+ case Some(catalogName) if extraOptions.get(DataSourceOptions.TABLE_KEY).isDefined =>
+ val catalog = sparkSession.catalog(catalogName).asTableCatalog
+ val options = extraOptions.toMap
+ val identifier = options.table.get
+
+ return Dataset.ofRows(sparkSession,
+ DataSourceV2Relation.create(
+ catalogName, identifier, catalog.loadTable(identifier), options))
+
+ case _ =>
+ }
+
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 39c0e102b69b..b1e102fd90f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, Properties, UUID}
+import java.util.{Locale, Properties, UUID}
import scala.collection.JavaConverters._
@@ -26,12 +25,11 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.StructType
@@ -238,23 +236,74 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")
+ import DataSourceV2Implicits._
+
+ extraOptions.get("catalog") match {
+ case Some(catalogName) if extraOptions.get(DataSourceOptions.TABLE_KEY).isDefined =>
+ val catalog = df.sparkSession.catalog(catalogName).asTableCatalog
+ val options = extraOptions.toMap
+ val identifier = options.table.get
+ val exists = catalog.tableExists(identifier)
+
+ (exists, mode) match {
+ case (true, SaveMode.ErrorIfExists) =>
+ throw new AnalysisException(s"Table already exists: ${identifier.quotedString}")
+
+ case (true, SaveMode.Overwrite) =>
+ runCommand(df.sparkSession, "save") {
+ ReplaceTableAsSelect(catalog, identifier, Seq.empty, df.logicalPlan, options)
+ }
+
+ case (true, SaveMode.Append) =>
+ val relation = DataSourceV2Relation.create(
+ catalogName, identifier, catalog.loadTable(identifier), options)
+
+ runCommand(df.sparkSession, "save") {
+ AppendData.byName(relation, df.logicalPlan)
+ }
+
+ case (false, SaveMode.Append) =>
+ throw new AnalysisException(s"Table does not exist: ${identifier.quotedString}")
+
+ case (false, SaveMode.ErrorIfExists) |
+ (false, SaveMode.Ignore) |
+ (false, SaveMode.Overwrite) =>
+
+ runCommand(df.sparkSession, "save") {
+ CreateTableAsSelect(catalog, identifier, Seq.empty, df.logicalPlan, options,
+ ignoreIfExists = mode == SaveMode.Ignore)
+ }
+
+ case _ =>
+ return // table exists and mode is ignore
+ }
+
+ case _ =>
+ }
+
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
- val ds = cls.newInstance()
- ds match {
+ val source = cls.newInstance().asInstanceOf[DataSourceV2]
+ source match {
case ws: WriteSupport =>
- val options = new DataSourceOptions((extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(
- ds = ds.asInstanceOf[DataSourceV2],
- conf = df.sparkSession.sessionState.conf)).asJava)
- // Using a timestamp and a random UUID to distinguish different writing jobs. This is good
- // enough as there won't be tons of writing jobs created at the same second.
- val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- .format(new Date()) + "-" + UUID.randomUUID()
- val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
- if (writer.isPresent) {
+ val options = extraOptions ++
+ DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
+
+ val relation = DataSourceV2Relation.create(source, options.toMap)
+ if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
- WriteToDataSourceV2(writer.get(), df.planWithBarrier)
+ AppendData.byName(relation, df.planWithBarrier)
+ }
+
+ } else {
+ val writer = ws.createWriter(
+ UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
+ new DataSourceOptions(options.asJava))
+
+ if (writer.isPresent) {
+ runCommand(df.sparkSession, "save") {
+ WriteToDataSourceV2(writer.get, df.planWithBarrier)
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 565042fcf762..4937a794c812 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -21,6 +21,7 @@ import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
@@ -31,6 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
+import org.apache.spark.sql.catalog.v2.{CatalogProvider, Catalogs}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders._
@@ -609,6 +611,12 @@ class SparkSession private(
*/
@transient lazy val catalog: Catalog = new CatalogImpl(self)
+ @transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]()
+
+ private[sql] def catalog(name: String): CatalogProvider = synchronized {
+ catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf))
+ }
+
/**
* Returns the specified table/view as a `DataFrame`.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
new file mode 100644
index 000000000000..57564cb3e2ed
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+private[sql] object CatalogV2Implicits {
+ implicit class CatalogHelper(catalog: CatalogProvider) {
+ def asTableCatalog: TableCatalog = catalog match {
+ case tableCatalog: TableCatalog =>
+ tableCatalog
+ case _ =>
+ throw new UnsupportedOperationException(s"Catalog $catalog does not support tables")
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/ResolveCatalogV2Relations.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/ResolveCatalogV2Relations.scala
new file mode 100644
index 000000000000..77aa94600bf4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/ResolveCatalogV2Relations.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFrom, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+case class ResolveCatalogV2Relations(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ case d @ DeleteFrom(u: UnresolvedRelation, _) if u.table.catalog.isDefined =>
+ val catalogName = u.table.catalog.get
+ val catalog = sparkSession.catalog(catalogName).asTableCatalog
+ val table = catalog.loadTable(u.table.asTableIdentifier)
+ d.copy(table = DataSourceV2Relation.create(catalogName, u.table.dropCatalog, table))
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7613eb210c65..0192f53dad2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -17,16 +17,17 @@
package org.apache.spark.sql.execution.datasources.v2
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalog.v2.Table
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.DataSourceV2Implicits._
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
/**
* A logical plan representing a data source v2 scan.
@@ -40,10 +41,15 @@ case class DataSourceV2Relation(
source: DataSourceV2,
output: Seq[AttributeReference],
options: Map[String, String],
- userSpecifiedSchema: Option[StructType])
- extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
+ tableIdent: Option[TableIdentifier] = None,
+ userSpecifiedSchema: Option[StructType] = None)
+ extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat {
- import DataSourceV2Relation._
+ override def sourceName: String = source.name
+
+ override def name: String = {
+ tableIdent.map(_.unquotedString).getOrElse(s"$sourceName:unknown")
+ }
override def pushedFilters: Seq[Expression] = Seq.empty
@@ -51,7 +57,9 @@ case class DataSourceV2Relation(
def newReader(): DataSourceReader = source.createReader(options, userSpecifiedSchema)
- override def computeStats(): Statistics = newReader match {
+ def newWriter(): DataSourceWriter = source.createWriter(options, schema)
+
+ override def computeStats(): Statistics = newReader() match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
@@ -63,6 +71,43 @@ case class DataSourceV2Relation(
}
}
+/**
+ * A logical plan representing a data source v2 table.
+ *
+ * @param ident The table's TableIdentifier.
+ * @param table The table.
+ * @param output The output attributes of the table.
+ * @param options The options for this scan or write.
+ */
+case class TableV2Relation(
+ catalogName: String,
+ ident: TableIdentifier,
+ table: Table,
+ output: Seq[AttributeReference],
+ options: Map[String, String])
+ extends LeafNode with MultiInstanceRelation with NamedRelation {
+
+ import org.apache.spark.sql.sources.v2.DataSourceV2Implicits._
+
+ override def name: String = ident.unquotedString
+
+ override def simpleString: String =
+ s"RelationV2 $name ${Utils.truncatedString(output, "[", ", ", "]")}"
+
+ def newReader(): DataSourceReader = table.createReader(options)
+
+ override def computeStats(): Statistics = newReader() match {
+ case r: SupportsReportStatistics =>
+ Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+ case _ =>
+ Statistics(sizeInBytes = conf.defaultSizeInBytes)
+ }
+
+ override def newInstance(): TableV2Relation = {
+ copy(output = output.map(_.newInstance()))
+ }
+}
+
/**
* A specialization of [[DataSourceV2Relation]] with the streaming bit set to true.
*
@@ -77,6 +122,8 @@ case class StreamingDataSourceV2Relation(
reader: DataSourceReader)
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
+ override def sourceName: String = source.name
+
override def isStreaming: Boolean = true
override def simpleString: String = "Streaming RelationV2 " + metadataString
@@ -105,60 +152,22 @@ case class StreamingDataSourceV2Relation(
}
object DataSourceV2Relation {
- private implicit class SourceHelpers(source: DataSourceV2) {
- def asReadSupport: ReadSupport = {
- source match {
- case support: ReadSupport =>
- support
- case _: ReadSupportWithSchema =>
- // this method is only called if there is no user-supplied schema. if there is no
- // user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
- throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
- case _ =>
- throw new AnalysisException(s"Data source is not readable: $name")
- }
- }
-
- def asReadSupportWithSchema: ReadSupportWithSchema = {
- source match {
- case support: ReadSupportWithSchema =>
- support
- case _: ReadSupport =>
- throw new AnalysisException(
- s"Data source does not support user-supplied schema: $name")
- case _ =>
- throw new AnalysisException(s"Data source is not readable: $name")
- }
- }
-
- def name: String = {
- source match {
- case registered: DataSourceRegister =>
- registered.shortName()
- case _ =>
- source.getClass.getSimpleName
- }
- }
-
- def createReader(
- options: Map[String, String],
- userSpecifiedSchema: Option[StructType]): DataSourceReader = {
- val v2Options = new DataSourceOptions(options.asJava)
- userSpecifiedSchema match {
- case Some(s) =>
- asReadSupportWithSchema.createReader(s, v2Options)
- case _ =>
- asReadSupport.createReader(v2Options)
- }
- }
- }
-
def create(
source: DataSourceV2,
options: Map[String, String],
- userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
+ tableIdent: Option[TableIdentifier] = None,
+ userSpecifiedSchema: Option[StructType] = None): NamedRelation = {
val reader = source.createReader(options, userSpecifiedSchema)
+ val ident = tableIdent.orElse(options.table)
DataSourceV2Relation(
- source, reader.readSchema().toAttributes, options, userSpecifiedSchema)
+ source, reader.readSchema().toAttributes, options, ident, userSpecifiedSchema)
+ }
+
+ def create(
+ catalogName: String,
+ ident: TableIdentifier,
+ table: Table,
+ options: Map[String, String] = Map.empty): NamedRelation = {
+ TableV2Relation(catalogName, ident, table, table.schema.toAttributes, options)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index b030b9a929b0..643feeb57e50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType
@@ -39,7 +38,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
- @transient source: DataSourceV2,
+ @transient sourceName: String,
@transient options: Map[String, String],
@transient pushedFilters: Seq[Expression],
@transient reader: DataSourceReader)
@@ -55,7 +54,7 @@ case class DataSourceV2ScanExec(
}
override def hashCode(): Int = {
- Seq(output, source, options).hashCode()
+ Seq(output, sourceName, options).hashCode()
}
override def outputPartitioning: physical.Partitioning = reader match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 9414e68155b9..b67567e55074 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable
import org.apache.spark.sql.{sources, Strategy}
+import org.apache.spark.sql.catalyst.analysis.NamedRelation
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, DeleteFrom, LogicalPlan, Repartition, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -31,6 +32,8 @@ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
object DataSourceV2Strategy extends Strategy {
+ import org.apache.spark.sql.sources.v2.DataSourceV2Implicits._
+
/**
* Pushes down filters to the data source reader
*
@@ -81,7 +84,7 @@ object DataSourceV2Strategy extends Strategy {
// TODO: nested column pruning.
private def pruneColumns(
reader: DataSourceReader,
- relation: DataSourceV2Relation,
+ relation: NamedRelation,
exprs: Seq[Expression]): Seq[AttributeReference] = {
reader match {
case r: SupportsPushDownRequiredColumns =>
@@ -102,10 +105,15 @@ object DataSourceV2Strategy extends Strategy {
}
}
-
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
- val reader = relation.newReader()
+ case PhysicalOperation(project, filters, relation: NamedRelation)
+ if relation.isInstanceOf[DataSourceV2Relation] || relation.isInstanceOf[TableV2Relation] =>
+
+ val (reader, options, sourceName) = relation match {
+ case r: DataSourceV2Relation => (r.newReader(), r.options, r.sourceName)
+ case r: TableV2Relation => (r.newReader(), r.options, r.catalogName)
+ }
+
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
@@ -113,14 +121,14 @@ object DataSourceV2Strategy extends Strategy {
val output = pruneColumns(reader, relation, project ++ postScanFilters)
logInfo(
s"""
- |Pushing operators to ${relation.source.getClass}
+ |Pushing operators to $sourceName
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
""".stripMargin)
val scan = DataSourceV2ScanExec(
- output, relation.source, relation.options, pushedFilters, reader)
+ output, sourceName, options, pushedFilters, reader)
val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
@@ -131,11 +139,22 @@ object DataSourceV2Strategy extends Strategy {
case r: StreamingDataSourceV2Relation =>
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
- DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader)) :: Nil
+ DataSourceV2ScanExec(r.output, r.source.name, r.options, r.pushedFilters, r.reader)) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
+ case AppendData(r: TableV2Relation, query, _) =>
+ AppendDataExec(r.table, r.options, planLater(query)) :: Nil
+
+ case CreateTableAsSelect(catalog, ident, partitioning, query, writeOptions, ignoreIfExists) =>
+ CreateTableAsSelectExec(catalog, ident, partitioning, Map.empty, writeOptions,
+ planLater(query), ignoreIfExists) :: Nil
+
+ case ReplaceTableAsSelect(catalog, ident, partitioning, query, writeOptions) =>
+ ReplaceTableAsSelectExec(catalog, ident, partitioning, Map.empty, writeOptions,
+ planLater(query)) :: Nil
+
case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
@@ -150,6 +169,9 @@ object DataSourceV2Strategy extends Strategy {
Nil
}
+ case DeleteFrom(rel: TableV2Relation, expr) =>
+ DeleteFromV2Exec(rel, expr) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
index 97e6c6d702ac..5c5a51c99c77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.util.Utils
/**
@@ -34,7 +32,7 @@ trait DataSourceV2StringFormat {
* The instance of this data source implementation. Note that we only consider its class in
* equals/hashCode, not the instance itself.
*/
- def source: DataSourceV2
+ def sourceName: String
/**
* The output of the data source reader, w.r.t. column pruning.
@@ -51,13 +49,6 @@ trait DataSourceV2StringFormat {
*/
def pushedFilters: Seq[Expression]
- private def sourceName: String = source match {
- case registered: DataSourceRegister => registered.shortName()
- // source.getClass.getSimpleName can cause Malformed class name error,
- // call safer `Utils.getSimpleName` instead
- case _ => Utils.getSimpleName(source.getClass)
- }
-
def metadataString: String = {
val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromV2Exec.scala
new file mode 100644
index 000000000000..9065ea425530
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromV2Exec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PredicateHelper}
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.sources.v2.DeleteSupport
+
+case class DeleteFromV2Exec(rel: TableV2Relation, expr: Expression)
+ extends LeafExecNode with PredicateHelper {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ rel.table match {
+ case d: DeleteSupport =>
+ d.deleteWhere(splitConjunctivePredicates(expr).toArray)
+ case _ =>
+ throw new SparkException(
+ s"Cannot delete, table that does not support deletes: ${rel.table}")
+ }
+
+ sqlContext.sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
similarity index 68%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index b1148c0f62f7..95f8e78899a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.v2
+import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -24,9 +25,11 @@ import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalog.v2.{Table, TableCatalog}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.MicroBatchExecution
@@ -35,21 +38,101 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
/**
- * The logical plan for writing data into data source v2.
+ * Deprecated logical plan for writing data into data source v2. This is being replaced by more
+ * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
*/
+@deprecated("Use specific logical plans like AppendData instead", "2.4.0")
case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
}
+case class AppendDataExec(
+ table: Table,
+ writeOptions: Map[String, String],
+ plan: SparkPlan) extends V2TableWriteExec(writeOptions, plan) {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ appendToTable(table)
+ }
+}
+
+case class CreateTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: TableIdentifier,
+ partitioning: Seq[Expression],
+ properties: Map[String, String],
+ writeOptions: Map[String, String],
+ plan: SparkPlan,
+ ifNotExists: Boolean) extends V2TableWriteExec(writeOptions, plan) {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ if (ifNotExists) {
+ return sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ throw new TableAlreadyExistsException(ident.database.getOrElse("null"), ident.table)
+ }
+
+ Utils.tryWithSafeFinally({
+ val table = catalog.createTable(ident, plan.schema, partitioning.asJava, properties.asJava)
+ appendToTable(table)
+ })(finallyBlock = {
+ catalog.dropTable(ident)
+ })
+ }
+}
+
+case class ReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: TableIdentifier,
+ partitioning: Seq[Expression],
+ properties: Map[String, String],
+ writeOptions: Map[String, String],
+ plan: SparkPlan) extends V2TableWriteExec(writeOptions, plan) {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (!catalog.tableExists(ident)) {
+ throw new NoSuchTableException(ident.database.getOrElse("null"), ident.table)
+ }
+
+ catalog.dropTable(ident)
+
+ Utils.tryWithSafeFinally({
+ val table = catalog.createTable(ident, plan.schema, partitioning.asJava, properties.asJava)
+ appendToTable(table)
+ })(finallyBlock = {
+ catalog.dropTable(ident)
+ })
+ }
+}
+
+case class WriteToDataSourceV2Exec(
+ writer: DataSourceWriter,
+ plan: SparkPlan) extends V2TableWriteExec(Map.empty, plan) {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ doAppend(writer)
+ }
+}
+
/**
- * The physical plan for writing data into data source v2.
+ * The base physical plan for writing data into data source v2.
*/
-case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan {
+abstract class V2TableWriteExec(
+ options: Map[String, String],
+ query: SparkPlan) extends SparkPlan {
+ import org.apache.spark.sql.sources.v2.DataSourceV2Implicits._
+
override def children: Seq[SparkPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
- override protected def doExecute(): RDD[InternalRow] = {
+ protected def appendToTable(table: Table): RDD[InternalRow] = {
+ doAppend(table.createWriter(options, query.schema))
+ }
+
+ protected def doAppend(writer: DataSourceWriter): RDD[InternalRow] = {
val writeTask = writer match {
case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 3a0db7e16c23..4c2e1fd8a8d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
import org.apache.spark.SparkConf
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _}
+import org.apache.spark.sql.catalog.v2.ResolveCatalogV2Relations
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
@@ -156,7 +157,8 @@ abstract class BaseSessionStateBuilder(
*/
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new FindDataSourceTable(session) +:
+ ResolveCatalogV2Relations(session) +:
+ new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/v2/DataSourceV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/v2/DataSourceV2Implicits.scala
new file mode 100644
index 000000000000..68feb9876b6a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/v2/DataSourceV2Implicits.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalog.v2.{CaseInsensitiveStringMap, CatalogProvider, Table, TableCatalog}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Implicit helper classes to make working with the v2 API in Scala easier.
+ */
+private[sql] object DataSourceV2Implicits {
+ implicit class CatalogHelper(catalog: CatalogProvider) {
+ def asTableCatalog: TableCatalog = catalog match {
+ case tableCatalog: TableCatalog =>
+ tableCatalog
+ case _ =>
+ throw new UnsupportedOperationException(s"Catalog $catalog does not support tables")
+ }
+ }
+
+ implicit class TableHelper(table: Table) {
+ def asReadSupport: ReadSupport = {
+ table match {
+ case support: ReadSupport =>
+ support
+ case _: ReadSupportWithSchema =>
+ // this method is only called if there is no user-supplied schema. if there is no
+ // user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
+ throw new AnalysisException(s"Table requires a user-supplied schema: $table")
+ case _ =>
+ throw new AnalysisException(s"Table is not readable: $table")
+ }
+ }
+
+ def asReadSupportWithSchema: ReadSupportWithSchema = {
+ table match {
+ case support: ReadSupportWithSchema =>
+ support
+ case _: ReadSupport =>
+ throw new AnalysisException(
+ s"Table does not support user-supplied schema: $table")
+ case _ =>
+ throw new AnalysisException(s"Table is not readable: $table")
+ }
+ }
+
+ def asWriteSupport: WriteSupport = {
+ table match {
+ case support: WriteSupport =>
+ support
+ case _ =>
+ throw new AnalysisException(s"Table is not writable: $table")
+ }
+ }
+
+ def createReader(
+ options: Map[String, String]): DataSourceReader = {
+ table match {
+ case supportWithSchema: ReadSupportWithSchema =>
+ supportWithSchema.createReader(table.schema(), options.asDataSourceOptions)
+ case support: ReadSupport =>
+ support.createReader(options.asDataSourceOptions)
+ }
+ }
+
+ def createWriter(
+ options: Map[String, String],
+ schema: StructType): DataSourceWriter = {
+ asWriteSupport.createWriter(
+ UUID.randomUUID.toString, schema, SaveMode.Append, options.asDataSourceOptions).get
+ }
+ }
+
+ implicit class SourceHelper(source: DataSourceV2) {
+ def asReadSupport: ReadSupport = {
+ source match {
+ case support: ReadSupport =>
+ support
+ case _: ReadSupportWithSchema =>
+ // this method is only called if there is no user-supplied schema. if there is no
+ // user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
+ throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
+ case _ =>
+ throw new AnalysisException(s"Data source is not readable: $name")
+ }
+ }
+
+ def asReadSupportWithSchema: ReadSupportWithSchema = {
+ source match {
+ case support: ReadSupportWithSchema =>
+ support
+ case _: ReadSupport =>
+ throw new AnalysisException(
+ s"Data source does not support user-supplied schema: $name")
+ case _ =>
+ throw new AnalysisException(s"Data source is not readable: $name")
+ }
+ }
+
+ def asWriteSupport: WriteSupport = {
+ source match {
+ case support: WriteSupport =>
+ support
+ case _ =>
+ throw new AnalysisException(s"Data source is not writable: $name")
+ }
+ }
+
+ def name: String = {
+ source match {
+ case registered: DataSourceRegister =>
+ registered.shortName()
+ case _ =>
+ source.getClass.getSimpleName
+ }
+ }
+
+ def createReader(
+ options: Map[String, String],
+ userSpecifiedSchema: Option[StructType]): DataSourceReader = {
+ val v2Options = new DataSourceOptions(options.asJava)
+ userSpecifiedSchema match {
+ case Some(s) =>
+ asReadSupportWithSchema.createReader(s, v2Options)
+ case _ =>
+ asReadSupport.createReader(v2Options)
+ }
+ }
+
+ def createWriter(
+ options: Map[String, String],
+ schema: StructType): DataSourceWriter = {
+ val v2Options = new DataSourceOptions(options.asJava)
+ asWriteSupport.createWriter(UUID.randomUUID.toString, schema, SaveMode.Append, v2Options).get
+ }
+ }
+
+ implicit class OptionsHelper(options: Map[String, String]) {
+ def asDataSourceOptions: DataSourceOptions = {
+ new DataSourceOptions(options.asJava)
+ }
+
+ def asCaseInsensitiveMap: CaseInsensitiveStringMap = {
+ val map = CaseInsensitiveStringMap.empty()
+ map.putAll(options.asJava)
+ map
+ }
+
+ def table: Option[TableIdentifier] = {
+ val map = asCaseInsensitiveMap
+ Option(map.get(DataSourceOptions.TABLE_KEY))
+ .map(TableIdentifier(_, Option(map.get(DataSourceOptions.DATABASE_KEY))))
+ }
+
+ def paths: Array[String] = {
+ asDataSourceOptions.paths()
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index d73eebbc84b7..09757b0e8a6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -204,33 +204,33 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
val path = file.getCanonicalPath
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
- spark.range(10).select('id, -'id).write.format(cls.getName)
+ spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).select('id, -'id))
// test with different save modes
- spark.range(10).select('id, -'id).write.format(cls.getName)
+ spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("append").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).union(spark.range(10)).select('id, -'id))
- spark.range(5).select('id, -'id).write.format(cls.getName)
+ spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("overwrite").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(5).select('id, -'id))
- spark.range(5).select('id, -'id).write.format(cls.getName)
+ spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("ignore").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(5).select('id, -'id))
val e = intercept[Exception] {
- spark.range(5).select('id, -'id).write.format(cls.getName)
+ spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("error").save()
}
assert(e.getMessage.contains("data already exists"))
@@ -247,7 +247,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
// this input data will fail to read middle way.
- val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i)
+ val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j)
val e2 = intercept[SparkException] {
input.write.format(cls.getName).option("path", path).mode("overwrite").save()
}
@@ -256,7 +256,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
// test internal row writer
- spark.range(5).select('id, -'id).write.format(cls.getName)
+ spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).option("internal", "true").mode("overwrite").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
@@ -272,7 +272,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
val numPartition = 6
- spark.range(0, 10, 1, numPartition).select('id, -'id).write.format(cls.getName)
+ spark.range(0, 10, 1, numPartition).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),