Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -324,38 +324,47 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val tableIdentwithDB = table.identifier.quotedString
val tableIdentWithDB = table.identifier.quotedString

if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB")
s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableIdentwithDB")
s"for tables that are not partitioned: $tableIdentWithDB")
}
if (partitionSpec.isDefined) {
DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
}

val partCols = table.partitionColumnNames
val locations =
if (table.partitionColumnNames.isEmpty) {
if (partCols.isEmpty) {
Seq(table.storage.locationUri)
} else {
// Here we diverge from Hive when the given partition spec contains all partition columns
// but no partition is matched: Hive will throw an exception and we just do nothing.
val normalizedSpec = partitionSpec.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
partCols,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}
catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
val partLocations =
catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)

// Fail if the partition spec is fully specified (not partial) and the partition does not
// exist.
for (spec <- partitionSpec if partLocations.isEmpty && spec.size == partCols.length) {
throw new NoSuchPartitionException(table.database, table.identifier.table, spec)
}

partLocations
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
Expand All @@ -368,7 +377,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " +
s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
Expand All @@ -381,7 +390,7 @@ case class TruncateTableCommand(
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e)
log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e)
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1673,11 +1673,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())

// do nothing if no partition is matched for the given non-partial partition spec
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
// Hive's behaviour or stick with our existing behaviour later.
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
assert(spark.table("partTable").count() == data.count())
// throw exception if no partition is matched for the given non-partial partition spec.
intercept[NoSuchPartitionException] {
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
}

// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1149,11 +1148,10 @@ class HiveDDLSuite
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())

// do nothing if no partition is matched for the given non-partial partition spec
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
// Hive's behaviour or stick with our existing behaviour later.
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
assert(spark.table("partTable").count() == data.count())
// throw exception if no partition is matched for the given non-partial partition spec.
intercept[NoSuchPartitionException] {
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
}

// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
Expand Down