Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ statement
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList?
Expand Down Expand Up @@ -397,7 +397,7 @@ queryTerm

queryPrimary
: querySpecification #queryPrimaryDefault
| TABLE tableIdentifier #table
| TABLE multipartIdentifier #table
| inlineTable #inlineTableDefault1
| '(' queryNoWith ')' #subquery
;
Expand Down Expand Up @@ -536,7 +536,7 @@ identifierComment
;

relationPrimary
: tableIdentifier sample? tableAlias #tableName
: multipartIdentifier sample? tableAlias #tableName
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.annotation.Private;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import static org.apache.spark.sql.catalog.v2.Catalogs.classKey;
import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey;
import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix;
import static scala.collection.JavaConverters.mapAsJavaMapConverter;

@Private
public class CatalogManager {

private final SQLConf conf;

public CatalogManager(SQLConf conf) {
this.conf = conf;
}

/**
* Load a catalog.
*
* @param name a catalog name
* @return a catalog plugin
*/
public CatalogPlugin load(String name) throws SparkException {
return Catalogs.load(name, conf);
}

/**
* Add a catalog.
*
* @param name a catalog name
* @param pluginClassName a catalog plugin class name
* @param options catalog options
*/
public void add(
String name,
String pluginClassName,
CaseInsensitiveStringMap options) {
options.entrySet().stream()
.forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue()));
conf.setConfString(classKey(name), pluginClassName);
}

/**
* Add a catalog without option.
*
* @param name a catalog name
* @param pluginClassName a catalog plugin class name
*/
public void add(
String name,
String pluginClassName) {
add(name, pluginClassName, CaseInsensitiveStringMap.empty());
}

/**
* Remove a catalog.
*
* @param name a catalog name
*/
public void remove(String name) {
conf.unsetConf(classKey(name));
mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream()
.filter(key -> isOptionKey(name, key))
.forEach(conf::unsetConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static scala.collection.JavaConverters.mapAsJavaMapConverter;

Expand All @@ -35,6 +33,23 @@ public class Catalogs {
private Catalogs() {
}

public static String classKey(String name) {
return "spark.sql.catalog." + name;
}

public static String optionKeyPrefix(String name) {
return "spark.sql.catalog." + name + ".";
}

public static boolean isOptionKey(String name, String keyName) {
return keyName.startsWith(optionKeyPrefix(name));
}

public static String optionName(String name, String keyName) {
assert(isOptionKey(name, keyName));
return keyName.substring(optionKeyPrefix(name).length());
}

/**
* Load and configure a catalog by name.
* <p>
Expand All @@ -49,10 +64,10 @@ private Catalogs() {
*/
public static CatalogPlugin load(String name, SQLConf conf)
throws CatalogNotFoundException, SparkException {
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
String pluginClassName = conf.getConfString(classKey(name), null);
if (pluginClassName == null) {
throw new CatalogNotFoundException(String.format(
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
"Catalog '%s' plugin class not found: %s is not defined", name, classKey(name)));
}

ClassLoader loader = Utils.getContextOrSparkClassLoader();
Expand Down Expand Up @@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf)
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

HashMap<String, String> options = new HashMap<>();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

Map<String, String> options =
mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream()
.filter(e -> isOptionKey(name, e.getKey()))
.collect(Collectors.toMap(
e -> optionName(name, e.getKey()),
e -> e.getValue()));
return new CaseInsensitiveStringMap(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* An {@link Identifier} implementation.
Expand Down Expand Up @@ -49,6 +51,21 @@ public String name() {
return name;
}

private String quote(String part) {
if (part.contains("`")) {
return part.replace("`", "``");
} else {
return part;
}
}

@Override
public String toString() {
return Stream.concat(Stream.of(namespace), Stream.of(name))
.map(part -> '`' + quote(part) + '`')
.collect(Collectors.joining("."));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) {
public Map<String, String> asCaseSensitiveMap() {
return Collections.unmodifiableMap(original);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o;
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return Objects.hash(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalog.v2

import scala.util.{Failure, Success, Try}

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier

Expand All @@ -26,27 +28,31 @@ import org.apache.spark.sql.catalyst.TableIdentifier
@Experimental
trait LookupCatalog {

def lookupCatalog: Option[(String) => CatalogPlugin] = None
def lookupCatalog: Option[String => CatalogPlugin] = None

type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)

/**
* Extract catalog plugin and identifier from a multi-part identifier.
*/
object CatalogObjectIdentifier {
def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup =>
parts match {
case Seq(name) =>
(None, Identifier.of(Array.empty, name))
case Seq(catalogName, tail @ _*) =>
try {
val catalog = lookup(catalogName)
(Some(catalog), Identifier.of(tail.init.toArray, tail.last))
} catch {
case _: CatalogNotFoundException =>
(None, Identifier.of(parts.init.toArray, parts.last))
}
}
def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = parts match {
case Seq(name) =>
Some((None, Identifier.of(Array.empty, name)))
case Seq(catalogName, tail @ _*) =>
lookupCatalog match {
case Some(lookup) =>
Try(lookup(catalogName)) match {
case Success(catalog) =>
Some((Some(catalog), Identifier.of(tail.init.toArray, tail.last)))
case Failure(_: CatalogNotFoundException) =>
Some((None, Identifier.of(parts.init.toArray, parts.last)))
case Failure(ex) =>
throw ex
}
case None =>
Some((None, Identifier.of(parts.init.toArray, parts.last)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2

import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier, TableIdentifierLike}

/**
* Resolve multipart identifier to [[CatalogTableIdentifier]] or [[TableIdentifier]].
*/
trait TableIdentifierHelper extends LookupCatalog {
import CatalogV2Implicits._

implicit class TableIdentifierHelper(parts: Seq[String]) {
def asCatalogTableIdentifier: TableIdentifierLike = parts match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
CatalogTableIdentifier(catalog.asTableCatalog, ident)

case AsTableIdentifier(tableIdentifier) =>
tableIdentifier
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ class Analyzer(

def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
plan resolveOperatorsDown {
case u: UnresolvedRelation =>
cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).getOrElse(u)
case u @ UnresolvedRelation(TableIdentifier(table, _)) =>
cteRelations.find(x => resolver(x._1, table)).map(_._2).getOrElse(u)
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
Expand Down Expand Up @@ -721,7 +720,7 @@ class Analyzer(
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
}
case u: UnresolvedRelation => resolveRelation(u)
case u @ UnresolvedRelation(_: TableIdentifier) => resolveRelation(u)
}

// Look up the table with the given name from catalog. The database we used is decided by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -71,17 +72,18 @@ object ResolveHints {

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case ResolvedHint(u: UnresolvedRelation, hint)
if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
case ResolvedHint(u @ UnresolvedRelation(TableIdentifier(table, _)), hint)
if relations.exists(resolver(_, table)) =>
relations.remove(table)
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
case ResolvedHint(r: SubqueryAlias, hint)
if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))

case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
case UnresolvedRelation(TableIdentifier(table, _))
if relations.exists(resolver(_, table)) =>
relations.remove(table)
ResolvedHint(plan, createHintInfo(hintName))
case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
Expand Down
Loading