Skip to content

Commit ab07c63

Browse files
committed
[SPARK-30799][SQL] "spark_catalog.t" should not be resolved to temp view
### What changes were proposed in this pull request? No v2 command supports temp views and the `ResolveCatalogs`/`ResolveSessionCatalog` framework is designed with this assumption. However, `ResolveSessionCatalog` needs to fallback to v1 commands, which do support temp views (e.g. CACHE TABLE). To work around it, we add a hack in `CatalogAndIdentifier`, which does not expand the given identifier with current namespace if the catalog is session catalog. This works fine in most cases, as temp views should take precedence over tables during lookup. So if `CatalogAndIdentifier` returns a single name "t", the v1 commands can still resolve it to temp views correctly, or resolve it to table "default.t" if temp view doesn't exist. However, if users write `spark_catalog.t`, it shouldn't be resolved to temp views as temp views don't belong to any catalog. `CatalogAndIdentifier` can't distinguish between `spark_catalog.t` and `t`, so the caller side may mistakenly resolve `spark_catalog.t` to a temp view. This PR proposes to fix this issue by 1. remove the hack in `CatalogAndIdentifier`, and clearly document that this shouldn't be used to resolve temp views. 2. update `ResolveSessionCatalog` to explicitly look up temp views first before calling `CatalogAndIdentifier`, for v1 commands that support temp views. ### Why are the changes needed? To avoid releasing a behavior that we should not support. Removing the hack also fixes the problem we hit in https://github.com/apache/spark/pull/27532/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R937 ### Does this PR introduce any user-facing change? yes, now it's not allowed to refer to a temp view with `spark_catalog` prefix. ### How was this patch tested? new tests Closes #27550 from cloud-fan/ns. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0353cbf commit ab07c63

File tree

9 files changed

+176
-77
lines changed

9 files changed

+176
-77
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,6 +1344,10 @@ class SessionCatalog(
13441344
!hiveFunctions.contains(name.funcName.toLowerCase(Locale.ROOT))
13451345
}
13461346

