Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ package object config {
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

// Note: This is a SQL config but needs to be in core because it's cross-session and can not put
// in SQLConf.
private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
Expand Down
45 changes: 40 additions & 5 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,41 @@ The `sql` function enables applications to run SQL queries programmatically and
</div>


## Global Temporary View

Temporay views in Spark SQL are session-scoped and will disappear if the session that creates it
terminates. If you want to have a temporary view that is shared among all sessions and keep alive
until the Spark application terminiates, you can create a global temporary view. Global temporary
view is tied to a system preserved database `global_temp`, and we must use the qualified name to
refer it, e.g. `SELECT * FROM global_temp.view1`.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example global_temp_view scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example global_temp_view java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example global_temp_view python/sql/basic.py %}
</div>

<div data-lang="sql" markdown="1">

{% highlight sql %}

CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl

SELECT * FROM global_temp.temp_view

{% endhighlight %}

</div>
Copy link
Member

@gatorsmile gatorsmile Oct 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need one more </div>

</div>


## Creating Datasets

Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use
Expand Down Expand Up @@ -1058,14 +1093,14 @@ the Data Sources API. The following options are supported:
The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
</td>
</tr>

<tr>
<td><code>truncate</code></td>
<td>
This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>.
This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>.
</td>
</tr>

<tr>
<td><code>createTableOptions</code></td>
<td>
Expand Down Expand Up @@ -1101,11 +1136,11 @@ USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
user 'username',
password 'password'
)

INSERT INTO TABLE jdbcTable
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable
{% endhighlight %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// $example off:programmatic_schema$
import org.apache.spark.sql.AnalysisException;

// $example on:untyped_ops$
// col("...") is preferable to df.col("...")
Expand Down Expand Up @@ -84,7 +85,7 @@ public void setAge(int age) {
}
// $example off:create_ds$

public static void main(String[] args) {
public static void main(String[] args) throws AnalysisException {
// $example on:init_session$
SparkSession spark = SparkSession
.builder()
Expand All @@ -101,7 +102,7 @@ public static void main(String[] args) {
spark.stop();
}

private static void runBasicDataFrameExample(SparkSession spark) {
private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
// $example on:create_df$
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

Expand Down Expand Up @@ -176,6 +177,31 @@ private static void runBasicDataFrameExample(SparkSession spark) {
// | 19| Justin|
// +----+-------+
// $example off:run_sql$

// $example on:global_temp_view$
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:global_temp_view$
}

private static void runDatasetCreationExample(SparkSession spark) {
Expand Down
25 changes: 25 additions & 0 deletions examples/src/main/python/sql/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ def basic_df_example(spark):
# +----+-------+
# $example off:run_sql$

# $example on:global_temp_view$
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# $example off:global_temp_view$


def schema_inference_example(spark):
# $example on:schema_inferring$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ object SparkSQLExample {
// | 19| Justin|
// +----+-------+
// $example off:run_sql$

// $example on:global_temp_view$
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:global_temp_view$
}

private def runDatasetCreationExample(spark: SparkSession): Unit = {
Expand Down
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.getFunction"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView")
)
}

Expand Down
18 changes: 17 additions & 1 deletion python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def createExternalTable(self, tableName, path=None, source=None, schema=None, **

@since(2.0)
def dropTempView(self, viewName):
"""Drops the temporary view with the given view name in the catalog.
"""Drops the local temporary view with the given view name in the catalog.
If the view has been cached before, then it will also be uncached.

>>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
Expand All @@ -181,6 +181,22 @@ def dropTempView(self, viewName):
"""
self._jcatalog.dropTempView(viewName)

@since(2.1)
def dropGlobalTempView(self, viewName):
"""Drops the global temporary view with the given view name in the catalog.
If the view has been cached before, then it will also be uncached.

>>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
>>> spark.table("global_temp.my_table").collect()
[Row(_1=1, _2=1)]
>>> spark.catalog.dropGlobalTempView("my_table")
>>> spark.table("global_temp.my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
AnalysisException: ...
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this bad case will end up in the python doc. Can we move this test to the tests.py file (it is fine to do it in a follow-up pr)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the method doc in dropTempView, is it bad?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be pretty confusing because this test case will appear in the python doc (and users will see an example that throws exceptions).

self._jcatalog.dropGlobalTempView(viewName)

@ignore_unicode_prefix
@since(2.0)
def registerFunction(self, name, f, returnType=StringType()):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def tables(self, dbName=None):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
Row(database=u'', tableName=u'table1', isTemporary=True)
"""
if dbName is None:
return DataFrame(self._ssql_ctx.tables(), self)
Expand Down
25 changes: 23 additions & 2 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def registerTempTable(self, name):

@since(2.0)
def createTempView(self, name):
"""Creates a temporary view with this DataFrame.
"""Creates a local temporary view with this DataFrame.

The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
Expand All @@ -153,7 +153,7 @@ def createTempView(self, name):

@since(2.0)
def createOrReplaceTempView(self, name):
"""Creates or replaces a temporary view with this DataFrame.
"""Creates or replaces a local temporary view with this DataFrame.

The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
Expand All @@ -169,6 +169,27 @@ def createOrReplaceTempView(self, name):
"""
self._jdf.createOrReplaceTempView(name)

@since(2.1)
def createGlobalTempView(self, name):
"""Creates a global temporary view with this DataFrame.

The lifetime of this temporary view is tied to this Spark application.
throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the
catalog.

>>> df.createGlobalTempView("people")
>>> df2 = spark.sql("select * from global_temp.people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df.createGlobalTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
AnalysisException: u"Temporary table 'people' already exists;"
>>> spark.catalog.dropGlobalTempView("people")

"""
self._jdf.createGlobalTempView(name)

@property
@since(1.4)
def write(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ statement
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
| CREATE (OR REPLACE)? TEMPORARY VIEW
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
Expand Down Expand Up @@ -668,7 +669,7 @@ nonReserved
| MAP | ARRAY | STRUCT
| LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
| DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
| EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
| EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
| GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
Expand Down Expand Up @@ -856,6 +857,7 @@ CACHE: 'CACHE';
UNCACHE: 'UNCACHE';
LAZY: 'LAZY';
FORMATTED: 'FORMATTED';
GLOBAL: 'GLOBAL';
TEMPORARY: 'TEMPORARY' | 'TEMP';
OPTIONS: 'OPTIONS';
UNSET: 'UNSET';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,12 @@ class Analyzer(
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
val table = u.tableIdentifier
if (table.database.isDefined && conf.runSQLonFile &&
if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
// If the table does not exist, and the database part is specified, and we support
// running SQL directly on files, then let's just return the original UnresolvedRelation.
// It is possible we are matching a query like "select * from parquet.`/path/to/query`".
// The plan will get resolved later.
// If the database part is specified, and we support running SQL directly on files, and
// it's not a temporary view, and the table does not exist, then let's just return the
// original UnresolvedRelation. It is possible we are matching a query like "select *
// from parquet.`/path/to/query`". The plan will get resolved later.
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
// an exception from tableExists if the database does not exist.
u
Expand Down
Loading