Skip to content

Commit 60fa8e3

Browse files
stczwdcloud-fan
authored andcommitted
[SPARK-31694][SQL] Add SupportsPartitions APIs on DataSourceV2
### What changes were proposed in this pull request? There are no partition Commands, such as AlterTableAddPartition supported in DatasourceV2, it is widely used in mysql or hive or other datasources. Thus it is necessary to defined Partition API to support these Commands. We defined the partition API as part of Table API, as it will change table data sometimes. And a partition is composed of identifier and properties, while identifier is defined with InternalRow and properties is defined as a Map. ### Does this PR introduce _any_ user-facing change? Yes. This PR will enable user to use some partition commands ### How was this patch tested? run all tests and add some partition api tests Closes #28617 from stczwd/SPARK-31694. Authored-by: stczwd <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 643cd87 commit 60fa8e3

File tree

8 files changed

+688
-15
lines changed

8 files changed

+688
-15
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog;
19+
20+
import java.util.Map;
21+
22+
import org.apache.spark.annotation.Experimental;
23+
import org.apache.spark.sql.catalyst.InternalRow;
24+
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
25+
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
26+
27+
/**
28+
* An atomic partition interface of {@link Table} to operate multiple partitions atomically.
29+
* <p>
30+
* These APIs are used to modify table partition or partition metadata,
31+
* they will change the table data as well.
32+
* ${@link #createPartitions}:
33+
* add an array of partitions and any data they contain to the table
34+
* ${@link #dropPartitions}:
35+
* remove an array of partitions and any data they contain from the table
36+
*
37+
* @since 3.1.0
38+
*/
39+
@Experimental
40+
public interface SupportsAtomicPartitionManagement extends SupportsPartitionManagement {
41+
42+
@Override
43+
default void createPartition(
44+
InternalRow ident,
45+
Map<String, String> properties)
46+
throws PartitionAlreadyExistsException, UnsupportedOperationException {
47+
try {
48+
createPartitions(new InternalRow[]{ident}, new Map[]{properties});
49+
} catch (PartitionsAlreadyExistException e) {
50+
throw new PartitionAlreadyExistsException(e.getMessage());
51+
}
52+
}
53+
54+
@Override
55+
default boolean dropPartition(InternalRow ident) {
56+
return dropPartitions(new InternalRow[]{ident});
57+
}
58+
59+
/**
60+
* Create an array of partitions atomically in table.
61+
* <p>
62+
* If any partition already exists,
63+
* the operation of createPartitions need to be safely rolled back.
64+
*
65+
* @param idents an array of new partition identifiers
66+
* @param properties the metadata of the partitions
67+
* @throws PartitionsAlreadyExistException If any partition already exists for the identifier
68+
* @throws UnsupportedOperationException If partition property is not supported
69+
*/
70+
void createPartitions(
71+
InternalRow[] idents,
72+
Map<String, String>[] properties)
73+
throws PartitionsAlreadyExistException, UnsupportedOperationException;
74+
75+
/**
76+
* Drop an array of partitions atomically from table.
77+
* <p>
78+
* If any partition doesn't exists,
79+
* the operation of dropPartitions need to be safely rolled back.
80+
*
81+
* @param idents an array of partition identifiers
82+
* @return true if partitions were deleted, false if any partition not exists
83+
*/
84+
boolean dropPartitions(InternalRow[] idents);
85+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog;
19+
20+
import java.util.Map;
21+
22+
import org.apache.spark.annotation.Experimental;
23+
import org.apache.spark.sql.catalyst.InternalRow;
24+
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
25+
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
26+
import org.apache.spark.sql.types.StructType;
27+
28+
/**
29+
* A partition interface of {@link Table}.
30+
* A partition is composed of identifier and properties,
31+
* and properties contains metadata information of the partition.
32+
* <p>
33+
* These APIs are used to modify table partition identifier or partition metadata.
34+
* In some cases, they will change the table data as well.
35+
* ${@link #createPartition}:
36+
* add a partition and any data it contains to the table
37+
* ${@link #dropPartition}:
38+
* remove a partition and any data it contains from the table
39+
* ${@link #replacePartitionMetadata}:
40+
* point a partition to a new location, which will swap one location's data for the other
41+
*
42+
* @since 3.1.0
43+
*/
44+
@Experimental
45+
public interface SupportsPartitionManagement extends Table {
46+
47+
/**
48+
* Get the partition schema of table,
49+
* this must be consistent with ${@link Table#partitioning()}.
50+
* @return the partition schema of table
51+
*/
52+
StructType partitionSchema();
53+
54+
/**
55+
* Create a partition in table.
56+
*
57+
* @param ident a new partition identifier
58+
* @param properties the metadata of a partition
59+
* @throws PartitionAlreadyExistsException If a partition already exists for the identifier
60+
* @throws UnsupportedOperationException If partition property is not supported
61+
*/
62+
void createPartition(
63+
InternalRow ident,
64+
Map<String, String> properties)
65+
throws PartitionAlreadyExistsException, UnsupportedOperationException;
66+
67+
/**
68+
* Drop a partition from table.
69+
*
70+
* @param ident a partition identifier
71+
* @return true if a partition was deleted, false if no partition exists for the identifier
72+
*/
73+
boolean dropPartition(InternalRow ident);
74+
75+
/**
76+
* Test whether a partition exists using an {@link InternalRow ident} from the table.
77+
*
78+
* @param ident a partition identifier
79+
* @return true if the partition exists, false otherwise
80+
*/
81+
default boolean partitionExists(InternalRow ident) {
82+
return listPartitionIdentifiers(ident).length > 0;
83+
}
84+
85+
/**
86+
* Replace the partition metadata of the existing partition.
87+
*
88+
* @param ident the partition identifier of the existing partition
89+
* @param properties the new metadata of the partition
90+
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
91+
* @throws UnsupportedOperationException If partition property is not supported
92+
*/
93+
void replacePartitionMetadata(
94+
InternalRow ident,
95+
Map<String, String> properties)
96+
throws NoSuchPartitionException, UnsupportedOperationException;
97+
98+
/**
99+
* Retrieve the partition metadata of the existing partition.
100+
*
101+
* @param ident a partition identifier
102+
* @return the metadata of the partition
103+
* @throws UnsupportedOperationException If partition property is not supported
104+
*/
105+
Map<String, String> loadPartitionMetadata(InternalRow ident)
106+
throws UnsupportedOperationException;
107+
108+
/**
109+
* List the identifiers of all partitions that contains the ident in a table.
110+
*
111+
* @param ident a prefix of partition identifier
112+
* @return an array of Identifiers for the partitions
113+
*/
114+
InternalRow[] listPartitionIdentifiers(InternalRow ident);
115+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2324
import org.apache.spark.sql.connector.catalog.Identifier
25+
import org.apache.spark.sql.types.StructType
2426

2527
/**
2628
* Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
@@ -48,14 +50,30 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes
4850
class TempTableAlreadyExistsException(table: String)
4951
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")
5052

51-
class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
52-
extends AnalysisException(
53-
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
53+
class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) {
54+
def this(db: String, table: String, spec: TablePartitionSpec) = {
55+
this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
56+
}
57+
58+
def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
59+
this(s"Partition already exists in table $tableName:" +
60+
partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
61+
.map( kv => s"${kv._1} -> ${kv._2}").mkString(","))
62+
}
63+
}
5464

55-
class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])
56-
extends AnalysisException(
57-
s"The following partitions already exists in table '$table' database '$db':\n"
65+
class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) {
66+
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) {
67+
this(s"The following partitions already exists in table '$table' database '$db':\n"
5868
+ specs.mkString("\n===\n"))
69+
}
70+
71+
def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = {
72+
this(s"The following partitions already exists in table $tableName:" +
73+
partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
74+
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n"))
75+
}
76+
}
5977

6078
class FunctionAlreadyExistsException(db: String, func: String)
6179
extends AnalysisException(s"Function '$func' already exists in database '$db'")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
2324
import org.apache.spark.sql.connector.catalog.Identifier
25+
import org.apache.spark.sql.types.StructType
2426

2527

2628
/**
@@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) {
4648
}
4749
}
4850

49-
class NoSuchPartitionException(
50-
db: String,
51-
table: String,
52-
spec: TablePartitionSpec)
53-
extends AnalysisException(
54-
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
51+
class NoSuchPartitionException(message: String) extends AnalysisException(message) {
52+
def this(db: String, table: String, spec: TablePartitionSpec) = {
53+
this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
54+
}
55+
56+
def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
57+
this(s"Partition not found in table $tableName: "
58+
+ partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
59+
.map( kv => s"${kv._1} -> ${kv._2}").mkString(","))
60+
}
61+
}
5562

5663
class NoSuchPermanentFunctionException(db: String, func: String)
5764
extends AnalysisException(s"Function '$func' not found in database '$db'")
@@ -61,10 +68,18 @@ class NoSuchFunctionException(db: String, func: String, cause: Option[Throwable]
6168
s"Undefined function: '$func'. This function is neither a registered temporary function nor " +
6269
s"a permanent function registered in the database '$db'.", cause = cause)
6370

64-
class NoSuchPartitionsException(db: String, table: String, specs: Seq[TablePartitionSpec])
65-
extends AnalysisException(
66-
s"The following partitions not found in table '$table' database '$db':\n"
71+
class NoSuchPartitionsException(message: String) extends AnalysisException(message) {
72+
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
73+
this(s"The following partitions not found in table '$table' database '$db':\n"
6774
+ specs.mkString("\n===\n"))
75+
}
76+
77+
def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = {
78+
this(s"The following partitions not found in table $tableName: "
79+
+ partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
80+
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n"))
81+
}
82+
}
6883

6984
class NoSuchTempFunctionException(func: String)
7085
extends AnalysisException(s"Temporary function '$func' not found")
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector
19+
20+
import java.util
21+
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException}
24+
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
25+
import org.apache.spark.sql.connector.expressions.Transform
26+
import org.apache.spark.sql.types.StructType
27+
28+
/**
29+
* This class is used to test SupportsAtomicPartitionManagement API.
30+
*/
31+
class InMemoryAtomicPartitionTable (
32+
name: String,
33+
schema: StructType,
34+
partitioning: Array[Transform],
35+
properties: util.Map[String, String])
36+
extends InMemoryPartitionTable(name, schema, partitioning, properties)
37+
with SupportsAtomicPartitionManagement {
38+
39+
override def createPartition(
40+
ident: InternalRow,
41+
properties: util.Map[String, String]): Unit = {
42+
if (memoryTablePartitions.containsKey(ident)) {
43+
throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
44+
} else {
45+
memoryTablePartitions.put(ident, properties)
46+
}
47+
}
48+
49+
override def dropPartition(ident: InternalRow): Boolean = {
50+
if (memoryTablePartitions.containsKey(ident)) {
51+
memoryTablePartitions.remove(ident)
52+
true
53+
} else {
54+
false
55+
}
56+
}
57+
58+
override def createPartitions(
59+
idents: Array[InternalRow],
60+
properties: Array[util.Map[String, String]]): Unit = {
61+
if (idents.exists(partitionExists)) {
62+
throw new PartitionsAlreadyExistException(
63+
name, idents.filter(partitionExists), partitionSchema)
64+
}
65+
idents.zip(properties).foreach { case (ident, property) =>
66+
createPartition(ident, property)
67+
}
68+
}
69+
70+
override def dropPartitions(idents: Array[InternalRow]): Boolean = {
71+
if (!idents.forall(partitionExists)) {
72+
return false;
73+
}
74+
idents.forall(dropPartition)
75+
}
76+
}

0 commit comments

Comments
 (0)