Skip to content

Commit abec6d7

Browse files
committed
[SPARK-28341][SQL] create a public API for V2SessionCatalog
## What changes were proposed in this pull request? The `V2SessionCatalog` has 2 functionalities: 1. work as an adapter: provide v2 APIs and translate calls to the `SessionCatalog`. 2. allow users to extend it, so that they can add hooks to apply custom logic before calling methods of the builtin catalog (session catalog). To leverage the second functionality, users must extend `V2SessionCatalog` which is an internal class. There is no doc to explain this usage. This PR does 2 things: 1. refine the document of the config `spark.sql.catalog.session`. 2. add a public abstract class `CatalogExtension` for users to write implementations. TODOs for followup PRs: 1. discuss if we should allow users to completely overwrite the v2 session catalog with a new one. 2. discuss to change the name of session catalog, so that it's less likely to conflict with existing namespace names. ## How was this patch tested? existing tests Closes #25104 from cloud-fan/session-catalog. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent dadb720 commit abec6d7

File tree

18 files changed

+246
-94
lines changed

18 files changed

+246
-94
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
22+
23+
/**
24+
* An API to extend the Spark built-in session catalog. Implementation can get the built-in session
25+
* catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with
26+
* some custom logic and call the built-in session catalog at the end. For example, they can
27+
* implement {@code createTable}, do something else before calling {@code createTable} of the
28+
* built-in session catalog.
29+
*/
30+
@Experimental
31+
public interface CatalogExtension extends TableCatalog {
32+
33+
/**
34+
* This will be called only once by Spark to pass in the Spark built-in session catalog, after
35+
* {@link #initialize(String, CaseInsensitiveStringMap)} is called.
36+
*/
37+
void setDelegateCatalog(TableCatalog delegate);
38+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import java.util.Map;
21+
22+
import org.apache.spark.annotation.Experimental;
23+
import org.apache.spark.sql.catalog.v2.expressions.Transform;
24+
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
25+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
26+
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
27+
import org.apache.spark.sql.sources.v2.Table;
28+
import org.apache.spark.sql.types.StructType;
29+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
30+
31+
/**
32+
* A simple implementation of {@link CatalogExtension}, which implements all the catalog functions
33+
* by calling the built-in session catalog directly. This is created for convenience, so that users
34+
* only need to override some methods where they want to apply custom logic. For example, they can
35+
* override {@code createTable}, do something else before calling {@code super.createTable}.
36+
*/
37+
@Experimental
38+
public abstract class DelegatingCatalogExtension implements CatalogExtension {
39+
40+
private TableCatalog delegate;
41+
42+
public final void setDelegateCatalog(TableCatalog delegate) {
43+
this.delegate = delegate;
44+
}
45+
46+
@Override
47+
public String name() {
48+
return delegate.name();
49+
}
50+
51+
@Override
52+
public final void initialize(String name, CaseInsensitiveStringMap options) {}
53+
54+
@Override
55+
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
56+
return delegate.listTables(namespace);
57+
}
58+
59+
@Override
60+
public Table loadTable(Identifier ident) throws NoSuchTableException {
61+
return delegate.loadTable(ident);
62+
}
63+
64+
@Override
65+
public void invalidateTable(Identifier ident) {
66+
delegate.invalidateTable(ident);
67+
}
68+
69+
@Override
70+
public boolean tableExists(Identifier ident) {
71+
return delegate.tableExists(ident);
72+
}
73+
74+
@Override
75+
public Table createTable(
76+
Identifier ident,
77+
StructType schema,
78+
Transform[] partitions,
79+
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
80+
return delegate.createTable(ident, schema, partitions, properties);
81+
}
82+
83+
@Override
84+
public Table alterTable(
85+
Identifier ident,
86+
TableChange... changes) throws NoSuchTableException {
87+
return delegate.alterTable(ident, changes);
88+
}
89+
90+
@Override
91+
public boolean dropTable(Identifier ident) {
92+
return delegate.dropTable(ident);
93+
}
94+
95+
@Override
96+
public void renameTable(
97+
Identifier oldIdent,
98+
Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
99+
delegate.renameTable(oldIdent, newIdent);
100+
}
101+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@ import org.apache.spark.sql.internal.SQLConf
2727
* A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
2828
* the caller to look up a catalog by name.
2929
*/
30-
class CatalogManager(conf: SQLConf) extends Logging {
30+
class CatalogManager(conf: SQLConf, defaultSessionCatalog: TableCatalog) extends Logging {
3131

3232
private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]
3333

3434
def catalog(name: String): CatalogPlugin = synchronized {
35-
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
35+
if (name.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) {
36+
v2SessionCatalog
37+
} else {
38+
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
39+
}
3640
}
3741

3842
def defaultCatalog: Option[CatalogPlugin] = {
@@ -47,16 +51,30 @@ class CatalogManager(conf: SQLConf) extends Logging {
4751
}
4852
}
4953

50-
def v2SessionCatalog: Option[CatalogPlugin] = {
51-
try {
52-
Some(catalog(CatalogManager.SESSION_CATALOG_NAME))
53-
} catch {
54-
case NonFatal(e) =>
55-
logError("Cannot load v2 session catalog", e)
56-
None
54+
private def loadV2SessionCatalog(): CatalogPlugin = {
55+
Catalogs.load(CatalogManager.SESSION_CATALOG_NAME, conf) match {
56+
case extension: CatalogExtension =>
57+
extension.setDelegateCatalog(defaultSessionCatalog)
58+
extension
59+
case other => other
5760
}
5861
}
5962

63+
// If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2
64+
// session catalog. Otherwise, return the default session catalog.
65+
def v2SessionCatalog: CatalogPlugin = {
66+
conf.getConf(SQLConf.V2_SESSION_CATALOG).map { customV2SessionCatalog =>
67+
try {
68+
catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog())
69+
} catch {
70+
case NonFatal(_) =>
71+
logError(
72+
"Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)
73+
defaultSessionCatalog
74+
}
75+
}.getOrElse(defaultSessionCatalog)
76+
}
77+
6078
private def getDefaultNamespace(c: CatalogPlugin) = c match {
6179
case c: SupportsNamespaces => c.defaultNamespace()
6280
case _ => Array.empty[String]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ trait LookupCatalog extends Logging {
4545
* This happens when the source implementation extends the v2 TableProvider API and is not listed
4646
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
4747
*/
48-
def sessionCatalog: Option[CatalogPlugin] = catalogManager.v2SessionCatalog
48+
def sessionCatalog: CatalogPlugin = catalogManager.v2SessionCatalog
4949

5050
/**
5151
* Extract catalog plugin and remaining identifier names.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import java.util
2021
import java.util.Locale
2122

2223
import scala.collection.mutable
@@ -25,7 +26,7 @@ import scala.util.Random
2526

2627
import org.apache.spark.sql.AnalysisException
2728
import org.apache.spark.sql.catalog.v2._
28-
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform}
29+
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform}
2930
import org.apache.spark.sql.catalyst._
3031
import org.apache.spark.sql.catalyst.catalog._
3132
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -45,6 +46,7 @@ import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssig
4546
import org.apache.spark.sql.sources.v2.Table
4647
import org.apache.spark.sql.sources.v2.internal.V1Table
4748
import org.apache.spark.sql.types._
49+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4850

4951
/**
5052
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
@@ -60,6 +62,24 @@ object SimpleAnalyzer extends Analyzer(
6062
},
6163
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
6264

65+
object FakeV2SessionCatalog extends TableCatalog {
66+
private def fail() = throw new UnsupportedOperationException
67+
override def listTables(namespace: Array[String]): Array[Identifier] = fail()
68+
override def loadTable(ident: Identifier): Table = {
69+
throw new NoSuchTableException(ident.toString)
70+
}
71+
override def createTable(
72+
ident: Identifier,
73+
schema: StructType,
74+
partitions: Array[Transform],
75+
properties: util.Map[String, String]): Table = fail()
76+
override def alterTable(ident: Identifier, changes: TableChange*): Table = fail()
77+
override def dropTable(ident: Identifier): Boolean = fail()
78+
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = fail()
79+
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = fail()
80+
override def name(): String = fail()
81+
}
82+
6383
/**
6484
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
6585
* of analysis environment from the catalog.
@@ -101,15 +121,21 @@ object AnalysisContext {
101121
*/
102122
class Analyzer(
103123
catalog: SessionCatalog,
124+
v2SessionCatalog: TableCatalog,
104125
conf: SQLConf,
105126
maxIterations: Int)
106127
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
107128

129+
// Only for tests.
108130
def this(catalog: SessionCatalog, conf: SQLConf) = {
109-
this(catalog, conf, conf.optimizerMaxIterations)
131+
this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations)
132+
}
133+
134+
def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = {
135+
this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations)
110136
}
111137

112-
override val catalogManager: CatalogManager = new CatalogManager(conf)
138+
override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog)
113139

114140
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
115141
AnalysisHelper.markInAnalyzer {
@@ -954,7 +980,7 @@ class Analyzer(
954980
case scala.Right(tableOpt) =>
955981
tableOpt.map { table =>
956982
AlterTable(
957-
sessionCatalog.get.asTableCatalog, // table being resolved means this exists
983+
sessionCatalog.asTableCatalog,
958984
Identifier.of(tableName.init.toArray, tableName.last),
959985
DataSourceV2Relation.create(table),
960986
changes
@@ -2837,7 +2863,7 @@ class Analyzer(
28372863
case CatalogObjectIdentifier(Some(v2Catalog), ident) =>
28382864
scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident)))
28392865
case CatalogObjectIdentifier(None, ident) =>
2840-
catalogManager.v2SessionCatalog.flatMap(loadTable(_, ident)) match {
2866+
loadTable(catalogManager.v2SessionCatalog, ident) match {
28412867
case Some(_: V1Table) => scala.Right(None)
28422868
case other => scala.Right(other)
28432869
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1965,9 +1965,12 @@ object SQLConf {
19651965
.createOptional
19661966

19671967
val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
1968-
.doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
1968+
.doc("A catalog implementation that will be used in place of the Spark built-in session " +
1969+
"catalog for v2 operations. The implementation may extend `CatalogExtension` to be " +
1970+
"passed the Spark built-in session catalog, so that it may delegate calls to the " +
1971+
"built-in session catalog.")
19691972
.stringConf
1970-
.createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")
1973+
.createOptional
19711974

19721975
val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
19731976
.doc("When true, the upcast will be loose and allows string to atomic types.")

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import java.util
2121

2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.catalog.v2.{CatalogManager, NamespaceChange, SupportsNamespaces}
24+
import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
2425
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2627

2728
class CatalogManagerSuite extends SparkFunSuite {
2829

2930
test("CatalogManager should reflect the changes of default catalog") {
3031
val conf = new SQLConf
31-
val catalogManager = new CatalogManager(conf)
32+
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
3233
assert(catalogManager.currentCatalog.isEmpty)
3334
assert(catalogManager.currentNamespace.sameElements(Array("default")))
3435

@@ -42,7 +43,7 @@ class CatalogManagerSuite extends SparkFunSuite {
4243

4344
test("CatalogManager should keep the current catalog once set") {
4445
val conf = new SQLConf
45-
val catalogManager = new CatalogManager(conf)
46+
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
4647
assert(catalogManager.currentCatalog.isEmpty)
4748
conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)
4849
catalogManager.setCurrentCatalog("dummy")
@@ -57,7 +58,7 @@ class CatalogManagerSuite extends SparkFunSuite {
5758

5859
test("current namespace should be updated when switching current catalog") {
5960
val conf = new SQLConf
60-
val catalogManager = new CatalogManager(conf)
61+
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
6162
catalogManager.setCurrentNamespace(Array("abc"))
6263
assert(catalogManager.currentNamespace.sameElements(Array("abc")))
6364

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -354,15 +354,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
354354

355355
val session = df.sparkSession
356356
val canUseV2 = lookupV2Provider().isDefined
357-
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
357+
val sessionCatalog = session.sessionState.analyzer.sessionCatalog
358358

359359
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
360360
case CatalogObjectIdentifier(Some(catalog), ident) =>
361361
insertInto(catalog, ident)
362362

363-
case CatalogObjectIdentifier(None, ident)
364-
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
365-
insertInto(sessionCatalogOpt.get, ident)
363+
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
364+
insertInto(sessionCatalog, ident)
366365

367366
case AsTableIdentifier(tableIdentifier) =>
368367
insertInto(tableIdentifier)
@@ -488,17 +487,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
488487

489488
val session = df.sparkSession
490489
val canUseV2 = lookupV2Provider().isDefined
491-
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
490+
val sessionCatalog = session.sessionState.analyzer.sessionCatalog
492491

493492
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
494493
case CatalogObjectIdentifier(Some(catalog), ident) =>
495494
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)
496495

497-
case CatalogObjectIdentifier(None, ident)
498-
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
496+
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
499497
// We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility
500498
// for now.
501-
saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1)
499+
saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1)
502500

503501
case AsTableIdentifier(tableIdentifier) =>
504502
saveAsTable(tableIdentifier)

0 commit comments

Comments
 (0)