Skip to content

Commit db4a417

Browse files
committed
[SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation
1 parent a79cb84 commit db4a417

File tree

17 files changed

+441
-80
lines changed

17 files changed

+441
-80
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ statement
139139
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
140140
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
141141
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
142-
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
143-
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
142+
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
143+
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable
144144
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
145145
VIEW (IF NOT EXISTS)? tableIdentifier
146146
identifierCommentList?
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.SparkException;
21+
import org.apache.spark.annotation.Private;
22+
import org.apache.spark.sql.internal.SQLConf;
23+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
24+
25+
import static org.apache.spark.sql.catalog.v2.Catalogs.classKey;
26+
import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey;
27+
import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix;
28+
import static scala.collection.JavaConverters.mapAsJavaMapConverter;
29+
30+
@Private
31+
public class CatalogManager {
32+
33+
private final SQLConf conf;
34+
35+
public CatalogManager(SQLConf conf) {
36+
this.conf = conf;
37+
}
38+
39+
/**
40+
* Load a catalog.
41+
*
42+
* @param name a catalog name
43+
* @return a catalog plugin
44+
*/
45+
public CatalogPlugin load(String name) throws SparkException {
46+
return Catalogs.load(name, conf);
47+
}
48+
49+
/**
50+
* Add a catalog.
51+
*
52+
* @param name a catalog name
53+
* @param pluginClassName a catalog plugin class name
54+
* @param options catalog options
55+
*/
56+
public void add(
57+
String name,
58+
String pluginClassName,
59+
CaseInsensitiveStringMap options) {
60+
options.entrySet().stream()
61+
.forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue()));
62+
conf.setConfString(classKey(name), pluginClassName);
63+
}
64+
65+
/**
66+
* Add a catalog without option.
67+
*
68+
* @param name a catalog name
69+
* @param pluginClassName a catalog plugin class name
70+
*/
71+
public void add(
72+
String name,
73+
String pluginClassName) {
74+
add(name, pluginClassName, CaseInsensitiveStringMap.empty());
75+
}
76+
77+
/**
78+
* Remove a catalog.
79+
*
80+
* @param name a catalog name
81+
*/
82+
public void remove(String name) {
83+
conf.unsetConf(classKey(name));
84+
mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream()
85+
.filter(key -> isOptionKey(name, key))
86+
.forEach(conf::unsetConf);
87+
}
88+
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2424
import org.apache.spark.util.Utils;
2525

26-
import java.util.HashMap;
2726
import java.util.Map;
28-
import java.util.regex.Matcher;
29-
import java.util.regex.Pattern;
27+
import java.util.stream.Collectors;
3028

3129
import static scala.collection.JavaConverters.mapAsJavaMapConverter;
3230

@@ -35,6 +33,23 @@ public class Catalogs {
3533
private Catalogs() {
3634
}
3735

36+
public static String classKey(String name) {
37+
return "spark.sql.catalog." + name;
38+
}
39+
40+
public static String optionKeyPrefix(String name) {
41+
return "spark.sql.catalog." + name + ".";
42+
}
43+
44+
public static boolean isOptionKey(String name, String keyName) {
45+
return keyName.startsWith(optionKeyPrefix(name));
46+
}
47+
48+
public static String optionName(String name, String keyName) {
49+
assert(isOptionKey(name, keyName));
50+
return keyName.substring(optionKeyPrefix(name).length());
51+
}
52+
3853
/**
3954
* Load and configure a catalog by name.
4055
* <p>
@@ -49,10 +64,10 @@ private Catalogs() {
4964
*/
5065
public static CatalogPlugin load(String name, SQLConf conf)
5166
throws CatalogNotFoundException, SparkException {
52-
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
67+
String pluginClassName = conf.getConfString(classKey(name), null);
5368
if (pluginClassName == null) {
5469
throw new CatalogNotFoundException(String.format(
55-
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
70+
"Catalog '%s' plugin class not found: %s is not defined", name, classKey(name)));
5671
}
5772

5873
ClassLoader loader = Utils.getContextOrSparkClassLoader();
@@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf)
96111
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
97112
*/
98113
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
99-
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
100-
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");
101-
102-
HashMap<String, String> options = new HashMap<>();
103-
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
104-
Matcher matcher = prefix.matcher(entry.getKey());
105-
if (matcher.matches() && matcher.groupCount() > 0) {
106-
options.put(matcher.group(1), entry.getValue());
107-
}
108-
}
109-
114+
Map<String, String> options =
115+
mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream()
116+
.filter(e -> isOptionKey(name, e.getKey()))
117+
.collect(Collectors.toMap(
118+
e -> optionName(name, e.getKey()),
119+
e -> e.getValue()));
110120
return new CaseInsensitiveStringMap(options);
111121
}
112122
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import java.util.Arrays;
2424
import java.util.Objects;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.Stream;
2527

