Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,10 @@ class SessionCatalog(
!hiveFunctions.contains(name.funcName.toLowerCase(Locale.ROOT))
}

def isTempFunction(name: String): Boolean = {
isTemporaryFunction(FunctionIdentifier(name))
}

/**
* Return whether this function has been registered in the function registry of the current
* session. If not existed, return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ private[sql] trait LookupCatalog extends Logging {
* Extract catalog and identifier from a multi-part name with the current catalog if needed.
* Catalog name takes precedence over identifier, but for a single-part name, identifier takes
* precedence over catalog name.
*
* Note that, this pattern is used to look up permanent catalog objects like table, view,
* function, etc. If you need to look up temp objects like temp view, please do it separately
* before calling this pattern, as temp objects don't belong to any catalog.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term Catalog is being used in many places. For example, our internal SessionCatalog also manages temp objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our internal SessionCatalog also manages temp objects

We can move them out and put it in a temp view manager like the GlobalTempViewManager. I'm talking more about the theory.

*/
object CatalogAndIdentifier {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
Expand All @@ -103,16 +107,7 @@ private[sql] trait LookupCatalog extends Logging {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
if (nameParts.length == 1) {
// If the current catalog is session catalog, the current namespace is not used because
// the single-part name could be referencing a temp view, which doesn't belong to any
// namespaces. An empty namespace will be resolved inside the session catalog
// implementation when a relation is looked up.
val ns = if (CatalogV2Util.isSessionCatalog(currentCatalog)) {
Array.empty[String]
} else {
catalogManager.currentNamespace
}
Some((currentCatalog, Identifier.of(ns, nameParts.head)))
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this adds current namespace to the returned identifier? So we cannot tell if a returned identifier is for a temp view?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ResolveTempViews needs to be updated to look up twice (one with namespace and one without).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you look at SessionCatalog.setCurrentDatabase, we forbid to set global_temp as the current database.

This is to make the resolution of temp view simple: we look up temp view via the name parts given by users literally. No name extension is allowed.

The general rule of relation resolution is: we try to look up temp view first, then tables/permanent views. When we call CatalogAndIdentifier, the temp view lookup should already be tried, or the caller side don't want to resolve temp views.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we call CatalogAndIdentifier, the temp view lookup should already be tried, or the caller side don't want to resolve temp views.

So, ResolveCatalogs should be applied after ResolveTables and ResolveRelations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 are different frameworks so I don't see a way to guarantee the order. I think what we should do is to migrate to the new framework.

} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
})
when(manager.currentCatalog).thenReturn(sessionCatalog)
when(manager.v2SessionCatalog).thenReturn(sessionCatalog)
when(manager.currentNamespace).thenReturn(Array.empty[String])
manager
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
Expand All @@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBui
class ResolveSessionCatalog(
val catalogManager: CatalogManager,
conf: SQLConf,
isView: Seq[String] => Boolean)
isTempView: Seq[String] => Boolean,
isTempFunction: String => Boolean)
extends Rule[LogicalPlan] with LookupCatalog {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
Expand Down Expand Up @@ -215,8 +216,9 @@ class ResolveSessionCatalog(
}
AlterDatabaseSetLocationCommand(ns.head, location)

case RenameTableStatement(SessionCatalogAndTable(_, oldName), newNameParts, isView) =>
AlterTableRenameCommand(oldName.asTableIdentifier, newNameParts.asTableIdentifier, isView)
// v1 RENAME TABLE supports temp view.
case RenameTableStatement(TempViewOrV1Table(oldName), newName, isView) =>
AlterTableRenameCommand(oldName.asTableIdentifier, newName.asTableIdentifier, isView)

case DescribeRelation(ResolvedTable(_, ident, _: V1Table), partitionSpec, isExtended) =>
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
Expand All @@ -228,10 +230,12 @@ class ResolveSessionCatalog(
case DescribeColumnStatement(
SessionCatalogAndTable(catalog, tbl), colNameParts, isExtended) =>
loadTable(catalog, tbl.asIdentifier).collect {
// `V1Table` also includes permanent views.
case v1Table: V1Table =>
DescribeColumnCommand(tbl.asTableIdentifier, colNameParts, isExtended)
}.getOrElse {
if (isView(tbl)) {
if (isTempView(tbl)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since tbl is from SessionCatalogAndTable, tbl may have the current namespace? And if the current namespace is not empty, this will always return false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a good point here. I think this is an existing bug. Here we look up table first (call loadTable) then look up temp view. We should look up temp view first to retain the behavior of Spark 2.4. @imback82 can you help fix it later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do after this PR.

// v1 DESCRIBE COLUMN supports temp view.
DescribeColumnCommand(tbl.asTableIdentifier, colNameParts, isExtended)
} else {
throw new AnalysisException("Describing columns is not supported for v2 tables.")
Expand Down Expand Up @@ -279,8 +283,9 @@ class ResolveSessionCatalog(
ignoreIfExists = c.ifNotExists)
}

case RefreshTableStatement(SessionCatalogAndTable(_, tbl)) =>
RefreshTable(tbl.asTableIdentifier)
// v1 REFRESH TABLE supports temp view.
case RefreshTableStatement(TempViewOrV1Table(name)) =>
RefreshTable(name.asTableIdentifier)

// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
Expand Down Expand Up @@ -315,11 +320,13 @@ class ResolveSessionCatalog(
orCreate = c.orCreate)
}

case DropTableStatement(SessionCatalogAndTable(catalog, tbl), ifExists, purge) =>
DropTableCommand(tbl.asTableIdentifier, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropTableStatement(TempViewOrV1Table(name), ifExists, purge) =>
DropTableCommand(name.asTableIdentifier, ifExists, isView = false, purge = purge)

case DropViewStatement(SessionCatalogAndTable(catalog, viewName), ifExists) =>
DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false)
// v1 DROP TABLE supports temp view.
case DropViewStatement(TempViewOrV1Table(name), ifExists) =>
DropTableCommand(name.asTableIdentifier, ifExists, isView = true, purge = false)

case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
if isSessionCatalog(catalog) =>
Expand Down Expand Up @@ -393,12 +400,18 @@ class ResolveSessionCatalog(
ShowCreateTableAsSerdeCommand(v1TableName.asTableIdentifier)

case CacheTableStatement(tbl, plan, isLazy, options) =>
val v1TableName = parseV1Table(tbl, "CACHE TABLE")
CacheTableCommand(v1TableName.asTableIdentifier, plan, isLazy, options)
val name = if (plan.isDefined) {
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseTempViewOrV1Table(tbl, "CACHE TABLE")
}
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)

case UncacheTableStatement(tbl, ifExists) =>
val v1TableName = parseV1Table(tbl, "UNCACHE TABLE")
UncacheTableCommand(v1TableName.asTableIdentifier, ifExists)
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
UncacheTableCommand(name.asTableIdentifier, ifExists)

case TruncateTableStatement(tbl, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE")
Expand Down Expand Up @@ -427,10 +440,6 @@ class ResolveSessionCatalog(
throw new AnalysisException(
s"Namespace name should have only one part if specified: ${ns.get.quoted}")
}
if (tbl.length > 2) {
Copy link
Contributor Author

@cloud-fan cloud-fan Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dead code, val v1TableName = parseV1Table(tbl, sql).asTableIdentifier fails earlier if length > 2

throw new AnalysisException(
s"Table name should have at most two parts: ${tbl.quoted}")
}
ShowColumnsCommand(db, v1TableName)

case AlterTableRecoverPartitionsStatement(tbl) =>
Expand Down Expand Up @@ -470,18 +479,23 @@ class ResolveSessionCatalog(
serdeProperties,
partitionSpec)

case AlterViewAsStatement(tbl, originalText, query) =>
val v1TableName = parseV1Table(tbl, "ALTER VIEW QUERY")
case AlterViewAsStatement(name, originalText, query) =>
val viewName = parseTempViewOrV1Table(name, "ALTER VIEW QUERY")
AlterViewAsCommand(
v1TableName.asTableIdentifier,
viewName.asTableIdentifier,
originalText,
query)

case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
originalText, child, allowExisting, replace, viewType) =>

val v1TableName = parseV1Table(tbl, "CREATE VIEW")
val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
v1TableName.asTableIdentifier,
userSpecifiedColumns,
Expand All @@ -496,56 +510,94 @@ class ResolveSessionCatalog(
case ShowTableProperties(r: ResolvedTable, propertyKey) if isSessionCatalog(r.catalog) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)

case DescribeFunctionStatement(CatalogAndIdentifier(catalog, ident), extended) =>
case DescribeFunctionStatement(nameParts, extended) =>
val functionIdent =
parseSessionCatalogFunctionIdentifier("DESCRIBE FUNCTION", catalog, ident)
parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION")
DescribeFunctionCommand(functionIdent, extended)

case ShowFunctionsStatement(userScope, systemScope, pattern, fun) =>
val (database, function) = fun match {
case Some(CatalogAndIdentifier(catalog, ident)) =>
case Some(nameParts) =>
val FunctionIdentifier(fn, db) =
parseSessionCatalogFunctionIdentifier("SHOW FUNCTIONS", catalog, ident)
parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS")
(db, Some(fn))
case None => (None, pattern)
}
ShowFunctionsCommand(database, function, userScope, systemScope)

case DropFunctionStatement(CatalogAndIdentifier(catalog, ident), ifExists, isTemp) =>
case DropFunctionStatement(nameParts, ifExists, isTemp) =>
val FunctionIdentifier(function, database) =
parseSessionCatalogFunctionIdentifier("DROP FUNCTION", catalog, ident)
parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION")
DropFunctionCommand(database, function, ifExists, isTemp)

case CreateFunctionStatement(CatalogAndIdentifier(catalog, ident),
case CreateFunctionStatement(nameParts,
className, resources, isTemp, ignoreIfExists, replace) =>
val FunctionIdentifier(function, database) =
parseSessionCatalogFunctionIdentifier("CREATE FUNCTION", catalog, ident)
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
replace)
if (isTemp) {
// temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name.
val database = if (nameParts.length > 2) {
throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else if (nameParts.length == 2) {
Some(nameParts.head)
} else {
None
}
CreateFunctionCommand(
database,
nameParts.last,
className,
resources,
isTemp,
ignoreIfExists,
replace)
} else {
val FunctionIdentifier(function, database) =
parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION")
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
replace)
}
}

// TODO: move function related v2 statements to the new framework.
private def parseSessionCatalogFunctionIdentifier(
sql: String,
catalog: CatalogPlugin,
functionIdent: Identifier): FunctionIdentifier = {
if (isSessionCatalog(catalog)) {
functionIdent.asMultipartIdentifier match {
case Seq(db, fn) => FunctionIdentifier(fn, Some(db))
case Seq(fn) => FunctionIdentifier(fn, None)
case _ =>
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
}
} else {
throw new AnalysisException(s"$sql is only supported in v1 catalog")
nameParts: Seq[String],
sql: String): FunctionIdentifier = {
if (nameParts.length == 1 && isTempFunction(nameParts.head)) {
return FunctionIdentifier(nameParts.head)
}
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
val CatalogAndIdentifier(catalog, ident) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException(s"$sql is only supported with v1 tables.")
nameParts match {
case SessionCatalogAndIdentifier(_, ident) =>
if (nameParts.length == 1) {
// If there is only one name part, it means the current catalog is the session catalog.
// Here we don't fill the default database, to keep the error message unchanged for
// v1 commands.
FunctionIdentifier(nameParts.head, None)
} else {
ident.namespace match {
// For name parts like `spark_catalog.t`, we need to fill in the default database so
// that the caller side won't treat it as a temp function.
case Array() if nameParts.head == CatalogManager.SESSION_CATALOG_NAME =>
FunctionIdentifier(
ident.name, Some(catalogManager.v1SessionCatalog.getCurrentDatabase))
case Array(db) => FunctionIdentifier(ident.name, Some(db))
case _ =>
throw new AnalysisException(s"Unsupported function name '$ident'")
}
}

case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog")
}
ident.asMultipartIdentifier
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
case SessionCatalogAndTable(_, tbl) => tbl
case _ => throw new AnalysisException(s"$sql is only supported with v1 tables.")
}

private def parseTempViewOrV1Table(
nameParts: Seq[String], sql: String): Seq[String] = nameParts match {
case TempViewOrV1Table(name) => name
case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.")
}

private def buildCatalogTable(
Expand Down Expand Up @@ -584,7 +636,29 @@ class ResolveSessionCatalog(
object SessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case SessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
if (nameParts.length == 1) {
// If there is only one name part, it means the current catalog is the session catalog.
// Here we return the original name part, to keep the error message unchanged for
// v1 commands.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove this, but then I need to update many tests slightly to update the error message. We can leave it to 3.1.

Some(catalog -> nameParts)
} else {
Some(catalog -> ident.asMultipartIdentifier)
}
case _ => None
}
}

object TempViewOrV1Table {
def unapply(nameParts: Seq[String]): Option[Seq[String]] = nameParts match {
case _ if isTempView(nameParts) => Some(nameParts)
case SessionCatalogAndTable(_, tbl) =>
if (nameParts.head == CatalogManager.SESSION_CATALOG_NAME && tbl.length == 1) {
// For name parts like `spark_catalog.t`, we need to fill in the default database so
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated question, what was the reason to allow spark_catalog.t to be spark_catalog.<cur_db>.t? (different behavior from v2 catalogs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this well and think this is not intentional. But this is a different topic and we have many tests using spark_catalog.t.

We can open another PR to forbid it if we think we shouldn't support this feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation. It makes more sense to forbid it to keep the behavior consistent.

// that the caller side won't treat it as a temp view.
Some(Seq(catalogManager.v1SessionCatalog.getCurrentDatabase, tbl.head))
} else {
Some(tbl)
}
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ abstract class BaseSessionStateBuilder(
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
new ResolveSessionCatalog(catalogManager, conf, catalog.isView) +:
new ResolveSessionCatalog(
catalogManager, conf, catalog.isTempView, catalog.isTempFunction) +:
customResolutionRules

override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/test/resources/sql-tests/results/describe.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
Execute DescribeTableCommand
+- DescribeTableCommand `t`, false
+- DescribeTableCommand `default`.`t`, false


-- !query
Expand All @@ -530,7 +530,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
Execute DescribeTableCommand
+- DescribeTableCommand `t`, true
+- DescribeTableCommand `default`.`t`, true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the consequence of removing the hack. Now ResolvedTable can get qualified table name even from session catalog.



-- !query
Expand All @@ -544,14 +544,14 @@ struct<plan:string>

== Analyzed Logical Plan ==
col_name: string, data_type: string, comment: string
DescribeTableCommand `t`, false
DescribeTableCommand `default`.`t`, false

== Optimized Logical Plan ==
DescribeTableCommand `t`, false
DescribeTableCommand `default`.`t`, false

== Physical Plan ==
Execute DescribeTableCommand
+- DescribeTableCommand `t`, false
+- DescribeTableCommand `default`.`t`, false


-- !query
Expand All @@ -571,7 +571,7 @@ struct<plan:string>
-- !query output
== Physical Plan ==
Execute DescribeTableCommand
+- DescribeTableCommand `t`, Map(c -> Us, d -> 2), false
+- DescribeTableCommand `default`.`t`, Map(c -> Us, d -> 2), false


-- !query
Expand Down
Loading