Skip to content

Commit 387d866

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-34699][SQL] 'CREATE OR REPLACE TEMP VIEW USING' should uncache correctly
### What changes were proposed in this pull request? This PR proposes: 1. `CREATE OR REPLACE TEMP VIEW USING` should use `TemporaryViewRelation` to store temp views. 2. By doing #1, it fixes the issue where the temp view being replaced is not uncached. ### Why are the changes needed? This is a part of an ongoing work to wrap all the temporary views with `TemporaryViewRelation`: [SPARK-34698](https://issues.apache.org/jira/browse/SPARK-34698). This also fixes a bug where the temp view being replaced is not uncached. ### Does this PR introduce _any_ user-facing change? Yes, the temp view being replaced with `CREATE OR REPLACE TEMP VIEW USING` is correctly uncached if the temp view is cached. ### How was this patch tested? Added new tests. Closes apache#31825 from imback82/create_temp_view_using. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent af55373 commit 387d866

File tree

3 files changed

+67
-2
lines changed

3 files changed

+67
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
2525
import org.apache.spark.sql.catalyst.expressions.Attribute
2626
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2727
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
28-
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.sql.execution.command.ViewHelper.createTemporaryViewRelation
29+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
2930
import org.apache.spark.sql.types._
3031

3132
/**
@@ -90,12 +91,30 @@ case class CreateTempViewUsing(
9091
options = options)
9192

9293
val catalog = sparkSession.sessionState.catalog
93-
val viewDefinition = Dataset.ofRows(
94+
val analyzedPlan = Dataset.ofRows(
9495
sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
9596

9697
if (global) {
98+
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
99+
val viewIdent = TableIdentifier(tableIdent.table, Option(db))
100+
val viewDefinition = createTemporaryViewRelation(
101+
viewIdent,
102+
sparkSession,
103+
replace,
104+
catalog.getRawGlobalTempView,
105+
originalText = None,
106+
analyzedPlan,
107+
aliasedPlan = analyzedPlan)
97108
catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
98109
} else {
110+
val viewDefinition = createTemporaryViewRelation(
111+
tableIdent,
112+
sparkSession,
113+
replace,
114+
catalog.getRawTempView,
115+
originalText = None,
116+
analyzedPlan,
117+
aliasedPlan = analyzedPlan)
99118
catalog.createTempView(tableIdent.table, viewDefinition, replace)
100119
}
101120

sql/core/src/test/resources/sql-tests/results/show-tables.sql.out

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ Created Time [not included in comparison]
127127
Last Access [not included in comparison]
128128
Created By [not included in comparison]
129129
Type: VIEW
130+
Table Properties: [view.storingAnalyzedPlan=true]
130131
Schema: root
131132
|-- e: integer (nullable = true)
132133

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,4 +1509,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
15091509
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
15101510
}
15111511
}
1512+
1513+
test("SPARK-34699: CREATE TEMP VIEW USING should uncache correctly") {
1514+
withTempView("tv") {
1515+
testCreateTemporaryViewUsingWithCache(TableIdentifier("tv"))
1516+
}
1517+
}
1518+
1519+
test("SPARK-34699: CREATE GLOBAL TEMP VIEW USING should uncache correctly") {
1520+
withGlobalTempView("global_tv") {
1521+
val db = spark.sharedState.globalTempViewManager.database
1522+
testCreateTemporaryViewUsingWithCache(TableIdentifier("global_tv", Some(db)))
1523+
}
1524+
}
1525+
1526+
private def testCreateTemporaryViewUsingWithCache(ident: TableIdentifier): Unit = {
1527+
withTempDir { dir =>
1528+
val path1 = new File(dir, "t1").getCanonicalPath
1529+
val path2 = new File(dir, "t2").getCanonicalPath
1530+
Seq(1).toDF.write.parquet(path1)
1531+
Seq(1).toDF.write.parquet(path2)
1532+
1533+
val (tempViewStr, viewName) = if (ident.database.nonEmpty) {
1534+
("GLOBAL TEMPORARY VIEW", s"${ident.database.get}.${ident.table}")
1535+
} else {
1536+
("TEMPORARY VIEW", ident.table)
1537+
}
1538+
1539+
sql(s"CREATE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")
1540+
1541+
sql(s"CACHE TABLE $viewName")
1542+
assert(spark.catalog.isCached(viewName))
1543+
1544+
// Replacing with the same relation. The cache shouldn't be uncached.
1545+
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")
1546+
assert(spark.catalog.isCached(viewName))
1547+
1548+
// Replacing with a different relation. The cache should be cleared.
1549+
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path2')")
1550+
assert(!spark.catalog.isCached(viewName))
1551+
1552+
// Validate that the cache is cleared by creating a temp view with the same relation.
1553+
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")
1554+
assert(!spark.catalog.isCached(viewName))
1555+
}
1556+
}
15121557
}

0 commit comments

Comments
 (0)