1347+
def isTempFunction(name: String): Boolean = {
1348+
isTemporaryFunction(FunctionIdentifier(name))
1349+
}
1350+
13471351
/**
13481352
* Return whether this function has been registered in the function registry of the current
13491353
* session. If not existed, return false.

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ private[sql] trait LookupCatalog extends Logging {
9494
* Extract catalog and identifier from a multi-part name with the current catalog if needed.
9595
* Catalog name takes precedence over identifier, but for a single-part name, identifier takes
9696
* precedence over catalog name.
97+
*
98+
* Note that, this pattern is used to look up permanent catalog objects like table, view,
99+
* function, etc. If you need to look up temp objects like temp view, please do it separately
100+
* before calling this pattern, as temp objects don't belong to any catalog.
97101
*/
98102
object CatalogAndIdentifier {
99103
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
@@ -103,16 +107,7 @@ private[sql] trait LookupCatalog extends Logging {
103107
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
104108
assert(nameParts.nonEmpty)
105109
if (nameParts.length == 1) {
106-
// If the current catalog is session catalog, the current namespace is not used because
107-
// the single-part name could be referencing a temp view, which doesn't belong to any
108-
// namespaces. An empty namespace will be resolved inside the session catalog
109-
// implementation when a relation is looked up.
110-
val ns = if (CatalogV2Util.isSessionCatalog(currentCatalog)) {
111-
Array.empty[String]
112-
} else {
113-
catalogManager.currentNamespace
114-
}
115-
Some((currentCatalog, Identifier.of(ns, nameParts.head)))
110+
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
116111
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
117112
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
118113
// API does not support view yet, and we have to use v1 commands to deal with global temp

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
5050
})
5151
when(manager.currentCatalog).thenReturn(sessionCatalog)
5252
when(manager.v2SessionCatalog).thenReturn(sessionCatalog)
53+
when(manager.currentNamespace).thenReturn(Array.empty[String])
5354
manager
5455
}
5556

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 127 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2222
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
2323
import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
25+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
2626
import org.apache.spark.sql.connector.expressions.Transform
2727
import org.apache.spark.sql.execution.command._
2828
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBui
3939
class ResolveSessionCatalog(
4040
val catalogManager: CatalogManager,
4141
conf: SQLConf,
42-
isView: Seq[String] => Boolean)
42+
isTempView: Seq[String] => Boolean,
43+
isTempFunction: String => Boolean)
4344
extends Rule[LogicalPlan] with LookupCatalog {
4445
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
4546
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
@@ -238,8 +239,9 @@ class ResolveSessionCatalog(
238239
}
239240
AlterDatabaseSetLocationCommand(ns.head, location)
240241

241-
case RenameTableStatement(SessionCatalogAndTable(_, oldName), newNameParts, isView) =>
242-
AlterTableRenameCommand(oldName.asTableIdentifier, newNameParts.asTableIdentifier, isView)
242+
// v1 RENAME TABLE supports temp view.
243+
case RenameTableStatement(TempViewOrV1Table(oldName), newName, isView) =>
244+
AlterTableRenameCommand(oldName.asTableIdentifier, newName.asTableIdentifier, isView)
243245

244246
case DescribeRelation(ResolvedTable(_, ident, _: V1Table), partitionSpec, isExtended) =>
245247
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
@@ -251,10 +253,12 @@ class ResolveSessionCatalog(
251253
case DescribeColumnStatement(
252254
SessionCatalogAndTable(catalog, tbl), colNameParts, isExtended) =>
253255
loadTable(catalog, tbl.asIdentifier).collect {
256+
// `V1Table` also includes permanent views.
254257
case v1Table: V1Table =>
255258
DescribeColumnCommand(tbl.asTableIdentifier, colNameParts, isExtended)
256259
}.getOrElse {
257-
if (isView(tbl)) {
260+
if (isTempView(tbl)) {
261+
// v1 DESCRIBE COLUMN supports temp view.
258262
DescribeColumnCommand(tbl.asTableIdentifier, colNameParts, isExtended)
259263
} else {
260264
throw new AnalysisException("Describing columns is not supported for v2 tables.")
@@ -302,8 +306,9 @@ class ResolveSessionCatalog(
302306
ignoreIfExists = c.ifNotExists)
303307
}
304308

305-
case RefreshTableStatement(SessionCatalogAndTable(_, tbl)) =>
306-
RefreshTable(tbl.asTableIdentifier)
309+
// v1 REFRESH TABLE supports temp view.
310+
case RefreshTableStatement(TempViewOrV1Table(name)) =>
311+
RefreshTable(name.asTableIdentifier)
307312

308313
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
309314
// session catalog and the table provider is not v2.
@@ -338,11 +343,13 @@ class ResolveSessionCatalog(
338343
orCreate = c.orCreate)
339344
}
340345

341-
case DropTableStatement(SessionCatalogAndTable(catalog, tbl), ifExists, purge) =>
342-
DropTableCommand(tbl.asTableIdentifier, ifExists, isView = false, purge = purge)
346+
// v1 DROP TABLE supports temp view.
347+
case DropTableStatement(TempViewOrV1Table(name), ifExists, purge) =>
348+
DropTableCommand(name.asTableIdentifier, ifExists, isView = false, purge = purge)
343349

344-
case DropViewStatement(SessionCatalogAndTable(catalog, viewName), ifExists) =>
345-
DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false)
350+
// v1 DROP TABLE supports temp view.
351+
case DropViewStatement(TempViewOrV1Table(name), ifExists) =>
352+
DropTableCommand(name.asTableIdentifier, ifExists, isView = true, purge = false)
346353

347354
case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
348355
if isSessionCatalog(catalog) =>
@@ -416,12 +423,18 @@ class ResolveSessionCatalog(
416423
ShowCreateTableAsSerdeCommand(v1TableName.asTableIdentifier)
417424

418425
case CacheTableStatement(tbl, plan, isLazy, options) =>
419-
val v1TableName = parseV1Table(tbl, "CACHE TABLE")
420-
CacheTableCommand(v1TableName.asTableIdentifier, plan, isLazy, options)
426+
val name = if (plan.isDefined) {
427+
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
428+
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
429+
tbl
430+
} else {
431+
parseTempViewOrV1Table(tbl, "CACHE TABLE")
432+
}
433+
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
421434

422435
case UncacheTableStatement(tbl, ifExists) =>
423-
val v1TableName = parseV1Table(tbl, "UNCACHE TABLE")
424-
UncacheTableCommand(v1TableName.asTableIdentifier, ifExists)
436+
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
437+
UncacheTableCommand(name.asTableIdentifier, ifExists)
425438

426439
case TruncateTableStatement(tbl, partitionSpec) =>
427440
val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE")
@@ -450,10 +463,6 @@ class ResolveSessionCatalog(
450463
throw new AnalysisException(
451464
s"Namespace name should have only one part if specified: ${ns.get.quoted}")
452465
}
453-
if (tbl.length > 2) {
454-
throw new AnalysisException(
455-
s"Table name should have at most two parts: ${tbl.quoted}")
456-
}
457466
ShowColumnsCommand(db, v1TableName)
458467

459468
case AlterTableRecoverPartitionsStatement(tbl) =>
@@ -493,18 +502,23 @@ class ResolveSessionCatalog(
493502
serdeProperties,
494503
partitionSpec)
495504

496-
case AlterViewAsStatement(tbl, originalText, query) =>
497-
val v1TableName = parseV1Table(tbl, "ALTER VIEW QUERY")
505+
case AlterViewAsStatement(name, originalText, query) =>
506+
val viewName = parseTempViewOrV1Table(name, "ALTER VIEW QUERY")
498507
AlterViewAsCommand(
499-
v1TableName.asTableIdentifier,
508+
viewName.asTableIdentifier,
500509
originalText,
501510
query)
502511

503512
case CreateViewStatement(
504513
tbl, userSpecifiedColumns, comment, properties,
505514
originalText, child, allowExisting, replace, viewType) =>
506515

507-
val v1TableName = parseV1Table(tbl, "CREATE VIEW")
516+
val v1TableName = if (viewType != PersistedView) {
517+
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
518+
tbl
519+
} else {
520+
parseV1Table(tbl, "CREATE VIEW")
521+
}
508522
CreateViewCommand(
509523
v1TableName.asTableIdentifier,
510524
userSpecifiedColumns,
@@ -519,56 +533,94 @@ class ResolveSessionCatalog(
519533
case ShowTableProperties(r: ResolvedTable, propertyKey) if isSessionCatalog(r.catalog) =>
520534
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
521535

522-
case DescribeFunctionStatement(CatalogAndIdentifier(catalog, ident), extended) =>
536+
case DescribeFunctionStatement(nameParts, extended) =>
523537
val functionIdent =
524-
parseSessionCatalogFunctionIdentifier("DESCRIBE FUNCTION", catalog, ident)
538+
parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION")
525539
DescribeFunctionCommand(functionIdent, extended)
526540

527541
case ShowFunctionsStatement(userScope, systemScope, pattern, fun) =>
528542
val (database, function) = fun match {
529-
case Some(CatalogAndIdentifier(catalog, ident)) =>
543+
case Some(nameParts) =>
530544
val FunctionIdentifier(fn, db) =
531-
parseSessionCatalogFunctionIdentifier("SHOW FUNCTIONS", catalog, ident)
545+
parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS")
532546
(db, Some(fn))
533547
case None => (None, pattern)
534548
}
535549
ShowFunctionsCommand(database, function, userScope, systemScope)
536550

537-
case DropFunctionStatement(CatalogAndIdentifier(catalog, ident), ifExists, isTemp) =>
551+
case DropFunctionStatement(nameParts, ifExists, isTemp) =>
538552
val FunctionIdentifier(function, database) =
539-
parseSessionCatalogFunctionIdentifier("DROP FUNCTION", catalog, ident)
553+
parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION")
540554
DropFunctionCommand(database, function, ifExists, isTemp)
541555

542-
case CreateFunctionStatement(CatalogAndIdentifier(catalog, ident),
556+
case CreateFunctionStatement(nameParts,
543557
className, resources, isTemp, ignoreIfExists, replace) =>
544-
val FunctionIdentifier(function, database) =
545-
parseSessionCatalogFunctionIdentifier("CREATE FUNCTION", catalog, ident)
546-
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
547-
replace)
558+
if (isTemp) {
559+
// temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name.
560+
val database = if (nameParts.length > 2) {
561+
throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'")
562+
} else if (nameParts.length == 2) {
563+
Some(nameParts.head)
564+
} else {
565+
None
566+
}
567+
CreateFunctionCommand(
568+
database,
569+
nameParts.last,
570+
className,
571+
resources,
572+
isTemp,
573+
ignoreIfExists,
574+
replace)
575+
} else {
576+
val FunctionIdentifier(function, database) =
577+
parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION")
578+
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
579+
replace)
580+
}
548581
}
549582

583+
// TODO: move function related v2 statements to the new framework.
550584
private def parseSessionCatalogFunctionIdentifier(
551-
sql: String,
552-
catalog: CatalogPlugin,
553-
functionIdent: Identifier): FunctionIdentifier = {
554-
if (isSessionCatalog(catalog)) {
555-
functionIdent.asMultipartIdentifier match {
556-
case Seq(db, fn) => FunctionIdentifier(fn, Some(db))
557-
case Seq(fn) => FunctionIdentifier(fn, None)
558-
case _ =>
559-
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
560-
}
561-
} else {
562-
throw new AnalysisException(s"$sql is only supported in v1 catalog")
585+
nameParts: Seq[String],
586+
sql: String): FunctionIdentifier = {
587+
if (nameParts.length == 1 && isTempFunction(nameParts.head)) {
588+
return FunctionIdentifier(nameParts.head)
563589
}
564-
}
565590

566-
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
567-
val CatalogAndIdentifier(catalog, ident) = tableName
568-
if (!isSessionCatalog(catalog)) {
569-
throw new AnalysisException(s"$sql is only supported with v1 tables.")
591+
nameParts match {
592+
case SessionCatalogAndIdentifier(_, ident) =>
593+
if (nameParts.length == 1) {
594+
// If there is only one name part, it means the current catalog is the session catalog.
595+
// Here we don't fill the default database, to keep the error message unchanged for
596+
// v1 commands.
597+
FunctionIdentifier(nameParts.head, None)
598+
} else {
599+
ident.namespace match {
600+
// For name parts like `spark_catalog.t`, we need to fill in the default database so
601+
// that the caller side won't treat it as a temp function.
602+
case Array() if nameParts.head == CatalogManager.SESSION_CATALOG_NAME =>
603+
FunctionIdentifier(
604+
ident.name, Some(catalogManager.v1SessionCatalog.getCurrentDatabase))
605+
case Array(db) => FunctionIdentifier(ident.name, Some(db))
606+
case _ =>
607+
throw new AnalysisException(s"Unsupported function name '$ident'")
608+
}
609+
}
610+
611+
case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog")
570612
}
571-
ident.asMultipartIdentifier
613+
}
614+
615+
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
616+
case SessionCatalogAndTable(_, tbl) => tbl
617+
case _ => throw new AnalysisException(s"$sql is only supported with v1 tables.")
618+
}
619+
620+
private def parseTempViewOrV1Table(
621+
nameParts: Seq[String], sql: String): Seq[String] = nameParts match {
622+
case TempViewOrV1Table(name) => name
623+
case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.")
572624
}
573625

574626
private def buildCatalogTable(
@@ -607,7 +659,29 @@ class ResolveSessionCatalog(
607659
object SessionCatalogAndTable {
608660
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
609661
case SessionCatalogAndIdentifier(catalog, ident) =>
610-
Some(catalog -> ident.asMultipartIdentifier)
662+
if (nameParts.length == 1) {
663+
// If there is only one name part, it means the current catalog is the session catalog.
664+
// Here we return the original name part, to keep the error message unchanged for
665+
// v1 commands.
666+
Some(catalog -> nameParts)
667+
} else {
668+
Some(catalog -> ident.asMultipartIdentifier)
669+
}
670+
case _ => None
671+
}
672+
}
673+
674+
object TempViewOrV1Table {
675+
def unapply(nameParts: Seq[String]): Option[Seq[String]] = nameParts match {
676+
case _ if isTempView(nameParts) => Some(nameParts)
677+
case SessionCatalogAndTable(_, tbl) =>
678+
if (nameParts.head == CatalogManager.SESSION_CATALOG_NAME && tbl.length == 1) {
679+
// For name parts like `spark_catalog.t`, we need to fill in the default database so
680+
// that the caller side won't treat it as a temp view.
681+
Some(Seq(catalogManager.v1SessionCatalog.getCurrentDatabase, tbl.head))
682+
} else {
683+
Some(tbl)
684+
}
611685
case _ => None
612686
}
613687
}

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ abstract class BaseSessionStateBuilder(
174174
new FindDataSourceTable(session) +:
175175
new ResolveSQLOnFile(session) +:
176176
new FallBackFileSourceV2(session) +:
177-
new ResolveSessionCatalog(catalogManager, conf, catalog.isView) +:
177+
new ResolveSessionCatalog(
178+
catalogManager, conf, catalog.isTempView, catalog.isTempFunction) +:
178179
customResolutionRules
179180

180181
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =

sql/core/src/test/resources/sql-tests/results/describe.sql.out

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ struct<plan:string>
520520
-- !query output
521521
== Physical Plan ==
522522
Execute DescribeTableCommand
523-
+- DescribeTableCommand `t`, false
523+
+- DescribeTableCommand `default`.`t`, false
524524

525525

526526
-- !query
@@ -530,7 +530,7 @@ struct<plan:string>
530530
-- !query output
531531
== Physical Plan ==
532532
Execute DescribeTableCommand
533-
+- DescribeTableCommand `t`, true
533+
+- DescribeTableCommand `default`.`t`, true
534534

535535

536536
-- !query
@@ -544,14 +544,14 @@ struct<plan:string>
544544

545545
== Analyzed Logical Plan ==
546546
col_name: string, data_type: string, comment: string
547-
DescribeTableCommand `t`, false
547+
DescribeTableCommand `default`.`t`, false
548548

549549
== Optimized Logical Plan ==
550-
DescribeTableCommand `t`, false
550+
DescribeTableCommand `default`.`t`, false
551551

552552
== Physical Plan ==
553553
Execute DescribeTableCommand
554-
+- DescribeTableCommand `t`, false
554+
+- DescribeTableCommand `default`.`t`, false
555555

556556

557557
-- !query
@@ -571,7 +571,7 @@ struct<plan:string>
571571
-- !query output
572572
== Physical Plan ==
573573
Execute DescribeTableCommand
574-
+- DescribeTableCommand `t`, Map(c -> Us, d -> 2), false
574+
+- DescribeTableCommand `default`.`t`, Map(c -> Us, d -> 2), false
575575

576576

577577
-- !query

0 commit comments

Comments
 (0)