Skip to content

Commit 9ff1c6c

Browse files
author
stczwd
committed
rename partition API
Change-Id: Ifa7655acf23f3ae6cfd70c41c91ee190ae78d4b8
1 parent 4a77db0 commit 9ff1c6c

File tree

7 files changed

+183
-459
lines changed

7 files changed

+183
-459
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java

Lines changed: 46 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,87 +19,78 @@
1919
import java.util.Map;
2020

2121
import org.apache.spark.annotation.Experimental;
22+
import org.apache.spark.sql.catalyst.InternalRow;
23+
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
24+
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
2225

2326
/**
24-
* Catalog methods for working with Partitions.
27+
* A partition interface of {@link Table} to indicate partition APIs.
28+
* A partition is composed of identifier and properties,
29+
* and properties contains metadata information of the partition.
30+
* <p>
31+
* These APIs are used to modify table partition identifier or partition metadata,
32+
* in some cases, they will change actual value of table data as well.
33+
*
34+
* @since 3.0.0
2535
*/
2636
@Experimental
27-
public interface SupportsPartitions extends TableCatalog {
28-
29-
/**
30-
* Create partitions in an existing table, assuming it exists.
31-
*
32-
* @param ident a table identifier
33-
* @param partitions transforms to use for partitioning data in the table
34-
* @param ignoreIfExists
35-
*/
36-
void createPartitions(
37-
Identifier ident,
38-
TablePartition[] partitions,
39-
Boolean ignoreIfExists);
37+
public interface SupportsPartitions extends Table {
4038

4139
/**
42-
* Drop partitions from a table, assuming they exist.
40+
* Create a partition in table.
4341
*
44-
* @param ident a table identifier
45-
* @param partitions a list of string map for existing partitions
46-
* @param ignoreIfNotExists
42+
* @param ident a new partition identifier
43+
* @param properties the metadata of a partition
44+
* @throws PartitionAlreadyExistsException If a partition already exists for the identifier
4745
*/
48-
void dropPartitions(
49-
Identifier ident,
50-
Map<String, String>[] partitions,
51-
Boolean ignoreIfNotExists);
46+
void createPartition(
47+
InternalRow ident,
48+
Map<String, String> properties) throws PartitionAlreadyExistsException;
5249

5350
/**
54-
* Override the specs of one or many existing table partitions, assuming they exist.
51+
* Drop a partition from table.
5552
*
56-
* @param ident a table identifier
57-
* @param oldpartitions a list of string map for existing partitions to be renamed
58-
* @param newPartitions a list of string map for new partitions
53+
* @param ident a partition identifier
54+
* @return true if a partition was deleted, false if no partition exists for the identifier
5955
*/
60-
void renamePartitions(
61-
Identifier ident,
62-
Map<String, String>[] oldpartitions,
63-
Map<String, String>[] newPartitions);
56+
Boolean dropPartition(InternalRow ident);
6457

6558
/**
66-
* Alter one or many table partitions whose specs that match those specified in `parts`,
67-
* assuming the partitions exist.
59+
* Rename a Partition from old identifier to new identifier with no metadata changed.
6860
*
69-
* @param ident a table identifier
70-
* @param partitions transforms to use for partitioning data in the table
61+
* @param oldIdent the partition identifier of the existing partition
62+
* @param newIdent the new partition identifier of the partition
63+
* @throws NoSuchPartitionException If the partition identifier to rename doesn't exist
64+
* @throws PartitionAlreadyExistsException If the new partition identifier already exists
7165
*/
72-
void alterPartitions(
73-
Identifier ident,
74-
TablePartition[] partitions);
66+
void renamePartition(
67+
InternalRow oldIdent,
68+
InternalRow newIdent) throws NoSuchPartitionException, PartitionAlreadyExistsException;
7569

7670
/**
77-
* Retrieve the metadata of a table partition, assuming it exists.
71+
* Replace the partition metadata of the existing partition.
7872
*
79-
* @param ident a table identifier
80-
* @param partition a list of string map for existing partitions
73+
* @param ident the partition identifier of the existing partition
74+
* @param properties the new metadata of the partition
75+
* @throws NoSuchPartitionException If the partition identifier to rename doesn't exist
8176
*/
82-
TablePartition getPartition(
83-
Identifier ident,
84-
Map<String, String> partition);
77+
void replacePartitionMetadata(
78+
InternalRow ident,
79+
Map<String, String> properties) throws NoSuchPartitionException;
8580

8681
/**
87-
* List the names of all partitions that belong to the specified table, assuming it exists.
82+
* Retrieve the partition metadata of the existing partition.
8883
*
89-
* @param ident a table identifier
90-
* @param partition a list of string map for existing partitions
84+
* @param ident a partition identifier
85+
* @return the metadata of the partition
9186
*/
92-
String[] listPartitionNames(
93-
Identifier ident,
94-
Map<String, String> partition);
87+
Map<String, String> getPartitionMetadata(InternalRow ident);
9588

9689
/**
97-
* List the metadata of all partitions that belong to the specified table, assuming it exists.
90+
* List the identifiers of all partitions that contains the ident in a table.
9891
*
99-
* @param ident a table identifier
100-
* @param partition a list of string map for existing partitions
92+
* @param ident a prefix of partition identifier
93+
* @return an array of Identifiers for the partitions
10194
*/
102-
TablePartition[] listPartitions(
103-
Identifier ident,
104-
Map<String, String> partition);
95+
InternalRow[] listPartitionIdentifiers(InternalRow ident);
10596
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2324
import org.apache.spark.sql.connector.catalog.Identifier
25+
import org.apache.spark.sql.types.StructType
2426

2527
/**
2628
* Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
@@ -48,14 +50,22 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes
4850
class TempTableAlreadyExistsException(table: String)
4951
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")
5052

51-
class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
52-
extends AnalysisException(
53-
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
53+
class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) {
54+
def this(db: String, table: String, spec: TablePartitionSpec) = {
55+
this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
56+
}
5457

55-
class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])
56-
extends AnalysisException(
57-
s"The following partitions already exists in table '$table' database '$db':\n"
58+
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
59+
this(s"The following partitions already exists in table '$table' database '$db':\n"
5860
+ specs.mkString("\n===\n"))
61+
}
62+
63+
def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
64+
this(s"Partition " +
65+
s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
66+
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")} in $tableName exists")
67+
}
68+
}
5969

6070
class FunctionAlreadyExistsException(db: String, func: String)
6171
extends AnalysisException(s"Function '$func' already exists in database '$db'")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2324
import org.apache.spark.sql.connector.catalog.Identifier
25+
import org.apache.spark.sql.types.StructType
2426

2527

2628
/**
@@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) {
4648
}
4749
}
4850

49-
class NoSuchPartitionException(
50-
db: String,
51-
table: String,
52-
spec: TablePartitionSpec)
53-
extends AnalysisException(
54-
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
51+
class NoSuchPartitionException(message: String) extends AnalysisException(message) {
52+
def this(db: String, table: String, spec: TablePartitionSpec) = {
53+
this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
54+
}
55+
56+
def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
57+
this(s"Partition not found in table $tableName: " +
58+
s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
59+
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")}")
60+
}
61+
}
5562

5663
class NoSuchPermanentFunctionException(db: String, func: String)
5764
extends AnalysisException(s"Function '$func' not found in database '$db'")

0 commit comments

Comments
 (0)