2628
/**
2729
* An {@link Identifier} implementation.
@@ -49,6 +51,21 @@ public String name() {
4951
return name;
5052
}
5153

54+
private String quote(String part) {
55+
if (part.contains("`")) {
56+
return part.replace("`", "``");
57+
} else {
58+
return part;
59+
}
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return Stream.concat(Stream.of(namespace), Stream.of(name))
65+
.map(part -> '`' + quote(part) + '`')
66+
.collect(Collectors.joining("."));
67+
}
68+
5269
@Override
5370
public boolean equals(Object o) {
5471
if (this == o) {

sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.HashMap;
2727
import java.util.Locale;
2828
import java.util.Map;
29+
import java.util.Objects;
2930
import java.util.Set;
3031

3132
/**
@@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) {
178179
public Map<String, String> asCaseSensitiveMap() {
179180
return Collections.unmodifiableMap(original);
180181
}
182+
183+
@Override
184+
public boolean equals(Object o) {
185+
if (this == o) return true;
186+
if (o == null || getClass() != o.getClass()) return false;
187+
CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o;
188+
return delegate.equals(that.delegate);
189+
}
190+
191+
@Override
192+
public int hashCode() {
193+
return Objects.hash(delegate);
194+
}
181195
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,14 @@ object OverwritePartitionsDynamic {
499499
}
500500
}
501501

502+
/**
503+
* Drop a table.
504+
*/
505+
case class DropTable(
506+
catalog: TableCatalog,
507+
ident: Identifier,
508+
ifExists: Boolean) extends Command
509+
502510

503511
/**
504512
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
/**
24+
* A DROP TABLE statement, as parsed from SQL.
25+
*/
26+
case class DropTableStatement(
27+
tableName: Seq[String],
28+
ifExists: Boolean,
29+
isView: Boolean,
30+
purge: Boolean) extends ParsedStatement {
31+
32+
override def output: Seq[Attribute] = Seq.empty
33+
34+
override def children: Seq[LogicalPlan] = Seq.empty
35+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.SparkException;
21+
import org.apache.spark.sql.internal.SQLConf;
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.junit.rules.ExpectedException;
26+
27+
import java.util.HashMap;
28+
29+
import static org.hamcrest.core.Is.is;
30+
import static org.hamcrest.core.IsInstanceOf.instanceOf;
31+
import static org.junit.Assert.assertThat;
32+
33+
public class CatalogManagerSuite {
34+
35+
@Rule
36+
public final ExpectedException exception = ExpectedException.none();
37+
38+
CatalogManager catalogManager = new CatalogManager(new SQLConf());
39+
40+
@Test
41+
public void testAdd() throws SparkException {
42+
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(
43+
new HashMap<String, String>() {{
44+
put("option1", "value1");
45+
put("option2", "value2");
46+
}});
47+
catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options);
48+
CatalogPlugin catalogPlugin = catalogManager.load("testcat");
49+
assertThat(catalogPlugin.name(), is("testcat"));
50+
assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class));
51+
assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options));
52+
}
53+
54+
@Test
55+
public void testAddWithOption() throws SparkException {
56+
catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName());
57+
CatalogPlugin catalogPlugin = catalogManager.load("testcat");
58+
assertThat(catalogPlugin.name(), is("testcat"));
59+
assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class));
60+
assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty()));
61+
}
62+
63+
@Test
64+
public void testRemove() throws SparkException {
65+
catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName());
66+
catalogManager.load("testcat");
67+
catalogManager.remove("testcat");
68+
exception.expect(CatalogNotFoundException.class);
69+
catalogManager.load("testcat");
70+
}
71+
}

0 commit comments

Comments
 (0)