Skip to content

Commit 7c1a8e5

Browse files
committed
fix
1 parent 6029a95 commit 7c1a8e5

File tree

6 files changed

+61
-54
lines changed

6 files changed

+61
-54
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,10 @@ class SessionCatalog(
246246
}
247247

248248
/**
249-
* Retrieve the metadata of an existing metastore table.
250-
* If no database is specified, assume the table is in the current database.
251-
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
249+
* Retrieve the metadata of an existing metastore table/view or a temporary view.
250+
* If no database is specified, we check whether the corresponding temporary view exists.
251+
* If the temporary view does not exist, we assume the table/view is in the current database.
252+
* If still not found in the database then a [[NoSuchTableException]] is thrown.
252253
*/
253254
def getTableMetadata(name: TableIdentifier): CatalogTable = {
254255
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
@@ -269,6 +270,19 @@ class SessionCatalog(
269270
}
270271
}
271272

273+
/**
274+
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
275+
* assume the table/view is in the current database. If the specified table/view is not found
276+
* in the database then a [[NoSuchTableException]] is thrown.
277+
*/
278+
def getNonTempTableMetadata(name: TableIdentifier): CatalogTable = {
279+
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
280+
val table = formatTableName(name.table)
281+
requireDbExists(db)
282+
requireTableExists(TableIdentifier(table, Some(db)))
283+
externalCatalog.getTable(db, table)
284+
}
285+
272286
/**
273287
* Retrieve the metadata of an existing metastore table.
274288
* If no database is specified, assume the table is in the current database.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,13 @@ case class AlterTableUnsetPropertiesCommand(
265265
override def run(sparkSession: SparkSession): Seq[Row] = {
266266
val catalog = sparkSession.sessionState.catalog
267267
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
268-
val db = tableName.database.getOrElse(catalog.getCurrentDatabase)
269-
val qualifiedName = TableIdentifier(tableName.table, Some(db))
270-
val table = catalog.getTableMetadata(qualifiedName)
268+
val table = catalog.getNonTempTableMetadata(tableName)
271269

272270
if (!ifExists) {
273271
propKeys.foreach { k =>
274272
if (!table.properties.contains(k)) {
275273
throw new AnalysisException(
276-
s"Attempted to unset non-existent property '$k' in table '$tableName'")
274+
s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
277275
}
278276
}
279277
}
@@ -307,7 +305,7 @@ case class AlterTableSerDePropertiesCommand(
307305

308306
override def run(sparkSession: SparkSession): Seq[Row] = {
309307
val catalog = sparkSession.sessionState.catalog
310-
val table = catalog.getTableMetadata(tableName)
308+
val table = catalog.getNonTempTableMetadata(tableName)
311309
// For datasource tables, disallow setting serde or specifying partition
312310
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
313311
throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
@@ -325,11 +323,11 @@ case class AlterTableSerDePropertiesCommand(
325323
catalog.alterTable(newTable)
326324
} else {
327325
val spec = partSpec.get
328-
val part = catalog.getPartition(tableName, spec)
326+
val part = catalog.getPartition(table.identifier, spec)
329327
val newPart = part.copy(storage = part.storage.copy(
330328
serde = serdeClassName.orElse(part.storage.serde),
331329
properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
332-
catalog.alterPartitions(tableName, Seq(newPart))
330+
catalog.alterPartitions(table.identifier, Seq(newPart))
333331
}
334332
Seq.empty[Row]
335333
}
@@ -356,7 +354,7 @@ case class AlterTableAddPartitionCommand(
356354

357355
override def run(sparkSession: SparkSession): Seq[Row] = {
358356
val catalog = sparkSession.sessionState.catalog
359-
val table = catalog.getTableMetadata(tableName)
357+
val table = catalog.getNonTempTableMetadata(tableName)
360358
if (DDLUtils.isDatasourceTable(table)) {
361359
throw new AnalysisException(
362360
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
@@ -365,7 +363,7 @@ case class AlterTableAddPartitionCommand(
365363
// inherit table storage format (possibly except for location)
366364
CatalogTablePartition(spec, table.storage.copy(locationUri = location))
367365
}
368-
catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
366+
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
369367
Seq.empty[Row]
370368
}
371369

@@ -416,12 +414,12 @@ case class AlterTableDropPartitionCommand(
416414

417415
override def run(sparkSession: SparkSession): Seq[Row] = {
418416
val catalog = sparkSession.sessionState.catalog
419-
val table = catalog.getTableMetadata(tableName)
417+
val table = catalog.getNonTempTableMetadata(tableName)
420418
if (DDLUtils.isDatasourceTable(table)) {
421419
throw new AnalysisException(
422420
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
423421
}
424-
catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
422+
catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
425423
Seq.empty[Row]
426424
}
427425

@@ -470,9 +468,9 @@ case class AlterTableRecoverPartitionsCommand(
470468

471469
override def run(spark: SparkSession): Seq[Row] = {
472470
val catalog = spark.sessionState.catalog
473-
val db = tableName.database.getOrElse(catalog.getCurrentDatabase)
474-
val qualifiedName = TableIdentifier(tableName.table, Some(db))
475-
val table = catalog.getTableMetadata(qualifiedName)
471+
val table = catalog.getNonTempTableMetadata(tableName)
472+
val qualifiedName = table.identifier.quotedString
473+
476474
if (DDLUtils.isDatasourceTable(table)) {
477475
throw new AnalysisException(
478476
s"Operation not allowed: $cmd on datasource tables: $qualifiedName")
@@ -647,11 +645,11 @@ case class AlterTableSetLocationCommand(
647645

648646
override def run(sparkSession: SparkSession): Seq[Row] = {
649647
val catalog = sparkSession.sessionState.catalog
650-
val table = catalog.getTableMetadata(tableName)
648+
val table = catalog.getNonTempTableMetadata(tableName)
651649
partitionSpec match {
652650
case Some(spec) =>
653651
// Partition spec is specified, so we set the location only for this partition
654-
val part = catalog.getPartition(tableName, spec)
652+
val part = catalog.getPartition(table.identifier, spec)
655653
val newPart =
656654
if (DDLUtils.isDatasourceTable(table)) {
657655
throw new AnalysisException(
@@ -660,7 +658,7 @@ case class AlterTableSetLocationCommand(
660658
} else {
661659
part.copy(storage = part.storage.copy(locationUri = Some(location)))
662660
}
663-
catalog.alterPartitions(tableName, Seq(newPart))
661+
catalog.alterPartitions(table.identifier, Seq(newPart))
664662
case None =>
665663
// No partition spec is specified, so we set the location for the table itself
666664
val newTable =

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,8 @@ case class LoadDataCommand(
214214

215215
override def run(sparkSession: SparkSession): Seq[Row] = {
216216
val catalog = sparkSession.sessionState.catalog
217-
val db = table.database.getOrElse(catalog.getCurrentDatabase)
218-
val qualifiedName = TableIdentifier(table.table, Some(db))
219-
val targetTable = catalog.getTableMetadata(qualifiedName)
217+
val targetTable = catalog.getNonTempTableMetadata(table)
218+
val qualifiedName = targetTable.identifier.quotedString
220219

221220
if (targetTable.tableType == CatalogTableType.VIEW) {
222221
throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $qualifiedName")
@@ -334,9 +333,8 @@ case class TruncateTableCommand(
334333

335334
override def run(spark: SparkSession): Seq[Row] = {
336335
val catalog = spark.sessionState.catalog
337-
val db = tableName.database.getOrElse(catalog.getCurrentDatabase)
338-
val qualifiedName = TableIdentifier(tableName.table, Some(db))
339-
val table = catalog.getTableMetadata(qualifiedName)
336+
val table = catalog.getNonTempTableMetadata(tableName)
337+
val qualifiedName = table.identifier.quotedString
340338

341339
if (table.tableType == CatalogTableType.EXTERNAL) {
342340
throw new AnalysisException(
@@ -363,7 +361,7 @@ case class TruncateTableCommand(
363361
} else if (table.partitionColumnNames.isEmpty) {
364362
Seq(table.storage.locationUri)
365363
} else {
366-
catalog.listPartitions(qualifiedName, partitionSpec).map(_.storage.locationUri)
364+
catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
367365
}
368366
val hadoopConf = spark.sessionState.newHadoopConf()
369367
locations.foreach { location =>
@@ -386,7 +384,7 @@ case class TruncateTableCommand(
386384
spark.sessionState.refreshTable(tableName.unquotedString)
387385
// Also try to drop the contents of the table from the columnar cache
388386
try {
389-
spark.sharedState.cacheManager.uncacheQuery(spark.table(qualifiedName.quotedString))
387+
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
390388
} catch {
391389
case NonFatal(e) =>
392390
log.warn(s"Exception when attempting to uncache table $qualifiedName", e)
@@ -622,7 +620,7 @@ case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
622620
* }}}
623621
*/
624622
case class ShowPartitionsCommand(
625-
table: TableIdentifier,
623+
tableName: TableIdentifier,
626624
spec: Option[TablePartitionSpec]) extends RunnableCommand {
627625
override val output: Seq[Attribute] = {
628626
AttributeReference("partition", StringType, nullable = false)() :: Nil
@@ -636,29 +634,27 @@ case class ShowPartitionsCommand(
636634

637635
override def run(sparkSession: SparkSession): Seq[Row] = {
638636
val catalog = sparkSession.sessionState.catalog
639-
val db = table.database.getOrElse(catalog.getCurrentDatabase)
640-
val qualifiedName = TableIdentifier(table.table, Some(db))
641-
val tab = catalog.getTableMetadata(qualifiedName)
637+
val table = catalog.getNonTempTableMetadata(tableName)
638+
val qualifiedName = table.identifier.quotedString
642639

643640
/**
644641
* Validate and throws an [[AnalysisException]] exception under the following conditions:
645642
* 1. If the table is not partitioned.
646643
* 2. If it is a datasource table.
647644
* 3. If it is a view.
648645
*/
649-
if (tab.tableType == VIEW) {
650-
throw new AnalysisException(
651-
s"SHOW PARTITIONS is not allowed on a view: ${tab.qualifiedName}")
646+
if (table.tableType == VIEW) {
647+
throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $qualifiedName")
652648
}
653649

654-
if (tab.partitionColumnNames.isEmpty) {
650+
if (table.partitionColumnNames.isEmpty) {
655651
throw new AnalysisException(
656-
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
652+
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $qualifiedName")
657653
}
658654

659-
if (DDLUtils.isDatasourceTable(tab)) {
655+
if (DDLUtils.isDatasourceTable(table)) {
660656
throw new AnalysisException(
661-
s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}")
657+
s"SHOW PARTITIONS is not allowed on a datasource table: $qualifiedName")
662658
}
663659

664660
/**
@@ -667,16 +663,16 @@ case class ShowPartitionsCommand(
667663
* thrown if the partitioning spec is invalid.
668664
*/
669665
if (spec.isDefined) {
670-
val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
666+
val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains)
671667
if (badColumns.nonEmpty) {
672668
val badCols = badColumns.mkString("[", ", ", "]")
673669
throw new AnalysisException(
674670
s"Non-partitioning column(s) $badCols are specified for SHOW PARTITIONS")
675671
}
676672
}
677673

678-
val partNames = catalog.listPartitions(table, spec).map { p =>
679-
getPartName(p.spec, tab.partitionColumnNames)
674+
val partNames = catalog.listPartitions(tableName, spec).map { p =>
675+
getPartName(p.spec, table.partitionColumnNames)
680676
}
681677

682678
partNames.map(Row(_))

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -406,25 +406,24 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
406406
|USING org.apache.spark.sql.parquet.DefaultSource
407407
""".stripMargin)
408408
// An empty sequence of row is returned for session temporary table.
409-
val message1 = intercept[AnalysisException] {
409+
intercept[NoSuchTableException] {
410410
sql("SHOW PARTITIONS parquet_temp")
411-
}.getMessage
412-
assert(message1.contains("Operation not allowed: SHOW PARTITIONS on temporary tables"))
411+
}
413412

414-
val message2 = intercept[AnalysisException] {
413+
val message1 = intercept[AnalysisException] {
415414
sql("SHOW PARTITIONS parquet_tab3")
416415
}.getMessage
417-
assert(message2.contains("not allowed on a table that is not partitioned"))
416+
assert(message1.contains("not allowed on a table that is not partitioned"))
418417

419-
val message3 = intercept[AnalysisException] {
418+
val message2 = intercept[AnalysisException] {
420419
sql("SHOW PARTITIONS parquet_tab4 PARTITION(abcd=2015, xyz=1)")
421420
}.getMessage
422-
assert(message3.contains("Non-partitioning column(s) [abcd, xyz] are specified"))
421+
assert(message2.contains("Non-partitioning column(s) [abcd, xyz] are specified"))
423422

424-
val message4 = intercept[AnalysisException] {
423+
val message3 = intercept[AnalysisException] {
425424
sql("SHOW PARTITIONS parquet_view1")
426425
}.getMessage
427-
assert(message4.contains("is not allowed on a view"))
426+
assert(message3.contains("is not allowed on a view"))
428427
}
429428
}
430429

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ class HiveDDLSuite
300300
sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
301301
}.getMessage
302302
assert(message.contains(
303-
"Attempted to unset non-existent property 'p' in table '`view1`'"))
303+
"Attempted to unset non-existent property 'p' in table '`default`.`view1`'"))
304304
}
305305
}
306306
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,12 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
145145
e = intercept[AnalysisException] {
146146
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
147147
}.getMessage
148-
assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`$viewName`"))
148+
assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`"))
149149

150150
e = intercept[AnalysisException] {
151151
sql(s"TRUNCATE TABLE $viewName")
152152
}.getMessage
153-
assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `default`.`$viewName`"))
153+
assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `default`.`testview`"))
154154
}
155155
}
156156

0 commit comments

Comments
 (0)