From 4b3329b33824fc9d8559e431afe846ccedb03689 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 3 May 2019 15:29:52 -0700 Subject: [PATCH 1/4] Add SupportsNamespaces API. --- .../spark/sql/catalog/v2/NamespaceChange.java | 97 +++++++++++++ .../sql/catalog/v2/SupportsNamespaces.java | 131 ++++++++++++++++++ 2 files changed, 228 insertions(+) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java new file mode 100644 index 000000000000..56c39c347e80 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * NamespaceChange subclasses represent requested changes to a namespace. These are passed to + * {@link SupportsNamespaces#alterNamespace}. For example, + *
+ *   import NamespaceChange._
+ *   val catalog = Catalogs.load(name)
+ *   catalog.namespaces.alterNamespace(ident,
+ *       setProperty("prop", "value"),
+ *       removeProperty("other_prop")
+ *     )
+ * 
+ */ +public interface NamespaceChange { + /** + * Create a NamespaceChange for setting a namespace property. + *

+ * If the property already exists, it will be replaced with the new value. + * + * @param property the property name + * @param value the new property value + * @return a NamespaceChange for the addition + */ + static NamespaceChange setProperty(String property, String value) { + return new SetProperty(property, value); + } + + /** + * Create a NamespaceChange for removing a namespace property. + *

+ * If the property does not exist, the change will succeed. + * + * @param property the property name + * @return a NamespaceChange for the addition + */ + static NamespaceChange removeProperty(String property) { + return new RemoveProperty(property); + } + + /** + * A NamespaceChange to set a namespace property. + *

+ * If the property already exists, it must be replaced with the new value. + */ + final class SetProperty implements NamespaceChange { + private final String property; + private final String value; + + private SetProperty(String property, String value) { + this.property = property; + this.value = value; + } + + public String property() { + return property; + } + + public String value() { + return value; + } + } + + /** + * A NamespaceChange to remove a namespace property. + *

+ * If the property does not exist, the change should succeed. + */ + final class RemoveProperty implements NamespaceChange { + private final String property; + + private RemoveProperty(String property) { + this.property = property; + } + + public String property() { + return property; + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java new file mode 100644 index 000000000000..ea789d7e2c57 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; + +import java.util.Map; + +/** + * Catalog methods for working with namespaces. + *

+ * If an object such as a table, view, or function exists, its parent namespaces must also exist + * and must be returned by the discovery methods {@link #listNamespaces()} and + * {@link #listNamespaces(String[])}. + *

+ * Catalog implementations are not required to maintain the existence of namespaces independent of + * objects in a namespace. For example, a function catalog that loads functions using reflection + * and uses Java packages as namespaces is not required to support the methods to create, alter, or + * drop a namespace. Implementations are allowed to discover the existence of objects or namespaces + * without throwing {@link NoSuchNamespaceException} when no namespace is found. + */ +public interface SupportsNamespaces extends CatalogPlugin { + + /** + * List top-level namespaces from the catalog. + *

+ * If an object such as a table, view, or function exists, its parent namespaces must also exist + * and must be returned by this discovery method. For example, if table a.b.t exists, this method + * must return ["a"] in the result array. + * + * @return an array of multi-part namespace names + */ + String[][] listNamespaces() throws NoSuchNamespaceException; + + /** + * List namespaces in a namespace. + *

+ * If an object such as a table, view, or function exists, its parent namespaces must also exist + * and must be returned by this discovery method. For example, if table a.b.t exists, this method + * invoked as listNamespaces(["a"]) must return ["a", "b"] in the result array. + * + * @param namespace a multi-part namespace + * @return an array of multi-part namespace names + * @throws NoSuchNamespaceException If the namespace does not exist (optional) + */ + String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException; + + /** + * Test whether a namespace exists. + *

+ * If an object such as a table, view, or function exists, its parent namespaces must also exist. + * For example, if table a.b.t exists, this method invoked as namespaceExists(["a"]) or + * namespaceExists(["a", "b"]) must return true. + * + * @param namespace a multi-part namespace + * @return true if the namespace exists, false otherwise + */ + default boolean namespaceExists(String[] namespace) { + try { + loadNamespaceMetadata(namespace); + return true; + } catch (NoSuchNamespaceException e) { + return false; + } + } + + /** + * Load metadata properties for a namespace. + * + * @param namespace a multi-part namespace + * @return a string map of properties for the given namespace + * @throws NoSuchNamespaceException If the namespace does not exist (optional) + * @throws UnsupportedOperationException If namespace properties are not supported + */ + Map loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException; + + /** + * Create a namespace in the catalog. + * + * @param namespace a multi-part namespace + * @param metadata a string map of properties for the given namespace + * @throws NamespaceAlreadyExistsException If the namespace already exists + * @throws UnsupportedOperationException If create is not a supported operation + */ + void createNamespace( + String[] namespace, + Map metadata) throws NamespaceAlreadyExistsException; + + /** + * Apply a set of metadata changes to a namespace in the catalog. + * + * @param namespace a multi-part namespace + * @param changes a collection of changes to apply to the namespace + * @throws NoSuchNamespaceException If the namespace does not exist (optional) + * @throws UnsupportedOperationException If namespace properties are not supported + */ + void alterNamespace( + String[] namespace, + NamespaceChange... changes) throws NoSuchNamespaceException; + + /** + * Drop a namespace from the catalog. + *

+ * This operation may be rejected by the catalog implementation if the namespace is not empty by + * throwing {@link IllegalStateException}. If the catalog implementation does not support this + * operation, it may throw {@link UnsupportedOperationException}. + * + * @param namespace a multi-part namespace + * @return true if the namespace was dropped + * @throws NoSuchNamespaceException If the namespace does not exist (optional) + * @throws IllegalStateException If the namespace is not empty + * @throws UnsupportedOperationException If drop is not a supported operation + */ + boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException; +} From 6ff538a32f78398ab358d851ca568fe7eb34e401 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 25 Jul 2019 09:15:22 -0700 Subject: [PATCH 2/4] Fix doc typo, add defaultNamespace method. --- .../spark/sql/catalog/v2/NamespaceChange.java | 2 +- .../spark/sql/catalog/v2/SupportsNamespaces.java | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java index 56c39c347e80..6f5895bcc380 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java @@ -23,7 +23,7 @@ *

  *   import NamespaceChange._
  *   val catalog = Catalogs.load(name)
- *   catalog.namespaces.alterNamespace(ident,
+ *   catalog.alterNamespace(ident,
  *       setProperty("prop", "value"),
  *       removeProperty("other_prop")
  *     )
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
index ea789d7e2c57..12c2e511f33f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
@@ -37,6 +37,20 @@
  */
 public interface SupportsNamespaces extends CatalogPlugin {
 
+  /**
+   * Return a default namespace for the catalog.
+   * 

+ * When this catalog is set as the current catalog, the namespace returned by this method will be + * set as the current namespace. + *

+ * The namespace returned by this method is not required to exist. + * + * @return a multi-part namespace + */ + default String[] defaultNamespace() { + return new String[0]; + } + /** * List top-level namespaces from the catalog. *

From 347c999504e1781c135248405712b3bc0202a120 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 25 Jul 2019 14:19:40 -0700 Subject: [PATCH 3/4] Implement SupportsNamespaces in TestTableCatalog, add tests. --- .../sql/catalog/v2/SupportsNamespaces.java | 2 +- .../sql/catalog/v2/utils/CatalogV2Util.scala | 33 ++- .../sql/catalog/v2/TableCatalogSuite.scala | 201 +++++++++++++++++- .../sql/catalog/v2/TestTableCatalog.scala | 67 +++++- 4 files changed, 296 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java index 12c2e511f33f..1844437ff2e2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java @@ -112,7 +112,7 @@ default boolean namespaceExists(String[] namespace) { * @throws NamespaceAlreadyExistsException If the namespace already exists * @throws UnsupportedOperationException If create is not a supported operation */ - void createNamespace( + void createNamespaceMetadata( String[] namespace, Map metadata) throws NamespaceAlreadyExistsException; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala index 7cc80c41a901..cd9bcc0f44f7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala @@ -22,7 +22,7 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange} import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.sources.v2.Table @@ -31,6 +31,37 @@ import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} object CatalogV2Util { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + /** + * Apply properties changes to a map and return the result. + */ + def applyNamespaceChanges( + properties: Map[String, String], + changes: Seq[NamespaceChange]): Map[String, String] = { + applyNamespaceChanges(properties.asJava, changes).asScala.toMap + } + + /** + * Apply properties changes to a Java map and return the result. + */ + def applyNamespaceChanges( + properties: util.Map[String, String], + changes: Seq[NamespaceChange]): util.Map[String, String] = { + val newProperties = new util.HashMap[String, String](properties) + + changes.foreach { + case set: NamespaceChange.SetProperty => + newProperties.put(set.property, set.value) + + case unset: NamespaceChange.RemoveProperty => + newProperties.remove(unset.property) + + case _ => + // ignore non-property changes + } + + Collections.unmodifiableMap(newProperties) + } + /** * Apply properties changes to a map and return the result. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala index 9c1b9a3e53de..2734e34a3cc7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala @@ -23,7 +23,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} @@ -37,13 +37,14 @@ class TableCatalogSuite extends SparkFunSuite { .add("id", IntegerType) .add("data", StringType) - private def newCatalog(): TableCatalog = { + private def newCatalog(): TableCatalog with SupportsNamespaces = { val newCatalog = new TestTableCatalog newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) newCatalog } - private val testIdent = Identifier.of(Array("`", "."), "test_table") + private val testNs = Array("`", ".") + private val testIdent = Identifier.of(testNs, "test_table") test("Catalogs can load the catalog") { val catalog = newCatalog() @@ -654,4 +655,198 @@ class TableCatalogSuite extends SparkFunSuite { assert(!wasDropped) assert(!catalog.tableExists(testIdent)) } + + test("listNamespaces: list namespaces from metadata") { + val catalog = newCatalog() + catalog.createNamespaceMetadata(Array("ns1"), Map("property" -> "value").asJava) + + assert(catalog.listNamespaces === Array(Array("ns1"))) + assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) + assert(catalog.listNamespaces(Array("ns1")) === Array()) + } + + test("listNamespaces: list namespaces from tables") { + val catalog = newCatalog() + val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1") + val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") + + catalog.createTable(ident1, schema, Array.empty, emptyProps) + catalog.createTable(ident2, schema, Array.empty, emptyProps) + + assert(catalog.listNamespaces === Array(Array("ns1"))) + assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) + assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2"))) + assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array()) + } + + test("listNamespaces: list namespaces from metadata and tables") { + val catalog = newCatalog() + val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1") + val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") + + catalog.createNamespaceMetadata(Array("ns1"), Map("property" -> "value").asJava) + catalog.createTable(ident1, schema, Array.empty, emptyProps) + catalog.createTable(ident2, schema, Array.empty, emptyProps) + + assert(catalog.listNamespaces === Array(Array("ns1"))) + assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) + assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2"))) + assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array()) + } + + test("loadNamespaceMetadata: fail if no metadata or tables exist") { + val catalog = newCatalog() + + val exc = intercept[NoSuchNamespaceException] { + catalog.loadNamespaceMetadata(testNs) + } + + assert(exc.getMessage.contains(testNs.quoted)) + } + + test("loadNamespaceMetadata: no metadata, table exists") { + val catalog = newCatalog() + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(metadata.asScala === Map.empty) + } + + test("loadNamespaceMetadata: metadata exists, no tables") { + val catalog = newCatalog() + + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(metadata.asScala === Map("property" -> "value")) + } + + test("loadNamespaceMetadata: metadata and table exist") { + val catalog = newCatalog() + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(metadata.asScala === Map("property" -> "value")) + } + + test("createNamespaceMetadata: basic behavior") { + val catalog = newCatalog() + + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + assert(catalog.namespaceExists(testNs) === true) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + } + + test("createNamespaceMetadata: fail if metadata already exists") { + val catalog = newCatalog() + + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + val exc = intercept[NamespaceAlreadyExistsException] { + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === true) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + } + + test("createNamespaceMetadata: table exists") { + val catalog = newCatalog() + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(catalog.namespaceExists(testNs) === true) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty) + + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + assert(catalog.namespaceExists(testNs) === true) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + } + + test("dropNamespace: drop missing namespace") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val ret = catalog.dropNamespace(testNs) + + assert(ret === false) + } + + test("dropNamespace: drop empty namespace") { + val catalog = newCatalog() + + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + assert(catalog.namespaceExists(testNs) === true) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + + val ret = catalog.dropNamespace(testNs) + + assert(ret === true) + assert(catalog.namespaceExists(testNs) === false) + } + + test("dropNamespace: fail if not empty") { + val catalog = newCatalog() + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + val exc = intercept[IllegalStateException] { + catalog.dropNamespace(testNs) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === true) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + } + + test("alterNamespace: basic behavior") { + val catalog = newCatalog() + + catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2")) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map( + "property" -> "value", "property2" -> "value2")) + + catalog.alterNamespace(testNs, + NamespaceChange.removeProperty("property2"), + NamespaceChange.setProperty("property3", "value3")) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map( + "property" -> "value", "property3" -> "value3")) + + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3")) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + } + + test("alterNamespace: create metadata if missing and table exists") { + val catalog = newCatalog() + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value")) + + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + } + + test("alterNamespace: fail if no metadata or table exists") { + val catalog = newCatalog() + + val exc = intercept[NoSuchNamespaceException] { + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value")) + } + + assert(exc.getMessage.contains(testNs.quoted)) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 6ba140fa5dde..ff0ca5429fa4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -24,14 +24,18 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.{Table, TableCapability} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -class TestTableCatalog extends TableCatalog { +class TestTableCatalog extends TableCatalog with SupportsNamespaces { import CatalogV2Implicits._ + override val defaultNamespace: Array[String] = Array() + protected val namespaces: util.Map[List[String], Map[String, String]] = + new ConcurrentHashMap[List[String], Map[String, String]]() + private val tables: util.Map[Identifier, Table] = new ConcurrentHashMap[Identifier, Table]() private var _name: Option[String] = None @@ -88,6 +92,65 @@ class TestTableCatalog extends TableCatalog { } override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + + private def allNamespaces: Seq[Seq[String]] = { + (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct + } + + override def namespaceExists(namespace: Array[String]): Boolean = { + allNamespaces.exists(_.startsWith(namespace)) + } + + override def listNamespaces: Array[Array[String]] = { + allNamespaces.map(_.head).distinct.map(Array(_)).toArray + } + + override def listNamespaces( + namespace: Array[String]): Array[Array[String]] = { + allNamespaces + .filter(_.size > namespace.length) + .filter(_.startsWith(namespace)) + .map(_.take(namespace.length + 1)) + .distinct + .map(_.toArray) + .toArray + } + + override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = { + Option(namespaces.get(namespace.toSeq)) match { + case Some(metadata) => + metadata.asJava + case _ if namespaceExists(namespace) => + util.Collections.emptyMap[String, String] + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def createNamespaceMetadata( + namespace: Array[String], + metadata: util.Map[String, String]): Unit = { + Option(namespaces.putIfAbsent(namespace.toList, metadata.asScala.toMap)) match { + case Some(_) => + throw new NamespaceAlreadyExistsException(namespace) + case _ => + // created successfully + } + } + + override def alterNamespace( + namespace: Array[String], + changes: NamespaceChange*): Unit = { + val metadata = loadNamespaceMetadata(namespace).asScala.toMap + namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes)) + } + + override def dropNamespace(namespace: Array[String]): Boolean = { + if (listTables(namespace).nonEmpty) { + throw new IllegalStateException(s"Cannot delete non-empty namespace: ${namespace.quoted}") + } + Option(namespaces.remove(namespace.toList)).isDefined + } } case class InMemoryTable( From 44afb5ca97a02f2353669bfb16cb7733c9d53b17 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sat, 3 Aug 2019 16:02:36 -0700 Subject: [PATCH 4/4] Restore createNamespace call and behavior. --- .../sql/catalog/v2/SupportsNamespaces.java | 2 +- .../sql/catalog/v2/TableCatalogSuite.scala | 33 ++++++++++--------- .../sql/catalog/v2/TestTableCatalog.scala | 11 ++++--- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java index 1844437ff2e2..12c2e511f33f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java @@ -112,7 +112,7 @@ default boolean namespaceExists(String[] namespace) { * @throws NamespaceAlreadyExistsException If the namespace already exists * @throws UnsupportedOperationException If create is not a supported operation */ - void createNamespaceMetadata( + void createNamespace( String[] namespace, Map metadata) throws NamespaceAlreadyExistsException; diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala index 2734e34a3cc7..089b4c5ed94f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala @@ -658,7 +658,7 @@ class TableCatalogSuite extends SparkFunSuite { test("listNamespaces: list namespaces from metadata") { val catalog = newCatalog() - catalog.createNamespaceMetadata(Array("ns1"), Map("property" -> "value").asJava) + catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava) assert(catalog.listNamespaces === Array(Array("ns1"))) assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) @@ -684,7 +684,7 @@ class TableCatalogSuite extends SparkFunSuite { val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1") val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") - catalog.createNamespaceMetadata(Array("ns1"), Map("property" -> "value").asJava) + catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava) catalog.createTable(ident1, schema, Array.empty, emptyProps) catalog.createTable(ident2, schema, Array.empty, emptyProps) @@ -717,7 +717,7 @@ class TableCatalogSuite extends SparkFunSuite { test("loadNamespaceMetadata: metadata exists, no tables") { val catalog = newCatalog() - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + catalog.createNamespace(testNs, Map("property" -> "value").asJava) val metadata = catalog.loadNamespaceMetadata(testNs) @@ -727,30 +727,30 @@ class TableCatalogSuite extends SparkFunSuite { test("loadNamespaceMetadata: metadata and table exist") { val catalog = newCatalog() + catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) val metadata = catalog.loadNamespaceMetadata(testNs) assert(metadata.asScala === Map("property" -> "value")) } - test("createNamespaceMetadata: basic behavior") { + test("createNamespace: basic behavior") { val catalog = newCatalog() - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + catalog.createNamespace(testNs, Map("property" -> "value").asJava) assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) } - test("createNamespaceMetadata: fail if metadata already exists") { + test("createNamespace: fail if metadata already exists") { val catalog = newCatalog() - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + catalog.createNamespace(testNs, Map("property" -> "value").asJava) val exc = intercept[NamespaceAlreadyExistsException] { - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + catalog.createNamespace(testNs, Map("property" -> "value").asJava) } assert(exc.getMessage.contains(testNs.quoted)) @@ -758,7 +758,7 @@ class TableCatalogSuite extends SparkFunSuite { assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) } - test("createNamespaceMetadata: table exists") { + test("createNamespace: fail if namespace already exists from table") { val catalog = newCatalog() catalog.createTable(testIdent, schema, Array.empty, emptyProps) @@ -766,10 +766,13 @@ class TableCatalogSuite extends SparkFunSuite { assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty) - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + val exc = intercept[NamespaceAlreadyExistsException] { + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + } + assert(exc.getMessage.contains(testNs.quoted)) assert(catalog.namespaceExists(testNs) === true) - assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) + assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty) } test("dropNamespace: drop missing namespace") { @@ -785,7 +788,7 @@ class TableCatalogSuite extends SparkFunSuite { test("dropNamespace: drop empty namespace") { val catalog = newCatalog() - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + catalog.createNamespace(testNs, Map("property" -> "value").asJava) assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) @@ -799,8 +802,8 @@ class TableCatalogSuite extends SparkFunSuite { test("dropNamespace: fail if not empty") { val catalog = newCatalog() + catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) val exc = intercept[IllegalStateException] { catalog.dropNamespace(testNs) @@ -814,7 +817,7 @@ class TableCatalogSuite extends SparkFunSuite { test("alterNamespace: basic behavior") { val catalog = newCatalog() - catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava) + catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2")) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index ff0ca5429fa4..6fdd6e30e1ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -105,8 +105,7 @@ class TestTableCatalog extends TableCatalog with SupportsNamespaces { allNamespaces.map(_.head).distinct.map(Array(_)).toArray } - override def listNamespaces( - namespace: Array[String]): Array[Array[String]] = { + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { allNamespaces .filter(_.size > namespace.length) .filter(_.startsWith(namespace)) @@ -127,14 +126,18 @@ class TestTableCatalog extends TableCatalog with SupportsNamespaces { } } - override def createNamespaceMetadata( + override def createNamespace( namespace: Array[String], metadata: util.Map[String, String]): Unit = { + if (namespaceExists(namespace)) { + throw new NamespaceAlreadyExistsException(namespace) + } + Option(namespaces.putIfAbsent(namespace.toList, metadata.asScala.toMap)) match { case Some(_) => throw new NamespaceAlreadyExistsException(namespace) case _ => - // created successfully + // created successfully } }