Skip to content

Commit 6efca68

Browse files
author
stczwd
committed
add AlterTableAddPartitionExec and AlterTableDropPartitionExec
Change-Id: Id4c4ee16dec31def7fbbb8609ef5ed41804c5402
1 parent a57aafc commit 6efca68

File tree

10 files changed

+300
-19
lines changed

10 files changed

+300
-19
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,24 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
228228

229229
case ShowCurrentNamespaceStatement() =>
230230
ShowCurrentNamespace(catalogManager)
231+
232+
case AlterTableAddPartitionStatement(
233+
NonSessionCatalogAndTable(catalog, tableName), partitionSpecsAndLocs, ifNotExists) =>
234+
AlterTableAddPartition(
235+
catalog.asTableCatalog,
236+
tableName.asIdentifier,
237+
partitionSpecsAndLocs,
238+
ifNotExists)
239+
240+
case AlterTableDropPartitionStatement(
241+
NonSessionCatalogAndTable(catalog, tableName), specs, ifExists, purge, retainData) =>
242+
AlterTableDropPartition(
243+
catalog.asTableCatalog,
244+
tableName.asIdentifier,
245+
specs,
246+
ifExists,
247+
purge,
248+
retainData)
231249
}
232250

233251
object NonSessionCatalogAndTable {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,3 +551,35 @@ case class ShowFunctions(
551551
pattern: Option[String]) extends Command {
552552
override def children: Seq[LogicalPlan] = child.toSeq
553553
}
554+
555+
/**
556+
* The logical plan of the ALTER TABLE ADD PARTITION command that works for v2 tables.
557+
*
558+
* The syntax of this command is:
559+
* {{{
560+
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
561+
* PARTITION spec2 [LOCATION 'loc2']
562+
* }}}
563+
*/
564+
case class AlterTableAddPartition(
565+
catalog: TableCatalog,
566+
ident: Identifier,
567+
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
568+
ignoreIfExists: Boolean) extends Command
569+
570+
/**
571+
* The logical plan of the ALTER TABLE DROP PARTITION command that works for v2 tables.
572+
* This may remove the data and metadata for this partition.
573+
*
574+
* The syntax of this command is:
575+
* {{{
576+
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
577+
* }}}
578+
*/
579+
case class AlterTableDropPartition(
580+
catalog: TableCatalog,
581+
ident: Identifier,
582+
specs: Seq[TablePartitionSpec],
583+
ignoreIfNotExists: Boolean,
584+
purge: Boolean,
585+
retainData: Boolean) extends Command

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.spark.sql.AnalysisException
23-
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
25+
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsPartitions, SupportsRead, SupportsWrite, Table, TableCapability}
26+
import org.apache.spark.sql.types.{ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructType}
2427
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2528

2629
object DataSourceV2Implicits {
@@ -52,6 +55,15 @@ object DataSourceV2Implicits {
5255
}
5356
}
5457

58+
def asPartitionable: SupportsPartitions = {
59+
table match {
60+
case support: SupportsPartitions =>
61+
support
62+
case _ =>
63+
throw new AnalysisException(s"Table does not support partitions: ${table.name}")
64+
}
65+
}
66+
5567
def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)
5668

5769
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
@@ -62,4 +74,31 @@ object DataSourceV2Implicits {
6274
new CaseInsensitiveStringMap(options.asJava)
6375
}
6476
}
77+
78+
def convertPartitionIndentifers(
79+
partSpec: TablePartitionSpec,
80+
partSchema: StructType): InternalRow = {
81+
val partValues = partSchema.map { part =>
82+
part.dataType match {
83+
case _: ByteType =>
84+
partSpec.getOrElse(part.name, "0").toByte
85+
case _: ShortType =>
86+
partSpec.getOrElse(part.name, "0").toShort
87+
case _: IntegerType =>
88+
partSpec.getOrElse(part.name, "0").toInt
89+
case _: LongType =>
90+
partSpec.getOrElse(part.name, "0").toLong
91+
case _: FloatType =>
92+
partSpec.getOrElse(part.name, "0").toFloat
93+
case _: DoubleType =>
94+
partSpec.getOrElse(part.name, "0").toDouble
95+
case _: StringType =>
96+
partSpec.getOrElse(part.name, "")
97+
case _ =>
98+
throw new AnalysisException(
99+
s"Type ${part.dataType.typeName} is not supported for partition.")
100+
}
101+
}
102+
InternalRow.fromSeq(partValues)
103+
}
65104
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,11 @@ class InMemoryPartitionTable(
9191
.filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray
9292
}
9393

94-
def partitionExists(ident: InternalRow): Boolean = memoryTablePartitions.containsKey(ident)
94+
def partitionExists(ident: InternalRow): Boolean = {
95+
memoryTablePartitions.containsKey(ident)
96+
}
97+
98+
def clearPartitions(): Unit = {
99+
memoryTablePartitions.clear()
100+
}
95101
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector
19+
import java.util
20+
21+
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
22+
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, Table}
23+
import org.apache.spark.sql.connector.expressions.Transform
24+
import org.apache.spark.sql.types.StructType
25+
26+
class InMemoryPartitionTableCatalog extends InMemoryTableCatalog {
27+
import CatalogV2Implicits._
28+
29+
override def createTable(
30+
ident: Identifier,
31+
schema: StructType,
32+
partitions: Array[Transform],
33+
properties: util.Map[String, String]): Table = {
34+
if (tables.containsKey(ident)) {
35+
throw new TableAlreadyExistsException(ident)
36+
}
37+
38+
InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
39+
40+
val table = new InMemoryPartitionTable(
41+
s"$name.${ident.quoted}", schema, partitions, properties)
42+
tables.put(ident, table)
43+
namespaces.putIfAbsent(ident.namespace.toList, Map())
44+
table
45+
}
46+
}

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,10 @@ class ResolveSessionCatalog(
497497
v1TableName.asTableIdentifier,
498498
"ALTER TABLE RECOVER PARTITIONS")
499499

