Skip to content

Commit b50b34f

Browse files
committed
[SPARK-17609][SQL] SessionCatalog.tableExists should not check temp view
## What changes were proposed in this pull request? After #15054 , there is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks. This PR also improves the `getTempViewOrPermanentTableMetadata` that is introduced in #15054 , to make the code simpler. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #15160 from cloud-fan/exists.
1 parent 8bde03b commit b50b34f

File tree

9 files changed

+81
-115
lines changed

9 files changed

+81
-115
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,16 @@ class SessionCatalog(
245245
externalCatalog.alterTable(newTableDefinition)
246246
}
247247

248+
/**
249+
* Return whether a table/view with the specified name exists. If no database is specified, check
250+
* with current database.
251+
*/
252+
def tableExists(name: TableIdentifier): Boolean = synchronized {
253+
val db = formatDatabaseName(name.database.getOrElse(currentDb))
254+
val table = formatTableName(name.table)
255+
externalCatalog.tableExists(db, table)
256+
}
257+
248258
/**
249259
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
250260
* assume the table/view is in the current database. If the specified table/view is not found
@@ -270,24 +280,6 @@ class SessionCatalog(
270280
externalCatalog.getTableOption(db, table)
271281
}
272282

273-
/**
274-
* Retrieve the metadata of an existing temporary view or permanent table/view.
275-
* If the temporary view does not exist, tries to get the metadata an existing permanent
276-
* table/view. If no database is specified, assume the table/view is in the current database.
277-
* If the specified table/view is not found in the database then a [[NoSuchTableException]] is
278-
* thrown.
279-
*/
280-
def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
281-
val table = formatTableName(name)
282-
getTempView(table).map { plan =>
283-
CatalogTable(
284-
identifier = TableIdentifier(table),
285-
tableType = CatalogTableType.VIEW,
286-
storage = CatalogStorageFormat.empty,
287-
schema = plan.output.toStructType)
288-
}.getOrElse(getTableMetadata(TableIdentifier(name)))
289-
}
290-
291283
/**
292284
* Load files stored in given path into an existing metastore table.
293285
* If no database is specified, assume the table is in the current database.
@@ -368,6 +360,30 @@ class SessionCatalog(
368360
// | Methods that interact with temporary and metastore tables |
369361
// -------------------------------------------------------------
370362

363+
/**
364+
* Retrieve the metadata of an existing temporary view or permanent table/view.
365+
*
366+
* If a database is specified in `name`, this will return the metadata of table/view in that
367+
* database.
368+
* If no database is specified, this will first attempt to get the metadata of a temporary view
369+
* with the same name, then, if that does not exist, return the metadata of table/view in the
370+
* current database.
371+
*/
372+
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
373+
val table = formatTableName(name.table)
374+
if (name.database.isDefined) {
375+
getTableMetadata(name)
376+
} else {
377+
getTempView(table).map { plan =>
378+
CatalogTable(
379+
identifier = TableIdentifier(table),
380+
tableType = CatalogTableType.VIEW,
381+
storage = CatalogStorageFormat.empty,
382+
schema = plan.output.toStructType)
383+
}.getOrElse(getTableMetadata(name))
384+
}
385+
}
386+
371387
/**
372388
* Rename a table.
373389
*
@@ -449,24 +465,6 @@ class SessionCatalog(
449465
}
450466
}
451467

452-
/**
453-
* Return whether a table/view with the specified name exists.
454-
*
455-
* Note: If a database is explicitly specified, then this will return whether the table/view
456-
* exists in that particular database instead. In that case, even if there is a temporary
457-
* table with the same name, we will return false if the specified database does not
458-
* contain the table/view.
459-
*/
460-
def tableExists(name: TableIdentifier): Boolean = synchronized {
461-
val db = formatDatabaseName(name.database.getOrElse(currentDb))
462-
val table = formatTableName(name.table)
463-
if (isTemporaryTable(name)) {
464-
true
465-
} else {
466-
externalCatalog.tableExists(db, table)
467-
}
468-
}
469-
470468
/**
471469
* Return whether a table with the specified name is a temporary table.
472470
*

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -425,35 +425,37 @@ class SessionCatalogSuite extends SparkFunSuite {
425425
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
426426
// If database is explicitly specified, do not check temporary tables
427427
val tempTable = Range(1, 10, 1, 10)
428-
catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
429428
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
430429
// If database is not explicitly specified, check the current database
431430
catalog.setCurrentDatabase("db2")
432431
assert(catalog.tableExists(TableIdentifier("tbl1")))
433432
assert(catalog.tableExists(TableIdentifier("tbl2")))
434-
assert(catalog.tableExists(TableIdentifier("tbl3")))
435-
}
436433

437-
test("tableExists on temporary views") {
438-
val catalog = new SessionCatalog(newBasicCatalog())
439-
val tempTable = Range(1, 10, 2, 10)
440-
assert(!catalog.tableExists(TableIdentifier("view1")))
441-
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
442-
catalog.createTempView("view1", tempTable, overrideIfExists = false)
443-
assert(catalog.tableExists(TableIdentifier("view1")))
444-
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
434+
catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
435+
// tableExists should not check temp view.
436+
assert(!catalog.tableExists(TableIdentifier("tbl3")))
445437
}
446438

447439
test("getTempViewOrPermanentTableMetadata on temporary views") {
448440
val catalog = new SessionCatalog(newBasicCatalog())
449441
val tempTable = Range(1, 10, 2, 10)
450442
intercept[NoSuchTableException] {
451-
catalog.getTempViewOrPermanentTableMetadata("view1")
443+
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
444+
}.getMessage
445+
446+
intercept[NoSuchTableException] {
447+
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
452448
}.getMessage
453449

454450
catalog.createTempView("view1", tempTable, overrideIfExists = false)
455-
assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
456-
TableIdentifier("view1"), "the temporary view `view1` should exist")
451+
assert(catalog.getTempViewOrPermanentTableMetadata(
452+
TableIdentifier("view1")).identifier.table == "view1")
453+
assert(catalog.getTempViewOrPermanentTableMetadata(
454+
TableIdentifier("view1")).schema(0).name == "id")
455+
456+
intercept[NoSuchTableException] {
457+
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
458+
}.getMessage
457459
}
458460

459461
test("list tables without pattern") {

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
361361
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
362362
}
363363

364-
val sessionState = df.sparkSession.sessionState
365-
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
366-
val tableIdentWithDB = tableIdent.copy(database = Some(db))
367-
// Pass a table identifier with database part, so that `tableExists` won't check temp views
368-
// unexpectedly.
369-
val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
364+
val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
370365

371366
(tableExists, mode) match {
372367
case (true, SaveMode.Ignore) =>
@@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
392387
bucketSpec = getBucketSpec
393388
)
394389
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
395-
sessionState.executePlan(cmd).toRdd
390+
df.sparkSession.sessionState.executePlan(cmd).toRdd
396391
}
397392
}
398393

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
4747
assert(table.provider.isDefined)
4848

4949
val sessionState = sparkSession.sessionState
50-
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
51-
val tableIdentWithDB = table.identifier.copy(database = Some(db))
52-
// Pass a table identifier with database part, so that `tableExists` won't check temp views
53-
// unexpectedly.
54-
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
50+
if (sessionState.catalog.tableExists(table.identifier)) {
5551
if (ignoreIfExists) {
5652
return Seq.empty[Row]
5753
} else {
58-
throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
54+
throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
5955
}
6056
}
6157

@@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(
146142

147143
var createMetastoreTable = false
148144
var existingSchema = Option.empty[StructType]
149-
// Pass a table identifier with database part, so that `tableExists` won't check temp views
150-
// unexpectedly.
151145
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
152146
// Check if we need to throw an exception or just return.
153147
mode match {
@@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
172166
// TODO: Check that options from the resolved relation match the relation that we are
173167
// inserting into (i.e. using the same compression).
174168

175-
EliminateSubqueryAliases(
176-
sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
169+
// Pass a table identifier with database part, so that `lookupRelation` won't get temp
170+
// views unexpectedly.
171+
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
177172
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
178173
// check if the file formats match
179174
l.relation match {

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -183,32 +183,25 @@ case class DropTableCommand(
183183

184184
override def run(sparkSession: SparkSession): Seq[Row] = {
185185
val catalog = sparkSession.sessionState.catalog
186-
if (!catalog.tableExists(tableName)) {
187-
if (!ifExists) {
188-
val objectName = if (isView) "View" else "Table"
189-
throw new AnalysisException(s"$objectName to drop '$tableName' does not exist")
190-
}
191-
} else {
192-
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
193-
// issue an exception.
194-
catalog.getTableMetadataOption(tableName).map(_.tableType match {
195-
case CatalogTableType.VIEW if !isView =>
196-
throw new AnalysisException(
197-
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
198-
case o if o != CatalogTableType.VIEW && isView =>
199-
throw new AnalysisException(
200-
s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
201-
case _ =>
202-
})
203-
try {
204-
sparkSession.sharedState.cacheManager.uncacheQuery(
205-
sparkSession.table(tableName.quotedString))
206-
} catch {
207-
case NonFatal(e) => log.warn(e.toString, e)
208-
}
209-
catalog.refreshTable(tableName)
210-
catalog.dropTable(tableName, ifExists, purge)
186+
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
187+
// issue an exception.
188+
catalog.getTableMetadataOption(tableName).map(_.tableType match {
189+
case CatalogTableType.VIEW if !isView =>
190+
throw new AnalysisException(
191+
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
192+
case o if o != CatalogTableType.VIEW && isView =>
193+
throw new AnalysisException(
194+
s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
195+
case _ =>
196+
})
197+
try {
198+
sparkSession.sharedState.cacheManager.uncacheQuery(
199+
sparkSession.table(tableName.quotedString))
200+
} catch {
201+
case NonFatal(e) => log.warn(e.toString, e)
211202
}
203+
catalog.refreshTable(tableName)
204+
catalog.dropTable(tableName, ifExists, purge)
212205
Seq.empty[Row]
213206
}
214207
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,7 @@ case class CreateTableLikeCommand(
5959

6060
override def run(sparkSession: SparkSession): Seq[Row] = {
6161
val catalog = sparkSession.sessionState.catalog
62-
if (!catalog.tableExists(sourceTable)) {
63-
throw new AnalysisException(
64-
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
65-
}
66-
67-
val sourceTableDesc = if (sourceTable.database.isDefined) {
68-
catalog.getTableMetadata(sourceTable)
69-
} else {
70-
catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
71-
}
62+
val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
7263

7364
// Storage format
7465
val newStorage =
@@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman
602593

603594
override def run(sparkSession: SparkSession): Seq[Row] = {
604595
val catalog = sparkSession.sessionState.catalog
605-
val table = if (tableName.database.isDefined) {
606-
catalog.getTableMetadata(tableName)
607-
} else {
608-
catalog.getTempViewOrPermanentTableMetadata(tableName.table)
609-
}
596+
val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
610597
table.schema.map { c =>
611598
Row(c.name)
612599
}

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
151151
}
152152

153153
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
154-
val tableMetadata = if (tableIdentifier.database.isDefined) {
155-
sessionCatalog.getTableMetadata(tableIdentifier)
156-
} else {
157-
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
158-
}
154+
val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
159155

160156
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
161157
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
515515
assert(
516516
intercept[AnalysisException] {
517517
sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
518-
}.getMessage.contains("Table default.createdJsonTable already exists."),
518+
}.getMessage.contains("Table createdJsonTable already exists."),
519519
"We should complain that createdJsonTable already exists")
520520
}
521521

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,8 +678,8 @@ class HiveDDLSuite
678678
.createTempView(sourceViewName)
679679
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
680680

681-
val sourceTable =
682-
spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
681+
val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(
682+
TableIdentifier(sourceViewName))
683683
val targetTable = spark.sessionState.catalog.getTableMetadata(
684684
TableIdentifier(targetTabName, Some("default")))
685685

0 commit comments

Comments
 (0)