Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case AlterTableDropPartition(ResolvedTable(_, _, table), parts, _, _) =>
checkAlterTablePartition(table, parts)

case AlterTableRenamePartition(ResolvedTable(_, _, table), from, _) =>
checkAlterTablePartition(table, Seq(from))

case showPartitions: ShowPartitions => checkShowPartitions(showPartitions)

case _ => // Falls back to the following checks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan, ShowPartitions}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, AlterTableRenamePartition, LogicalPlan, ShowPartitions}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
Expand Down Expand Up @@ -51,6 +51,15 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
partitionSchema,
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))

case r @ AlterTableRenamePartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), from, _) =>
val partitionSchema = table.partitionSchema()
r.copy(from = resolvePartitionSpecs(
table.name,
Seq(from),
partitionSchema,
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)).head)

case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) =>
r.copy(pattern = resolvePartitionSpecs(
table.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3761,7 +3761,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create an [[AlterTableRenamePartitionStatement]]
* Create an [[AlterTableRenamePartition]]
*
* For example:
* {{{
Expand All @@ -3770,9 +3770,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitRenameTablePartition(
ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenamePartitionStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
visitNonOptionalPartitionSpec(ctx.from),
AlterTableRenamePartition(
UnresolvedTable(
visitMultipartIdentifier(ctx.multipartIdentifier),
"ALTER TABLE ... RENAME TO PARTITION"),
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(ctx.from)),
visitNonOptionalPartitionSpec(ctx.to))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,6 @@ case class AlterTableSetLocationStatement(
partitionSpec: Option[TablePartitionSpec],
location: String) extends ParsedStatement

/**
* ALTER TABLE ... RENAME PARTITION command, as parsed from SQL.
*/
case class AlterTableRenamePartitionStatement(
tableName: Seq[String],
from: TablePartitionSpec,
to: TablePartitionSpec) extends ParsedStatement

/**
* An INSERT INTO statement, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,19 @@ case class AlterTableDropPartition(
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the ALTER TABLE ... RENAME TO PARTITION command.
*/
case class AlterTableRenamePartition(
child: LogicalPlan,
from: PartitionSpec,
to: TablePartitionSpec) extends Command {
override lazy val resolved: Boolean =
childrenResolved && from.isInstanceOf[ResolvedPartitionSpec]

override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the ALTER TABLE ... RECOVER PARTITIONS command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -2106,9 +2106,9 @@ class DDLParserSuite extends AnalysisTest {
|RENAME TO PARTITION (dt='2008-09-09', country='uk')
""".stripMargin
val parsed1 = parsePlan(sql1)
val expected1 = AlterTableRenamePartitionStatement(
Seq("table_name"),
Map("dt" -> "2008-08-08", "country" -> "us"),
val expected1 = AlterTableRenamePartition(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME TO PARTITION"),
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us")),
Map("dt" -> "2008-09-09", "country" -> "uk"))
comparePlans(parsed1, expected1)

Expand All @@ -2118,9 +2118,9 @@ class DDLParserSuite extends AnalysisTest {
|RENAME TO PARTITION (ds='2018-06-10')
""".stripMargin
val parsed2 = parsePlan(sql2)
val expected2 = AlterTableRenamePartitionStatement(
Seq("a", "b", "c"),
Map("ds" -> "2017-06-10"),
val expected2 = AlterTableRenamePartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... RENAME TO PARTITION"),
UnresolvedPartitionSpec(Map("ds" -> "2017-06-10")),
Map("ds" -> "2018-06-10"))
comparePlans(parsed2, expected2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,10 @@ class ResolveSessionCatalog(
partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
ifNotExists)

case AlterTableRenamePartitionStatement(tbl, from, to) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE RENAME PARTITION")
case AlterTableRenamePartition(
ResolvedV1TableIdentifier(ident), UnresolvedPartitionSpec(from, _), to) =>
AlterTableRenamePartitionCommand(
v1TableName.asTableIdentifier,
ident.asTableIdentifier,
from,
to)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
AlterTableDropPartitionExec(
table, parts.asResolvedPartitionSpecs, ignoreIfNotExists) :: Nil

case AlterTableRenamePartition(_: ResolvedTable, _: ResolvedPartitionSpec, _) =>
throw new AnalysisException(
"ALTER TABLE ... RENAME TO PARTITION is not supported for v2 tables.")

case AlterTableRecoverPartitions(_: ResolvedTable) =>
throw new AnalysisException(
"ALTER TABLE ... RECOVER PARTITIONS is not supported for v2 tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}

test("ALTER TABLE RENAME PARTITION") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION (id=1) RENAME TO PARTITION (id=2)")
val nonPartTbl = "testcat.ns1.ns2.tbl"
val partTbl = "testpart.ns1.ns2.tbl"
withTable(nonPartTbl, partTbl) {
spark.sql(s"CREATE TABLE $nonPartTbl (id bigint, data string) USING foo PARTITIONED BY (id)")
val e1 = intercept[AnalysisException] {
sql(s"ALTER TABLE $nonPartTbl PARTITION (id=1) RENAME TO PARTITION (id=2)")
}
assert(e1.message.contains(s"Table $nonPartTbl can not alter partitions"))

spark.sql(s"CREATE TABLE $partTbl (id bigint, data string) USING foo PARTITIONED BY (id)")
val e2 = intercept[AnalysisException] {
sql(s"ALTER TABLE $partTbl PARTITION (id=1) RENAME TO PARTITION (id=2)")
}
assert(e.message.contains("ALTER TABLE RENAME PARTITION is only supported with v1 tables"))
assert(e2.message.contains(
"ALTER TABLE ... RENAME TO PARTITION is not supported for v2 tables."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
assertAnalysisError(
s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')",
s"$viewName is a temp view. 'ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]' expects a table")
assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')")
assertAnalysisError(
s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')",
s"$viewName is a temp view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table")
assertAnalysisError(
s"ALTER TABLE $viewName RECOVER PARTITIONS",
s"$viewName is a temp view. 'ALTER TABLE ... RECOVER PARTITIONS' expects a table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
Expand Down Expand Up @@ -1642,9 +1642,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))

// table to alter does not exist
intercept[NoSuchTableException] {
val e = intercept[AnalysisException] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you replace NoSuchTableException ?

Copy link
Contributor Author

@imback82 imback82 Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unresolved table is now handled here:

case u: UnresolvedTable =>
u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

Copy link
Member

@MaxGekk MaxGekk Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, I believe we should throw NoSuchTableException here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise with such approach, you will lift all specific exception to the general one - AnalysisException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's fix it separately.

Copy link
Contributor Author

@imback82 imback82 Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will do a follow-up PR.

sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
assert(e.getMessage.contains("Table not found: does_not_exist"))

// partition to rename does not exist
intercept[NoSuchPartitionException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,9 @@ class HiveDDLSuite
s"ALTER TABLE $oldViewName RECOVER PARTITIONS",
s"$oldViewName is a view. 'ALTER TABLE ... RECOVER PARTITIONS' expects a table.")

assertErrorForAlterTableOnView(
s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')")
assertAnalysisError(
s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')",
s"$oldViewName is a view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table.")

assertAnalysisError(
s"ALTER TABLE $oldViewName ADD IF NOT EXISTS PARTITION (a='4', b='8')",
Expand Down