diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java new file mode 100644 index 000000000000..6f5895bcc380 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * NamespaceChange subclasses represent requested changes to a namespace. These are passed to + * {@link SupportsNamespaces#alterNamespace}. For example, + *
+ * import NamespaceChange._
+ * val catalog = Catalogs.load(name)
+ * catalog.alterNamespace(ident,
+ * setProperty("prop", "value"),
+ * removeProperty("other_prop")
+ * )
+ *
+ */
+public interface NamespaceChange {
+ /**
+ * Create a NamespaceChange for setting a namespace property.
+ * + * If the property already exists, it will be replaced with the new value. + * + * @param property the property name + * @param value the new property value + * @return a NamespaceChange for the addition + */ + static NamespaceChange setProperty(String property, String value) { + return new SetProperty(property, value); + } + + /** + * Create a NamespaceChange for removing a namespace property. + *
+ * If the property does not exist, the change will succeed. + * + * @param property the property name + * @return a NamespaceChange for the addition + */ + static NamespaceChange removeProperty(String property) { + return new RemoveProperty(property); + } + + /** + * A NamespaceChange to set a namespace property. + *
+ * If the property already exists, it must be replaced with the new value. + */ + final class SetProperty implements NamespaceChange { + private final String property; + private final String value; + + private SetProperty(String property, String value) { + this.property = property; + this.value = value; + } + + public String property() { + return property; + } + + public String value() { + return value; + } + } + + /** + * A NamespaceChange to remove a namespace property. + *
+ * If the property does not exist, the change should succeed. + */ + final class RemoveProperty implements NamespaceChange { + private final String property; + + private RemoveProperty(String property) { + this.property = property; + } + + public String property() { + return property; + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java new file mode 100644 index 000000000000..12c2e511f33f --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; + +import java.util.Map; + +/** + * Catalog methods for working with namespaces. + *
+ * If an object such as a table, view, or function exists, its parent namespaces must also exist + * and must be returned by the discovery methods {@link #listNamespaces()} and + * {@link #listNamespaces(String[])}. + *
+ * Catalog implementations are not required to maintain the existence of namespaces independent of + * objects in a namespace. For example, a function catalog that loads functions using reflection + * and uses Java packages as namespaces is not required to support the methods to create, alter, or + * drop a namespace. Implementations are allowed to discover the existence of objects or namespaces + * without throwing {@link NoSuchNamespaceException} when no namespace is found. + */ +public interface SupportsNamespaces extends CatalogPlugin { + + /** + * Return a default namespace for the catalog. + *
+ * When this catalog is set as the current catalog, the namespace returned by this method will be + * set as the current namespace. + *
+ * The namespace returned by this method is not required to exist. + * + * @return a multi-part namespace + */ + default String[] defaultNamespace() { + return new String[0]; + } + + /** + * List top-level namespaces from the catalog. + *
+ * If an object such as a table, view, or function exists, its parent namespaces must also exist + * and must be returned by this discovery method. For example, if table a.b.t exists, this method + * must return ["a"] in the result array. + * + * @return an array of multi-part namespace names + */ + String[][] listNamespaces() throws NoSuchNamespaceException; + + /** + * List namespaces in a namespace. + *
+ * If an object such as a table, view, or function exists, its parent namespaces must also exist + * and must be returned by this discovery method. For example, if table a.b.t exists, this method + * invoked as listNamespaces(["a"]) must return ["a", "b"] in the result array. + * + * @param namespace a multi-part namespace + * @return an array of multi-part namespace names + * @throws NoSuchNamespaceException If the namespace does not exist (optional) + */ + String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException; + + /** + * Test whether a namespace exists. + *
+ * If an object such as a table, view, or function exists, its parent namespaces must also exist.
+ * For example, if table a.b.t exists, this method invoked as namespaceExists(["a"]) or
+ * namespaceExists(["a", "b"]) must return true.
+ *
+ * @param namespace a multi-part namespace
+ * @return true if the namespace exists, false otherwise
+ */
+ default boolean namespaceExists(String[] namespace) {
+ try {
+ loadNamespaceMetadata(namespace);
+ return true;
+ } catch (NoSuchNamespaceException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Load metadata properties for a namespace.
+ *
+ * @param namespace a multi-part namespace
+ * @return a string map of properties for the given namespace
+ * @throws NoSuchNamespaceException If the namespace does not exist (optional)
+ * @throws UnsupportedOperationException If namespace properties are not supported
+ */
+ Map
+ * This operation may be rejected by the catalog implementation if the namespace is not empty by
+ * throwing {@link IllegalStateException}. If the catalog implementation does not support this
+ * operation, it may throw {@link UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @return true if the namespace was dropped
+ * @throws NoSuchNamespaceException If the namespace does not exist (optional)
+ * @throws IllegalStateException If the namespace is not empty
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException;
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
index 7cc80c41a901..cd9bcc0f44f7 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
@@ -22,7 +22,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
-import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.sources.v2.Table
@@ -31,6 +31,37 @@ import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
object CatalogV2Util {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+ /**
+ * Apply properties changes to a map and return the result.
+ */
+ def applyNamespaceChanges(
+ properties: Map[String, String],
+ changes: Seq[NamespaceChange]): Map[String, String] = {
+ applyNamespaceChanges(properties.asJava, changes).asScala.toMap
+ }
+
+ /**
+ * Apply properties changes to a Java map and return the result.
+ */
+ def applyNamespaceChanges(
+ properties: util.Map[String, String],
+ changes: Seq[NamespaceChange]): util.Map[String, String] = {
+ val newProperties = new util.HashMap[String, String](properties)
+
+ changes.foreach {
+ case set: NamespaceChange.SetProperty =>
+ newProperties.put(set.property, set.value)
+
+ case unset: NamespaceChange.RemoveProperty =>
+ newProperties.remove(unset.property)
+
+ case _ =>
+ // ignore non-property changes
+ }
+
+ Collections.unmodifiableMap(newProperties)
+ }
+
/**
* Apply properties changes to a map and return the result.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
index 9c1b9a3e53de..089b4c5ed94f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
@@ -23,7 +23,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
@@ -37,13 +37,14 @@ class TableCatalogSuite extends SparkFunSuite {
.add("id", IntegerType)
.add("data", StringType)
- private def newCatalog(): TableCatalog = {
+ private def newCatalog(): TableCatalog with SupportsNamespaces = {
val newCatalog = new TestTableCatalog
newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
newCatalog
}
- private val testIdent = Identifier.of(Array("`", "."), "test_table")
+ private val testNs = Array("`", ".")
+ private val testIdent = Identifier.of(testNs, "test_table")
test("Catalogs can load the catalog") {
val catalog = newCatalog()
@@ -654,4 +655,201 @@ class TableCatalogSuite extends SparkFunSuite {
assert(!wasDropped)
assert(!catalog.tableExists(testIdent))
}
+
+ test("listNamespaces: list namespaces from metadata") {
+ val catalog = newCatalog()
+ catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
+
+ assert(catalog.listNamespaces === Array(Array("ns1")))
+ assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
+ assert(catalog.listNamespaces(Array("ns1")) === Array())
+ }
+
+ test("listNamespaces: list namespaces from tables") {
+ val catalog = newCatalog()
+ val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
+ val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
+
+ catalog.createTable(ident1, schema, Array.empty, emptyProps)
+ catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+ assert(catalog.listNamespaces === Array(Array("ns1")))
+ assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
+ assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2")))
+ assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array())
+ }
+
+ test("listNamespaces: list namespaces from metadata and tables") {
+ val catalog = newCatalog()
+ val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
+ val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
+
+ catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
+ catalog.createTable(ident1, schema, Array.empty, emptyProps)
+ catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+ assert(catalog.listNamespaces === Array(Array("ns1")))
+ assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
+ assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2")))
+ assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array())
+ }
+
+ test("loadNamespaceMetadata: fail if no metadata or tables exist") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchNamespaceException] {
+ catalog.loadNamespaceMetadata(testNs)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ }
+
+ test("loadNamespaceMetadata: no metadata, table exists") {
+ val catalog = newCatalog()
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(metadata.asScala === Map.empty)
+ }
+
+ test("loadNamespaceMetadata: metadata exists, no tables") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(metadata.asScala === Map("property" -> "value"))
+ }
+
+ test("loadNamespaceMetadata: metadata and table exist") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(metadata.asScala === Map("property" -> "value"))
+ }
+
+ test("createNamespace: basic behavior") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
+ }
+
+ test("createNamespace: fail if metadata already exists") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ val exc = intercept[NamespaceAlreadyExistsException] {
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === true)
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
+ }
+
+ test("createNamespace: fail if namespace already exists from table") {
+ val catalog = newCatalog()
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
+
+ val exc = intercept[NamespaceAlreadyExistsException] {
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === true)
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
+ }
+
+ test("dropNamespace: drop missing namespace") {
+ val catalog = newCatalog()
+
+ assert(catalog.namespaceExists(testNs) === false)
+
+ val ret = catalog.dropNamespace(testNs)
+
+ assert(ret === false)
+ }
+
+ test("dropNamespace: drop empty namespace") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
+
+ val ret = catalog.dropNamespace(testNs)
+
+ assert(ret === true)
+ assert(catalog.namespaceExists(testNs) === false)
+ }
+
+ test("dropNamespace: fail if not empty") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val exc = intercept[IllegalStateException] {
+ catalog.dropNamespace(testNs)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === true)
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
+ }
+
+ test("alterNamespace: basic behavior") {
+ val catalog = newCatalog()
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2"))
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map(
+ "property" -> "value", "property2" -> "value2"))
+
+ catalog.alterNamespace(testNs,
+ NamespaceChange.removeProperty("property2"),
+ NamespaceChange.setProperty("property3", "value3"))
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map(
+ "property" -> "value", "property3" -> "value3"))
+
+ catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3"))
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
+ }
+
+ test("alterNamespace: create metadata if missing and table exists") {
+ val catalog = newCatalog()
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value"))
+
+ assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
+ }
+
+ test("alterNamespace: fail if no metadata or table exists") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchNamespaceException] {
+ catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value"))
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
index 6ba140fa5dde..6fdd6e30e1ee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
@@ -24,14 +24,18 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class TestTableCatalog extends TableCatalog {
+class TestTableCatalog extends TableCatalog with SupportsNamespaces {
import CatalogV2Implicits._
+ override val defaultNamespace: Array[String] = Array()
+ protected val namespaces: util.Map[List[String], Map[String, String]] =
+ new ConcurrentHashMap[List[String], Map[String, String]]()
+
private val tables: util.Map[Identifier, Table] = new ConcurrentHashMap[Identifier, Table]()
private var _name: Option[String] = None
@@ -88,6 +92,68 @@ class TestTableCatalog extends TableCatalog {
}
override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
+
+ private def allNamespaces: Seq[Seq[String]] = {
+ (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct
+ }
+
+ override def namespaceExists(namespace: Array[String]): Boolean = {
+ allNamespaces.exists(_.startsWith(namespace))
+ }
+
+ override def listNamespaces: Array[Array[String]] = {
+ allNamespaces.map(_.head).distinct.map(Array(_)).toArray
+ }
+
+ override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
+ allNamespaces
+ .filter(_.size > namespace.length)
+ .filter(_.startsWith(namespace))
+ .map(_.take(namespace.length + 1))
+ .distinct
+ .map(_.toArray)
+ .toArray
+ }
+
+ override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
+ Option(namespaces.get(namespace.toSeq)) match {
+ case Some(metadata) =>
+ metadata.asJava
+ case _ if namespaceExists(namespace) =>
+ util.Collections.emptyMap[String, String]
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def createNamespace(
+ namespace: Array[String],
+ metadata: util.Map[String, String]): Unit = {
+ if (namespaceExists(namespace)) {
+ throw new NamespaceAlreadyExistsException(namespace)
+ }
+
+ Option(namespaces.putIfAbsent(namespace.toList, metadata.asScala.toMap)) match {
+ case Some(_) =>
+ throw new NamespaceAlreadyExistsException(namespace)
+ case _ =>
+ // created successfully
+ }
+ }
+
+ override def alterNamespace(
+ namespace: Array[String],
+ changes: NamespaceChange*): Unit = {
+ val metadata = loadNamespaceMetadata(namespace).asScala.toMap
+ namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes))
+ }
+
+ override def dropNamespace(namespace: Array[String]): Boolean = {
+ if (listTables(namespace).nonEmpty) {
+ throw new IllegalStateException(s"Cannot delete non-empty namespace: ${namespace.quoted}")
+ }
+ Option(namespaces.remove(namespace.toList)).isDefined
+ }
}
case class InMemoryTable(