From 2d4212085448d2d1350da8db1f3a1585a8812be3 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 14:27:43 +0300 Subject: [PATCH 1/6] Add listPartitionByNames() to the SupportsPartitionManagement interface --- .../catalog/SupportsPartitionManagement.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index 446ea1463309f..380717d2e0e9b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -106,10 +106,19 @@ Map loadPartitionMetadata(InternalRow ident) throws UnsupportedOperationException; /** - * List the identifiers of all partitions that contains the ident in a table. + * List the identifiers of all partitions that have the ident prefix in a table. * * @param ident a prefix of partition identifier * @return an array of Identifiers for the partitions */ InternalRow[] listPartitionIdentifiers(InternalRow ident); + + /** + * List the identifiers of all partitions that match to the ident by names. + * + * @param names the names of partition values in the identifier. + * @param ident a partition identifier values. + * @return an array of Identifiers for the partitions + */ + InternalRow[] listPartitionByNames(String[] names, InternalRow ident); } From d1cbc9228af0c0f8ab50487c0bbb0aebebfa62c6 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 15:15:52 +0300 Subject: [PATCH 2/6] First implementation of listPartitionByNames() --- .../sql/connector/InMemoryPartitionTable.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index 23987e909aa70..71f676c8270e6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -21,9 +21,11 @@ import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -96,4 +98,19 @@ class InMemoryPartitionTable( override protected def addPartitionKey(key: Seq[Any]): Unit = { memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava) } + + override def listPartitionByNames( + names: Array[String], + ident: InternalRow): Array[InternalRow] = { + val schema = partitionSchema + val indexes = names.map(schema.fieldIndex) + val dataTypes = names.map(schema(_).dataType) + val currentRow = new GenericInternalRow(new Array[Any](names.length)) + memoryTablePartitions.keySet().asScala.filter { key => + for (i <- 0 until names.length) { + currentRow.values(i) = key.get(indexes(i), dataTypes(i)) + } + currentRow == ident + }.toArray + } } From 20d9121defb1a58e89f36b14067343637e28ad36 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 17:10:30 +0300 Subject: [PATCH 3/6] Add a test --- .../SupportsPartitionManagementSuite.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index e8e28e3422f27..18e4825b85c20 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -140,4 +140,33 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { partTable.dropPartition(partIdent1) assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) } + + test("listPartitionByNames") { + val partCatalog = new InMemoryPartitionTableCatalog + partCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + val table = partCatalog.createTable( + ident, + new StructType() + .add("col0", IntegerType) + .add("part0", IntegerType) + .add("part1", StringType), + Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))), + util.Collections.emptyMap[String, String]) + val partTable = table.asInstanceOf[InMemoryPartitionTable] + + Seq( + InternalRow(0, "abc"), + InternalRow(0, "def"), + InternalRow(1, "abc")).foreach { partIdent => + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + } + + Seq( + (Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")), + (Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")), + (Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")) + ).foreach { case ((names, idents), expected) => + assert(partTable.listPartitionByNames(names, idents).toSet === expected) + } + } } From 75c4903808446eda4bad847c986f3634a81a0d8e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 17:15:12 +0300 Subject: [PATCH 4/6] Gets all partitions --- .../connector/catalog/SupportsPartitionManagementSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index 18e4825b85c20..686c56b3389cb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -164,7 +164,9 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { Seq( (Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")), (Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")), - (Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")) + (Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")), + (Array.empty[String], InternalRow.empty) -> + Set(InternalRow(0, "abc"), InternalRow(0, "def"), InternalRow(1, "abc")) ).foreach { case ((names, idents), expected) => assert(partTable.listPartitionByNames(names, idents).toSet === expected) } From 373e22ef769090389a9d9eeb218283af43fc6659 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 17:33:16 +0300 Subject: [PATCH 5/6] Add asserts --- .../spark/sql/connector/InMemoryPartitionTable.scala | 7 ++++++- .../catalog/SupportsPartitionManagementSuite.scala | 8 ++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index 71f676c8270e6..ba762a58b1e52 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -21,7 +21,6 @@ import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} @@ -102,7 +101,13 @@ class InMemoryPartitionTable( override def listPartitionByNames( names: Array[String], ident: InternalRow): Array[InternalRow] = { + assert(names.length == ident.numFields, + s"Number of partition names (${names.length}) must be equal to " + + s"the number of partition values (${ident.numFields}).") val schema = partitionSchema + assert(names.forall(fieldName => schema.fieldNames.contains(fieldName)), + s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " + + s"the partition schema '${schema.sql}'.") val indexes = names.map(schema.fieldIndex) val dataTypes = names.map(schema(_).dataType) val currentRow = new GenericInternalRow(new Array[Any](names.length)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index 686c56b3389cb..2c6d70ff58032 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -170,5 +170,13 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { ).foreach { case ((names, idents), expected) => assert(partTable.listPartitionByNames(names, idents).toSet === expected) } + // Check invalid parameters + Seq( + (Array("part0", "part1"), InternalRow(0)), + (Array("col0", "part1"), InternalRow(0, 1)), + (Array("wrong"), InternalRow("invalid")) + ).foreach { case (names, idents) => + intercept[AssertionError](partTable.listPartitionByNames(names, idents)) + } } } From 3f20ee8289107f10a65165e4a6dbb69250039efa Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 17:38:00 +0300 Subject: [PATCH 6/6] Nothing matches to parameters --- .../connector/catalog/SupportsPartitionManagementSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index 2c6d70ff58032..caf7e91612563 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -166,7 +166,9 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { (Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")), (Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")), (Array.empty[String], InternalRow.empty) -> - Set(InternalRow(0, "abc"), InternalRow(0, "def"), InternalRow(1, "abc")) + Set(InternalRow(0, "abc"), InternalRow(0, "def"), InternalRow(1, "abc")), + (Array("part0", "part1"), InternalRow(3, "xyz")) -> Set(), + (Array("part1"), InternalRow(3.14f)) -> Set() ).foreach { case ((names, idents), expected) => assert(partTable.listPartitionByNames(names, idents).toSet === expected) }