Skip to content

Commit ab5d21c

Browse files
committed
Implement SupportsNamespaces in TestTableCatalog, add tests.
1 parent 968ea3f commit ab5d21c

File tree

4 files changed

+296
-7
lines changed

4 files changed

+296
-7
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ default boolean namespaceExists(String[] namespace) {
112112
* @throws NamespaceAlreadyExistsException If the namespace already exists
113113
* @throws UnsupportedOperationException If create is not a supported operation
114114
*/
115-
void createNamespace(
115+
void createNamespaceMetadata(
116116
String[] namespace,
117117
Map<String, String> metadata) throws NamespaceAlreadyExistsException;
118118

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Collections
2222

2323
import scala.collection.JavaConverters._
2424

25-
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
25+
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange}
2626
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
2727
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2828
import org.apache.spark.sql.sources.v2.Table
@@ -31,6 +31,37 @@ import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
3131
object CatalogV2Util {
3232
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
3333

34+
/**
35+
* Apply properties changes to a map and return the result.
36+
*/
37+
def applyNamespaceChanges(
38+
properties: Map[String, String],
39+
changes: Seq[NamespaceChange]): Map[String, String] = {
40+
applyNamespaceChanges(properties.asJava, changes).asScala.toMap
41+
}
42+
43+
/**
44+
* Apply properties changes to a Java map and return the result.
45+
*/
46+
def applyNamespaceChanges(
47+
properties: util.Map[String, String],
48+
changes: Seq[NamespaceChange]): util.Map[String, String] = {
49+
val newProperties = new util.HashMap[String, String](properties)
50+
51+
changes.foreach {
52+
case set: NamespaceChange.SetProperty =>
53+
newProperties.put(set.property, set.value)
54+
55+
case unset: NamespaceChange.RemoveProperty =>
56+
newProperties.remove(unset.property)
57+
58+
case _ =>
59+
// ignore non-property changes
60+
}
61+
62+
Collections.unmodifiableMap(newProperties)
63+
}
64+
3465
/**
3566
* Apply properties changes to a map and return the result.
3667
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala

Lines changed: 198 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Collections
2323
import scala.collection.JavaConverters._
2424

2525
import org.apache.spark.SparkFunSuite
26-
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
26+
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2727
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2828
import org.apache.spark.sql.internal.SQLConf
2929
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
@@ -37,13 +37,14 @@ class TableCatalogSuite extends SparkFunSuite {
3737
.add("id", IntegerType)
3838
.add("data", StringType)
3939

40-
private def newCatalog(): TableCatalog = {
40+
private def newCatalog(): TableCatalog with SupportsNamespaces = {
4141
val newCatalog = new TestTableCatalog
4242
newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
4343
newCatalog
4444
}
4545

46-
private val testIdent = Identifier.of(Array("`", "."), "test_table")
46+
private val testNs = Array("`", ".")
47+
private val testIdent = Identifier.of(testNs, "test_table")
4748

4849
test("Catalogs can load the catalog") {
4950
val catalog = newCatalog()
@@ -654,4 +655,198 @@ class TableCatalogSuite extends SparkFunSuite {
654655
assert(!wasDropped)
655656
assert(!catalog.tableExists(testIdent))
656657
}
658+
659+
test("listNamespaces: list namespaces from metadata") {
660+
val catalog = newCatalog()
661+
catalog.createNamespaceMetadata(Array("ns1"), Map("property" -> "value").asJava)
662+
663+
assert(catalog.listNamespaces === Array(Array("ns1")))
664+
assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
665+
assert(catalog.listNamespaces(Array("ns1")) === Array())
666+
}
667+
668+
test("listNamespaces: list namespaces from tables") {
669+
val catalog = newCatalog()
670+
val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
671+
val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
672+
673+
catalog.createTable(ident1, schema, Array.empty, emptyProps)
674+
catalog.createTable(ident2, schema, Array.empty, emptyProps)
675+
676+
assert(catalog.listNamespaces === Array(Array("ns1")))
677+
assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
678+
assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2")))
679+
assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array())
680+
}
681+
682+
test("listNamespaces: list namespaces from metadata and tables") {
683+
val catalog = newCatalog()
684+
val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
685+
val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
686+
687+
catalog.createNamespaceMetadata(Array("ns1"), Map("property" -> "value").asJava)
688+
catalog.createTable(ident1, schema, Array.empty, emptyProps)
689+
catalog.createTable(ident2, schema, Array.empty, emptyProps)
690+
691+
assert(catalog.listNamespaces === Array(Array("ns1")))
692+
assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
693+
assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2")))
694+
assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array())
695+
}
696+
697+
test("loadNamespaceMetadata: fail if no metadata or tables exist") {
698+
val catalog = newCatalog()
699+
700+
val exc = intercept[NoSuchNamespaceException] {
701+
catalog.loadNamespaceMetadata(testNs)
702+
}
703+
704+
assert(exc.getMessage.contains(testNs.quoted))
705+
}
706+
707+
test("loadNamespaceMetadata: no metadata, table exists") {
708+
val catalog = newCatalog()
709+
710+
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
711+
712+
val metadata = catalog.loadNamespaceMetadata(testNs)
713+
714+
assert(metadata.asScala === Map.empty)
715+
}
716+
717+
test("loadNamespaceMetadata: metadata exists, no tables") {
718+
val catalog = newCatalog()
719+
720+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
721+
722+
val metadata = catalog.loadNamespaceMetadata(testNs)
723+
724+
assert(metadata.asScala === Map("property" -> "value"))
725+
}
726+
727+
test("loadNamespaceMetadata: metadata and table exist") {
728+
val catalog = newCatalog()
729+
730+
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
731+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
732+
733+
val metadata = catalog.loadNamespaceMetadata(testNs)
734+
735+
assert(metadata.asScala === Map("property" -> "value"))
736+
}
737+
738+
test("createNamespaceMetadata: basic behavior") {
739+
val catalog = newCatalog()
740+
741+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
742+
743+
assert(catalog.namespaceExists(testNs) === true)
744+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
745+
}
746+
747+
test("createNamespaceMetadata: fail if metadata already exists") {
748+
val catalog = newCatalog()
749+
750+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
751+
752+
val exc = intercept[NamespaceAlreadyExistsException] {
753+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
754+
}
755+
756+
assert(exc.getMessage.contains(testNs.quoted))
757+
assert(catalog.namespaceExists(testNs) === true)
758+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
759+
}
760+
761+
test("createNamespaceMetadata: table exists") {
762+
val catalog = newCatalog()
763+
764+
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
765+
766+
assert(catalog.namespaceExists(testNs) === true)
767+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
768+
769+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
770+
771+
assert(catalog.namespaceExists(testNs) === true)
772+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
773+
}
774+
775+
test("dropNamespace: drop missing namespace") {
776+
val catalog = newCatalog()
777+
778+
assert(catalog.namespaceExists(testNs) === false)
779+
780+
val ret = catalog.dropNamespace(testNs)
781+
782+
assert(ret === false)
783+
}
784+
785+
test("dropNamespace: drop empty namespace") {
786+
val catalog = newCatalog()
787+
788+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
789+
790+
assert(catalog.namespaceExists(testNs) === true)
791+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
792+
793+
val ret = catalog.dropNamespace(testNs)
794+
795+
assert(ret === true)
796+
assert(catalog.namespaceExists(testNs) === false)
797+
}
798+
799+
test("dropNamespace: fail if not empty") {
800+
val catalog = newCatalog()
801+
802+
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
803+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
804+
805+
val exc = intercept[IllegalStateException] {
806+
catalog.dropNamespace(testNs)
807+
}
808+
809+
assert(exc.getMessage.contains(testNs.quoted))
810+
assert(catalog.namespaceExists(testNs) === true)
811+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
812+
}
813+
814+
test("alterNamespace: basic behavior") {
815+
val catalog = newCatalog()
816+
817+
catalog.createNamespaceMetadata(testNs, Map("property" -> "value").asJava)
818+
819+
catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2"))
820+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map(
821+
"property" -> "value", "property2" -> "value2"))
822+
823+
catalog.alterNamespace(testNs,
824+
NamespaceChange.removeProperty("property2"),
825+
NamespaceChange.setProperty("property3", "value3"))
826+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map(
827+
"property" -> "value", "property3" -> "value3"))
828+
829+
catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3"))
830+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
831+
}
832+
833+
test("alterNamespace: create metadata if missing and table exists") {
834+
val catalog = newCatalog()
835+
836+
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
837+
838+
catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value"))
839+
840+
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
841+
}
842+
843+
test("alterNamespace: fail if no metadata or table exists") {
844+
val catalog = newCatalog()
845+
846+
val exc = intercept[NoSuchNamespaceException] {
847+
catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value"))
848+
}
849+
850+
assert(exc.getMessage.contains(testNs.quoted))
851+
}
657852
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,18 @@ import scala.collection.JavaConverters._
2424

2525
import org.apache.spark.sql.catalog.v2.expressions.Transform
2626
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
27-
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
27+
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2828
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
2929
import org.apache.spark.sql.types.StructType
3030
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3131

32-
class TestTableCatalog extends TableCatalog {
32+
class TestTableCatalog extends TableCatalog with SupportsNamespaces {
3333
import CatalogV2Implicits._
3434

35+
override val defaultNamespace: Array[String] = Array()
36+
protected val namespaces: util.Map[List[String], Map[String, String]] =
37+
new ConcurrentHashMap[List[String], Map[String, String]]()
38+
3539
private val tables: util.Map[Identifier, Table] = new ConcurrentHashMap[Identifier, Table]()
3640
private var _name: Option[String] = None
3741

@@ -88,6 +92,65 @@ class TestTableCatalog extends TableCatalog {
8892
}
8993

9094
override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
95+
96+
private def allNamespaces: Seq[Seq[String]] = {
97+
(tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct
98+
}
99+
100+
override def namespaceExists(namespace: Array[String]): Boolean = {
101+
allNamespaces.exists(_.startsWith(namespace))
102+
}
103+
104+
override def listNamespaces: Array[Array[String]] = {
105+
allNamespaces.map(_.head).distinct.map(Array(_)).toArray
106+
}
107+
108+
override def listNamespaces(
109+
namespace: Array[String]): Array[Array[String]] = {
110+
allNamespaces
111+
.filter(_.size > namespace.length)
112+
.filter(_.startsWith(namespace))
113+
.map(_.take(namespace.length + 1))
114+
.distinct
115+
.map(_.toArray)
116+
.toArray
117+
}
118+
119+
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
120+
Option(namespaces.get(namespace.toSeq)) match {
121+
case Some(metadata) =>
122+
metadata.asJava
123+
case _ if namespaceExists(namespace) =>
124+
util.Collections.emptyMap[String, String]
125+
case _ =>
126+
throw new NoSuchNamespaceException(namespace)
127+
}
128+
}
129+
130+
override def createNamespaceMetadata(
131+
namespace: Array[String],
132+
metadata: util.Map[String, String]): Unit = {
133+
Option(namespaces.putIfAbsent(namespace.toList, metadata.asScala.toMap)) match {
134+
case Some(_) =>
135+
throw new NamespaceAlreadyExistsException(namespace)
136+
case _ =>
137+
// created successfully
138+
}
139+
}
140+
141+
override def alterNamespace(
142+
namespace: Array[String],
143+
changes: NamespaceChange*): Unit = {
144+
val metadata = loadNamespaceMetadata(namespace).asScala.toMap
145+
namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes))
146+
}
147+
148+
override def dropNamespace(namespace: Array[String]): Boolean = {
149+
if (listTables(namespace).nonEmpty) {
150+
throw new IllegalStateException(s"Cannot delete non-empty namespace: ${namespace.quoted}")
151+
}
152+
Option(namespaces.remove(namespace.toList)).isDefined
153+
}
91154
}
92155

93156
case class InMemoryTable(

0 commit comments

Comments
 (0)