Skip to content

Commit 6170e40

Browse files
rdbluecloud-fan
authored andcommitted
[SPARK-24252][SQL] Add v2 catalog plugin system
## What changes were proposed in this pull request? This adds a v2 API for adding new catalog plugins to Spark. * Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources * `Catalogs` loads and initializes catalogs using configuration from a `SQLConf` * `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize` Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name. This replaces #21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`. ## How was this patch tested? Added test suites for `CaseInsensitiveStringMap` and for catalog loading. Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins. Authored-by: Ryan Blue <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 2036074 commit 6170e40

File tree

6 files changed

+544
-0
lines changed

6 files changed

+544
-0
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.internal.SQLConf;
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
23+
24+
/**
25+
* A marker interface to provide a catalog implementation for Spark.
26+
* <p>
27+
* Implementations can provide catalog functions by implementing additional interfaces for tables,
28+
* views, and functions.
29+
* <p>
30+
* Catalog implementations must implement this marker interface to be loaded by
31+
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
32+
* required public no-arg constructor. After creating an instance, it will be configured by calling
33+
* {@link #initialize(String, CaseInsensitiveStringMap)}.
34+
* <p>
35+
* Catalog implementations are registered to a name by adding a configuration option to Spark:
36+
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
37+
* in the Spark configuration that share the catalog name prefix,
38+
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
39+
* string map of options in initialization with the prefix removed.
40+
* {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".
41+
*/
42+
@Experimental
43+
public interface CatalogPlugin {
44+
/**
45+
* Called to initialize configuration.
46+
* <p>
47+
* This method is called once, just after the provider is instantiated.
48+
*
49+
* @param name the name used to identify and load this catalog
50+
* @param options a case-insensitive string map of configuration
51+
*/
52+
void initialize(String name, CaseInsensitiveStringMap options);
53+
54+
/**
55+
* Called to get this catalog's name.
56+
* <p>
57+
* This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is
58+
* called to pass the catalog's name.
59+
*/
60+
String name();
61+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.SparkException;
21+
import org.apache.spark.annotation.Private;
22+
import org.apache.spark.sql.internal.SQLConf;
23+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
24+
import org.apache.spark.util.Utils;
25+
26+
import java.util.Map;
27+
import java.util.regex.Matcher;
28+
import java.util.regex.Pattern;
29+
30+
import static scala.collection.JavaConverters.mapAsJavaMapConverter;
31+
32+
@Private
33+
public class Catalogs {
34+
private Catalogs() {
35+
}
36+
37+
/**
38+
* Load and configure a catalog by name.
39+
* <p>
40+
* This loads, instantiates, and initializes the catalog plugin for each call; it does not cache
41+
* or reuse instances.
42+
*
43+
* @param name a String catalog name
44+
* @param conf a SQLConf
45+
* @return an initialized CatalogPlugin
46+
* @throws SparkException If the plugin class cannot be found or instantiated
47+
*/
48+
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException {
49+
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
50+
if (pluginClassName == null) {
51+
throw new SparkException(String.format(
52+
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
53+
}
54+
55+
ClassLoader loader = Utils.getContextOrSparkClassLoader();
56+
57+
try {
58+
Class<?> pluginClass = loader.loadClass(pluginClassName);
59+
60+
if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) {
61+
throw new SparkException(String.format(
62+
"Plugin class for catalog '%s' does not implement CatalogPlugin: %s",
63+
name, pluginClassName));
64+
}
65+
66+
CatalogPlugin plugin = CatalogPlugin.class.cast(pluginClass.newInstance());
67+
68+
plugin.initialize(name, catalogOptions(name, conf));
69+
70+
return plugin;
71+
72+
} catch (ClassNotFoundException e) {
73+
throw new SparkException(String.format(
74+
"Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName));
75+
76+
} catch (IllegalAccessException e) {
77+
throw new SparkException(String.format(
78+
"Failed to call public no-arg constructor for catalog '%s': %s", name, pluginClassName),
79+
e);
80+
81+
} catch (InstantiationException e) {
82+
throw new SparkException(String.format(
83+
"Failed while instantiating plugin for catalog '%s': %s", name, pluginClassName),
84+
e.getCause());
85+
}
86+
}
87+
88+
/**
89+
* Extracts a named catalog's configuration from a SQLConf.
90+
*
91+
* @param name a catalog name
92+
* @param conf a SQLConf
93+
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
94+
*/
95+
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
96+
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
97+
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");
98+
99+
CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
100+
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
101+
Matcher matcher = prefix.matcher(entry.getKey());
102+
if (matcher.matches() && matcher.groupCount() > 0) {
103+
options.put(matcher.group(1), entry.getValue());
104+
}
105+
}
106+
107+
return options;
108+
}
109+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.util;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
import java.util.Collection;
23+
import java.util.HashMap;
24+
import java.util.Locale;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
/**
29+
* Case-insensitive map of string keys to string values.
30+
* <p>
31+
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
32+
* <p>
33+
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
34+
* keys converted to lower case.
35+
*/
36+
@Experimental
37+
public class CaseInsensitiveStringMap implements Map<String, String> {
38+
39+
public static CaseInsensitiveStringMap empty() {
40+
return new CaseInsensitiveStringMap();
41+
}
42+
43+
private final Map<String, String> delegate;
44+
45+
private CaseInsensitiveStringMap() {
46+
this.delegate = new HashMap<>();
47+
}
48+
49+
@Override
50+
public int size() {
51+
return delegate.size();
52+
}
53+
54+
@Override
55+
public boolean isEmpty() {
56+
return delegate.isEmpty();
57+
}
58+
59+
@Override
60+
public boolean containsKey(Object key) {
61+
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
62+
}
63+
64+
@Override
65+
public boolean containsValue(Object value) {
66+
return delegate.containsValue(value);
67+
}
68+
69+
@Override
70+
public String get(Object key) {
71+
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
72+
}
73+
74+
@Override
75+
public String put(String key, String value) {
76+
return delegate.put(key.toLowerCase(Locale.ROOT), value);
77+
}
78+
79+
@Override
80+
public String remove(Object key) {
81+
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
82+
}
83+
84+
@Override
85+
public void putAll(Map<? extends String, ? extends String> m) {
86+
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
87+
put(entry.getKey(), entry.getValue());
88+
}
89+
}
90+
91+
@Override
92+
public void clear() {
93+
delegate.clear();
94+
}
95+
96+
@Override
97+
public Set<String> keySet() {
98+
return delegate.keySet();
99+
}
100+
101+
@Override
102+
public Collection<String> values() {
103+
return delegate.values();
104+
}
105+
106+
@Override
107+
public Set<Map.Entry<String, String>> entrySet() {
108+
return delegate.entrySet();
109+
}
110+
}

0 commit comments

Comments
 (0)