500-
case AlterTableAddPartitionStatement(tbl, partitionSpecsAndLocs, ifNotExists) =>
501-
val v1TableName = parseV1Table(tbl, "ALTER TABLE ADD PARTITION")
500+
case AlterTableAddPartitionStatement(
501+
SessionCatalogAndTable(_, tbl), partitionSpecsAndLocs, ifNotExists) =>
502502
AlterTableAddPartitionCommand(
503-
v1TableName.asTableIdentifier,
503+
tbl.asTableIdentifier,
504504
partitionSpecsAndLocs,
505505
ifNotExists)
506506

@@ -511,10 +511,10 @@ class ResolveSessionCatalog(
511511
from,
512512
to)
513513

514-
case AlterTableDropPartitionStatement(tbl, specs, ifExists, purge, retainData) =>
515-
val v1TableName = parseV1Table(tbl, "ALTER TABLE DROP PARTITION")
514+
case AlterTableDropPartitionStatement(
515+
SessionCatalogAndTable(_, tbl), specs, ifExists, purge, retainData) =>
516516
AlterTableDropPartitionCommand(
517-
v1TableName.asTableIdentifier,
517+
tbl.asTableIdentifier,
518518
specs,
519519
ifExists,
520520
purge,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
23+
import org.apache.spark.sql.catalyst.expressions.Attribute
24+
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, TableCatalog}
25+
26+
/**
27+
* Physical plan node for adding partitions of table.
28+
*/
29+
case class AlterTableAddPartitionExec(
30+
catalog: TableCatalog,
31+
ident: Identifier,
32+
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
33+
ignoreIfExists: Boolean) extends V2CommandExec {
34+
import DataSourceV2Implicits._
35+
import CatalogV2Implicits._
36+
37+
override def output: Seq[Attribute] = Seq.empty
38+
39+
override protected def run(): Seq[InternalRow] = {
40+
val table = catalog.loadTable(ident).asPartitionable
41+
val partNames = table.partitionSchema().map(_.name)
42+
43+
partitionSpecsAndLocs.foreach { case (spec, location) =>
44+
val partParams = new java.util.HashMap[String, String](table.properties())
45+
location.foreach(locationUri =>
46+
partParams.put("location", locationUri))
47+
partParams.put("ignoreIfExists", ignoreIfExists.toString)
48+
49+
val conflictKeys = spec.keys.filterNot(partNames.contains)
50+
if (conflictKeys.nonEmpty) {
51+
throw new AnalysisException(
52+
s"Partition key ${conflictKeys.mkString(",")} " +
53+
s"not exists in ${ident.namespace().quoted}.${ident.name()}")
54+
}
55+
56+
val partIdent: InternalRow = convertPartitionIndentifers(spec, table.partitionSchema())
57+
table.createPartition(partIdent, partParams)
58+
}
59+
Seq.empty
60+
}
61+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
23+
import org.apache.spark.sql.catalyst.expressions.Attribute
24+
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, TableCatalog}
25+
26+
/**
27+
* Physical plan node for dropping partitions of table.
28+
*/
29+
case class AlterTableDropPartitionExec(
30+
catalog: TableCatalog,
31+
ident: Identifier,
32+
specs: Seq[TablePartitionSpec],
33+
ignoreIfNotExists: Boolean,
34+
purge: Boolean,
35+
retainData: Boolean) extends V2CommandExec {
36+
import DataSourceV2Implicits._
37+
import CatalogV2Implicits._
38+
39+
override def output: Seq[Attribute] = Seq.empty
40+
41+
override protected def run(): Seq[InternalRow] = {
42+
val table = catalog.loadTable(ident).asPartitionable
43+
val partNames = table.partitionSchema().map(_.name)
44+
45+
specs.foreach { partSpec =>
46+
val conflictKeys = partSpec.keys.filterNot(partNames.contains)
47+
if (conflictKeys.nonEmpty) {
48+
throw new AnalysisException(
49+
s"Partition key ${conflictKeys.mkString(",")} " +
50+
s"not exists in ${ident.namespace().quoted}.${ident.name()}")
51+
}
52+
53+
val partIdent: InternalRow = convertPartitionIndentifers(partSpec, table.partitionSchema())
54+
table.dropPartition(partIdent)
55+
}
56+
Seq.empty
57+
}
58+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
276276
case r @ ShowTableProperties(rt: ResolvedTable, propertyKey) =>
277277
ShowTablePropertiesExec(r.output, rt.table, propertyKey) :: Nil
278278

279+
case AlterTableAddPartition(catalog, ident, partitionSpecsAndLocs, ignoreIfExists) =>
280+
AlterTableAddPartitionExec(catalog, ident, partitionSpecsAndLocs, ignoreIfExists) :: Nil
281+
282+
case AlterTableDropPartition(catalog, ident, specs, ignoreIfNotExists, purge, retainData) =>
283+
AlterTableDropPartitionExec(
284+
catalog, ident, specs, ignoreIfNotExists, purge, retainData) :: Nil
285+
279286
case _ => Nil
280287
}
281288
}

0 commit comments

Comments
 (0)