diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index d536cc5097b2..0896e68eca7d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -98,6 +98,13 @@ package object config {
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
+ // Note: This is a SQL config but needs to be in core because it's cross-session and can not put
+ // in SQLConf.
+ private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase")
+ .internal()
+ .stringConf
+ .createWithDefault("global_temp")
+
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 71bdd19c16db..835cb6981f5b 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -220,6 +220,41 @@ The `sql` function enables applications to run SQL queries programmatically and
+## Global Temporary View
+
+Temporay views in Spark SQL are session-scoped and will disappear if the session that creates it
+terminates. If you want to have a temporary view that is shared among all sessions and keep alive
+until the Spark application terminiates, you can create a global temporary view. Global temporary
+view is tied to a system preserved database `global_temp`, and we must use the qualified name to
+refer it, e.g. `SELECT * FROM global_temp.view1`.
+
+
createTableOptions |
@@ -1101,11 +1136,11 @@ USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
- user 'username',
+ user 'username',
password 'password'
)
-INSERT INTO TABLE jdbcTable
+INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable
{% endhighlight %}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
index cff9032f52b5..c5770d147a6b 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
@@ -54,6 +54,7 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// $example off:programmatic_schema$
+import org.apache.spark.sql.AnalysisException;
// $example on:untyped_ops$
// col("...") is preferable to df.col("...")
@@ -84,7 +85,7 @@ public void setAge(int age) {
}
// $example off:create_ds$
- public static void main(String[] args) {
+ public static void main(String[] args) throws AnalysisException {
// $example on:init_session$
SparkSession spark = SparkSession
.builder()
@@ -101,7 +102,7 @@ public static void main(String[] args) {
spark.stop();
}
- private static void runBasicDataFrameExample(SparkSession spark) {
+ private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
// $example on:create_df$
Dataset df = spark.read().json("examples/src/main/resources/people.json");
@@ -176,6 +177,31 @@ private static void runBasicDataFrameExample(SparkSession spark) {
// | 19| Justin|
// +----+-------+
// $example off:run_sql$
+
+ // $example on:global_temp_view$
+ // Register the DataFrame as a global temporary view
+ df.createGlobalTempView("people");
+
+ // Global temporary view is tied to a system preserved database `global_temp`
+ spark.sql("SELECT * FROM global_temp.people").show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+
+ // Global temporary view is cross-session
+ spark.newSession().sql("SELECT * FROM global_temp.people").show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:global_temp_view$
}
private static void runDatasetCreationExample(SparkSession spark) {
diff --git a/examples/src/main/python/sql/basic.py b/examples/src/main/python/sql/basic.py
index fdc017aed97c..ebcf66995b47 100644
--- a/examples/src/main/python/sql/basic.py
+++ b/examples/src/main/python/sql/basic.py
@@ -114,6 +114,31 @@ def basic_df_example(spark):
# +----+-------+
# $example off:run_sql$
+ # $example on:global_temp_view$
+ # Register the DataFrame as a global temporary view
+ df.createGlobalTempView("people")
+
+ # Global temporary view is tied to a system preserved database `global_temp`
+ spark.sql("SELECT * FROM global_temp.people").show()
+ # +----+-------+
+ # | age| name|
+ # +----+-------+
+ # |null|Michael|
+ # | 30| Andy|
+ # | 19| Justin|
+ # +----+-------+
+
+ # Global temporary view is cross-session
+ spark.newSession().sql("SELECT * FROM global_temp.people").show()
+ # +----+-------+
+ # | age| name|
+ # +----+-------+
+ # |null|Michael|
+ # | 30| Andy|
+ # | 19| Justin|
+ # +----+-------+
+ # $example off:global_temp_view$
+
def schema_inference_example(spark):
# $example on:schema_inferring$
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
index 129b81d5fbbf..f27c403c5b38 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
@@ -135,6 +135,31 @@ object SparkSQLExample {
// | 19| Justin|
// +----+-------+
// $example off:run_sql$
+
+ // $example on:global_temp_view$
+ // Register the DataFrame as a global temporary view
+ df.createGlobalTempView("people")
+
+ // Global temporary view is tied to a system preserved database `global_temp`
+ spark.sql("SELECT * FROM global_temp.people").show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+
+ // Global temporary view is cross-session
+ spark.newSession().sql("SELECT * FROM global_temp.people").show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:global_temp_view$
}
private def runDatasetCreationExample(spark: SparkSession): Unit = {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 163e3f2fdea4..e3d9a17469a3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -55,7 +55,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.getFunction"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+ // [SPARK-17338][SQL] add global temp view
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView")
)
}
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 3c5030722f30..df3bf4254d4d 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -167,7 +167,7 @@ def createExternalTable(self, tableName, path=None, source=None, schema=None, **
@since(2.0)
def dropTempView(self, viewName):
- """Drops the temporary view with the given view name in the catalog.
+ """Drops the local temporary view with the given view name in the catalog.
If the view has been cached before, then it will also be uncached.
>>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
@@ -181,6 +181,22 @@ def dropTempView(self, viewName):
"""
self._jcatalog.dropTempView(viewName)
+ @since(2.1)
+ def dropGlobalTempView(self, viewName):
+ """Drops the global temporary view with the given view name in the catalog.
+ If the view has been cached before, then it will also be uncached.
+
+ >>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
+ >>> spark.table("global_temp.my_table").collect()
+ [Row(_1=1, _2=1)]
+ >>> spark.catalog.dropGlobalTempView("my_table")
+ >>> spark.table("global_temp.my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
+ Traceback (most recent call last):
+ ...
+ AnalysisException: ...
+ """
+ self._jcatalog.dropGlobalTempView(viewName)
+
@ignore_unicode_prefix
@since(2.0)
def registerFunction(self, name, f, returnType=StringType()):
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 7482be8bda5c..8264dcf8a97d 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -386,7 +386,7 @@ def tables(self, dbName=None):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
- Row(tableName=u'table1', isTemporary=True)
+ Row(database=u'', tableName=u'table1', isTemporary=True)
"""
if dbName is None:
return DataFrame(self._ssql_ctx.tables(), self)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0ac481a8a8b5..14e80ea4615e 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -131,7 +131,7 @@ def registerTempTable(self, name):
@since(2.0)
def createTempView(self, name):
- """Creates a temporary view with this DataFrame.
+ """Creates a local temporary view with this DataFrame.
The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
@@ -153,7 +153,7 @@ def createTempView(self, name):
@since(2.0)
def createOrReplaceTempView(self, name):
- """Creates or replaces a temporary view with this DataFrame.
+ """Creates or replaces a local temporary view with this DataFrame.
The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
@@ -169,6 +169,27 @@ def createOrReplaceTempView(self, name):
"""
self._jdf.createOrReplaceTempView(name)
+ @since(2.1)
+ def createGlobalTempView(self, name):
+ """Creates a global temporary view with this DataFrame.
+
+ The lifetime of this temporary view is tied to this Spark application.
+ throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the
+ catalog.
+
+ >>> df.createGlobalTempView("people")
+ >>> df2 = spark.sql("select * from global_temp.people")
+ >>> sorted(df.collect()) == sorted(df2.collect())
+ True
+ >>> df.createGlobalTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL
+ Traceback (most recent call last):
+ ...
+ AnalysisException: u"Temporary table 'people' already exists;"
+ >>> spark.catalog.dropGlobalTempView("people")
+
+ """
+ self._jdf.createGlobalTempView(name)
+
@property
@since(1.4)
def write(self):
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6a94def65f36..7eb5091e1a98 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -111,11 +111,12 @@ statement
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
- | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
+ | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
+ VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
- | CREATE (OR REPLACE)? TEMPORARY VIEW
+ | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
@@ -668,7 +669,7 @@ nonReserved
| MAP | ARRAY | STRUCT
| LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
| DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
- | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
+ | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
| GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
@@ -856,6 +857,7 @@ CACHE: 'CACHE';
UNCACHE: 'UNCACHE';
LAZY: 'LAZY';
FORMATTED: 'FORMATTED';
+GLOBAL: 'GLOBAL';
TEMPORARY: 'TEMPORARY' | 'TEMP';
OPTIONS: 'OPTIONS';
UNSET: 'UNSET';
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae8869ff25f2..536d38777f89 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -458,12 +458,12 @@ class Analyzer(
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
val table = u.tableIdentifier
- if (table.database.isDefined && conf.runSQLonFile &&
+ if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
- // If the table does not exist, and the database part is specified, and we support
- // running SQL directly on files, then let's just return the original UnresolvedRelation.
- // It is possible we are matching a query like "select * from parquet.`/path/to/query`".
- // The plan will get resolved later.
+ // If the database part is specified, and we support running SQL directly on files, and
+ // it's not a temporary view, and the table does not exist, then let's just return the
+ // original UnresolvedRelation. It is possible we are matching a query like "select *
+ // from parquet.`/path/to/query`". The plan will get resolved later.
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
// an exception from tableExists if the database does not exist.
u
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
new file mode 100644
index 000000000000..6095ac0bc9c5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.StringUtils
+
+
+/**
+ * A thread-safe manager for global temporary views, providing atomic operations to manage them,
+ * e.g. create, update, remove, etc.
+ *
+ * Note that, the view name is always case-sensitive here, callers are responsible to format the
+ * view name w.r.t. case-sensitive config.
+ *
+ * @param database The system preserved virtual database that keeps all the global temporary views.
+ */
+class GlobalTempViewManager(val database: String) {
+
+ /** List of view definitions, mapping from view name to logical plan. */
+ @GuardedBy("this")
+ private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
+
+ /**
+ * Returns the global view definition which matches the given name, or None if not found.
+ */
+ def get(name: String): Option[LogicalPlan] = synchronized {
+ viewDefinitions.get(name)
+ }
+
+ /**
+ * Creates a global temp view, or issue an exception if the view already exists and
+ * `overrideIfExists` is false.
+ */
+ def create(
+ name: String,
+ viewDefinition: LogicalPlan,
+ overrideIfExists: Boolean): Unit = synchronized {
+ if (!overrideIfExists && viewDefinitions.contains(name)) {
+ throw new TempTableAlreadyExistsException(name)
+ }
+ viewDefinitions.put(name, viewDefinition)
+ }
+
+ /**
+ * Updates the global temp view if it exists, returns true if updated, false otherwise.
+ */
+ def update(
+ name: String,
+ viewDefinition: LogicalPlan): Boolean = synchronized {
+ if (viewDefinitions.contains(name)) {
+ viewDefinitions.put(name, viewDefinition)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Removes the global temp view if it exists, returns true if removed, false otherwise.
+ */
+ def remove(name: String): Boolean = synchronized {
+ viewDefinitions.remove(name).isDefined
+ }
+
+ /**
+ * Renames the global temp view if the source view exists and the destination view not exists, or
+ * issue an exception if the source view exists but the destination view already exists. Returns
+ * true if renamed, false otherwise.
+ */
+ def rename(oldName: String, newName: String): Boolean = synchronized {
+ if (viewDefinitions.contains(oldName)) {
+ if (viewDefinitions.contains(newName)) {
+ throw new AnalysisException(
+ s"rename temporary view from '$oldName' to '$newName': destination view already exists")
+ }
+
+ val viewDefinition = viewDefinitions(oldName)
+ viewDefinitions.remove(oldName)
+ viewDefinitions.put(newName, viewDefinition)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Lists the names of all global temporary views.
+ */
+ def listViewNames(pattern: String): Seq[String] = synchronized {
+ StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern)
+ }
+
+ /**
+ * Clears all the global temporary views.
+ */
+ def clear(): Unit = synchronized {
+ viewDefinitions.clear()
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8c01c7a3f2bd..e44e30ec648f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -47,6 +48,7 @@ object SessionCatalog {
*/
class SessionCatalog(
externalCatalog: ExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf,
@@ -61,6 +63,7 @@ class SessionCatalog(
conf: CatalystConf) {
this(
externalCatalog,
+ new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString),
DummyFunctionResourceLoader,
functionRegistry,
conf,
@@ -142,8 +145,13 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
- val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
val dbName = formatDatabaseName(dbDefinition.name)
+ if (dbName == globalTempViewManager.database) {
+ throw new AnalysisException(
+ s"${globalTempViewManager.database} is a system preserved database, " +
+ "you cannot create a database with this name.")
+ }
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
@@ -154,7 +162,7 @@ class SessionCatalog(
if (dbName == DEFAULT_DATABASE) {
throw new AnalysisException(s"Can not drop default database")
} else if (dbName == getCurrentDatabase) {
- throw new AnalysisException(s"Can not drop current database `${dbName}`")
+ throw new AnalysisException(s"Can not drop current database `$dbName`")
}
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
}
@@ -188,6 +196,13 @@ class SessionCatalog(
def setCurrentDatabase(db: String): Unit = {
val dbName = formatDatabaseName(db)
+ if (dbName == globalTempViewManager.database) {
+ throw new AnalysisException(
+ s"${globalTempViewManager.database} is a system preserved database, " +
+ "you cannot use it as current database. To access global temporary views, you should " +
+ "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " +
+ s"${globalTempViewManager.database}.viewName.")
+ }
requireDbExists(dbName)
synchronized { currentDb = dbName }
}
@@ -329,7 +344,7 @@ class SessionCatalog(
// ----------------------------------------------
/**
- * Create a temporary table.
+ * Create a local temporary view.
*/
def createTempView(
name: String,
@@ -343,19 +358,65 @@ class SessionCatalog(
}
/**
- * Return a temporary view exactly as it was stored.
+ * Create a global temporary view.
+ */
+ def createGlobalTempView(
+ name: String,
+ viewDefinition: LogicalPlan,
+ overrideIfExists: Boolean): Unit = {
+ globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
+ }
+
+ /**
+ * Alter the definition of a local/global temp view matching the given name, returns true if a
+ * temp view is matched and altered, false otherwise.
+ */
+ def alterTempViewDefinition(
+ name: TableIdentifier,
+ viewDefinition: LogicalPlan): Boolean = synchronized {
+ val viewName = formatTableName(name.table)
+ if (name.database.isEmpty) {
+ if (tempTables.contains(viewName)) {
+ createTempView(viewName, viewDefinition, overrideIfExists = true)
+ true
+ } else {
+ false
+ }
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.update(viewName, viewDefinition)
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Return a local temporary view exactly as it was stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
tempTables.get(formatTableName(name))
}
/**
- * Drop a temporary view.
+ * Return a global temporary view exactly as it was stored.
+ */
+ def getGlobalTempView(name: String): Option[LogicalPlan] = {
+ globalTempViewManager.get(formatTableName(name))
+ }
+
+ /**
+ * Drop a local temporary view.
*/
def dropTempView(name: String): Unit = synchronized {
tempTables.remove(formatTableName(name))
}
+ /**
+ * Drop a global temporary view.
+ */
+ def dropGlobalTempView(name: String): Boolean = {
+ globalTempViewManager.remove(formatTableName(name))
+ }
+
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
@@ -371,9 +432,7 @@ class SessionCatalog(
*/
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
- if (name.database.isDefined) {
- getTableMetadata(name)
- } else {
+ if (name.database.isEmpty) {
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
@@ -381,6 +440,16 @@ class SessionCatalog(
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(name))
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.get(table).map { plan =>
+ CatalogTable(
+ identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = plan.output.toStructType)
+ }.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
+ } else {
+ getTableMetadata(name)
}
}
@@ -393,21 +462,25 @@ class SessionCatalog(
*/
def renameTable(oldName: TableIdentifier, newName: String): Unit = synchronized {
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
- requireDbExists(db)
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName)
- if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
- requireTableExists(TableIdentifier(oldTableName, Some(db)))
- requireTableNotExists(TableIdentifier(newTableName, Some(db)))
- externalCatalog.renameTable(db, oldTableName, newTableName)
+ if (db == globalTempViewManager.database) {
+ globalTempViewManager.rename(oldTableName, newTableName)
} else {
- if (tempTables.contains(newTableName)) {
- throw new AnalysisException(
- s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': destination table already exists")
+ requireDbExists(db)
+ if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
+ requireTableExists(TableIdentifier(oldTableName, Some(db)))
+ requireTableNotExists(TableIdentifier(newTableName, Some(db)))
+ externalCatalog.renameTable(db, oldTableName, newTableName)
+ } else {
+ if (tempTables.contains(newTableName)) {
+ throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
+ "destination table already exists")
+ }
+ val table = tempTables(oldTableName)
+ tempTables.remove(oldTableName)
+ tempTables.put(newTableName, table)
}
- val table = tempTables(oldTableName)
- tempTables.remove(oldTableName)
- tempTables.put(newTableName, table)
}
}
@@ -424,17 +497,24 @@ class SessionCatalog(
purge: Boolean): Unit = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.contains(table)) {
- requireDbExists(db)
- // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
- // Instead, log it as an error message.
- if (tableExists(TableIdentifier(table, Option(db)))) {
- externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
- } else if (!ignoreIfNotExists) {
- throw new NoSuchTableException(db = db, table = table)
+ if (db == globalTempViewManager.database) {
+ val viewExists = globalTempViewManager.remove(table)
+ if (!viewExists && !ignoreIfNotExists) {
+ throw new NoSuchTableException(globalTempViewManager.database, table)
}
} else {
- tempTables.remove(table)
+ if (name.database.isDefined || !tempTables.contains(table)) {
+ requireDbExists(db)
+ // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
+ // Instead, log it as an error message.
+ if (tableExists(TableIdentifier(table, Option(db)))) {
+ externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
+ } else if (!ignoreIfNotExists) {
+ throw new NoSuchTableException(db = db, table = table)
+ }
+ } else {
+ tempTables.remove(table)
+ }
}
}
@@ -445,6 +525,9 @@ class SessionCatalog(
* If no database is specified, this will first attempt to return a temporary table/view with
* the same name, then, if that does not exist, return the table/view from the current database.
*
+ * Note that, the global temp view database is also valid here, this will return the global temp
+ * view matching the given name.
+ *
* If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
* track the name of the view.
*/
@@ -453,7 +536,11 @@ class SessionCatalog(
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
val relationAlias = alias.getOrElse(table)
- if (name.database.isDefined || !tempTables.contains(table)) {
+ if (db == globalTempViewManager.database) {
+ globalTempViewManager.get(table).map { viewDef =>
+ SubqueryAlias(relationAlias, viewDef, Some(name))
+ }.getOrElse(throw new NoSuchTableException(db, table))
+ } else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val view = Option(metadata.tableType).collect {
case CatalogTableType.VIEW => name
@@ -472,27 +559,48 @@ class SessionCatalog(
* explicitly specified.
*/
def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
- name.database.isEmpty && tempTables.contains(formatTableName(name.table))
+ val table = formatTableName(name.table)
+ if (name.database.isEmpty) {
+ tempTables.contains(table)
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.get(table).isDefined
+ } else {
+ false
+ }
}
/**
- * List all tables in the specified database, including temporary tables.
+ * List all tables in the specified database, including local temporary tables.
+ *
+ * Note that, if the specified database is global temporary view database, we will list global
+ * temporary views.
*/
def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
/**
- * List all matching tables in the specified database, including temporary tables.
+ * List all matching tables in the specified database, including local temporary tables.
+ *
+ * Note that, if the specified database is global temporary view database, we will list global
+ * temporary views.
*/
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
val dbName = formatDatabaseName(db)
- requireDbExists(dbName)
- val dbTables =
- externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) }
- synchronized {
- val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
- .map { t => TableIdentifier(t) }
- dbTables ++ _tempTables
+ val dbTables = if (dbName == globalTempViewManager.database) {
+ globalTempViewManager.listViewNames(pattern).map { name =>
+ TableIdentifier(name, Some(globalTempViewManager.database))
+ }
+ } else {
+ requireDbExists(dbName)
+ externalCatalog.listTables(dbName, pattern).map { name =>
+ TableIdentifier(name, Some(dbName))
+ }
+ }
+ val localTempViews = synchronized {
+ StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
+ TableIdentifier(name)
+ }
}
+ dbTables ++ localTempViews
}
/**
@@ -504,6 +612,8 @@ class SessionCatalog(
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(formatTableName(name.table)).foreach(_.refresh())
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
}
}
@@ -919,6 +1029,7 @@ class SessionCatalog(
}
}
tempTables.clear()
+ globalTempViewManager.clear()
functionRegistry.clear()
// restore built-in functions
FunctionRegistry.builtin.listFunction().foreach { f =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9cfbdffd0258..4b52508740bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
+import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2433,9 +2433,13 @@ class Dataset[T] private[sql](
}
/**
- * Creates a temporary view using the given name. The lifetime of this
+ * Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
+ * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+ * created it, i.e. it will be automatically dropped when the session terminates. It's not
+ * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ *
* @throws AnalysisException if the view name already exists
*
* @group basic
@@ -2443,21 +2447,51 @@ class Dataset[T] private[sql](
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
- createViewCommand(viewName, replace = false)
+ createTempViewCommand(viewName, replace = false, global = false)
}
+
+
/**
- * Creates a temporary view using the given name. The lifetime of this
+ * Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
- createViewCommand(viewName, replace = true)
+ createTempViewCommand(viewName, replace = true, global = false)
}
- private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
+ /**
+ * Creates a global temporary view using the given name. The lifetime of this
+ * temporary view is tied to this Spark application.
+ *
+ * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
+ * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+ * preserved database `_global_temp`, and we must use the qualified name to refer a global temp
+ * view, e.g. `SELECT * FROM _global_temp.view1`.
+ *
+ * @throws TempTableAlreadyExistsException if the view name already exists
+ *
+ * @group basic
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]
+ def createGlobalTempView(viewName: String): Unit = withPlan {
+ createTempViewCommand(viewName, replace = false, global = true)
+ }
+
+ private def createTempViewCommand(
+ viewName: String,
+ replace: Boolean,
+ global: Boolean): CreateViewCommand = {
+ val viewType = if (global) {
+ GlobalTempView
+ } else {
+ LocalTempView
+ }
+
CreateViewCommand(
name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
userSpecifiedColumns = Nil,
@@ -2467,7 +2501,7 @@ class Dataset[T] private[sql](
child = logicalPlan,
allowExisting = false,
replace = replace,
- isTemporary = true)
+ viewType = viewType)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 7f2762c7dac9..717fb291901b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -262,14 +262,32 @@ abstract class Catalog {
options: Map[String, String]): DataFrame
/**
- * Drops the temporary view with the given view name in the catalog.
+ * Drops the local temporary view with the given view name in the catalog.
* If the view has been cached before, then it will also be uncached.
*
+ * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+ * created it, i.e. it will be automatically dropped when the session terminates. It's not
+ * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ *
* @param viewName the name of the view to be dropped.
* @since 2.0.0
*/
def dropTempView(viewName: String): Unit
+ /**
+ * Drops the global temporary view with the given view name in the catalog.
+ * If the view has been cached before, then it will also be uncached.
+ *
+ * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
+ * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+ * preserved database `_global_temp`, and we must use the qualified name to refer a global temp
+ * view, e.g. `SELECT * FROM _global_temp.view1`.
+ *
+ * @param viewName the name of the view to be dropped.
+ * @since 2.1.0
+ */
+ def dropGlobalTempView(viewName: String): Boolean
+
/**
* Returns true if the table is currently cached in-memory.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 383b3a233fc2..cb45a6d78b9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -21,15 +21,14 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
import org.apache.spark.util.Utils
@@ -125,6 +124,9 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
.mkString("\t")
}
}
+ // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
+ case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
+ command.executeCollect().map(_.getString(1))
case command: ExecutedCommandExec =>
command.executeCollect().map(_.getString(0))
case other =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 085bb9fc3c6c..137d3c3eea61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
+import org.apache.spark.sql.execution.datasources.{CreateTable, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.StructType
/**
* Concrete parser for Spark SQL statements.
@@ -385,7 +385,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
"CREATE TEMPORARY VIEW ... USING ... instead")
- CreateTempViewUsing(table, schema, replace = true, provider, options)
+ CreateTempViewUsing(table, schema, replace = true, global = false, provider, options)
} else {
CreateTable(tableDesc, mode, None)
}
@@ -401,6 +401,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType),
replace = ctx.REPLACE != null,
+ global = ctx.GLOBAL != null,
provider = ctx.tableProvider.qualifiedName.getText,
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
@@ -1269,7 +1270,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*
* For example:
* {{{
- * CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
+ * CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
* [(column_name [COMMENT column_comment], ...) ]
* [COMMENT view_comment]
* [TBLPROPERTIES (property_name = property_value, ...)]
@@ -1286,6 +1287,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}
+ val viewType = if (ctx.TEMPORARY == null) {
+ PersistedView
+ } else if (ctx.GLOBAL != null) {
+ GlobalTempView
+ } else {
+ LocalTempView
+ }
+
CreateViewCommand(
name = visitTableIdentifier(ctx.tableIdentifier),
userSpecifiedColumns = userSpecifiedColumns,
@@ -1295,7 +1304,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
child = plan(ctx.query),
allowExisting = ctx.EXISTS != null,
replace = ctx.REPLACE != null,
- isTemporary = ctx.TEMPORARY != null)
+ viewType = viewType)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 01ac89868d10..45fa293e5895 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,17 +183,20 @@ case class DropTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
- // issue an exception.
- catalog.getTableMetadataOption(tableName).map(_.tableType match {
- case CatalogTableType.VIEW if !isView =>
- throw new AnalysisException(
- "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
- case o if o != CatalogTableType.VIEW && isView =>
- throw new AnalysisException(
- s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
- case _ =>
- })
+
+ if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) {
+ // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+ // issue an exception.
+ catalog.getTableMetadata(tableName).tableType match {
+ case CatalogTableType.VIEW if !isView =>
+ throw new AnalysisException(
+ "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+ case o if o != CatalogTableType.VIEW && isView =>
+ throw new AnalysisException(
+ s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+ case _ =>
+ }
+ }
try {
sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName.quotedString))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 08de6cd4242c..424ef58d76c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -579,9 +579,10 @@ case class ShowTablesCommand(
databaseName: Option[String],
tableIdentifierPattern: Option[String]) extends RunnableCommand {
- // The result of SHOW TABLES has two columns, tableName and isTemporary.
+ // The result of SHOW TABLES has three columns: database, tableName and isTemporary.
override val output: Seq[Attribute] = {
- AttributeReference("tableName", StringType, nullable = false)() ::
+ AttributeReference("database", StringType, nullable = false)() ::
+ AttributeReference("tableName", StringType, nullable = false)() ::
AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
}
@@ -592,9 +593,9 @@ case class ShowTablesCommand(
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
val tables =
tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
- tables.map { t =>
- val isTemp = t.database.isEmpty
- Row(t.table, isTemp)
+ tables.map { tableIdent =>
+ val isTemp = catalog.isTemporaryTable(tableIdent)
+ Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 15340ee921f6..bbcd9c4ef564 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,13 +19,46 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+
+/**
+ * ViewType is used to specify the expected view type when we want to create or replace a view in
+ * [[CreateViewCommand]].
+ */
+sealed trait ViewType
+
+/**
+ * LocalTempView means session-scoped local temporary views. Its lifetime is the lifetime of the
+ * session that created it, i.e. it will be automatically dropped when the session terminates. It's
+ * not tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ */
+object LocalTempView extends ViewType
+
+/**
+ * GlobalTempView means cross-session global temporary views. Its lifetime is the lifetime of the
+ * Spark application, i.e. it will be automatically dropped when the application terminates. It's
+ * tied to a system preserved database `_global_temp`, and we must use the qualified name to refer a
+ * global temp view, e.g. SELECT * FROM _global_temp.view1.
+ */
+object GlobalTempView extends ViewType
+
+/**
+ * PersistedView means cross-session persisted views. Persisted views stay until they are
+ * explicitly dropped by user command. It's always tied to a database, default to the current
+ * database if not specified.
+ *
+ * Note that, Existing persisted view with the same name are not visible to the current session
+ * while the local temporary view exists, unless the view name is qualified by database.
+ */
+object PersistedView extends ViewType
/**
@@ -46,10 +79,7 @@ import org.apache.spark.sql.types.StructType
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
- * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
- * at the end of current Spark session. Existing permanent relations with the same
- * name are not visible to the current session while the temporary view exists,
- * unless they are specified with full qualified table name with database prefix.
+ * @param viewType the expected view type to be created with this command.
*/
case class CreateViewCommand(
name: TableIdentifier,
@@ -60,20 +90,21 @@ case class CreateViewCommand(
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
- isTemporary: Boolean)
+ viewType: ViewType)
extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
- if (!isTemporary) {
- require(originalText.isDefined,
- "The table to created with CREATE VIEW must have 'originalText'.")
+ if (viewType == PersistedView) {
+ require(originalText.isDefined, "'originalText' must be provided to create permanent view")
}
if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
}
+ private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView
+
// Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
if (allowExisting && isTemporary) {
throw new AnalysisException(
@@ -99,72 +130,53 @@ case class CreateViewCommand(
s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
}
- val sessionState = sparkSession.sessionState
-
- if (isTemporary) {
- createTemporaryView(sparkSession, analyzedPlan)
- } else {
- // Adds default database for permanent table if it doesn't exist, so that tableExists()
- // only check permanent tables.
- val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val qualifiedName = name.copy(database = Option(database))
-
- if (sessionState.catalog.tableExists(qualifiedName)) {
- val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
- if (allowExisting) {
- // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
- // already exists.
- } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
- throw new AnalysisException(s"$qualifiedName is not a view")
- } else if (replace) {
- // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
- } else {
- // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
- // exists.
- throw new AnalysisException(
- s"View $qualifiedName already exists. If you want to update the view definition, " +
- "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
- }
- } else {
- // Create the view if it doesn't exist.
- sessionState.catalog.createTable(
- prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
- }
- }
- Seq.empty[Row]
- }
-
- private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
- val catalog = sparkSession.sessionState.catalog
- // Projects column names to alias names
- val logicalPlan = if (userSpecifiedColumns.isEmpty) {
+ val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, _)) => Alias(attr, colName)()
+ case (attr, (colName, None)) => Alias(attr, colName)()
+ case (attr, (colName, Some(colComment))) =>
+ val meta = new MetadataBuilder().putString("comment", colComment).build()
+ Alias(attr, colName)(explicitMetadata = Some(meta))
}
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
- catalog.createTempView(name.table, logicalPlan, replace)
+ val catalog = sparkSession.sessionState.catalog
+ if (viewType == LocalTempView) {
+ catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
+ } else if (viewType == GlobalTempView) {
+ catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
+ } else if (catalog.tableExists(name)) {
+ val tableMetadata = catalog.getTableMetadata(name)
+ if (allowExisting) {
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+ } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+ throw new AnalysisException(s"$name is not a view")
+ } else if (replace) {
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+ } else {
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
+ throw new AnalysisException(
+ s"View $name already exists. If you want to update the view definition, " +
+ "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+ }
+ } else {
+ // Create the view if it doesn't exist.
+ catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
+ }
+ Seq.empty[Row]
}
/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
- private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
- val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, _)) => Alias(attr, colName)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
- }
-
+ private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
// Validate the view SQL - make sure we can parse it and analyze it.
@@ -176,19 +188,11 @@ case class CreateViewCommand(
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
- val viewSchema = if (userSpecifiedColumns.isEmpty) {
- aliasedPlan.schema
- } else {
- StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
- case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field)
- })
- }
-
CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = viewSchema,
+ schema = aliasedPlan.schema,
properties = properties,
viewOriginalText = originalText,
viewText = Some(viewSQL),
@@ -222,8 +226,8 @@ case class AlterViewAsCommand(
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
- if (session.sessionState.catalog.isTemporaryTable(name)) {
- session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true)
+ if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) {
+ // a local/global temp view has been altered, we are done.
} else {
alterPermanentView(session, analyzedPlan)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fa95af2648cf..59fb48ffea59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -40,16 +40,20 @@ case class CreateTable(
override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
}
+/**
+ * Create or replace a local/global temporary view with given data source.
+ */
case class CreateTempViewUsing(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
replace: Boolean,
+ global: Boolean,
provider: String,
options: Map[String, String]) extends RunnableCommand {
if (tableIdent.database.isDefined) {
throw new AnalysisException(
- s"Temporary table '$tableIdent' should not have specified a database")
+ s"Temporary view '$tableIdent' should not have specified a database")
}
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -58,10 +62,16 @@ case class CreateTempViewUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
- sparkSession.sessionState.catalog.createTempView(
- tableIdent.table,
- Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
- replace)
+
+ val catalog = sparkSession.sessionState.catalog
+ val viewDefinition = Dataset.ofRows(
+ sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
+
+ if (global) {
+ catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
+ } else {
+ catalog.createTempView(tableIdent.table, viewDefinition, replace)
+ }
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e412e1b4b302..c05bda3f1b52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -94,20 +94,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("database does not exist")
override def listTables(dbName: String): Dataset[Table] = {
- requireDatabaseExists(dbName)
val tables = sessionCatalog.listTables(dbName).map(makeTable)
CatalogImpl.makeDataset(tables, sparkSession)
}
private def makeTable(tableIdent: TableIdentifier): Table = {
val metadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
- val database = metadata.identifier.database
+ val isTemp = sessionCatalog.isTemporaryTable(tableIdent)
new Table(
name = tableIdent.table,
- database = database.orNull,
+ database = metadata.identifier.database.orNull,
description = metadata.comment.orNull,
- tableType = if (database.isEmpty) "TEMPORARY" else metadata.tableType.name,
- isTemporary = database.isEmpty)
+ tableType = if (isTemp) "TEMPORARY" else metadata.tableType.name,
+ isTemporary = isTemp)
}
/**
@@ -365,7 +364,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
/**
- * Drops the temporary view with the given view name in the catalog.
+ * Drops the local temporary view with the given view name in the catalog.
* If the view has been cached/persisted before, it's also unpersisted.
*
* @param viewName the name of the view to be dropped.
@@ -379,6 +378,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
}
+ /**
+ * Drops the global temporary view with the given view name in the catalog.
+ * If the view has been cached/persisted before, it's also unpersisted.
+ *
+ * @param viewName the name of the view to be dropped.
+ * @group ddl_ops
+ * @since 2.1.0
+ */
+ override def dropGlobalTempView(viewName: String): Boolean = {
+ sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
+ sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, viewDef))
+ sessionCatalog.dropGlobalTempView(viewName)
+ }
+ }
+
/**
* Returns true if the table is currently cached in-memory.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 9f7d0019c6b9..8759dfe39ce1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -95,6 +95,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
*/
lazy val catalog = new SessionCatalog(
sparkSession.sharedState.externalCatalog,
+ sparkSession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 6387f0150631..c555a43cd258 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -22,11 +22,11 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -37,39 +37,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
- /**
- * Class for caching query results reused in future executions.
- */
- val cacheManager: CacheManager = new CacheManager
-
- /**
- * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
- */
- val listener: SQLListener = createListenerAndUI(sparkContext)
-
+ // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
+ // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
{
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
sparkContext.hadoopConfiguration.addResource(configFile)
}
- }
-
- /**
- * A catalog that interacts with external systems.
- */
- lazy val externalCatalog: ExternalCatalog =
- SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
- SharedState.externalCatalogClassName(sparkContext.conf),
- sparkContext.conf,
- sparkContext.hadoopConfiguration)
-
- /**
- * A classloader used to load all user-added jar.
- */
- val jarClassLoader = new NonClosableMutableURLClassLoader(
- org.apache.spark.util.Utils.getContextOrSparkClassLoader)
- {
// Set the Hive metastore warehouse path to the one we use
val tempConf = new SQLConf
sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
@@ -93,6 +68,48 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
logInfo(s"Warehouse path is '${tempConf.warehousePath}'.")
}
+ /**
+ * Class for caching query results reused in future executions.
+ */
+ val cacheManager: CacheManager = new CacheManager
+
+ /**
+ * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
+ */
+ val listener: SQLListener = createListenerAndUI(sparkContext)
+
+ /**
+ * A catalog that interacts with external systems.
+ */
+ val externalCatalog: ExternalCatalog =
+ SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
+ SharedState.externalCatalogClassName(sparkContext.conf),
+ sparkContext.conf,
+ sparkContext.hadoopConfiguration)
+
+ /**
+ * A manager for global temporary views.
+ */
+ val globalTempViewManager = {
+ // System preserved database should not exists in metastore. However it's hard to guarantee it
+ // for every session, because case-sensitivity differs. Here we always lowercase it to make our
+ // life easier.
+ val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase
+ if (externalCatalog.databaseExists(globalTempDB)) {
+ throw new SparkException(
+ s"$globalTempDB is a system preserved database, please rename your existing database " +
+ "to resolve the name conflict, or set a different value for " +
+ s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.")
+ }
+ new GlobalTempViewManager(globalTempDB)
+ }
+
+ /**
+ * A classloader used to load all user-added jar.
+ */
+ val jarClassLoader = new NonClosableMutableURLClassLoader(
+ org.apache.spark.util.Utils.getContextOrSparkClassLoader)
+
/**
* Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI.
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 001c1a1d8531..2b35db411e2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -88,11 +88,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
df.createOrReplaceTempView("listtablessuitetable")
assert(
sqlContext.tables().filter("tableName = 'listtablessuitetable'").collect().toSeq ==
- Row("listtablessuitetable", true) :: Nil)
+ Row("", "listtablessuitetable", true) :: Nil)
assert(
sqlContext.sql("SHOW tables").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
- Row("listtablessuitetable", true) :: Nil)
+ Row("", "listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
@@ -105,11 +105,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
df.createOrReplaceTempView("listtablessuitetable")
assert(
sqlContext.tables("default").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
- Row("listtablessuitetable", true) :: Nil)
+ Row("", "listtablessuitetable", true) :: Nil)
assert(
sqlContext.sql("show TABLES in default").filter("tableName = 'listtablessuitetable'")
- .collect().toSeq == Row("listtablessuitetable", true) :: Nil)
+ .collect().toSeq == Row("", "listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
@@ -122,7 +122,8 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
df.createOrReplaceTempView("listtablessuitetable")
val expectedSchema = StructType(
- StructField("tableName", StringType, false) ::
+ StructField("database", StringType, false) ::
+ StructField("tableName", StringType, false) ::
StructField("isTemporary", BooleanType, false) :: Nil)
Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
new file mode 100644
index 000000000000..391bcb8b35d0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalog.Table
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ globalTempDB = spark.sharedState.globalTempViewManager.database
+ }
+
+ private var globalTempDB: String = _
+
+ test("basic semantic") {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")
+
+ // If there is no database in table name, we should try local temp view first, if not found,
+ // try table/view in current database, which is "default" in this case. So we expect
+ // NoSuchTableException here.
+ intercept[NoSuchTableException](spark.table("src"))
+
+ // Use qualified name to refer to the global temp view explicitly.
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+ // Table name without database will never refer to a global temp view.
+ intercept[NoSuchTableException](sql("DROP VIEW src"))
+
+ sql(s"DROP VIEW $globalTempDB.src")
+ // The global temp view should be dropped successfully.
+ intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+
+ // We can also use Dataset API to create global temp view
+ Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+ // Use qualified name to rename a global temp view.
+ sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
+ intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+ checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))
+
+ // Use qualified name to alter a global temp view.
+ sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'")
+ checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b"))
+
+ // We can also use Catalog API to drop global temp view
+ spark.catalog.dropGlobalTempView("src2")
+ intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
+ }
+
+ test("global temp view is shared among all sessions") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, 2))
+ val newSession = spark.newSession()
+ checkAnswer(newSession.table(s"$globalTempDB.src"), Row(1, 2))
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ }
+ }
+
+ test("global temp view database should be preserved") {
+ val e = intercept[AnalysisException](sql(s"CREATE DATABASE $globalTempDB"))
+ assert(e.message.contains("system preserved database"))
+
+ val e2 = intercept[AnalysisException](sql(s"USE $globalTempDB"))
+ assert(e2.message.contains("system preserved database"))
+ }
+
+ test("CREATE GLOBAL TEMP VIEW USING") {
+ withTempPath { path =>
+ try {
+ Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+ sql(s"CREATE GLOBAL TEMP VIEW src USING parquet OPTIONS (PATH '${path.getAbsolutePath}')")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+ sql(s"INSERT INTO $globalTempDB.src SELECT 2, 'b'")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a") :: Row(2, "b") :: Nil)
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE should work for global temp view") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b")
+ sql(s"CREATE TABLE cloned LIKE ${globalTempDB}.src")
+ val tableMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier("cloned"))
+ assert(tableMeta.schema == new StructType().add("a", "int", false).add("b", "string", false))
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ sql("DROP TABLE default.cloned")
+ }
+ }
+
+ test("list global temp views") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW v1 AS SELECT 3, 4")
+ sql("CREATE TEMP VIEW v2 AS SELECT 1, 2")
+
+ checkAnswer(sql(s"SHOW TABLES IN $globalTempDB"),
+ Row(globalTempDB, "v1", true) ::
+ Row("", "v2", true) :: Nil)
+
+ assert(spark.catalog.listTables(globalTempDB).collect().toSeq.map(_.name) == Seq("v1", "v2"))
+ } finally {
+ spark.catalog.dropTempView("v1")
+ spark.catalog.dropGlobalTempView("v2")
+ }
+ }
+
+ test("should lookup global temp view if and only if global temp db is specified") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW same_name AS SELECT 3, 4")
+ sql("CREATE TEMP VIEW same_name AS SELECT 1, 2")
+
+ checkAnswer(sql("SELECT * FROM same_name"), Row(1, 2))
+
+ // we never lookup global temp views if database is not specified in table name
+ spark.catalog.dropTempView("same_name")
+ intercept[AnalysisException](sql("SELECT * FROM same_name"))
+
+ // Use qualified name to lookup a global temp view.
+ checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4))
+ } finally {
+ spark.catalog.dropTempView("same_name")
+ spark.catalog.dropGlobalTempView("same_name")
+ }
+ }
+
+ test("public Catalog should recognize global temp view") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+
+ assert(spark.catalog.tableExists(globalTempDB, "src"))
+ assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
+ name = "src",
+ database = globalTempDB,
+ description = null,
+ tableType = "TEMPORARY",
+ isTemporary = true).toString)
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b5499f2884c6..5489bc61a4ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -969,17 +969,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
""".stripMargin)
checkAnswer(
sql("SHOW TABLES IN default 'show1*'"),
- Row("show1a", true) :: Nil)
+ Row("", "show1a", true) :: Nil)
checkAnswer(
sql("SHOW TABLES IN default 'show1*|show2*'"),
- Row("show1a", true) ::
- Row("show2b", true) :: Nil)
+ Row("", "show1a", true) ::
+ Row("", "show2b", true) :: Nil)
checkAnswer(
sql("SHOW TABLES 'show1*|show2*'"),
- Row("show1a", true) ::
- Row("show2b", true) :: Nil)
+ Row("", "show1a", true) ::
+ Row("", "show2b", true) :: Nil)
assert(
sql("SHOW TABLES").count() >= 2)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 85c509847d8e..85ecf0ce7075 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -41,6 +41,7 @@ import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
@@ -48,6 +49,7 @@ private[sql] class HiveSessionCatalog(
hadoopConf: Configuration)
extends SessionCatalog(
externalCatalog,
+ globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index eb10c11382e8..6d4fe1a941a9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -45,6 +45,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override lazy val catalog = {
new HiveSessionCatalog(
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
sparkSession,
functionResourceLoader,
functionRegistry,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
index 57363b7259c6..939fd71b4f1e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
@@ -87,11 +87,11 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac
assert(
hc.sql("SELECT * FROM moo_table order by name").collect().toSeq ==
df.collect().toSeq.sortBy(_.getString(0)))
- val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+ val tables = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0))
assert(tables.toSet == Set("moo_table", "mee_table"))
hc.sql("DROP TABLE moo_table")
hc.sql("DROP TABLE mee_table")
- val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+ val tables2 = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0))
assert(tables2.isEmpty)
hc.sql("USE default")
hc.sql("DROP DATABASE mee_db CASCADE")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 6eeb67510c73..15ba61646d03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -58,10 +58,10 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
// We are using default DB.
checkAnswer(
allTables.filter("tableName = 'listtablessuitetable'"),
- Row("listtablessuitetable", true))
+ Row("", "listtablessuitetable", true))
checkAnswer(
allTables.filter("tableName = 'hivelisttablessuitetable'"),
- Row("hivelisttablessuitetable", false))
+ Row("default", "hivelisttablessuitetable", false))
assert(allTables.filter("tableName = 'hiveindblisttablessuitetable'").count() === 0)
}
}
@@ -71,11 +71,11 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
case allTables =>
checkAnswer(
allTables.filter("tableName = 'listtablessuitetable'"),
- Row("listtablessuitetable", true))
+ Row("", "listtablessuitetable", true))
assert(allTables.filter("tableName = 'hivelisttablessuitetable'").count() === 0)
checkAnswer(
allTables.filter("tableName = 'hiveindblisttablessuitetable'"),
- Row("hiveindblisttablessuitetable", false))
+ Row("listtablessuitedb", "hiveindblisttablessuitetable", false))
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 8ae6868c9848..51670649ad1d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -984,7 +984,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
checkAnswer(
spark.sql("show TABLES in testdb8156").filter("tableName = 'ttt3'"),
- Row("ttt3", false))
+ Row("testdb8156", "ttt3", false))
spark.sql("""use default""")
spark.sql("""drop database if exists testdb8156 CASCADE""")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index b2103b3bfc36..2c772ce2155e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -94,15 +94,15 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("CREATE TABLE show2b(c2 int)")
checkAnswer(
sql("SHOW TABLES IN default 'show1*'"),
- Row("show1a", false) :: Nil)
+ Row("default", "show1a", false) :: Nil)
checkAnswer(
sql("SHOW TABLES IN default 'show1*|show2*'"),
- Row("show1a", false) ::
- Row("show2b", false) :: Nil)
+ Row("default", "show1a", false) ::
+ Row("default", "show2b", false) :: Nil)
checkAnswer(
sql("SHOW TABLES 'show1*|show2*'"),
- Row("show1a", false) ::
- Row("show2b", false) :: Nil)
+ Row("default", "show1a", false) ::
+ Row("default", "show2b", false) :: Nil)
assert(
sql("SHOW TABLES").count() >= 2)
assert(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index f5c605fe5e2f..2af935da689c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -62,15 +62,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
var e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
}.getMessage
- assert(e.contains("`default`.`tab1` is not a view"))
+ assert(e.contains("`tab1` is not a view"))
e = intercept[AnalysisException] {
sql("CREATE VIEW tab1 AS SELECT * FROM jt")
}.getMessage
- assert(e.contains("`default`.`tab1` is not a view"))
+ assert(e.contains("`tab1` is not a view"))
e = intercept[AnalysisException] {
sql("ALTER VIEW tab1 AS SELECT * FROM jt")
}.getMessage
- assert(e.contains("`default`.`tab1` is not a view"))
+ assert(e.contains("`tab1` is not a view"))
}
}
|