-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17402][SQL] separate the management of temp views and metastore tables/views in SessionCatalog #14962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.catalog | ||
|
|
||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.analysis.TempViewAlreadyExistsException | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.util.StringUtils | ||
|
|
||
|
|
||
| /** | ||
| * A thread-safe manager for a list of temp views, providing atomic operations to manage temp views. | ||
|
||
| * Note that, the temp view name is always case-sensitive here, callers are responsible to format | ||
| * the view name w.r.t. case-sensitivity config. | ||
| */ | ||
| class TempViewManager { | ||
|
|
||
| /** List of temporary views, mapping from view name to logical plan. */ | ||
| @GuardedBy("this") | ||
| private val tempViews = new mutable.HashMap[String, LogicalPlan] | ||
|
|
||
| def get(name: String): Option[LogicalPlan] = synchronized { | ||
| tempViews.get(name) | ||
| } | ||
|
|
||
| def create( | ||
| name: String, | ||
| viewDefinition: LogicalPlan, | ||
| overrideIfExists: Boolean): Unit = synchronized { | ||
| if (!overrideIfExists && tempViews.contains(name)) { | ||
| throw new TempViewAlreadyExistsException(name) | ||
| } | ||
| tempViews.put(name, viewDefinition) | ||
| } | ||
|
|
||
| def update( | ||
| name: String, | ||
| viewDefinition: LogicalPlan): Boolean = synchronized { | ||
| // Only update it when the view with the given name exits. | ||
| if (tempViews.contains(name)) { | ||
| tempViews.put(name, viewDefinition) | ||
| true | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
|
|
||
| def remove(name: String): Boolean = synchronized { | ||
| tempViews.remove(name).isDefined | ||
| } | ||
|
|
||
| def rename(oldName: String, newName: String): Boolean = synchronized { | ||
| if (tempViews.contains(oldName)) { | ||
| if (tempViews.contains(newName)) { | ||
| throw new AnalysisException( | ||
| s"rename temporary view from '$oldName' to '$newName': destination view already exists") | ||
| } | ||
|
|
||
| val viewDefinition = tempViews(oldName) | ||
| tempViews.remove(oldName) | ||
| tempViews.put(newName, viewDefinition) | ||
| true | ||
| } else { | ||
| false | ||
| } | ||
| } | ||
|
|
||
| def listNames(pattern: String): Seq[String] = synchronized { | ||
| StringUtils.filterPattern(tempViews.keys.toSeq, pattern) | ||
| } | ||
|
|
||
| def clear(): Unit = synchronized { | ||
| tempViews.clear() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -189,31 +189,39 @@ case class DropTableCommand( | |
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| if (!catalog.tableExists(tableName)) { | ||
| if (!ifExists) { | ||
| val objectName = if (isView) "View" else "Table" | ||
| throw new AnalysisException(s"$objectName to drop '$tableName' does not exist") | ||
| } | ||
| } else { | ||
| // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view | ||
| // issue an exception. | ||
| catalog.getTableMetadataOption(tableName).map(_.tableType match { | ||
| case CatalogTableType.VIEW if !isView => | ||
| throw new AnalysisException( | ||
| "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") | ||
| case o if o != CatalogTableType.VIEW && isView => | ||
| throw new AnalysisException( | ||
| s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") | ||
| case _ => | ||
| }) | ||
| try { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession.table(tableName.quotedString)) | ||
| } catch { | ||
| case NonFatal(e) => log.warn(e.toString, e) | ||
|
|
||
| // If the table name contains database part, we should drop a metastore table directly, | ||
| // otherwise, try to drop a temp view first, if that not exist, drop metastore table. | ||
| val dropMetastoreTable = | ||
| tableName.database.isDefined || !catalog.dropTempView(tableName.table) | ||
|
||
|
|
||
| if (dropMetastoreTable) { | ||
| if (!catalog.tableExists(tableName)) { | ||
| if (!ifExists) { | ||
| val objectName = if (isView) "View" else "Table" | ||
| throw new AnalysisException(s"$objectName to drop '$tableName' does not exist") | ||
| } | ||
| } else { | ||
| // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view | ||
| // issue an exception. | ||
| catalog.getTableMetadataOption(tableName).map(_.tableType match { | ||
| case CatalogTableType.VIEW if !isView => | ||
| throw new AnalysisException( | ||
| "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") | ||
| case o if o != CatalogTableType.VIEW && isView => | ||
| throw new AnalysisException( | ||
| s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") | ||
| case _ => | ||
| }) | ||
| try { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession.table(tableName.quotedString)) | ||
| } catch { | ||
| case NonFatal(e) => log.warn(e.toString, e) | ||
| } | ||
| catalog.refreshTable(tableName) | ||
| catalog.dropTable(tableName, ifExists, purge) | ||
| } | ||
| catalog.refreshTable(tableName) | ||
| catalog.dropTable(tableName, ifExists, purge) | ||
| } | ||
| Seq.empty[Row] | ||
| } | ||
|
|
@@ -470,10 +478,6 @@ case class AlterTableRecoverPartitionsCommand( | |
| if (!catalog.tableExists(tableName)) { | ||
| throw new AnalysisException(s"Table $tableName in $cmd does not exist.") | ||
| } | ||
| if (catalog.isTemporaryTable(tableName)) { | ||
| throw new AnalysisException( | ||
| s"Operation not allowed: $cmd on temporary tables: $tableName") | ||
| } | ||
| val table = catalog.getTableMetadata(tableName) | ||
| if (DDLUtils.isDatasourceTable(table)) { | ||
| throw new AnalysisException( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also for view, right? Should we just keep the old name?