Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -184,4 +185,21 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
"The spec (part0) must match the partition spec (part0, part1)"))
}
}

test("partition already exists") {
withNsTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")

val errMsg = intercept[PartitionsAlreadyExistException] {
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg.contains("The following partitions already exists"))

sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.command.v1

import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.command
Expand All @@ -44,21 +43,4 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
}
}

class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession {
test("partition already exists") {
withNsTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")

val errMsg = intercept[PartitionsAlreadyExistException] {
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg.contains("The following partitions already exists"))

sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}
}
class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.v2

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, ResolvePartitionSpec}
import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
Expand Down Expand Up @@ -60,23 +60,6 @@ class AlterTableAddPartitionSuite
assert(partMetadata.get("location") === expected)
}

test("partition already exists") {
withNsTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")

val errMsg = intercept[PartitionsAlreadyExistException] {
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg.contains("The following partitions already exists"))

sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}

test("SPARK-33650: add partition into a table which doesn't support partition management") {
withNsTable("ns", "tbl", s"non_part_$catalog") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client

import java.io.{File, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -48,7 +49,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -598,7 +599,17 @@ private[hive] class HiveClientImpl(
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
def replaceExistException(e: Throwable): Unit = e match {
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec))
case _ => throw e
}
try {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
} catch {
case e: InvocationTargetException => replaceExistException(e.getCause)
case e: Throwable => replaceExistException(e)
}
}

override def dropPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
Expand Down Expand Up @@ -594,6 +594,27 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
}

test(s"$version: createPartitions if already exists") {
val partitions = Seq(CatalogTablePartition(
Map("key1" -> "101", "key2" -> "102"),
storageFormat))
try {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
val errMsg = intercept[PartitionsAlreadyExistException] {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
}.getMessage
assert(errMsg.contains("partitions already exists"))
} finally {
client.dropPartitions(
"default",
"src_part",
partitions.map(_.spec),
ignoreIfNotExists = true,
purge = false,
retainData = false)
}
}

///////////////////////////////////////////////////////////////////////////
// Function related API
///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hive.execution.command

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.hive.test.TestHiveSingleton

Expand All @@ -26,21 +25,4 @@ class AlterTableAddPartitionSuite
with TestHiveSingleton {
override def version: String = "Hive V1"
override def defaultUsing: String = "USING HIVE"

test("partition already exists") {
withNsTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")

val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg.contains("already exists"))

sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}
}