diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java new file mode 100644 index 0000000000000..754203125cdc2 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; + +/** + * An atomic partition interface of {@link Table} to operate multiple partitions atomically. + *

+ * These APIs are used to modify table partition or partition metadata, + * they will change the table data as well. + * ${@link #createPartitions}: + * add an array of partitions and any data they contain to the table + * ${@link #dropPartitions}: + * remove an array of partitions and any data they contain from the table + * + * @since 3.1.0 + */ +@Experimental +public interface SupportsAtomicPartitionManagement extends SupportsPartitionManagement { + + @Override + default void createPartition( + InternalRow ident, + Map properties) + throws PartitionAlreadyExistsException, UnsupportedOperationException { + try { + createPartitions(new InternalRow[]{ident}, new Map[]{properties}); + } catch (PartitionsAlreadyExistException e) { + throw new PartitionAlreadyExistsException(e.getMessage()); + } + } + + @Override + default boolean dropPartition(InternalRow ident) { + return dropPartitions(new InternalRow[]{ident}); + } + + /** + * Create an array of partitions atomically in table. + *

+ * If any partition already exists, + * the operation of createPartitions need to be safely rolled back. + * + * @param idents an array of new partition identifiers + * @param properties the metadata of the partitions + * @throws PartitionsAlreadyExistException If any partition already exists for the identifier + * @throws UnsupportedOperationException If partition property is not supported + */ + void createPartitions( + InternalRow[] idents, + Map[] properties) + throws PartitionsAlreadyExistException, UnsupportedOperationException; + + /** + * Drop an array of partitions atomically from table. + *

+ * If any partition doesn't exists, + * the operation of dropPartitions need to be safely rolled back. + * + * @param idents an array of partition identifiers + * @return true if partitions were deleted, false if any partition not exists + */ + boolean dropPartitions(InternalRow[] idents); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java new file mode 100644 index 0000000000000..446ea1463309f --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; +import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * A partition interface of {@link Table}. + * A partition is composed of identifier and properties, + * and properties contains metadata information of the partition. + *

+ * These APIs are used to modify table partition identifier or partition metadata. + * In some cases, they will change the table data as well. + * ${@link #createPartition}: + * add a partition and any data it contains to the table + * ${@link #dropPartition}: + * remove a partition and any data it contains from the table + * ${@link #replacePartitionMetadata}: + * point a partition to a new location, which will swap one location's data for the other + * + * @since 3.1.0 + */ +@Experimental +public interface SupportsPartitionManagement extends Table { + + /** + * Get the partition schema of table, + * this must be consistent with ${@link Table#partitioning()}. + * @return the partition schema of table + */ + StructType partitionSchema(); + + /** + * Create a partition in table. + * + * @param ident a new partition identifier + * @param properties the metadata of a partition + * @throws PartitionAlreadyExistsException If a partition already exists for the identifier + * @throws UnsupportedOperationException If partition property is not supported + */ + void createPartition( + InternalRow ident, + Map properties) + throws PartitionAlreadyExistsException, UnsupportedOperationException; + + /** + * Drop a partition from table. + * + * @param ident a partition identifier + * @return true if a partition was deleted, false if no partition exists for the identifier + */ + boolean dropPartition(InternalRow ident); + + /** + * Test whether a partition exists using an {@link InternalRow ident} from the table. + * + * @param ident a partition identifier + * @return true if the partition exists, false otherwise + */ + default boolean partitionExists(InternalRow ident) { + return listPartitionIdentifiers(ident).length > 0; + } + + /** + * Replace the partition metadata of the existing partition. + * + * @param ident the partition identifier of the existing partition + * @param properties the new metadata of the partition + * @throws NoSuchPartitionException If the partition identifier to alter doesn't exist + * @throws UnsupportedOperationException If partition property is not supported + */ + void replacePartitionMetadata( + InternalRow ident, + Map properties) + throws NoSuchPartitionException, UnsupportedOperationException; + + /** + * Retrieve the partition metadata of the existing partition. + * + * @param ident a partition identifier + * @return the metadata of the partition + * @throws UnsupportedOperationException If partition property is not supported + */ + Map loadPartitionMetadata(InternalRow ident) + throws UnsupportedOperationException; + + /** + * List the identifiers of all partitions that contains the ident in a table. + * + * @param ident a prefix of partition identifier + * @return an array of Identifiers for the partitions + */ + InternalRow[] listPartitionIdentifiers(InternalRow ident); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 7e5d56a7d1196..bfc3b3d0ac966 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.types.StructType /** * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception @@ -48,14 +50,30 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes class TempTableAlreadyExistsException(table: String) extends TableAlreadyExistsException(s"Temporary view '$table' already exists") -class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec) - extends AnalysisException( - s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) +class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, spec: TablePartitionSpec) = { + this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) + } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { + this(s"Partition already exists in table $tableName:" + + partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")) + } +} -class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) - extends AnalysisException( - s"The following partitions already exists in table '$table' database '$db':\n" +class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, specs: Seq[TablePartitionSpec]) { + this(s"The following partitions already exists in table '$table' database '$db':\n" + specs.mkString("\n===\n")) + } + + def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = { + this(s"The following partitions already exists in table $tableName:" + + partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n")) + } +} class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 9b5b059908c00..88be441d808db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.types.StructType /** @@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) { } } -class NoSuchPartitionException( - db: String, - table: String, - spec: TablePartitionSpec) - extends AnalysisException( - s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n")) +class NoSuchPartitionException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, spec: TablePartitionSpec) = { + this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n")) + } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { + this(s"Partition not found in table $tableName: " + + partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")) + } +} class NoSuchPermanentFunctionException(db: String, func: String) extends AnalysisException(s"Function '$func' not found in database '$db'") @@ -61,10 +68,18 @@ class NoSuchFunctionException(db: String, func: String, cause: Option[Throwable] s"Undefined function: '$func'. This function is neither a registered temporary function nor " + s"a permanent function registered in the database '$db'.", cause = cause) -class NoSuchPartitionsException(db: String, table: String, specs: Seq[TablePartitionSpec]) - extends AnalysisException( - s"The following partitions not found in table '$table' database '$db':\n" +class NoSuchPartitionsException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { + this(s"The following partitions not found in table '$table' database '$db':\n" + specs.mkString("\n===\n")) + } + + def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = { + this(s"The following partitions not found in table $tableName: " + + partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n")) + } +} class NoSuchTempFunctionException(func: String) extends AnalysisException(s"Temporary function '$func' not found") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala new file mode 100644 index 0000000000000..c2a95cc3b8b07 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + +/** + * This class is used to test SupportsAtomicPartitionManagement API. + */ +class InMemoryAtomicPartitionTable ( + name: String, + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]) + extends InMemoryPartitionTable(name, schema, partitioning, properties) + with SupportsAtomicPartitionManagement { + + override def createPartition( + ident: InternalRow, + properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + } else { + memoryTablePartitions.put(ident, properties) + } + } + + override def dropPartition(ident: InternalRow): Boolean = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.remove(ident) + true + } else { + false + } + } + + override def createPartitions( + idents: Array[InternalRow], + properties: Array[util.Map[String, String]]): Unit = { + if (idents.exists(partitionExists)) { + throw new PartitionsAlreadyExistException( + name, idents.filter(partitionExists), partitionSchema) + } + idents.zip(properties).foreach { case (ident, property) => + createPartition(ident, property) + } + } + + override def dropPartitions(idents: Array[InternalRow]): Boolean = { + if (!idents.forall(partitionExists)) { + return false; + } + idents.forall(dropPartition) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala new file mode 100644 index 0000000000000..1c96bdf3afa20 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + +/** + * This class is used to test SupportsPartitionManagement API. + */ +class InMemoryPartitionTable( + name: String, + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]) + extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitionManagement { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + protected val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] = + new ConcurrentHashMap[InternalRow, util.Map[String, String]]() + + def partitionSchema: StructType = { + val partitionColumnNames = partitioning.toSeq.asPartitionColumns + new StructType(schema.filter(p => partitionColumnNames.contains(p.name)).toArray) + } + + def createPartition( + ident: InternalRow, + properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + } else { + memoryTablePartitions.put(ident, properties) + } + } + + def dropPartition(ident: InternalRow): Boolean = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.remove(ident) + true + } else { + false + } + } + + def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.put(ident, properties) + } else { + throw new NoSuchPartitionException(name, ident, partitionSchema) + } + } + + def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.get(ident) + } else { + throw new NoSuchPartitionException(name, ident, partitionSchema) + } + } + + def listPartitionIdentifiers(ident: InternalRow): Array[InternalRow] = { + val prefixPartCols = + new StructType(partitionSchema.dropRight(partitionSchema.length - ident.numFields).toArray) + val prefixPart = ident.toSeq(prefixPartCols) + memoryTablePartitions.keySet().asScala + .filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray + } + + override def partitionExists(ident: InternalRow): Boolean = + memoryTablePartitions.containsKey(ident) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala new file mode 100644 index 0000000000000..6f7c30653110b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException +import org.apache.spark.sql.connector.{InMemoryAtomicPartitionTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { + + private val ident: Identifier = Identifier.of(Array("ns"), "test_table") + + def ref(name: String): NamedReference = LogicalExpressions.parseReference(name) + + private val catalog: InMemoryTableCatalog = { + val newCatalog = new InMemoryTableCatalog + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog.createTable( + ident, + new StructType() + .add("id", IntegerType) + .add("data", StringType) + .add("dt", StringType), + Array(LogicalExpressions.identity(ref("dt"))), + util.Collections.emptyMap[String, String]) + newCatalog + } + + test("createPartitions") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]())) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(InternalRow.apply("3"))) + assert(partTable.partitionExists(InternalRow.apply("4"))) + + partTable.dropPartition(InternalRow.apply("3")) + partTable.dropPartition(InternalRow.apply("4")) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("createPartitions failed if partition already exists") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + assertThrows[PartitionsAlreadyExistException]( + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))) + assert(!partTable.partitionExists(InternalRow.apply("3"))) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("dropPartitions") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]())) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(InternalRow.apply("3"))) + assert(partTable.partitionExists(InternalRow.apply("4"))) + + partTable.dropPartitions(partIdents) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("dropPartitions failed if partition not exists") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + assert(!partTable.dropPartitions(partIdents)) + assert(partTable.partitionExists(partIdent)) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala new file mode 100644 index 0000000000000..e8e28e3422f27 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class SupportsPartitionManagementSuite extends SparkFunSuite { + + private val ident: Identifier = Identifier.of(Array("ns"), "test_table") + + def ref(name: String): NamedReference = LogicalExpressions.parseReference(name) + + private val catalog: InMemoryTableCatalog = { + val newCatalog = new InMemoryTableCatalog + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog.createTable( + ident, + new StructType() + .add("id", IntegerType) + .add("data", StringType) + .add("dt", StringType), + Array(LogicalExpressions.identity(ref("dt"))), + util.Collections.emptyMap[String, String]) + newCatalog + } + + test("createPartition") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("dropPartition") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + val partIdent1 = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + partTable.createPartition(partIdent1, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + partTable.dropPartition(partIdent1) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("replacePartitionMetadata") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(partTable.loadPartitionMetadata(partIdent).isEmpty) + + partTable.replacePartitionMetadata(partIdent, Map("paramKey" -> "paramValue").asJava) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(!partTable.loadPartitionMetadata(partIdent).isEmpty) + assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue") + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("loadPartitionMetadata") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, Map("paramKey" -> "paramValue").asJava) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(!partTable.loadPartitionMetadata(partIdent).isEmpty) + assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue") + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("listPartitionIdentifiers") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + + val partIdent1 = InternalRow.apply("4") + partTable.createPartition(partIdent1, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2) + assert(partTable.listPartitionIdentifiers(partIdent1).length == 1) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + partTable.dropPartition(partIdent1) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } +}